X-Git-Url: https://git.8kb.co.uk/?p=slony-i%2Fpg_sched;a=blobdiff_plain;f=pg_sched.pl;h=c921a84c62481fdc82b46003bb3e891c5e538af2;hp=a62b7c055c0153733d46475ca74ad83c5dece655;hb=HEAD;hpb=8bb56fec42d5dc92bceb83389d7b9bf8acb38061 diff --git a/pg_sched.pl b/pg_sched.pl index a62b7c0..c921a84 100755 --- a/pg_sched.pl +++ b/pg_sched.pl @@ -47,7 +47,7 @@ my $g_dbschema = 'public'; my $g_dbtable = 'pg_sched'; my $g_clname = 'auto'; my $g_sets = 'all'; -my $g_usage = "pg_sched.pl -h -p -d -U -n -t -cl -m -l +my $g_usage = "pg_sched.pl -h -p -d -U -n -t
-cl -m -w -l -h Hostname of database containing schedule table DEFAULT = $g_dbhost -p Listening port of database containing schedule table @@ -64,8 +64,10 @@ my $g_usage = "pg_sched.pl -h -p -d -U - DEFAULT = $g_clname -m Comma separated list of slony sets on the master. 'all' for all DEFAULT = $g_sets - -l Lockfile used to prevent concurrent execution of tasks. + -w Lockfile used to prevent concurrent execution of tasks. DEFAULT = not used + -l File to log to. + DEFAULT = log to STDOUT instead "; my @g_databases; @@ -73,8 +75,9 @@ my $g_origin = false; my @g_pronames; my $g_errors; my $g_lockfile; +my $g_logfile; -die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|l=s', 'clname|cl=s', 'msets|m=s'); +die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|w=s', 'clname|cl=s', 'msets|m=s', 'logfile|l=s'); if (defined($opt{host})) { $g_dbhost = $opt{host}; @@ -103,6 +106,9 @@ if (defined($opt{clname})) { if (defined($opt{msets})) { $g_sets = $opt{msets}; } +if (defined($opt{logfile})) { + $g_logfile = $opt{logfile}; +} # If lockfile supplied check if the script is already running if (defined($g_lockfile)) { @@ -124,7 +130,7 @@ foreach my $target_dbname (@g_databases) { @g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname)); } # Do all the work we have collected for all databases -$g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable); +$g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_logfile); if ($g_errors > 0) { warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@; @@ -159,6 +165,19 @@ sub schedLock{ } } +sub logFile { + my $logfile = shift; + my $message = shift; + + eval { + open(LOGFILE, ">>", $logfile); + flock(LOGFILE, 2); + print LOGFILE $message; + close (LOGFILE); + }; + warn ("WARNING: unable to log to file $logfile\n") if $@; +} + sub loadDatabases{ my $dbhost = shift; my $dbport = shift; @@ -321,23 +340,32 @@ sub doWork{ my $dbuser = shift; my $dbschema = shift; my $dbtable = shift; + my $logfile = shift; my @workers; + my $running = 0; my @worker_result; my $bad = 0; foreach my $prodef (@$pronames) { - my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable); + my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable, $logfile); push(@workers,$t); + $running++; } - foreach (@workers) { - @worker_result = $_->join; - if ($worker_result[0] == true) { - print("$worker_result[1] [SUCCEEDED]\n"); - } - else { - print("$worker_result[1] [FAILED]\n"); - $bad++; + while ($running > 0) { + foreach (@workers) { + if ($_->is_joinable()) { + @worker_result = $_->join; + $running--; + if ($worker_result[0] == true) { + print("$worker_result[1] [SUCCEEDED]\n") if (!defined($logfile)); + } + else { + print("$worker_result[1] [FAILED]\n") if (!defined($logfile)); + $bad++; + } + } } + sleep(1); } return $bad; } @@ -350,6 +378,7 @@ sub runWorker{ my $dbuser = shift; my $dbschema = shift; my $dbtable = shift; + my $logfile = shift; my $dsn; my $dbh; @@ -358,17 +387,16 @@ sub runWorker{ my $qw_schedname; my @result; my $success; - my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0]; - my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec); - my $detail = $timestamp . " Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" . + my $run_timestamp = getTimestamp(); + my $completed_timestamp; + my $detail = "Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" . (defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: "; my $typindex = 0; my $start; my $end; if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) { - eval { - + eval { eval { $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;"; $dbh = DBI->connect($dsn, $dbuser); @@ -418,6 +446,7 @@ sub runWorker{ $start = gettimeofday(); $sth->execute(); $end = gettimeofday(); + $completed_timestamp = getTimestamp(); while (my @result = $sth->fetchrow) { $success = true; @@ -433,10 +462,11 @@ sub runWorker{ $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable); - $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp WHERE id = ?;"; + $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp, last_completed = ?::timestamp WHERE id = ?;"; $sth = $dbh->prepare($query); - $sth->bind_param(1, $timestamp); - $sth->bind_param(2, @$prodef[0]); + $sth->bind_param(1, $run_timestamp); + $sth->bind_param(2, $completed_timestamp); + $sth->bind_param(3, @$prodef[0]); $sth->execute(); $sth->finish; $dbh->disconnect(); @@ -456,6 +486,17 @@ sub runWorker{ $detail .= 'already running PID ' . @$prodef[8]; } + $detail = ($completed_timestamp // $run_timestamp) . " " . $detail; + + if (defined($logfile)) { + logFile($logfile, $detail . ($success?' [SUCCEEDED]':' [FAILED]') . "\n"); + } + return ($success, $detail); } +sub getTimestamp { + my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0]; + my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec); + return $timestamp; +}