4 # Copyright: 26/06/2011: v1.0.0 Glyn Astill <glyn@8kb.co.uk>
5 # PostgreSQL 9.0+ (optional Slony-I)
7 # This script is a command-line utility to schedule execution of stored
8 # functions against databases in a PostgreSQL cluster, with support for
9 # selective execution based on the replication role of nodes in Slony-I
12 # The script is designed to be run from cron and has a maximum resolution
15 # This script is free software: you can redistribute it and/or modify
16 # it under the terms of the GNU General Public License as published by
17 # the Free Software Foundation, either version 3 of the License, or
18 # (at your option) any later version.
20 # This script is distributed in the hope that it will be useful,
21 # but WITHOUT ANY WARRANTY; without even the implied warranty of
22 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 # GNU General Public License for more details.
25 # You should have received a copy of the GNU General Public License
26 # along with this script. If not, see <http://www.gnu.org/licenses/>.
31 use Time::HiRes qw/gettimeofday/;
33 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
34 use Getopt::Long qw/GetOptions/;
35 Getopt::Long::Configure('no_ignore_case');
38 use constant false => 0;
39 use constant true => 1;
41 my $g_slony_cluster = "test_replication";
42 my $g_dbhost = 'localhost';
44 my $g_dbname = 'postgres';
45 my $g_dbuser = 'pg_sched';
46 my $g_dbschema = 'public';
47 my $g_dbtable = 'pg_sched';
48 my $g_clname = 'auto';
50 my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -w <lock file> -l <logfile>
51 -h Hostname of database containing schedule table
53 -p Listening port of database containing schedule table
55 -d Name of database containing schedule table
57 -U User to connect to databse with, also used as default user for all tasks
59 -n Name of schema containing schedule table
61 -t Name of schedule table
63 -cl Name of slony cluster. 'auto' to autodetect, 'off' to disable
65 -m Comma separated list of slony sets on the master. 'all' for all
67 -w Lockfile used to prevent concurrent execution of tasks.
70 DEFAULT = log to STDOUT instead
80 die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|w=s', 'clname|cl=s', 'msets|m=s', 'logfile|l=s');
82 if (defined($opt{host})) {
83 $g_dbhost = $opt{host};
85 if (defined($opt{port})) {
86 $g_dbport = $opt{port};
88 if (defined($opt{dbname})) {
89 $g_dbname = $opt{dbname};
91 if (defined($opt{user})) {
92 $g_dbuser = $opt{user};
94 if (defined($opt{schema})) {
95 $g_dbschema = $opt{schema};
97 if (defined($opt{table})) {
98 $g_dbtable = $opt{table};
100 if (defined($opt{lockfile})) {
101 $g_lockfile = $opt{lockfile};
103 if (defined($opt{clname})) {
104 $g_clname = $opt{clname};
106 if (defined($opt{msets})) {
107 $g_sets = $opt{msets};
109 if (defined($opt{logfile})) {
110 $g_logfile = $opt{logfile};
113 # If lockfile supplied check if the script is already running
114 if (defined($g_lockfile)) {
115 schedLock($g_lockfile, 'lock');
118 # Get a list of databases in the postgresql cluster
119 @g_databases = loadDatabases($g_dbhost, $g_dbport, $g_dbname, $g_dbuser);
121 # For each database get its replication status then get any work to do
123 foreach my $target_dbname (@g_databases) {
124 if ($g_clname ne 'off') {
125 $g_origin = checkSlony($g_dbhost, $g_dbport, $target_dbname, $g_dbuser, $g_clname, $g_sets);
130 @g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname));
132 # Do all the work we have collected for all databases
133 $g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_logfile);
136 warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@;
139 cleanExit($g_lockfile);
142 my $lockfile = shift;
144 if (defined($lockfile)) {
145 schedLock($lockfile, 'unlock');
156 mkdir($lockdir) or die "Can't make directory: $!";
158 die ("Script already running") if $@;
164 die ("unable to remove lock\n") if $@;
173 open(LOGFILE, ">>", $logfile);
175 print LOGFILE $message;
178 warn ("WARNING: unable to log to file $logfile\n") if $@;
192 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
194 $dbh = DBI->connect($dsn, $dbuser);
196 $dbh->do("SET application_name = 'pg_sched'");
198 $query = "SELECT datname FROM pg_catalog.pg_database WHERE datallowconn AND NOT datistemplate;";
200 $sth = $dbh->prepare($query);
203 while (my $databasename = $sth->fetchrow) {
204 push(@databases, $databasename);
208 warn ("ERROR: Loading databaees failed\n") if $@;
216 my $target_dbname = shift;
227 my $run_master = false;
229 $dsn = "DBI:Pg:dbname=$target_dbname;host=$dbhost;port=$dbport;";
231 $dbh = DBI->connect($dsn, $dbuser);
233 $dbh->do("SET application_name = 'pg_sched'");
235 if ($clname ne 'auto') {
236 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_namespace n WHERE n.nspname = ?;";
239 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
240 WHERE c.relname = 'sl_set' AND c.relkind = 'r' GROUP BY n.nspname;";
243 $sth = $dbh->prepare($query);
245 if ($clname ne 'auto') {
246 $sth->bind_param(1, "_" . $clname);
250 $clname = $sth->fetchrow;
252 if (defined($clname)) {
253 $qw_clname = $dbh->quote_identifier("_" . $clname);
255 $query = "SELECT count(*), sum(CASE WHEN s.set_origin = $qw_clname.getlocalnodeid(?::name) THEN 1 ELSE 0 END)
256 FROM $qw_clname.sl_set s" . (($sets ne 'all')?' WHERE s.set_id = ANY(?::integer[]);':';');
258 $sth = $dbh->prepare($query);
259 $sth->bind_param(1, "_" . $clname);
260 if ($sets ne 'all') {
261 $sth->bind_param(2, [ split(',', $sets) ]);
266 ($all_sets, $local_sets) = $sth->fetchrow;
268 if (($all_sets // 0) == ($local_sets // 0)) {
280 warn ("ERROR: Failed reading slony node config\n") if $@;
290 my $dbschema = shift;
292 my $node_role = shift;
293 my $target_dbname = shift;
301 if ($node_role ne 'D') {
302 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
304 $dbh = DBI->connect($dsn, $dbuser);
306 $dbh->do("SET application_name = 'pg_sched'");
308 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
310 $query = "SELECT p.id, p.datname, p.pronamespace, p.proname, p.proargs, p.proargtypes, COALESCE(p.usename,current_user), ?::text, p.running FROM $qw_schedname p
311 WHERE p.datname = ? AND (p.enabled = 'A' OR p.enabled = ?)
312 AND (p.last_run IS NULL OR
313 (date_trunc('m', current_timestamp)-date_trunc('m',p.last_run))::interval >= p.frequency)
315 (extract(EPOCH FROM date_trunc('m', current_timestamp-coalesce(frequency_offset,'0s'::interval)))::integer
316 %extract(EPOCH FROM frequency)::integer) = 0);";
318 $sth = $dbh->prepare($query);
319 $sth->bind_param(1, ($node_role?'O':'R'));
320 $sth->bind_param(2, $target_dbname);
321 $sth->bind_param(3, ($node_role?'O':'R'));
324 while (my @prodef = $sth->fetchrow) {
325 push(@pronames, \@prodef);
329 warn ("ERROR: Loading work failed\n") if $@;
336 my $pronames = shift;
341 my $dbschema = shift;
349 foreach my $prodef (@$pronames) {
350 my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable, $logfile);
354 while ($running > 0) {
356 if ($_->is_joinable()) {
357 @worker_result = $_->join;
359 if ($worker_result[0] == true) {
360 print("$worker_result[1] [SUCCEEDED]\n") if (!defined($logfile));
363 print("$worker_result[1] [FAILED]\n") if (!defined($logfile));
379 my $dbschema = shift;
390 my $run_timestamp = getTimestamp();
391 my $completed_timestamp;
392 my $detail = "Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" .
393 (defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: ";
398 if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) {
401 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
402 $dbh = DBI->connect($dsn, $dbuser);
403 $dbh->do("SET application_name = 'pg_sched'");
405 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
407 $query = "UPDATE $qw_schedname SET running = ? WHERE id = ?;";
408 $sth = $dbh->prepare($query);
409 $sth->bind_param(1, $$);
410 $sth->bind_param(2, @$prodef[0]);
416 warn ("WARNING: pre update of schedule failed\n");
419 $dsn = "DBI:Pg:dbname=@$prodef[1];host=$dbhost;port=$dbport;";
420 $dbh = DBI->connect($dsn, $dbuser);
421 $dbh->do("SET application_name = 'pg_sched'");
423 $sth = $dbh->prepare("SET session_authorization = ?");
424 $sth->bind_param(1, @$prodef[6]);
427 $query = "SELECT " . $dbh->quote_identifier(@$prodef[2]) . '.' . $dbh->quote_identifier(@$prodef[3]) . '(';
428 if (defined(@$prodef[4])) {
429 foreach my $arg (@{@$prodef[4]}) {
430 #$query = $query . ($typindex>0?',':'') . $dbh->quote($arg) . '::' . @{@$prodef[5]}[$typindex];
431 $query .= ($typindex>0?',':'') . '?::' . @{@$prodef[5]}[$typindex];
437 $sth = $dbh->prepare($query);
440 if (defined(@$prodef[4])) {
441 foreach my $arg (@{@$prodef[4]}) {
443 $sth->bind_param($typindex, $arg);
446 $start = gettimeofday();
448 $end = gettimeofday();
449 $completed_timestamp = getTimestamp();
451 while (my @result = $sth->fetchrow) {
453 $detail .= ($result[0] // 'NULL') . ' (Timing: ' . sprintf("%.2f sec", $end-$start) . ')';
459 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
460 $dbh = DBI->connect($dsn, $dbuser);
461 $dbh->do("SET application_name = 'pg_sched'");
463 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
465 $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp, last_completed = ?::timestamp WHERE id = ?;";
466 $sth = $dbh->prepare($query);
467 $sth->bind_param(1, $run_timestamp);
468 $sth->bind_param(2, $completed_timestamp);
469 $sth->bind_param(3, @$prodef[0]);
475 warn ("WARNING: post update of schedule failed\n");
481 warn ("WARNING: worker execution failed\n");
486 $detail .= 'already running PID ' . @$prodef[8];
489 $detail = ($completed_timestamp // $run_timestamp) . " " . $detail;
491 if (defined($logfile)) {
492 logFile($logfile, $detail . ($success?' [SUCCEEDED]':' [FAILED]') . "\n");
495 return ($success, $detail);
499 my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
500 my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);