ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/gclib/scripts/gridx
Revision: 71
Committed: Fri Sep 23 19:23:49 2011 UTC (8 years ago) by gpertea
File size: 66008 byte(s)
Log Message:
gridx fixed to export PERL5LIB and PYTHONPATH

Line User Rev File contents
1 gpertea 23 #!/usr/bin/perl
2     require 5.8.0;
3     use strict;
4     use Getopt::Std;
5     use File::Basename; #for basename, dirname
6     use POSIX "sys_wait_h"; # mostly for the handy uname() function
7     use Cwd qw(abs_path cwd);
8     use Fcntl qw(:DEFAULT :seek); #mostly for the SEEK constants
9     use FindBin; use lib "$FindBin::Bin";
10    
11     my $NORMAL_ENDING=1; #set to 0 while working, to 1 upon normal ending of script (i.e. no die())
12     my $USER=$ENV{USER} || POSIX::cuserid(); #it may not be there for condor workers
13     my $PWD=cwd(); #from Cwd module
14     my $PERL_BIN='/usr/bin/perl';
15     my $MAX_RETRIES=3; #-- how many times to try a failed task
16     my $F_WRKCOUNT='.wrkCount'; # count of currently running workers
17     my $F_ALLDONE='.all.Done.';
18     my $F_WRKSTART='.wrkStarted'; #count of (ever) started workers
19     my $F_LASTTASK='.lastTask'; # maximum task# ever started
20     my $F_TASKSDONE='tasksDone'; # number of tasks successfully finished
21     my $F_ERRTASKS='err.tasks'; #LIST of task#s which returned non-zero status
22     # even after MAX_RETRIES
23     my $F_RETRYTASKS='retry.tasks'; #stack of task#s which returned non-zero status
24     #less then MAX_RETRIES times
25     my $F_WRKRUNNING='.running-worker'; #semaphore file in each worker directory
26     my $F_ENDCMD='epilogue';
27     my $F_WRKDIR='workdir';
28     my $F_NOTIFY='notify';
29     my $F_TASKDB='taskDb';
30     my $GRID_DEBUG;
31     my $starting_dir;
32     my $STARTED_GRID_TASK;
33     my $SMPChildren=0; #SMP case: number of children running
34     my %SMPChildren=(); #set of child PIDs
35     my %Locks = (); # filehandle -> [lockfilelink, hostlockfile]
36     my $HOSTNAME = (&POSIX::uname)[1]; # can't trust HOST envvar under condor, because
37     # it will either inherit HOST and HOSTNAME from submitter env for getenv=TRUE,
38     # OR it will not set anything for getenv=FALSE
39     chomp($HOSTNAME);
40     $HOSTNAME=lc($HOSTNAME);
41     my $HOST=$HOSTNAME;
42     my ($DOMAIN)=($HOST=~m/^[\w\-]+\.(.+)/);
43     unless ($DOMAIN) {
44     if ($HOST=~m/^flicker|^wren|^ibis/) {
45     $DOMAIN='umiacs.umd.edu';
46     }
47     unless ($DOMAIN) {
48     $DOMAIN=($PWD=~/^\/export|^\/home/) ? 'dfci.harvard.edu' : 'umiacs.umd.edu';
49     }
50     }
51     ($HOST)=($HOST=~m/^([\w\-]+)/);
52     $ENV{HOST}=$HOST;
53     $ENV{MACHINE}=$HOST;
54     $ENV{HOSTNAME}=$HOSTNAME;
55     $ENV{USER}=$USER;
56     #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! - site specific -
57     #---------- modify the SITE-SPECIFIC paths here if needed:
58     #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&!
59     # ===== PATH management ====
60     # default: assume architecture/PATH/LIB uniformity across nodes
61     # you can change this by adding your own worker/server paths here
62     my $binpath=$ENV{PATH};
63     my $libpath=$ENV{LD_LIBRARY_PATH};
64     my $perllib=$ENV{PERLLIB};
65 gpertea 71 my $perl5lib=$ENV{PERL5LIB};
66     my $pythonpath=$ENV{PYTHONPATH};
67 gpertea 23
68     # use home directory for symbolic links to working directories
69     my $homebase=$ENV{HOME};
70     $homebase=~s/(\/[^\/]+$)//;
71    
72     $homebase='/fs/wrenhomes' unless $homebase; #this is just valid for UMIACS here
73     # sometimes the HOME path is not defined within the condor job,
74     # please use your own globally mounted path instead
75    
76     # default grid engine used:
77     #my $GRID_ENGINE='sge'; # can also be: 'condor' and 'smp'
78     my ($GRID_MONHOME, $GRID_ENGINE) = ($DOMAIN=~/dfci/) ? ('/home', 'sge')
79     : ($homebase, 'condor');
80     my $sysopen_mode = O_RDWR | O_CREAT ;
81     $sysopen_mode |= O_DIRECT if $DOMAIN!~/umiacs/;
82    
83     #print STDERR "|$GRID_MONHOME|\n";
84     #&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&! -
85     #
86     my $morehints=qq{
87     The following entries will be created by gridx:
88     <GRID_JOBDIR>/epilogue - only if -e option was given, a file containing the
89     <end_cmd> parameter
90     <GRID_JOBDIR>/notify - a file containing the e-mail address to notify
91     if -m was given
92     <GRID_JOBDIR>/.lastTask - a file keeping track of the last task# scheduled
93     <GRID_JOBDIR>/.wrkStarted - a file keeping track of the number of
94     <GRID_JOBDIR>/.wrkCount - a file keeping track of the number of
95     currenly running workers
96     <GRID_JOBDIR>/err.tasks - list of all error-terminated tasks, after retry
97     (should be zero or empty if all tasks
98     finished successfully)
99     <GRID_JOBDIR>/retry.tasks - list of all error-terminated tasks that will
100     still be retried
101     <GRID_JOBDIR>/taskDb - pseudo-fasta db with the info & status
102     of each task
103     <GRID_JOBDIR>/taskDb.cidx - the cdbfasta index of the above db file
104     <GRID_JOBDIR>/locks/ - lockfiles/links are placed here
105     <GRID_JOBDIR>/running/ - while a task is processed, a file entry
106     called task_<GRID_TASK> will be created in
107     here for locking purposes; such file will have
108     a line with processing info for the current
109     task: <hostname> <pid> <CPU#>
110     <GRID_JOBDIR>/wrk_<CPU#>/ - working directory exclusive to each worker
111     process, one per CPU (1 <= CPU <= maxCPUs);
112     also contains stderr and stdout log files
113     for each worker process (wrk_stderr.log and
114     wrk_stdout.log)
115     * if -i option is used, a slice db file is created for <fastadb> with the
116     positions of all slices of <numseqs> fasta records in the <fastadb> file.
117     This file is called <GRID_JOBDIR>/taskDb
118     * if -b option was given: executes the <begin_script> in the current submit
119     directory (the one above <GRID_JOBDIR>)
120     * if all tasks completed successfully and if -e option was given
121     the <end_script> is executed in the <GRID_JOBDIR> subdirectory
122     };
123    
124     my $usage=qq{
125     Submit a list of commands to the grid. Can also automatically
126     slice a multi-fasta file into chunks and pass the slice file names as
127     parameters to a <cmd>:
128    
129     Usage:
130    
131     gridx \{-f <cmds_file> | -i <fastadb> | -r <numtasks> \} -p <numprocs>
132     \{-U|-u <slot#>\} [-n <numseqs>] [-s <skipseqs>] [-t <totalseqs>]
133     [-g <engine>] [-d <dir_prefix>] [-a] [-M <monitoring_basedir>]
134     [-L <local_wrk_dir>] [-O <log_files_dir>] [-S] [-T] [-v]
135     [-x <node_exclude_list>] [-y <node_targets_list>]
136     [-b <begin_script>] [-e <end_script>] [-q] [-m <e-mail>]
137     [-c] <cmd> [-C] [<cmd_args>..]
138    
139     Unless -J option is given, gridx will submit the job to the grid,
140     creating a <GRID_JOBDIR> subdirectory (which on Condor has the format:
141     gridx-<hostname>_<job_id>). A file called 'cmdline-<numtasks>.cmd'
142     will also be created there containing the current gridx commmand line.
143    
144     Options:
145     -g grid engine to use, can be 'smp', 'condor' or 'sge' (default: $GRID_ENGINE)
146     -p maximum number of grid CPUs to allocate for this job
147     -f provide a file with a list of commands to be run on the grid (one
148     command per line per CPU)
149     -i grid processing of slices of the given multi-fasta file <fastadb>;
150     the total number of tasks/iterations is given by the number of slices
151     in <fastadb> (and the -n <numseqs> option); the script will
152     automatically create the slice file in the worker directory before
153     <cmd> is run with these parameters:
154     <fastaslice> <numseqs> <slice#> <is_last> <skipped> <total> <cmd_args>
155     -n slice size: for -i option, how many fasta records from <fastadb>
156     should be placed into a slice file for each iteration (default: 2000)
157     -C legacy option for psx-like usage (to pass <cmd_args>)
158     -r submit an array job with <numtasks> iterations/tasks (default: 1)
159    
160     -S switch to (stay in) the current directory (where gridx was launched from)
161     before each task execution (especially useful for -f option)
162     -a (psx legacy): inherit the submitter environment (default)
163     -b prologue script to be executed BEFORE the grid job is submitted
164     -e epilogue script to be executed AFTER all the grid tasks were completed,
165     and only if ALL the tasks terminated successfully.
166     -m send an e-mail to the given address when all tasks are finished
167     -v do not try to validate <cmd> (assume it is going to be valid on the grid
168     nodes)
169     -x provide a list of machines (host names) that should be excluded
170     from the condor pool for this submitted job ('condor' engine only)
171     -y the submitted job is only allowed to run on the machines on this list
172     ('condor' engine only)
173     -d create <dir_prefix> prefixed symbolic links to worker directories
174     in the current directory
175     -O place all Condor log files (condor stderr/stdout redirects)
176     into <log_files_dir>
177     -M parent directory for creating a "monitoring" symlink; a symbolic link to
178     the current <GRID_JOBDIR> will be created there, under
179     <monitoring_basedir>/$USER/ (default: home directory)
180     -T do not exit after the job is submitted, wait around instead
181     until the last worker terminates and prints some job completion stats
182     -U force one worker per machine (only for 'condor' engine)
183     -u force each worker to run only on CPU <slot#> (between 1 and 12) of each
184     machine (only for 'condor' engine)
185     For -r option, the provided <cmd> could make use of the environment variables
186     GRID_TASK, GRID_TASKLAST to determine the current iteration being executed.
187    
188     Unless -S option was used, each <cmd> iteration will be executed in its own
189     worker subdirectory <GRID_JOBDIR>/wrk_<CPU#>
190    
191     Job monitoring/resuming/stopping (-J mode):
192    
193     gridx [-m e-mail] [-e <end_script>] [-K|-R] -J <jobID>
194    
195     If -J option is provided, gridx will report the status of the job <jobID>
196     which must have been submitted by a previous, "regular" use of gridx; it relies
197     on the the symbolic link <monitor_basedir>/$USER/gridx-<jobID> which must
198     be valid (<monitor_basedir> can be set by -M).
199    
200     Additional/alternate actions for -J mode:
201     -e update the <end_script> for the *running* <jobID> or for the <jobID>
202     rerun (if -R option is also given); does not work with -K option
203     -m update the e-mail notification option for job <jobID>
204     -K kill/abandon/terminate ALL pending tasks for grid job jobID
205     (trying to remove all running/pending tasks on the grid)
206     -R rerun/resubmit all the unfinished/unprocessed or unsuccessful tasks
207     for <GRID_JOB>; this assumes -K -J <JOB_ID> was given first (so there
208     are no pending tasks) and then will submit a new job in the same working
209     directory, renaming the GRID_JOBDIR accordingly while workers
210     will now *skip* all the tasks found with a "Done" status ('.') in the
211     <GRID_JOBDIR>/taskDb file
212     }; #'
213    
214    
215     RESUME_JOBID:
216     my @ar=@ARGV;
217     while ($ar[0] eq '-Z') { shift(@ar); shift(@ar); }
218     my $CMDLINE="$FindBin::Script\t".join("\t",@ar);
219     # parse script options
220     print STDERR "Running on $HOST (domain: $DOMAIN): $0 ".join(' ',@ARGV)."\n";
221     getopts('USWHM:O:NKRDTqvaJZ:L:u:r:x:y:i:Ff:n:t:d:s:p:m:g:b:e:c:C:') || die($usage."\n");
222     umask 0002;
223     if ($Getopt::Std::opt_H) {
224     print STDERR $usage;
225     die($morehints."\n");
226     }
227     my $SwitchDir=$Getopt::Std::opt_S;
228     $NORMAL_ENDING=0;
229     $GRID_DEBUG=$Getopt::Std::opt_D;
230     #print STDERR "Host=$HOST, Domain=$DOMAIN\n" if $GRID_DEBUG;
231    
232     my $GRID_DIRPREFIX=$Getopt::Std::opt_d;
233     my ($submitJob, $removeJob);
234     $GRID_ENGINE=lc($Getopt::Std::opt_g) if $Getopt::Std::opt_g;
235     if ($GRID_ENGINE eq 'sge') {
236     ($submitJob, $removeJob)=(\&submitJob_sge, \&removeJob_sge);
237     }
238     elsif ($GRID_ENGINE eq 'condor') {
239     ($submitJob, $removeJob)=(\&submitJob_condor, \&removeJob_condor);
240     }
241     elsif ($GRID_ENGINE eq 'smp') {
242     ($submitJob, $removeJob)=(\&submitJob_smp, \&removeJob_smp);
243     }
244     else {
245     die("Error: invalid grid engine given (only 'sge', 'condor' or 'smp' are accepted)!\n");
246     }
247     my $UniqueVM=$Getopt::Std::opt_U;
248    
249     my $UniqVMreq=$Getopt::Std::opt_u;
250     $UniqVMreq=int($UniqVMreq) if $UniqVMreq;
251     die("Error: use either -U or -u option, not both!\n") if $UniqueVM && $UniqVMreq>0;
252     # - exclude the following machines
253     my @xmachinelist=split(/\,/,$Getopt::Std::opt_x); #do NOT allow jobs to run on these machines
254     my @ymachinelist=split(/\,/,$Getopt::Std::opt_y); #only allow jobs to run on these machines, not others
255     #submitJob MUST use the globals:
256     # GRID_CMD, GRID_TASKLAST, GRID_MONHOME and update GRID_JOB
257    
258     $GRID_MONHOME=$Getopt::Std::opt_M if $Getopt::Std::opt_M;
259     #$GRID_MONHOME=$ENV{HOME} unless $GRID_MONHOME;
260     $GRID_MONHOME.='/'.$USER unless $GRID_MONHOME=~m/$USER$/;
261     #
262    
263     die("Error: directory $GRID_MONHOME should already be created! Aborting..\n")
264     unless (-d $GRID_MONHOME);
265    
266     my $mailnotify=$Getopt::Std::opt_m;
267     #-------- GLOBALs ---------------------------
268     my $GRID_JOBDIR;
269     my $GRID_JOB;
270     my $GRID_LOCKDIR;
271     my $GRID_TASKLAST;
272     my $GRID_CMD; # user's command and arguments
273     my $GRID_CMDFILE=$Getopt::Std::opt_f; # one line per run.. with arguments for command <cmd>
274     my $GRID_USECMDLIST=1 if $GRID_CMDFILE || $Getopt::Std::opt_F;
275    
276     my $GRID_PSXFASTA; #if -i was used
277     my $GRID_PSXSTEP; #if -i was used (-n value)
278     my $GRID_PSXSKIP=0; # -s option
279     my $GRID_PSXTOTAL=0; # -t option
280    
281     my $GRID_NUMPROCS=0; # -p option
282     my $GRID_RESUME=$Getopt::Std::opt_R || $Getopt::Std::opt_Z;
283     my $GRID_LOCAL_JOBDIR=$Getopt::Std::opt_L;
284     #---------- worker side global vars:
285     my $GRID_ENVSET=0; #was the environment set?
286     my $GRID_WORKER=0; # worker#
287     #my $GRID_NOWRKLINKS=1;
288     my $GRID_LOGDIR=$Getopt::Std::opt_O;
289     my $GRID_TASK; # dynamic -- task iteration#
290     my $TASK_ERRCOUNT; # dynamic -- current task error (retry) counter
291     my $TASK_DATA; # current task's user data as stored in taskDb
292     my $TASK_LOCKH; #file handle for the current task lock file
293     my $TASK_LOCKF; #file name for the current task lock file
294     my $GRID_WRKDIR; #only for the worker case, it's the current worker's subdirectory
295    
296     if ($Getopt::Std::opt_J) {
297     #################################################################
298     # gridx job monitoring use:
299     #---------------------------------------------------
300     # gridx -J [-m <e-mail>] [-M <mondir>] [-R | -K] <jobid>
301     #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
302     my ($jobid)=shift(@ARGV);
303     $jobid=~tr/ //d;
304     unless ($jobid) { # list mode
305     my $fmask="$GRID_MONHOME/gridx-*";
306     my @jdirs=<${fmask}>;
307     if (@jdirs>0) {
308     print STDOUT "The following jobs were found (in $GRID_MONHOME):\n";
309     foreach (@jdirs) {
310     my ($jobid)=(m/gridx\-(\w[\w\-]+)$/);
311     print " $jobid\n";
312     }
313     }
314     else {
315     print STDOUT "No jobs were found (in $GRID_MONHOME).\n";
316     }
317     $NORMAL_ENDING=1;
318     exit(0);
319     }
320     #a valid $jobid was given, hopefully
321     $jobid=lc($jobid);
322     my $subdir='gridx-'.$jobid;
323     my $jobdir="$GRID_MONHOME/gridx-$jobid";
324     unless (-d $jobdir) { #try current directory..
325     if (-d $subdir) {
326     $jobdir=$subdir;
327     }
328     else {
329     die "No such job found($jobid) - neither $jobdir nor $subdir exist!\n";
330     }
331     }
332     #print STDERR "..found jobdir='$jobdir'\n";
333     chdir($jobdir) || die("Error at chdir($jobdir)!\n");
334     #my $msg=jobSummary($mailnotify);
335     my $msg=jobSummary();
336     print STDOUT $msg."\n";
337     if ($Getopt::Std::opt_K) {
338     &$removeJob($jobid);
339     }
340     elsif ($Getopt::Std::opt_R) { #resume/rerun!
341     #chdir($jobdir) || die("Error at chdir($jobdir)!\n");
342     my @cmdfile=<cmdline-*.cmd>;
343     die "Error getting the cmdline-*.cmd from current directory!\n"
344     unless @cmdfile;
345     my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
346     die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
347     my $cmdline=readFile($cmdfile[0]);
348     chomp($cmdline);
349     my @args=split(/\t/,$cmdline);
350     die("$jobdir/$F_TASKDB and/or index not valid - cannot resume!\n")
351     unless -s $F_TASKDB && -s "$F_TASKDB.cidx";
352     shift(@args); #discard gridx command itself
353     chdir('..'); #go in the original working dir
354     $PWD=cwd(); #from Cwd module
355     $CMDLINE="$FindBin::Script\t".join("\t",@args);
356     @ARGV=('-Z',$jobid, @args);
357     undef($Getopt::Std::opt_J);
358     undef($Getopt::Std::opt_R);
359     goto RESUME_JOBID;
360     }
361     $NORMAL_ENDING=1;
362     exit(0);
363     }
364     elsif ($Getopt::Std::opt_W) {
365     ##########################################################
366     # gridx Worker use:
367     #---------------------------------------------------------
368     # This is the actual job that gets submitted
369     # gridx -W <cmd> <cmd_args>
370     # At runtime the environment should be set accordingly
371     #vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
372     beginWorker(); #set up environment and worker directory
373     #(wrk_NNNNN) and chdir() to it
374     $GRID_USECMDLIST=$Getopt::Std::opt_F;
375     my @cmd=@ARGV; # cmd and cmdargs
376     while (my $taskId = getNextTask()) {
377     runTask($taskId, @cmd);
378     }
379     my $runningworkers=endWorker();
380     my $exitcode=0;
381     if ($runningworkers==0) { #this was the last worker!
382     #run any JOB finishing/clean up code
383     chdir($GRID_JOBDIR);
384     my $epilogue=readFile($F_ENDCMD) if -s $F_ENDCMD;
385     if ($epilogue) {
386     chomp($epilogue);
387     #only run it if ALL tasks succeeded
388     my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
389     if ($tasksdone==$GRID_TASKLAST && !(-s $F_ERRTASKS)) {
390     runCmd($epilogue);
391     }
392     }
393     my $notify=readFile($F_NOTIFY) if -s $F_NOTIFY;
394     chomp($notify);
395     jobSummary($notify);
396     local *FDONE;
397     open(FDONE, '>'.$GRID_JOBDIR.'/'.$F_ALLDONE) || die("Error creating $GRID_JOBDIR/$F_ALLDONE\n");
398     print FDONE "done.\n";
399     close(FDONE);
400     } #last worker ending
401     $NORMAL_ENDING=1;
402     exit(0);
403     }# -- worker use case
404    
405    
406     ##########################################################
407     #
408     # gridx Submit use:
409     #
410     ##########################################################
411    
412     my $gridBeginCmd=$Getopt::Std::opt_b;
413     my $gridEndCmd=$Getopt::Std::opt_e;
414     $GRID_RESUME=$Getopt::Std::opt_Z; #caused by an initial -J -R request
415     $GRID_TASKLAST=$Getopt::Std::opt_r;
416     #--submit specific globals -- temporary
417     my $TASKDB;
418     my $JOBID;
419     #--
420     unless ($GRID_CMDFILE) {
421     $GRID_CMD=$Getopt::Std::opt_c || shift(@ARGV);
422     die "$usage\nError: no command given!\n" unless $GRID_CMD;
423     unless ($Getopt::Std::opt_v) {
424     $GRID_CMD = getCmdPath($GRID_CMD) ||
425     die "Error: command $GRID_CMD not found in PATH!\n";
426     }
427     $GRID_CMD.=' '.$Getopt::Std::opt_C if $Getopt::Std::opt_C;
428     $GRID_CMD.=' '.join(' ',@ARGV) if @ARGV;
429     }
430     else {
431     $GRID_CMD='.';
432     }
433     $GRID_NUMPROCS=$Getopt::Std::opt_p || 1; #numer of workers submitted
434    
435     if ($GRID_PSXFASTA=$Getopt::Std::opt_i) { #psx emulation case
436     die "Error: -r and -i are mutually exclusive options!\n" if $GRID_TASKLAST;
437     $GRID_PSXSTEP=$Getopt::Std::opt_n || 1;
438     #it will be moved into GRID_JOBDIR right after submit
439     $GRID_PSXSKIP=$Getopt::Std::opt_s || 0;
440     $GRID_PSXTOTAL=$Getopt::Std::opt_t || 0;
441     #$GRID_NOWRKLINKS=$Getopt::Std::opt_N;
442     }
443     else { $GRID_TASKLAST=1 unless $GRID_TASKLAST; } #one shot run
444     prepareTaskDb(); #builds a taskDb in the current directory for now
445     #checks global $GRID_PSXFASTA and $GRID_PSXSTEP
446     #sets TASKDB to the file name, and GRID_TASKLAST is updated as needed
447    
448     if ($gridBeginCmd) {
449     system($gridBeginCmd) && die("Error: exit status $? returned by prologue command: '$gridBeginCmd'\n");
450     }
451 gpertea 71 $JOBID = &$submitJob({PATH=> $binpath, PYTHONPATH=> $pythonpath,
452     LD_LIBRARY_PATH=>$libpath, PERLLIB=>$perllib, PERL5LIB=>$perl5lib} );
453 gpertea 23 #-- submitJob should also call setupJobDir() to create the gridx-<JOBID> subdirectory
454     #-- and move/rename the taskDb in there.
455     die("Error: no job ID has been obtained!\n") unless $JOBID;
456     if ($Getopt::Std::opt_T) { #wait for all children to finish
457     my $wstat;
458     chdir($GRID_JOBDIR);
459     do {
460     sleep(5);
461     } until (-e $F_ALLDONE);
462     my $msg=jobSummary($mailnotify);
463     my $exitcode = (-s $F_ERRTASKS) ? `wc -l $F_ERRTASKS` : 0;
464     chomp($exitcode);
465     print STDOUT $msg."\n";
466     chdir($PWD); #this should be the original working directory
467     $NORMAL_ENDING=1;
468     exit($exitcode);
469     }
470     $NORMAL_ENDING=1;
471     exit(0);
472     # if ($notify) {
473     # #qsub -o /dev/null -N "$cmd-$jobid-($numtasks)-watcher" -hold_jid $jobid -b y -j y $notify sleep 0
474     # my $sgecmd=`which sgepl`;
475     # qsub -o $GRID_MONHOME/.gridjob_$jobid/watcher.log -N "$cmdname-$jobid-($numtasks)-watcher" \
476     # -hold_jid $jobid -b y -j y -V /bin/tcsh -f $sgecmd -Done $cmdname $jobid $numtasks
477     # echo "Submitted watchdog job $cmdname-$jobid-($numtasks)-watcher:"
478     # echo " $sgecmd -Done $cmdname $jobid $numtasks"
479     # }
480    
481     #**********************************************************
482     #******************* SUBROUTINES **************************
483    
484    
485    
486     #******************** SUBMIT SIDE *************************
487    
488     =head2 ----------- taskDbRec -----------
489    
490     taskDbRec($fhdb, $jobId, $command_line..)
491    
492     Creates (writes) a record into the taskDb file open for writing
493     with the file handle $fhdb. This subroutine takes care of
494     formatting the fixed length defline (header) of the job record.
495    
496     This subroutine should only be used by the program which
497     creates the taskDb. After all the records are added to the taskDb;
498     the taskDb should be eventually indexed with cdbfasta.
499    
500     =cut
501    
502     sub taskDbRec {
503     my ($fh, $taskId, @userdata)=@_;
504     my $rec='>'.$taskId."\t{-|-|----|----|--------}\t{".('-' x 25).'}';
505     $rec.="\t".join(' ',@userdata) if @userdata;
506     print($fh $rec."\n");
507     }
508    
509    
510     sub prepareTaskDb {
511     if ($GRID_PSXFASTA) { #-i given, psx mode
512     $GRID_PSXFASTA=getFullPath($GRID_PSXFASTA, 1);
513     die ("Error: prepareTaskDb() called with PSXFASTA but no PSXSTEP!\n")
514     unless $GRID_PSXSTEP;
515     my $basename=getFName($GRID_PSXFASTA);
516     $TASKDB='gridx-psx-'.$basename.
517     ".n$GRID_PSXSTEP.s$GRID_PSXSKIP.t$GRID_PSXTOTAL".'.taskDb';
518     if ($GRID_RESUME) {
519     #taskDb must already be there!
520     my $r=`cdbyank -s gridx-$GRID_RESUME/taskDb.cidx`;
521     ($GRID_TASKLAST)=($r=~m/\nNumber of records:\s+(\d+)\n/s);
522     die "Error: couldn't determine number of records in gridx-$GRID_RESUME/taskDb\n"
523     unless $GRID_TASKLAST>0;
524     return;
525     }
526    
527     #-- iterate through the fasta file, create slice index
528     # and the index for the slice index..
529     local *TASKDB;
530     local *PSXFASTA;
531     open(PSXFASTA, $GRID_PSXFASTA) || die "Error opening file $GRID_PSXFASTA\n";
532     binmode(PSXFASTA);
533     open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
534     my $foffset=0;
535     my $rcount=0;
536     my $numrecs=0;
537     $GRID_TASKLAST=0;
538     while (<PSXFASTA>) {
539     my $linelen=length($_);
540     if (/^>/) {
541     $rcount++;
542     if ($rcount>$GRID_PSXSKIP) {
543     $numrecs++;
544     if (($numrecs-1) % $GRID_PSXSTEP == 0) {
545     $GRID_TASKLAST++;
546     taskDbRec(\*TASKDB, $GRID_TASKLAST, $foffset);
547     }
548     }
549     last if ($GRID_PSXTOTAL>0 && $numrecs>$GRID_PSXTOTAL);
550     }
551     $foffset+=$linelen;
552     }
553     close(PSXFASTA);
554     #&taskdbRec(\*TASKDB, $GRID_TASKLAST, $foffset)
555     # unless ($numrecs-1) % $GRID_PSXSTEP == 0;
556     close(TASKDB);
557     }
558     elsif ($GRID_CMDFILE) { # -f option given
559     return if ($GRID_RESUME); #taskDb must already be there!
560     $TASKDB="gridx.$GRID_CMDFILE.taskDb";
561     local *TASKDB;
562     open(CMDFILE, $GRID_CMDFILE) || die "Error opening file $GRID_CMDFILE\n";
563     open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
564     my $i=1;
565     while(<CMDFILE>) {
566     next if m/^\s*#/;
567     chomp;
568     taskDbRec(\*TASKDB, $i, $_);
569     $i++;
570     }
571     close(TASKDB);
572     close(CMDFILE);
573     $GRID_TASKLAST=$i-1;
574     }
575     elsif ($GRID_TASKLAST) { # -r option given
576     return if ($GRID_RESUME); #taskDb must already be there!
577     $TASKDB="gridx.r$GRID_TASKLAST.taskDb";
578    
579     local *TASKDB;
580     open(TASKDB, '>'.$TASKDB) || die "Error creating file $TASKDB ($!)\n";
581     for (my $i=1;$i<=$GRID_TASKLAST;$i++) {
582     taskDbRec(\*TASKDB, $i, $GRID_CMD);
583     }
584     close(TASKDB);
585     }
586     else { return; }
587     runCmd("cdbfasta $TASKDB");
588     }
589    
590     sub getFName {
591     return basename($_[0]);
592     }
593    
594     sub getFDir {
595     return dirname($_[0]);
596     }
597    
598     sub getFullPath {
599     my ($fname, $check)=@_;
600     die("Error: file $fname does not exist!\n") if $check && !-r $fname;
601     return abs_path($fname); #Cwd module
602     }
603    
604     #== getCmdPath -- checks for executable in the PATH if no full path given
605     # tests if the executable is a text file and interpreter was requested
606     sub getCmdPath {
607     my $cmd=$_[0];
608     my $fullpath;
609     my $checkBinary=wantarray();
610     if ($cmd =~ m/^\//) {
611     $fullpath = (-x $cmd) ? $cmd : '';
612     }
613     elsif ($cmd =~ m/^\.+\//) { #relative path given
614     $fullpath= (-x $cmd) ? abs_path($cmd) : '';
615     }
616     else { #we search in the path..
617     my @paths=split(/:/, $ENV{'PATH'});
618     foreach my $p (@paths) {
619     if (-x $p.'/'.$cmd) {
620     $fullpath=$p.'/'.$cmd;
621     last;
622     }
623     }
624     }#path searching
625     if ($checkBinary) { #asked for interpreter, if any
626     if ($fullpath) {
627     my $interpreter='';
628     if (-r $fullpath && -T $fullpath) {#readable text file, look for bang line
629     open(TFILE, $fullpath);
630     my $linesread=1;
631     while ($linesread<10) {#read only the first 10 lines..
632     $_=<TFILE>;
633     chomp;
634     if (m/^\s*#\!\s*(\S.+)/) {
635     $interpreter=$1;
636     last;
637     }
638     $linesread++;
639     }
640     $interpreter=~s/\s+$//;
641     }
642     return ($fullpath, $interpreter);
643     }
644     else { return (); } #cmd not found;
645     }
646     else { return $fullpath; }
647     }
648    
649     sub runCmd {
650     my ($cmd, $jobid)=@_;
651     my $exitstatus=system($cmd);
652     if ($exitstatus != 0) {
653     if ($jobid) {
654     &$removeJob($jobid);
655     }
656     die("Error at running system command: $cmd\n");
657     }
658     }
659    
660    
661     sub removeJob_sge {
662     my $jobid=shift(@_);
663     runCmd("qdel -f $jobid");
664     }
665    
666     sub removeJob_condor {
667     my $jobid=shift(@_); #machine+'_'+job#
668     #must be on the same machine that submit was issuedlh
669     my ($hostname, $job)=($jobid=~m/([\w\-]+)_(\d+)$/);
670     die("Error parsing hostname, job# from $jobid!\n")
671     unless $hostname && ($job>0);
672     #print STDERR "$hostname, $HOST, $jobid, $job\n";
673     if (lc($hostname) eq lc($HOST)) { #local host
674     runCmd("condor_rm $job");
675     }
676     else {
677     runCmd("condor_rm -name $hostname $job");
678     }
679     }
680    
681    
682     sub smpTaskReaper { # takes care of dead children $SIG{CHLD} = \&taskReaper;
683     my $childpid;
684     while (($childpid = waitpid(-1, WNOHANG)) > 0) {
685     $SMPChildren --;
686     delete $SMPChildren{$childpid};
687     }
688     $SIG{CHLD}=\&smpTaskReaper;
689     }
690    
691     sub smpTaskKiller { # signal handler for SIGINT
692     local($SIG{CHLD}) = 'IGNORE'; # we're going to kill our children
693     kill 'INT' => keys %SMPChildren;
694     }
695    
696     sub removeJob_smp {
697     taskKiller();
698    
699     }
700    
701     sub submitJob_sge {
702     my ($envhash)=@_;
703     # submit and array job, SGE style
704     my $array='';
705     if ($GRID_NUMPROCS > 1) {
706     $array="-t 1-$GRID_NUMPROCS";
707     }
708     #append our GRID_ environment
709     my $envparam="-v 'GRID_ENGINE=sge,".
710     "GRID_JOBDIR=$PWD,".
711     "GRID_TASKLAST=$GRID_TASKLAST,".
712     "GRID_MONHOME=$GRID_MONHOME,".
713     "GRID_CMD=$GRID_CMD,".
714     "GRID_DIRPREFIX=$GRID_DIRPREFIX";
715     $envparam.=",GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
716     $envparam.=",GRID_PSXFASTA=$GRID_PSXFASTA".
717     ",GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
718     $envparam.= ",GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
719     $envparam.= ",GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
720     $envparam.= ",GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR" if $GRID_LOCAL_JOBDIR;
721     if (keys(%$envhash)>0) {
722     $envparam.=',';
723     my @envars;
724     while (my ($env, $val)= each(%$envhash)) {
725     push(@envars, $env.'='.$val);
726     }
727     #$envparam.=" '".join(',',@envars)."'";
728     $envparam.=join(',',@envars);
729     }
730     $envparam.="'";
731     #--
732    
733     my $sub="qsub -cwd -b y $envparam";
734     #$sub.="-e $errout" if $errout;
735     #$sub.="-o $stdout" if $stdout;
736     my $logdir='';
737     if ($GRID_LOGDIR) {
738     #separate log dir given
739     $logdir=$GRID_LOGDIR;
740     if (-d $GRID_LOGDIR) {
741     print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
742     }
743     else {
744     mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
745     }
746     $logdir.='/' unless $logdir=~m/\/$/;
747     }
748     my $otherflags='';
749     $otherflags.=' -D' if $GRID_DEBUG;
750     #$otherflags.=' -N' if $GRID_NOWRKLINKS;
751     $otherflags.=' -F' if $GRID_USECMDLIST;
752     $otherflags.=' -S' if $SwitchDir;
753     my $subcmd= "$sub $array $PERL_BIN $0 $otherflags -W $GRID_CMD";
754     print STDERR "$subcmd\n" if $GRID_DEBUG;
755     my $subout = `$subcmd`;
756     #print STDOUT $subout;
757     #my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s);
758     my ($jobid)=($subout=~/Your\s+job\S*\s+(\d+)/s);
759     die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid);
760    
761     setupJobDir($jobid);
762    
763     return $jobid;
764     }
765    
766    
767     sub submitJob_smp {
768     my ($envhash)=@_;
769     my $jobid='smp_'.$$;
770     $ENV{GRID_ENGINE}='smp';
771     @ENV{'GRID_JOBDIR', 'GRID_TASKLAST', 'GRID_MONHOME', 'GRID_CMD','GRID_DIRPREFIX'}=
772     ($PWD."/gridx-$jobid", $GRID_TASKLAST, $GRID_MONHOME, $GRID_CMD,$GRID_DIRPREFIX);
773     $ENV{GRID_RESUME}=$GRID_RESUME;
774     $ENV{GRID_JOB}=$jobid;
775     @ENV{'GRID_PSXFASTA','GRID_PSXSTEP'}=($GRID_PSXFASTA, $GRID_PSXSTEP) if $GRID_PSXFASTA;
776     $ENV{GRID_PSXSKIP}=$GRID_PSXSKIP if $GRID_PSXSKIP;
777     $ENV{GRID_PSXTOTAL}=$GRID_PSXTOTAL if $GRID_PSXTOTAL;
778    
779     $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR."/gridx-$jobid" if $GRID_LOCAL_JOBDIR;
780     if (keys(%$envhash)>0) {
781     while (my ($env, $val)= each(%$envhash)) {
782     $ENV{$env}=$val;
783     }
784     }
785     # Fork off the children
786     my $pid;
787     setupJobDir($jobid); #we do this one in advance..
788     $SIG{CHLD}=\&smpTaskReaper;
789     print STDERR "..forking $GRID_NUMPROCS workers..\n" if $GRID_DEBUG;
790     my $logdir='';
791     if ($GRID_LOGDIR) {
792     #separate log dir given
793     $logdir=$GRID_LOGDIR;
794     if (-d $GRID_LOGDIR) {
795     print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
796     }
797     else {
798     mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
799     }
800     $logdir.='/' unless $logdir=~m/\/$/;
801     }
802     my $otherflags;
803     $otherflags=' -D' if $GRID_DEBUG;
804     #$otherflags.=' -N' if $GRID_NOWRKLINKS;
805     $otherflags.=' -F' if $GRID_USECMDLIST;
806     $otherflags.=' -S' if $SwitchDir;
807    
808     for (1 .. $GRID_NUMPROCS) {
809     die "Error at fork: $!" unless defined ($pid = fork);
810     if ($pid) { # Parent here
811     $SMPChildren{$pid} = 1;
812     $SMPChildren++;
813     next;
814     } else { #Child here
815     # Child can *not* return from this subroutine.
816     #$SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did before
817     exec("$PERL_BIN $0 $otherflags -W $GRID_CMD"); #never returns
818     }
819     }
820     $SIG{INT}=\&smpTaskKiller;
821     $SIG{TERM}=\&smpTaskKiller;
822     return $jobid;
823     }
824    
825     sub submitJob_condor {
826     my ($envhash)=@_;
827     my $jobid;
828     my $queue='queue';
829     $queue.=" $GRID_NUMPROCS" if ($GRID_NUMPROCS > 1);
830     #append our GRID_ environment
831     my $dprefix="gridx-$HOST".'_';
832     my $envparam="GRID_ENGINE=condor;".
833     "GRID_JOBDIR=$PWD/$dprefix\$(Cluster);".
834     "GRID_JOB=$HOST\_\$(Cluster);".
835     "GRID_TASKLAST=$GRID_TASKLAST;".
836     "GRID_MONHOME=$GRID_MONHOME;".
837     "GRID_CMD=$GRID_CMD;".
838     "GRID_DIRPREFIX=$GRID_DIRPREFIX";
839     $envparam.=";GRID_RESUME=$GRID_RESUME" if $GRID_RESUME;
840     $envparam.=";GRID_PSXFASTA=$GRID_PSXFASTA".
841     ";GRID_PSXSTEP=$GRID_PSXSTEP" if $GRID_PSXFASTA;
842     $envparam.= ";GRID_PSXSKIP=$GRID_PSXSKIP" if $GRID_PSXSKIP;
843     $envparam.= ";GRID_PSXTOTAL=$GRID_PSXTOTAL" if $GRID_PSXTOTAL;
844     $envparam.=';BLASTMAT='.$ENV{BLASTMAT} if $ENV{BLASTMAT};
845     $envparam.= ";GRID_LOCAL_JOBDIR=$GRID_LOCAL_JOBDIR/$dprefix\$(Cluster);" if $GRID_LOCAL_JOBDIR;
846     if (keys(%$envhash)>0) {
847     $envparam.=';';
848     my @envars;
849     while (my ($env, $val)= each(%$envhash)) {
850     push(@envars, $env.'='.$val);
851     }
852     $envparam.=join(';',@envars);
853     }
854     my $logdir='';
855     if ($GRID_LOGDIR) {
856     #separate log dir given
857     $logdir=$GRID_LOGDIR;
858     if (-d $GRID_LOGDIR) {
859     print STDERR "Warning: log dir $GRID_LOGDIR exists; existing files will be overwritten!\n";
860     }
861     else {
862     mkdir($logdir) || die("Error creating log dir '$logdir'!\n");
863     }
864     $logdir.='/' unless $logdir=~m/\/$/;
865     }
866     my $otherflags;
867     $otherflags=' -D' if $GRID_DEBUG;
868     #$otherflags.=' -N' if $GRID_NOWRKLINKS;
869     $otherflags.=' -F' if $GRID_USECMDLIST;
870     $otherflags.=' -S' if $SwitchDir;
871    
872     my $mtime=time();
873     my $cmdfile="condor.$$.t$mtime.$USER.$HOST.cmd";
874     local *CMDFILE;
875     open(CMDFILE, '>'.$cmdfile) || die "Cannot create $cmdfile!\n";
876     my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL" || Arch == "x86_64")';
877     #my $requirements = '(OpSys == "LINUX") && (Arch == "INTEL")';
878     my $force_slot;
879     if ($UniqVMreq>0) {
880     $force_slot=$UniqVMreq;
881     }
882     elsif ($UniqueVM) {
883     $force_slot=3+int(rand(9)); #this is messed up, only works on >11 core machines
884     }
885     $requirements .= ' && (VirtualMachineId == '.$force_slot.')' if $force_slot;
886     if (@xmachinelist>0) {
887     #map { $_='Machine != "'.$_.'.'.$DOMAIN.'"' } @xmachinelist;
888     #$requirements.= ' && ('.join(' && ',@xmachinelist).')';
889     if (@xmachinelist==1 && ($xmachinelist[0]=~tr/|^?*\\//)) {
890     $requirements.=' && (TRUE != regexp("'.$xmachinelist[0].'", Machine, "i"))';
891     }
892     else {
893     $requirements.=' && (TRUE != regexp("^('.join('|',@xmachinelist).
894     ')\..*", Machine, "i"))'
895     }
896     }
897     elsif (@ymachinelist>0) {
898     if (@ymachinelist==1 && ($ymachinelist[0]=~tr/|^?*\\//)>0) {
899     $requirements.=' && regexp("'.$ymachinelist[0].'", Machine, "i")';
900     }
901     else {
902     #map { $_='Machine == "'.$_.'.'.$DOMAIN.'"' } @ymachinelist;
903     #$requirements.= ' && ('.join(' || ',@ymachinelist).')';
904     $requirements.=' && regexp("^('.join('|',@ymachinelist).
905     ')\..*", Machine, "i")'
906     }
907     }
908     print CMDFILE qq{universe = vanilla
909     requirements = $requirements
910     notification = Never
911     executable = $0
912     initialdir = $PWD
913     };
914     if ($logdir) {
915     print CMDFILE "error = ${logdir}log_$dprefix\$(Cluster).\$(Process).stderr\n".
916     "output = ${logdir}log_$dprefix\$(Cluster).\$(Process).stdout\n";
917     }
918     print CMDFILE "arguments = $otherflags -W $GRID_CMD\n".
919     "environment = $envparam;\n".
920     "$queue\n";
921     close(CMDFILE);
922     my $subcmd="condor_submit $cmdfile";
923     my $subout = `$subcmd`;
924     ($jobid)=($subout=~/submitted\s+to\s+cluster\s+(\d+)\./s);
925     die "Error: No Job ID# could be parsed!\n($subout)" unless ($jobid);
926     $jobid=$HOST.'_'.$jobid;
927     setupJobDir($jobid);
928     #setupJobDir also chdirs() in the $GRID_JOBDIR
929     system("mv ../$cmdfile condor_submit.cmd");
930     return $jobid;
931     }
932    
933     sub jobDie {
934     my $jobid=shift @_;
935     print STDERR "Error: ".join("\n",@_)."\n";
936     &$removeJob($jobid);
937     die();
938     }
939    
940     sub setupJobDir {
941     my ($jobid)=@_;
942     my $jobdir = "gridx-$jobid";
943     jobDie($jobid, "job directory $jobdir already exists!")
944     if (-d $jobdir);
945     if ($GRID_RESUME) {
946     my $prevjobdir='gridx-'.$GRID_RESUME;
947     print STDERR " ..taking over jobdir: $prevjobdir\n";
948     unlink("$GRID_MONHOME/$prevjobdir") || warn(" couldn't unlink $GRID_MONHOME/$prevjobdir");
949     unlink("$prevjobdir/$F_LASTTASK");
950     unlink("$prevjobdir/$F_ALLDONE");
951     rename("$prevjobdir/$F_ERRTASKS", "$prevjobdir/prev_$F_ERRTASKS");
952     unlink("$prevjobdir/$F_RETRYTASKS");
953     system("/bin/rm -rf $prevjobdir/wrk_*/$F_WRKRUNNING");
954     system("/bin/rm -rf $prevjobdir/.wrk*");
955     system("/bin/rm -rf $prevjobdir/locks");
956     system("/bin/rm -rf $prevjobdir/running");
957     system("mv $prevjobdir $jobdir") &&
958     jobDie($jobid, "cannot 'mv $prevjobdir $jobdir' - $! - Resuming failed!");
959     }
960     else {
961     mkdir($jobdir) || jobDie($jobid, "cannot create subdirectory $jobdir");
962     runCmd("mv $TASKDB $jobdir/taskDb", $jobid);
963     runCmd("mv $TASKDB.cidx $jobdir/taskDb.cidx", $jobid);
964     }
965    
966     mkdir("$jobdir/locks") || jobDie($jobid,
967     "cannot create subdirectory $jobdir/locks");
968     mkdir("$jobdir/running") || jobDie($jobid,
969     "cannot create subdirectory $jobdir/running");
970     if ($GRID_MONHOME ne $PWD) {
971     symlink("$PWD/$jobdir", "$GRID_MONHOME/$jobdir")
972     || jobDie($jobid, "cannot symlink $GRID_MONHOME/$jobdir");
973     }
974     #- CHDIR to the new GRID_JOBDIR
975     chdir($jobdir) || jobDie($jobid, "cannot chdir($jobdir) (from $PWD)!");
976     readFile($F_TASKSDONE);
977     my $cmdfile="cmdline-$GRID_TASKLAST.cmd";
978     local *FHND;
979     open(FHND, ">$cmdfile") || jobDie($jobid, "cannot create file $cmdfile\n");
980     print FHND $CMDLINE."\n";
981     close(FHND);
982     open(FHND, ">$F_WRKDIR");
983     print FHND "$PWD/$jobdir\n";
984     close(FHND);
985    
986     if ($mailnotify) {
987     open(FHND, ">$F_NOTIFY");
988     print FHND "$mailnotify\n";
989     close(FHND);
990     }
991    
992     if ($gridEndCmd) {
993     open(FHND, ">$F_ENDCMD") || die("Error creating file $jobdir/.$F_ENDCMD ($!)\n");
994     print FHND $gridEndCmd."\n";
995     close(FHND);
996     }
997     $GRID_JOBDIR = "$PWD/$jobdir";
998     print STDOUT "Job $jobid scheduled to run with GRID_JOBDIR = $PWD/$jobdir\n";
999     } #setupJobDir
1000    
1001    
1002     sub jobSummary {
1003     my ($mail) = @_;
1004     #assuming we're in GRID_JOBDIR directory
1005     #(either directly or by $GRID_MONHOME)!
1006     die "Error: cannot locate $F_TASKSDONE and $F_WRKDIR in current directory ($ENV{PWD})!\n"
1007     unless -f $F_TASKSDONE && -f $F_WRKDIR;
1008     my $tasksdone=readFile($F_TASKSDONE);chomp($tasksdone);
1009     $tasksdone=0 unless $tasksdone;
1010     my $wrkdir=readFile($F_WRKDIR);chomp($wrkdir);
1011     my ($jobid)=($wrkdir=~m/\/gridx\-(\w[\w\-]+)$/);
1012     die("Error: cannot parse jobid from $F_WRKDIR content ($wrkdir)\n") unless $jobid;
1013     my @cmdfile=<cmdline-*.cmd>;
1014     die "Error getting the cmdline-*.cmd from current directory!\n"
1015     unless @cmdfile;
1016     my ($numtasks)=($cmdfile[0]=~m/cmdline\-(\d+)/);
1017     die "Error parsing the number of tasks from $cmdfile[0]!\n" unless $numtasks>0;
1018     open(CMDFILE, $cmdfile[0]);
1019     my $cmdline=<CMDFILE>;
1020     chomp($cmdline);
1021     $cmdline=~s/\t/ /g;
1022     close(CMDFILE);
1023     unlink($F_NOTIFY) if -s $F_NOTIFY;
1024     my ($msg, $subj);
1025     my $sig='';
1026     if ($mail) {
1027     $sig = "\n\n-------------------------\n -= mail sent from $HOST";
1028     $sig.=" (worker $GRID_WORKER)" if $GRID_WORKER;
1029     $sig.=" \nWorking directory: $wrkdir]\n" if $wrkdir;
1030     $sig.=" \nCommand line: \n $cmdline\n" if $cmdline;
1031     $sig.=" =-\n";
1032     }
1033     if ($tasksdone!=$numtasks) {
1034     $msg="Summary of gridx job $jobid: $tasksdone tasks done out of $numtasks\n".
1035     "Check $wrkdir for more details.\n";
1036     $subj="gridx job $jobid (done $tasksdone out of $numtasks)";
1037     }
1038     else{
1039     $msg="gridx job $jobid - done all $numtasks tasks\n";
1040     $subj="gridx job $jobid Done (all $numtasks tasks)";
1041     }
1042     $msg.=$sig;
1043     send_mail( { to=>$mail, subj=>$subj, body=>$msg }) if $mail;
1044     return $msg;
1045     }
1046    
1047    
1048    
1049     ############## WORKER SIDE subroutines #####################
1050    
1051     #=================== taskDB handling =================
1052    
1053     =head2 taskDbStat (taskdb, taskId [,tstatus, cpu#, host, retries, exitcode])
1054    
1055     taskDbStat($taskdb, $taskId, [, $taskstatus, $CPUno,
1056     $host, $retrycount, $exitcode])
1057    
1058     gets/sets the status of a task in an existing taskdb file
1059    
1060     If only $taskdb and $taskId parameters are given it works
1061     as a getter and returns the whole $taskdb entry, either as a raw
1062     string or, if wantarray(), as:
1063    
1064     ($taskstatus, $userdata, $CPUno, $host, $lastexitcode, $startTime, $retrycount)
1065    
1066     ..where $startTime is in minutes since the epoch (time/60)
1067    
1068     If more than 2 parameters are given, it is a setter for the task status
1069     and the other task related data.
1070    
1071     Valid status values:
1072     '-' = queued/idle/unprocessed
1073     'r' = running (could be a retry)
1074     '.' = finished successfully
1075     'E' = finished with error ending (after max retries)
1076    
1077     Internal details:
1078     -Each record is assumed locked at the time of writing
1079     (no other processes are trying to update the same task record).
1080     -A taskdb record format is:
1081    
1082     >taskId\t{S|R|xxxx|dddd|mmmmmmmm}\t{hostname}[\t<additional data..>]
1083    
1084     where:
1085     S = running status ('-','r','.' or 'E')
1086     R = retry counter (0-9)
1087     xxxx = last exit code, in hex
1088     dddd = last CPU number, in hex (the wrk_<CPU> subdirectory)
1089     mmmmmmmm = start minute since the epoch (time/60) in hexadecimal
1090     hostname = fixed 25 char length machine name
1091     (blank padded as needed)
1092    
1093     Error protocol for the setter: when given $status is 'E'
1094     the retry counter is incremented and the actual status
1095     would be updated to 'E' if > $maxRetries, or back to '-' otherwise.
1096    
1097     =cut
1098    
1099     sub taskDbStat {
1100     my ($taskdb, $entry, $status, $dirno, $machine, $errcount, $exitcode)=@_;
1101     my $taskdbidx=$taskdb;
1102     $taskdbidx.='.cidx' unless ($taskdb=~s/\.cidx$//);
1103     my $tdberr="taskDbStat(".join(',',@_).") Error:";
1104     wrkDie("$tdberr db $taskdb (and/or index) not found!".
1105     "(pwd = $ENV{PWD})") unless (-e $taskdb && -e $taskdbidx);
1106     wrkDie("$tdberr no entry given!") unless $entry;
1107     local *TASKDB;
1108     my $dbpos= `cdbyank -a '$entry' -P $taskdbidx`;
1109     chomp($dbpos);
1110     wrkDie("$tdberr at retrieving pos of entry $entry\n")
1111     if ($? || $dbpos!~/^\d+$/);
1112    
1113     my $openmode = O_RDONLY | O_SYNC ;
1114     if ($status) { # setter code:
1115     wrkDie("$tdberr invalid update parameters ($status)")
1116     if (length($status)>1 || ($dirno && ($dirno>65535 || $dirno<1)));
1117     $openmode= O_RDWR | O_SYNC; # | O_DIRECT might be needed ?
1118     }
1119     sysopen(TASKDB, $taskdb, $openmode) ||
1120     wrkDie("$tdberr sysopening $taskdb failed!");
1121     binmode(TASKDB);
1122     unless (sysseek(TASKDB, $dbpos, SEEK_SET)) {
1123     close(TASKDB);
1124     wrkDie("$tdberr at sysseek() to $dbpos for $entry");
1125     }
1126     #----- read the next line and check the format
1127     my $targetstr='>'.$entry."\t{-|-|----|----|--------}\t{".('-' x 25)."}";
1128     local $/="\n";
1129     my $dbline=readline(*TASKDB);
1130     binmode(TASKDB);
1131     my ($pentry, $pstats, $pmachine, $userdata)=
1132     split(/\t/,$dbline,4);
1133     chomp($pmachine);chomp($userdata);
1134     my $tcheck=join("\t",$pentry,$pstats,$pmachine);
1135     if ($pentry ne '>'.$entry ||
1136     length($tcheck)!=length($targetstr)) {
1137     close(TASKDB);
1138     wrkDie("$tdberr invalid record format for '$entry' ".
1139     "pos $dbpos, Found:\n'$tcheck'\n ..instead of:\n'$targetstr'\n");
1140     }
1141    
1142     $pmachine=~tr/{} //d;
1143     $pstats=~tr/{}//d;
1144     my ($pstatus, $pfcount, $pxcode, $pdirno, $ptime)=split(/\|/,$pstats);
1145     $ptime=hex($ptime) || $ptime;
1146     $pfcount=0 unless $pfcount>0;
1147     #----
1148     if ($status) { #---- setter code:
1149     $status=lc($status);
1150     if ($status!~/^[\-r\.e]$/) {
1151     close(TASKDB);
1152     wrkDie("$tdberr invalid status provided ($status)!");
1153     }
1154     $dbpos+=length(">$entry\t{");
1155     sysseek(TASKDB, $dbpos, SEEK_SET);
1156     if (defined($exitcode)) {
1157     $pxcode = $exitcode>0 ? sprintf('%04x', $exitcode) : $pxcode;
1158     }
1159    
1160     $pfcount=$errcount if defined($errcount);
1161     if ($status eq 'r') { #mark this entry as "running"
1162     $ptime=sprintf('%08x',int(time()/60));
1163     $pxcode='----';
1164     }
1165     elsif ($status eq '-') { # mark this entry as "available" (idle)
1166     $ptime='--------';
1167     }
1168     $pmachine=$machine if $machine;
1169     $pmachine=sprintf('%25s',$pmachine);
1170     $dirno=$pdirno unless $dirno>=1;
1171     my $wstr=$status.'|'.int($pfcount).
1172     '|'.$pxcode.
1173     '|'.sprintf('%04x', $dirno).
1174     '|'.$ptime."}\t{".$pmachine.'}';
1175     if (length($ptime)>8) {
1176     die("Error writing into taskDb record, ptime='$ptime' is too long when writing:\n".
1177     "$wstr\n");
1178     }
1179     my $wlen=length($wstr);
1180     my $w=syswrite(TASKDB, $wstr, $wlen);
1181     binmode(TASKDB); # 'cause this flushes any pending I/O buffers
1182     if ($w!=$wlen) {
1183     close(TASKDB);
1184     wrkDie("$tdberr failed writing '$wstr' for $entry!");
1185     }
1186     close(TASKDB);
1187     return 1;
1188     } # setter
1189     else { # getter code
1190     close(TASKDB);
1191     if (wantarray()) { #retrieve the parsed list of values
1192     return ($pstatus, $userdata, hex($pdirno), $pmachine, int($pfcount),
1193     hex($pxcode), $ptime);
1194     }
1195     else { #return the raw taskDb line
1196     return($dbline);
1197     }
1198     } # getter
1199     }
1200    
1201    
1202     sub gridWorkerEnv {
1203     if ($_[0]) {
1204     $GRID_TASK=$_[0];
1205     $ENV{GRID_TASK}=$_[0];
1206     }
1207     return if $GRID_ENVSET;
1208     my $gridengine=lc($ENV{GRID_ENGINE});
1209     ( $GRID_JOBDIR, $GRID_TASKLAST, $GRID_RESUME, $GRID_MONHOME, $GRID_LOCAL_JOBDIR,
1210     $GRID_PSXFASTA, $GRID_PSXSTEP, $GRID_PSXSKIP, $GRID_PSXTOTAL, $GRID_DIRPREFIX, )=
1211     @ENV{'GRID_JOBDIR','GRID_TASKLAST','GRID_RESUME','GRID_MONHOME', 'GRID_LOCAL_JOBDIR',
1212     'GRID_PSXFASTA','GRID_PSXSTEP','GRID_PSXSKIP','GRID_PSXTOTAL', 'GRID_DIRPREFIX'};
1213     if ($gridengine eq 'sge') {
1214     #can't have dynamic environment variables in SGE
1215     $GRID_JOB=$ENV{JOB_ID};
1216     #only static ones have been prepared (GRID_ENGINE, GRID_TASKLAST)
1217     # with GRID_JOBDIR initially set to the submit working directory
1218     $GRID_JOBDIR.='/gridx-'.$GRID_JOB;
1219    
1220     $GRID_LOCAL_JOBDIR.='/gridx-'.$GRID_JOB if $GRID_LOCAL_JOBDIR;
1221     $GRID_PSXFASTA=$ENV{GRID_PSXFASTA};
1222     $GRID_PSXSTEP=$ENV{GRID_PSXSTEP};
1223     #dynamic ones are built now from SGE ones
1224     $ENV{GRID_JOBDIR}=$GRID_JOBDIR;
1225     $ENV{GRID_JOB}=$GRID_JOB;
1226     $ENV{GRID_LOCAL_JOBDIR}=$GRID_LOCAL_JOBDIR;
1227     }
1228     elsif ($gridengine eq 'condor' || $gridengine eq 'smp') {
1229     #condor should have all the environment in order
1230     $GRID_JOB=$ENV{GRID_JOB};
1231     }
1232     else {
1233     die("Error: Invalid GRID_ENGINE (Is this a valid worker run?)\n");
1234     }
1235     $GRID_LOCKDIR=$GRID_JOBDIR.'/locks';
1236     $GRID_ENVSET=1;
1237     }
1238    
1239     sub beginWorker {
1240     gridWorkerEnv(); #setup the worker environment
1241     my $maxretries=5; #just give some time for the submit script to catch up
1242     my $chdir=0; # (only if execution of a submitted task is extraordinarily fast)
1243     my $retries=0;
1244     while (!($chdir=chdir($GRID_JOBDIR))) {
1245     sleep(1);
1246     $retries++;
1247     last if $retries==$maxretries;
1248     }
1249     die("Worker error: cannot chdir to $GRID_JOBDIR") unless $chdir;
1250     # -- we are in GRID_JOBDIR now!
1251     my $errmsg="Worker PID $$ ($GRID_WORKER) aborted on $HOST..\n";
1252     my $fh=setXLock($F_WRKSTART,120) || die $errmsg; #update the count of started workers
1253     $GRID_WORKER=incFValue($fh, $F_WRKSTART);
1254     endXLock($fh);
1255     print STDERR "D: worker $GRID_WORKER assigned to host $HOST pid $$ at ".getTime()."\n" if $GRID_DEBUG;
1256     $GRID_WRKDIR=sprintf("wrk_%04d",$GRID_WORKER);
1257     $starting_dir=$GRID_JOBDIR;
1258     $starting_dir=~s/[^\/]+\/?$//;
1259     if ($GRID_LOCAL_JOBDIR) {
1260     print STDERR "D: creating local dir on $HOST: $GRID_LOCAL_JOBDIR/$GRID_WRKDIR\n" if $GRID_DEBUG;
1261     mkdir("$GRID_LOCAL_JOBDIR") unless -d $GRID_LOCAL_JOBDIR;
1262     system("/bin/rm -f $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1263     mkdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR") unless -d $GRID_LOCAL_JOBDIR;
1264     wrkDie("Error: couldn't create local worker directory $GRID_LOCAL_JOBDIR/$GRID_WRKDIR on $HOST!\n")
1265     unless (-d "$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1266     }
1267     #-- also updates the "currently running" counter
1268     unless (-d "$GRID_JOBDIR/$GRID_WRKDIR") { #worker directory doesn't exit
1269     unless (mkdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1270     die "Error at mkdir $GRID_JOBDIR/$GRID_WRKDIR ($!)!\n";
1271     }
1272     }
1273     else { #existing working directory
1274     my $frunning= "$GRID_WRKDIR/$F_WRKRUNNING";
1275     if (-f $frunning) { #weird -- should never happen..
1276     die "Error: another process running in $GRID_WRKDIR?!\n".`cat $frunning`."$errmsg\n";
1277     }
1278     else { #normal case: no worker-running semaphore there already
1279     local *RSEM;
1280     open(RSEM, '>'.$frunning) || die "Error creating $frunning file! ($!)\n$errmsg";
1281     print RSEM join(" ",int(time()/60), $HOST, $$)."\n";
1282     close(RSEM);
1283     }
1284     }
1285     if ($GRID_DIRPREFIX) {
1286     my $gwrkdir="../$GRID_DIRPREFIX".'_'.$GRID_WORKER;
1287     unlink($gwrkdir);
1288     symlink("gridx-$GRID_JOB/$GRID_WRKDIR", $gwrkdir) ||
1289     print STDERR "Warning: cannot symlink gridx-$GRID_JOB/$GRID_WRKDIR to $gwrkdir ($!)\n";
1290     #needed by PSX emulation
1291     }
1292     # we are in GRID_JOBDIR now - descend into wrk_<GRID_WORKER>
1293     $fh=setXLock($F_WRKCOUNT,70,3) || die $errmsg; # update the count of running workers
1294     incFValue($fh, $F_WRKCOUNT);
1295     endXLock($fh);
1296     chdir("$GRID_JOBDIR/$GRID_WRKDIR") ||
1297     die "Error at chdir($GRID_JOBDIR/$GRID_WRKDIR) ($!)\n";
1298    
1299     open(STDERR, ">wrk_err.log");
1300     print STDERR "worker $GRID_WORKER fully assigned to host $HOST PID $$\n" if $GRID_DEBUG;
1301     open(STDOUT, ">wrk_log.log");
1302     local *WRKGRAB;
1303     open(WRKGRAB, ">.on_$HOST");
1304     print WRKGRAB join("\t",$GRID_WRKDIR, $HOST, $$, 'start: '.getTime())."\n";
1305     close(WRKGRAB);
1306     $ENV{GRID_WORKER}=$GRID_WORKER;
1307     if ($SwitchDir) {
1308     chdir($starting_dir) || die "Error at chdir($starting_dir)!\n";
1309     }
1310     elsif ($GRID_LOCAL_JOBDIR) {
1311     chdir($GRID_LOCAL_JOBDIR/$GRID_WRKDIR) || die "Error at chdir($starting_dir)!\n";
1312     }
1313     }
1314    
1315     sub endWorker {
1316     return -1 unless $GRID_WORKER && $GRID_WRKDIR;
1317     #make sure we are back in the wrk_<GRID_WORKER> directory
1318     unless (chdir("$GRID_JOBDIR/$GRID_WRKDIR")) {
1319     die("ERROR: endWorker() could not change to $GRID_JOBDIR/$GRID_WRKDIR\n");
1320     }
1321    
1322     my $fh=setXLock($F_WRKCOUNT,70, 3) || die "Error updating the number of running workers!\n";
1323     # my $v=readFile($fh,0,$F_WRKCOUNT);chomp($v);
1324     my $v=incFValue($fh, $F_WRKCOUNT, -1);
1325     unlink($F_WRKRUNNING); #remove the "worker here" semaphore..
1326     if ($v<0) {
1327     endXLock($fh);
1328     die("Error: invalid number of workers ($v) reported in $GRID_JOBDIR/$F_WRKCOUNT\n")
1329     }
1330     endXLock($fh);
1331     print STDERR join("\t","D. worker $GRID_WRKDIR", $HOST, $$, 'finished at:',getTime())."\n" if $GRID_DEBUG;
1332    
1333     local *WRKGRAB;
1334     sysopen(WRKGRAB, ".on_$HOST", O_RDWR | O_SYNC );
1335     sysseek(WRKGRAB,-1,SEEK_END);
1336     print WRKGRAB "\tend: ".getTime()."\n";
1337     close(WRKGRAB);
1338     if ($GRID_LOCAL_JOBDIR) {
1339     my $r=system("cp -pr $GRID_LOCAL_JOBDIR/$GRID_WRKDIR/* $GRID_JOBDIR/$GRID_WRKDIR/");
1340     if ($r) {
1341     print STDERR "Error at copying $HOST local files back to $GRID_JOBDIR/$GRID_WRKDIR!\n";
1342     }
1343     system("/bin/rm -rf $GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1344     rmdir($GRID_LOCAL_JOBDIR); #it'll fail if there are other subdirs there, but that's OK
1345     }
1346     undef($GRID_WORKER);
1347     undef($GRID_WRKDIR);
1348     return $v; #returns the number of workers left running
1349     }
1350    
1351     sub writeFValue {
1352     my ($fh, $v)=@_;
1353     truncate($fh, 0);
1354     seek($fh,0,SEEK_SET);
1355     $v=int($v);
1356     print $fh $v."\n";
1357     return $v;
1358     }
1359    
1360     sub writeFList { #with truncate
1361     my ($fh, $listref, $delim)=@_;
1362     $delim='' unless $delim;
1363     seek($fh,0,SEEK_SET);
1364     truncate($fh,0);
1365     print $fh join($delim,@$listref);
1366     #truncate($fh, tell($fh));
1367     }
1368     #
1369     # incFValue(fhandle) => reads an int value from fhandle
1370     # increments it, writes it back
1371     # and returns it
1372     #
1373     sub incFValue {
1374     my ($fh, $finfo, $incv)=@_;
1375     seek($fh, 0, SEEK_SET);
1376     $incv=1 unless $incv;
1377     my $v=readFile($fh, 0, "incFValue(,,$incv) in $finfo");
1378     chomp($v);
1379     return writeFValue($fh, int($v)+$incv);
1380     }
1381    
1382    
1383     #******************** worker side:
1384     # getNextTask() -- returns a taskId for the next task to be processed
1385     # or 0 if no more
1386     # -ignore any entries which are already "done"
1387     # -look first into the "retry pool" ($F_RETRYTASKS) to get some tasks from there,
1388     # if any -- and removes that entry
1389     #--------------- resources used:
1390     # update $F_RETRYTASKS
1391     # update $F_LASTTASK
1392     # update ENV{GRID_TASK} and $GRID_TASK
1393     #
1394     sub getNextTask {
1395     #check for any tasks in the "retry" queue
1396     my ($taskID, $retries);
1397     my $sourcemsg;
1398     SKIP_DONE:
1399     $retries=0;
1400     if (-s "$GRID_JOBDIR/$F_RETRYTASKS") {
1401     my $hr=setXLock($F_RETRYTASKS,120,5)
1402     || wrkDie("Error locking $F_RETRYTASKS ($HOST, $GRID_WORKER)");
1403     my @errstack=readFile($hr,0,$F_RETRYTASKS); #format: taskid <space> #retries
1404     ($taskID, $retries)=split(/\s+/, shift(@errstack)); #fetch last
1405     if ($taskID>0) { #valid taskID to retry
1406     chomp($retries);
1407     writeFList($hr, \@errstack); #write the list back
1408     $sourcemsg=' from the retry pool.';
1409     }
1410     endXLock($hr);
1411     }
1412     NEXT_TASK:
1413     unless ($taskID) { #getting the next available task
1414     #no retry tasks, so get the next task not processed yet
1415     my $fh=setXLock($F_LASTTASK, 70, 3) ||
1416     wrkDie("Error locking $F_LASTTASK ($HOST, $GRID_WORKER)"); #30 /retries
1417     my $last=readFile($fh, 0, $F_LASTTASK);chomp($last);
1418     if ($last<$GRID_TASKLAST) { # valid one, take the next
1419     $taskID=writeFValue($fh,int($last)+1);
1420     }
1421     $sourcemsg='';
1422     endXLock($fh);
1423     }
1424     return undef unless $taskID;
1425     $GRID_TASK=$taskID;
1426     # lock this taskID
1427     $TASK_LOCKF="$GRID_JOBDIR/running/task_$taskID";
1428     #$TASK_LOCKF="running/task_$taskID";
1429     catchSigs(1); #install signal handler
1430     $TASK_LOCKH=setXLock($TASK_LOCKF, 70, 3);
1431     unless ($TASK_LOCKH) { #this SHOULD be available immediately..
1432     my $lockedby=`cat $TASK_LOCKF`;chomp($lockedby);
1433     print STDERR "WARNING: lock-fail on task $taskID $sourcemsg (previously locked in $TASK_LOCKF by [$lockedby])\n";
1434     #wrkDie("Error: couldn't get a lock on task $taskID..\n");
1435     undef($taskID);
1436     goto SKIP_DONE; # don't kill the worker, just move on
1437     }
1438     print $TASK_LOCKH "$HOST $$ ".sprintf("wrk_%04d",$GRID_WORKER)."\n";
1439     # check the status of this task:
1440     my ($tstatus, $tuserdata, $tdirno, $thost, $terrcount,
1441     $texcode, $tstartmin)=taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID);
1442     print STDERR ">task-$taskID assigned to worker $GRID_WORKER (on $HOST) $sourcemsg\n";
1443     if ($GRID_RESUME && $tstatus eq '.') {
1444     #skip this one, it's finished (according to taskDb!)
1445     print STDERR ">SKIP-done:$taskID \{$tstatus|$terrcount|$texcode|$tdirno|$tstartmin\}\t\{$thost\}\n";
1446     undef $taskID;
1447     undef $GRID_TASK;
1448     endXLock($TASK_LOCKH);
1449     catchSigs(0);
1450     unlink($TASK_LOCKF);
1451     undef($TASK_LOCKH);undef($TASK_LOCKF);
1452     goto SKIP_DONE;
1453     }
1454     #update status of this task to 'running'
1455     $TASK_ERRCOUNT=$retries;
1456     taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $taskID, 'r', $GRID_WORKER, $HOST, $TASK_ERRCOUNT);
1457     #--
1458     $GRID_TASK=$taskID;
1459     $TASK_DATA=$tuserdata;
1460     $ENV{'GRID_TASK'}=$taskID;
1461     $STARTED_GRID_TASK=$taskID;
1462     return $taskID;
1463     }
1464    
1465     ##############################################
1466     # runTask($taskID, @cmd)
1467     #------------------------
1468     # *runs into a ./wrk_NNNN subdirectory of GRID_JOBDIR
1469     # *employs $GRID_TASK, $TASK_DATA, $TASK_ERRCOUNT
1470     # *with GRID_PSXFASTA, it uses TASK_DATA and other GRID_PSX..
1471     # to prepare the fasta slice and pass it to the cmd
1472     # *on exit, it should:
1473     # - set a lock on the ../running/task_$taskID file
1474     # - IF error exit of cmd (non-zero exit status), use $TASK_ERRCOUNT+1 and $MAX_RETRIES
1475     # to determine if the job should be put in the $F_RETRYTASKS file
1476     # or if status should be set to 'E' in taskDb and the entry added
1477     # to $F_ERRTASKS
1478     # - IF successful exit : increment $F_TASKSDONE
1479     # - update taskDb with the current status
1480     # - remove the lock on ../running/task_$taskID and delete this file!
1481     #
1482     #---------------------------------------------
1483     sub runTask {
1484     my $taskID=shift(@_);
1485     my @cmd=@_;
1486     my $exitstatus;
1487     my $runcmd; #the actuall command run using system();
1488    
1489     catchSigs(1); #install signal handler (should have been done already in getNextTask())
1490     if ($GRID_LOCAL_JOBDIR) {
1491     wrkDie("Fatal: cannot chdir to local dir $GRID_LOCAL_JOBDIR/$GRID_WRKDIR!")
1492     unless chdir("$GRID_LOCAL_JOBDIR/$GRID_WRKDIR");
1493     }
1494    
1495     #if ($SwitchDir) {
1496     # we are currently in a wrk_* directory
1497     # chdir('../..'); #change to the original directory (where gridx was launched from)
1498     # }
1499     if ($GRID_PSXFASTA) { #psx emulation
1500     #prepare the fasta slice here
1501     my $fslice=sprintf('%s.slice-%08d',getFName($GRID_PSXFASTA), $GRID_TASK);
1502     #-- write the slicefile
1503     local *FDB;
1504     local *FSL;
1505     open(FSL, '>'.$fslice)
1506     || wrkDie("Cannot create fasta slice $fslice");
1507     open(FDB, $GRID_PSXFASTA) || wrkDie("Cannot open fasta db $GRID_PSXFASTA");
1508     seek(FDB, $TASK_DATA, SEEK_SET);
1509     local $/="\n";
1510     my $seqnext=$GRID_PSXSTEP*($GRID_TASK-1); #+GRID_PSXSKIP
1511     my $seqcount=0;
1512     my $seqmax=($GRID_PSXTOTAL<=0) ? 0 : $GRID_PSXTOTAL;
1513     while (<FDB>) {
1514     if (/^>/) { #record start
1515     $seqnext++;
1516     last if ($seqcount>=$GRID_PSXSTEP);
1517     last if $seqmax && ($seqnext>$seqmax);
1518     $seqcount++;
1519     }
1520     print FSL $_;
1521     }#while input line from fastadb
1522     close(FSL);
1523     close(FDB);
1524    
1525     $runcmd=shift(@cmd);
1526     $GRID_PSXSKIP=0 unless ($GRID_PSXSKIP);
1527     $GRID_PSXTOTAL=-1 unless ($GRID_PSXTOTAL>0);
1528     my $islast=($GRID_TASK==$GRID_TASKLAST) ? 1 : 0;
1529     # 1 2 3 4
1530     $runcmd.=' '.join(' ',$fslice, $seqcount, $GRID_TASK, $islast,
1531     # 5 6 7
1532     $GRID_PSXSKIP, $GRID_PSXTOTAL, @cmd);
1533    
1534     }
1535     elsif ($GRID_USECMDLIST) {
1536     # $TASK_DATA is the actual command to run
1537     $runcmd=$TASK_DATA;
1538     }
1539     else { #normal repeat cmd -- let the cmd use the ENV accordingly
1540     $runcmd=join(" ",@cmd);
1541     }
1542    
1543     #... get $exitstatus for the system() call
1544     print STDERR ">starting-task-$GRID_TASK by worker $GRID_WORKER (on $HOST): '$runcmd'\n" if $GRID_DEBUG;
1545     $exitstatus=system($runcmd);
1546     if ($SwitchDir) {
1547     chdir("$GRID_JOBDIR/$GRID_WRKDIR");
1548     }
1549     endTask($exitstatus); #taskID is taken from $GRID_TASK
1550     }
1551    
1552    
1553     sub catchSigs { # true/false
1554     if ($_[0]) {
1555     $SIG{INT}=\&sigHandler;
1556     $SIG{TERM}=\&sigHandler;
1557     }
1558     else {
1559     $SIG{INT}='DEFAULT';
1560     $SIG{TERM}='DEFAULT';
1561     }
1562     }
1563    
1564     sub sigHandler {
1565     my $signame=shift;
1566     wrkDie("Signal $signame caught for worker $$ on $HOST, aborting..\n");
1567     }
1568    
1569     sub toRetry {
1570     my ($taskID, $taskErrCount)=@_;
1571     return unless $taskID;
1572     $taskErrCount=1 unless $taskErrCount;
1573     my $hr=setXLock($F_RETRYTASKS, 70, 3);
1574     while (<$hr>) {
1575     chomp;
1576     my ($tid, $tec)=split(/\s+/);
1577     if ($tid==$taskID) { #double retry, don't bother
1578     endXLock($hr);
1579     return;
1580     }
1581     }
1582     fappend($hr, "$taskID\t$taskErrCount\n");
1583     endXLock($hr);
1584     }
1585    
1586     sub endTask {
1587     return unless $GRID_TASK;
1588     # we MUST be in GRID_WRKDIR
1589     chdir("$GRID_JOBDIR/$GRID_WRKDIR") || die("Error: failed to chdir to $GRID_JOBDIR/$GRID_WRKDIR!");
1590     my ($exitstatus)=@_;
1591    
1592     if (!$exitstatus && !$STARTED_GRID_TASK) { # could be a premature failure, like cdbyank not found, etc.
1593     print STDERR "scheduling $GRID_TASK for retry..\n";
1594     toRetry($GRID_TASK, $TASK_ERRCOUNT);
1595     endXLock($TASK_LOCKH) if $TASK_LOCKH;
1596     unlink($TASK_LOCKF);
1597     undef($TASK_LOCKH);undef($TASK_LOCKF);
1598     unlink($F_WRKRUNNING);
1599     catchSigs(0);
1600     undef $GRID_TASK;
1601     return;
1602     }
1603     my $dbstatus;
1604     if (defined($exitstatus)) {
1605     if ($exitstatus==0) { #success
1606     $dbstatus='.';
1607     # update $F_TASKSDONE
1608     my $fh=setXLock($F_TASKSDONE, 110, 5);
1609     if ($fh) {
1610     incFValue($fh, $F_TASKSDONE);
1611     endXLock($fh);
1612     }
1613     }
1614     else { #error status
1615     $TASK_ERRCOUNT++;
1616     print STDERR "task $GRID_TASK failed on $HOST (PID=$$, status='$exitstatus') (worker $GRID_WORKER); error count=$TASK_ERRCOUNT\n" if $GRID_DEBUG;
1617     if ($TASK_ERRCOUNT>$MAX_RETRIES) { #trash it
1618     my $he=setXLock($F_ERRTASKS, 70, 3);
1619     fappend($he, join("\t",$GRID_TASK,$exitstatus,$HOST,$GRID_WRKDIR)."\n");
1620     endXLock($he);
1621     $dbstatus='E';
1622     }
1623     else {#give it another retry chance
1624     toRetry($GRID_TASK, $TASK_ERRCOUNT);
1625     $dbstatus='-';
1626     }
1627     }
1628     endXLock($TASK_LOCKH) if $TASK_LOCKH;
1629     unlink($TASK_LOCKF) if $TASK_LOCKF;
1630     undef($TASK_LOCKH);undef($TASK_LOCKF);
1631     print STDERR "]task $GRID_TASK ended - updating taskDb with status code '$dbstatus'\n" if $GRID_DEBUG;
1632     taskDbStat("$GRID_JOBDIR/$F_TASKDB.cidx", $GRID_TASK, $dbstatus,
1633     $GRID_WORKER, $HOST, $TASK_ERRCOUNT, $exitstatus);
1634     } # defined $exitstatus
1635     else { #no exitstatus given, it was an interrupt signal or otherwise failed task
1636     print STDERR "task $GRID_TASK ended with undefined exit status\n" if $GRID_DEBUG;
1637     toRetry($GRID_TASK, $TASK_ERRCOUNT);
1638     endXLock($TASK_LOCKH) if $TASK_LOCKH;
1639     unlink($TASK_LOCKF) if $TASK_LOCKF;
1640     undef($TASK_LOCKH);undef($TASK_LOCKF);
1641     unlink($F_WRKRUNNING);
1642     }
1643     catchSigs(0);
1644     undef($STARTED_GRID_TASK);
1645     }
1646    
1647     sub wrkDie {
1648     my ($msg)=@_;
1649     endTask();
1650     my $grdwrk=$GRID_WORKER;
1651     endWorker();
1652     #remove any other locks left in this worker..
1653     die("Error at worker $grdwrk on $HOST:\n$msg\n");
1654     }
1655    
1656     #--onExit, onEnd trigger
1657     sub END {
1658     unless ($NORMAL_ENDING) {
1659     $NORMAL_ENDING=1; #to avoid recursion?
1660     endTask();
1661     endWorker();
1662     #remove all locks
1663     my @locks=keys(%Locks);
1664     foreach my $lock (@locks) {
1665     my $d=$Locks{$lock};
1666     endXLock($lock);
1667     }
1668     }
1669     }
1670    
1671     sub fappend {
1672     my $fh=shift(@_);
1673     seek($fh,0,SEEK_END);
1674     print $fh join("\n",@_);
1675     }
1676    
1677     sub getLockDir {
1678     my $fname=shift;
1679     my $basename=basename($fname);
1680     my $dirname=dirname($fname);
1681     my $lockdir="$basename";
1682     $lockdir=$dirname.'/'.$lockdir if $dirname;
1683     return ($lockdir, "by.$HOST.$$");
1684     }
1685    
1686     sub makeLockFile {
1687     #this is called by a node just before a file lock is requested
1688     my ($fname)=@_; #MUST be a path relative to GRID_JOBDIR
1689     if (index($fname,$GRID_JOBDIR.'/')==0) {
1690     #remove the full path, if there
1691     $fname=substr($fname, length($GRID_JOBDIR)+1);
1692     }
1693     my $fnlock=$fname;
1694     $fnlock=~tr/\/\\/--/s;
1695     my $nodelockf="$GRID_LOCKDIR/$fnlock-lock.by.$HOST.$$";
1696     #--
1697     $nodelockf.='-'.substr(time(),-4);
1698     #--
1699     my $fh;
1700     open($fh, '>'.$nodelockf) || die("Error creating lockfile $nodelockf!\n");
1701     my $thismin=int(time()/60); # time when the lock was first initiated
1702     print $fh "$HOST $$ $thismin $GRID_WORKER\n";
1703     close($fh);
1704     return ($GRID_LOCKDIR.'/'.$fnlock, $nodelockf);
1705     }
1706    
1707     sub getFileLock {
1708     #attempts to make $lockfile a hard link to $nodefile which is host/process dependent
1709     my ($lockfile, $nodefile)=@_;
1710     if (link($nodefile, $lockfile)) { #could create link to node lock file
1711     my @stat=stat($nodefile);
1712     my $linkcount=$stat[3];
1713     if ($linkcount>2) {
1714     print STDERR "WARNING: weird lnkcount=$linkcount ($GRID_WORKER, $HOST, task $GRID_TASK)!\n";
1715     }
1716     return ($linkcount>1); #should never be more than 2..
1717     }
1718     else { #cannot create link
1719     return undef;
1720     }
1721     }
1722    
1723     #attempts to aquire an exclusive lock on a specific file
1724     #-returns a file handle ref
1725     sub setXLock {
1726     my ($fname, $maxretries, $stalemin)=@_;
1727     $maxretries=80 unless $maxretries;
1728     #
1729     # -- default: locks older than this are
1730     # considered "stale" and removed!
1731     $stalemin=7200 unless $stalemin;
1732     my ($lockfile, $nodefile)=makeLockFile($fname);
1733     my $retries=0;
1734     my $startmin=int(time()/60); #the minute we started trying
1735     #----------- try mkdir
1736     my ($currentLocker, $lockage);
1737     my $lock_age;
1738     my $prevlock;
1739     my $haveLock;
1740    
1741     while (!($haveLock=getFileLock($lockfile, $nodefile))) {
1742     if ($retries>$maxretries) {
1743     #tried too many times?
1744     #check for stale lock forgotten here..
1745     if (-f $lockfile) {
1746     #anyone ELSE holding it?
1747     my $l=readFile($lockfile);
1748     my ($lhost, $lpid, $ltime, $worker)=split(/\s+/,$l);
1749     $prevlock="$lhost (pid $lpid, worker $worker)" if $lpid;
1750     # code to check for a stale lock:
1751     $lock_age=int(time()/60)-$ltime;
1752     if ($lock_age>$stalemin) { #previous lock is older than $stalemin minutes
1753     print STDERR "WARNING: removing stale $fname lock from $lhost PID $lpid (worker $worker), age $lockage minutes.\n";
1754     unlink($lockfile);
1755     next; # hopefully we'll get it next time, unless another node steals it..
1756     }
1757     }
1758     last; #too many retries;
1759     }
1760     $retries++;
1761     sleep(3); #pause 3 seconds between attempts
1762     } #----- while failing at getting a lock
1763     if ($haveLock) {
1764     local *FH;
1765     open(FH, ">$nodefile")
1766     || die "Error re-creating host lock file $nodefile ?! ($!)\n"; #should never happen, really
1767     my $thismin= int(time()/60);
1768     print FH "$HOST $$ $thismin $GRID_WORKER\n";
1769     #print FH "$HOST $$ $thismin $GRID_WORKER ".getTime()."\n";
1770     close(FH);
1771     # -- now it's safe to open the actual file being locked..
1772     # has to be a path relative to $GRID_JOBDIR
1773     $fname="$GRID_JOBDIR/$fname" unless $fname=~m/^[\/\\]/;
1774     # --
1775     # -- add O_DIRECT only if you notice syncing/flushing issues for small files
1776     # --
1777     #my $mode= O_RDWR | O_CREAT | O_SYNC | O_DIRECT;
1778     #my $mode=(-f $fname) ? '+<' : '+>'; #create file if not there..
1779     my $fh;
1780     #open($fh, $mode.$fname) || die "setXLock($fname) failed at open($mode.$fname): $!\n";
1781     sysopen($fh, $fname, $sysopen_mode) || die "setXLock($fname) failed at open ($fname): ($!)\n";
1782     $Locks{$fh}=[$lockfile, $nodefile];
1783     return $fh;
1784     }
1785     else {
1786     print STDERR "ERROR getting a lock on $fname ($lockfile -> $nodefile) after $retries attempts!\n";
1787     print STDERR " (blocked by: $prevlock of $lock_age minutes)\n" if $prevlock;
1788     unlink($nodefile);
1789     return undef;
1790     }
1791     }
1792    
1793     sub endXLock {
1794     my $fh=shift(@_);
1795     close($fh);
1796     my $d = delete($Locks{$fh});
1797     unless ($d) {
1798     print STDERR "WARNING at endXLock(): no Locks entry found for file handle $fh!\n";
1799     return;
1800     }
1801     my ($locklink, $nodefile)=@$d;
1802     unlink($locklink);
1803     #-- keep these around for debugging purposes:
1804     unlink($nodefile);
1805     }
1806    
1807     # readFile(fname/fglobref) : read first or all lines from a given file
1808     # or filehandle glob reference
1809     sub readFile {
1810     my ($f, $nonfatal, $context)=@_;
1811     $context=" ($context)" if $context;
1812     my ($fh, $open);
1813     if (ref($f) eq 'GLOB') { # file handle or glob reference..
1814     $fh=$f;
1815     $f='[fh]';
1816     }
1817     else { #scalar: string = filename
1818     #create if not there!
1819     my $mode=(-f $f) ? '+<' : '+>'; #create if not exists
1820     my $canopen=open($fh, $mode.$f);
1821     unless ($canopen) {
1822     return undef if $nonfatal;
1823     die "readFile($mode $f)$context task $GRID_TASK on $HOST, open error: $!\n";
1824     }
1825     $open=1;
1826     }
1827     local $/="\n";
1828     if (wantarray()) {
1829     my @r=<$fh>;
1830     close($fh) if $open;
1831     return @r;
1832     }
1833     else { #first line only
1834     my $line=<$fh> || '';
1835     close($fh) if $open;
1836     return $line;
1837     }
1838     }
1839    
1840     #===================================================
1841     # Mailer subroutine
1842     #--- only works for some Linux installations.. as it requires sendmail
1843     sub send_mail {
1844     my $hash=shift;
1845     $hash->{'from'}=$USER.'@'.$HOSTNAME
1846     unless defined($hash->{'from'});
1847     my $to=$hash->{'to'};
1848     unless ($to) {
1849     $hash->{'to'}=$USER.'@'.$DOMAIN;
1850     }
1851     else {
1852     $hash->{'to'}=$to.'@'.$DOMAIN unless $to =~ m/@/;
1853     }
1854    
1855     my $file;
1856     local *ADDFILE;
1857     if (defined($hash->{file})) {
1858     #warning: assumes it's a decent, short text file!
1859     local $/=undef; #read whole file
1860     open(ADDFILE, '<'.$hash->{file}) || return "Cannot open file ".$hash->{file}."\n";
1861     $file=<ADDFILE>;
1862     close ADDFILE;
1863     }
1864     my $body = $hash->{'body'};
1865     $body.=$file;
1866     #my $fh;
1867     local* FH;
1868     open(FH, '| /usr/lib/sendmail -t -oi') || die "Error: cannot open sendmail pipe!\n";
1869     print(FH "To: $hash->{to}\n");
1870     print(FH "From: $hash->{from}\n");
1871     print(FH "Subject: $hash->{subj}\n\n");
1872     print(FH $body);
1873     close(FH);
1874     }
1875    
1876     sub getTime {
1877     my $date=localtime();
1878     #get rid of the day so Sybase will accept it
1879     (my $wday,$date)=split(/\s+/,$date,2);
1880     return $date;
1881     }

Properties

Name Value
svn:executable *