my $g_dbtable = 'pg_sched';
my $g_clname = 'auto';
my $g_sets = 'all';
-my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -l <lock file>
+my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -w <lock file> -l <logfile>
-h Hostname of database containing schedule table
DEFAULT = $g_dbhost
-p Listening port of database containing schedule table
DEFAULT = $g_clname
-m Comma separated list of slony sets on the master. 'all' for all
DEFAULT = $g_sets
- -l Lockfile used to prevent concurrent execution of tasks.
+ -w Lockfile used to prevent concurrent execution of tasks.
DEFAULT = not used
+ -l File to log to.
+ DEFAULT = log to STDOUT instead
";
my @g_databases;
my @g_pronames;
my $g_errors;
my $g_lockfile;
+my $g_logfile;
-die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|l=s', 'clname|cl=s', 'msets|m=s');
+die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|w=s', 'clname|cl=s', 'msets|m=s', 'logfile|l=s');
if (defined($opt{host})) {
$g_dbhost = $opt{host};
if (defined($opt{msets})) {
$g_sets = $opt{msets};
}
+if (defined($opt{logfile})) {
+ $g_logfile = $opt{logfile};
+}
# If lockfile supplied check if the script is already running
if (defined($g_lockfile)) {
@g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname));
}
# Do all the work we have collected for all databases
-$g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable);
+$g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_logfile);
if ($g_errors > 0) {
warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@;
}
}
+sub logFile {
+ my $logfile = shift;
+ my $message = shift;
+
+ eval {
+ open(LOGFILE, ">>", $logfile);
+ flock(LOGFILE, 2);
+ print LOGFILE $message;
+ close (LOGFILE);
+ };
+ warn ("WARNING: unable to log to file $logfile\n") if $@;
+}
+
sub loadDatabases{
my $dbhost = shift;
my $dbport = shift;
my $dbuser = shift;
my $dbschema = shift;
my $dbtable = shift;
+ my $logfile = shift;
my @workers;
+ my $running = 0;
my @worker_result;
my $bad = 0;
foreach my $prodef (@$pronames) {
- my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable);
+ my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable, $logfile);
push(@workers,$t);
+ $running++;
}
- foreach (@workers) {
- @worker_result = $_->join;
- if ($worker_result[0] == true) {
- print("$worker_result[1] [SUCCEEDED]\n");
- }
- else {
- print("$worker_result[1] [FAILED]\n");
- $bad++;
+ while ($running > 0) {
+ foreach (@workers) {
+ if ($_->is_joinable()) {
+ @worker_result = $_->join;
+ $running--;
+ if ($worker_result[0] == true) {
+ print("$worker_result[1] [SUCCEEDED]\n") if (!defined($logfile));
+ }
+ else {
+ print("$worker_result[1] [FAILED]\n") if (!defined($logfile));
+ $bad++;
+ }
+ }
}
+ sleep(1);
}
return $bad;
}
my $dbuser = shift;
my $dbschema = shift;
my $dbtable = shift;
+ my $logfile = shift;
my $dsn;
my $dbh;
my $qw_schedname;
my @result;
my $success;
- my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
- my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);
- my $detail = $timestamp . " Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" .
+ my $run_timestamp = getTimestamp();
+ my $completed_timestamp;
+ my $detail = "Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" .
(defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: ";
my $typindex = 0;
my $start;
my $end;
if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) {
- eval {
-
+ eval {
eval {
$dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
$dbh = DBI->connect($dsn, $dbuser);
$start = gettimeofday();
$sth->execute();
$end = gettimeofday();
+ $completed_timestamp = getTimestamp();
while (my @result = $sth->fetchrow) {
$success = true;
$qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
- $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp WHERE id = ?;";
+ $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp, last_completed = ?::timestamp WHERE id = ?;";
$sth = $dbh->prepare($query);
- $sth->bind_param(1, $timestamp);
- $sth->bind_param(2, @$prodef[0]);
+ $sth->bind_param(1, $run_timestamp);
+ $sth->bind_param(2, $completed_timestamp);
+ $sth->bind_param(3, @$prodef[0]);
$sth->execute();
$sth->finish;
$dbh->disconnect();
$detail .= 'already running PID ' . @$prodef[8];
}
+ $detail = ($completed_timestamp // $run_timestamp) . " " . $detail;
+
+ if (defined($logfile)) {
+ logFile($logfile, $detail . ($success?' [SUCCEEDED]':' [FAILED]') . "\n");
+ }
+
return ($success, $detail);
}
+sub getTimestamp {
+ my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
+ my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);
+ return $timestamp;
+}