]> git.8kb.co.uk Git - slony-i/pg_sched/blob - pg_sched.pl
Improve logging order by actually joining threads as they complete rather than waitin...
[slony-i/pg_sched] / pg_sched.pl
1 #!/usr/bin/perl
2
3 # Script: pg_sched.pl
4 # Copyright: 26/06/2011: v1.0.0 Glyn Astill <glyn@8kb.co.uk>
5 # PostgreSQL 9.0+ (optional Slony-I)
6 #
7 # This script is a command-line utility to schedule execution of stored
8 # functions against databases in a PostgreSQL cluster, with support for 
9 # selective execution based on the replication role of nodes in Slony-I 
10 # clusters.
11 #
12 # The script is designed to be run from cron and has a maximum resolution
13 # of 1 minute. 
14 #
15 # This script is free software: you can redistribute it and/or modify
16 # it under the terms of the GNU General Public License as published by
17 # the Free Software Foundation, either version 3 of the License, or
18 # (at your option) any later version.
19 #
20 # This script is distributed in the hope that it will be useful,
21 # but WITHOUT ANY WARRANTY; without even the implied warranty of
22 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 # GNU General Public License for more details.
24 #
25 # You should have received a copy of the GNU General Public License
26 # along with this script. If not, see <http://www.gnu.org/licenses/>.
27
28 use strict;
29 use warnings;
30 use threads;
31 use Time::HiRes qw/gettimeofday/;
32 use DBI;
33 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
34 use Getopt::Long qw/GetOptions/;
35 Getopt::Long::Configure('no_ignore_case');
36 use vars qw{%opt};
37
38 use constant false => 0;
39 use constant true  => 1;
40
41 my $g_slony_cluster = "test_replication";
42 my $g_dbhost = 'localhost';
43 my $g_dbport = 5432;
44 my $g_dbname = 'postgres';
45 my $g_dbuser = 'pg_sched';
46 my $g_dbschema = 'public';
47 my $g_dbtable = 'pg_sched';
48 my $g_clname = 'auto';
49 my $g_sets = 'all';
50 my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -w <lock file> -l <logfile>
51     -h      Hostname of database containing schedule table
52             DEFAULT = $g_dbhost
53     -p      Listening port of database containing schedule table 
54             DEFAULT = $g_dbport
55     -d      Name of database containing schedule table
56             DEFAULT = $g_dbname
57     -U      User to connect to databse with, also used as default user for all tasks
58             DEFAULT = $g_dbuser
59     -n      Name of schema containing schedule table
60             DEFAULT = $g_dbschema
61     -t      Name of schedule table
62             DEFAULT = $g_dbtable
63     -cl     Name of slony cluster. 'auto' to autodetect, 'off' to disable
64             DEFAULT = $g_clname
65     -m      Comma separated list of slony sets on the master. 'all' for all
66             DEFAULT = $g_sets
67     -w      Lockfile used to prevent concurrent execution of tasks.
68             DEFAULT = not used
69     -l      File to log to.
70             DEFAULT = log to STDOUT instead
71 ";
72
73 my @g_databases;
74 my $g_origin = false;
75 my @g_pronames;
76 my $g_errors;
77 my $g_lockfile;
78 my $g_logfile;
79
80 die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|w=s', 'clname|cl=s', 'msets|m=s', 'logfile|l=s');
81
82 if (defined($opt{host})) {
83     $g_dbhost = $opt{host};
84 }
85 if (defined($opt{port})) {
86     $g_dbport = $opt{port};
87 }
88 if (defined($opt{dbname})) {
89     $g_dbname = $opt{dbname};
90 }
91 if (defined($opt{user})) {
92     $g_dbuser = $opt{user};
93 }
94 if (defined($opt{schema})) {
95     $g_dbschema = $opt{schema};
96 }
97 if (defined($opt{table})) {
98     $g_dbtable = $opt{table};
99 }
100 if (defined($opt{lockfile})) {
101     $g_lockfile = $opt{lockfile};
102 }
103 if (defined($opt{clname})) {
104     $g_clname = $opt{clname};
105 }
106 if (defined($opt{msets})) {
107     $g_sets = $opt{msets};
108 }
109 if (defined($opt{logfile})) {
110     $g_logfile = $opt{logfile};
111 }
112
113 # If lockfile supplied check if the script is already running
114 if (defined($g_lockfile)) {
115     schedLock($g_lockfile, 'lock');
116 }
117
118 # Get a list of databases in the postgresql cluster
119 @g_databases = loadDatabases($g_dbhost, $g_dbport, $g_dbname, $g_dbuser);
120
121 # For each database get its replication status then get any work to do
122 undef @g_pronames;
123 foreach my $target_dbname (@g_databases) {
124     if ($g_clname ne 'off') {
125         $g_origin = checkSlony($g_dbhost, $g_dbport, $target_dbname, $g_dbuser, $g_clname, $g_sets);
126     }
127     else {
128         $g_origin = true;
129     }
130     @g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname));
131 }
132 # Do all the work we have collected for all databases
133 $g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_logfile);
134
135 if ($g_errors > 0) {
136     warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@;
137 }
138
139 cleanExit($g_lockfile);
140
141 sub cleanExit{
142     my $lockfile = shift;
143
144     if (defined($lockfile)) {
145         schedLock($lockfile, 'unlock');
146     }
147     exit(0);
148 }
149
150 sub schedLock{
151     my $lockdir = shift;
152     my $op = shift;
153
154     if ($op eq 'lock') {
155         eval {
156             mkdir($lockdir) or die "Can't make directory: $!";
157         };
158         die ("Script already running") if $@;
159     }
160     else {
161         eval {
162             rmdir $lockdir;
163         };
164         die ("unable to remove lock\n") if $@;
165     }
166 }
167
168 sub logFile {
169     my $logfile = shift;
170     my $message = shift;
171
172     eval {
173         open(LOGFILE, ">>", $logfile);
174         flock(LOGFILE, 2);
175         print LOGFILE $message;
176         close (LOGFILE);
177     };
178     warn ("WARNING: unable to log to file $logfile\n") if $@;
179 }
180
181 sub loadDatabases{
182     my $dbhost = shift;
183     my $dbport = shift;
184     my $dbname = shift;
185     my $dbuser = shift;
186     my $dsn;
187     my $dbh;
188     my $sth;
189     my $query;
190     my @databases;
191
192     $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
193     eval {
194         $dbh = DBI->connect($dsn, $dbuser);
195         {
196             $dbh->do("SET application_name = 'pg_sched'");
197             
198             $query = "SELECT datname FROM pg_catalog.pg_database WHERE datallowconn AND NOT datistemplate;";
199         
200             $sth = $dbh->prepare($query);
201             $sth->execute();
202
203             while (my $databasename = $sth->fetchrow) {
204                 push(@databases,  $databasename);
205             }
206         }
207     };
208     warn ("ERROR: Loading databaees failed\n") if $@;
209     
210     return @databases;
211 }
212
213 sub checkSlony{
214     my $dbhost = shift;
215     my $dbport = shift;
216     my $target_dbname = shift;
217     my $dbuser = shift;
218     my $clname = shift;
219     my $sets = shift;
220     my $qw_clname;
221     my $dsn;
222     my $dbh;
223     my $sth;
224     my $query;
225     my $local_sets;
226     my $all_sets;
227     my $run_master = false;
228     
229     $dsn = "DBI:Pg:dbname=$target_dbname;host=$dbhost;port=$dbport;";
230     eval {
231         $dbh = DBI->connect($dsn, $dbuser);
232         {
233             $dbh->do("SET application_name = 'pg_sched'");
234
235             if ($clname ne 'auto') {           
236                 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_namespace n WHERE n.nspname = ?;";
237             }
238             else {
239                 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
240                           WHERE c.relname = 'sl_set' AND c.relkind = 'r' GROUP BY n.nspname;";
241             }
242         
243             $sth = $dbh->prepare($query);
244
245             if ($clname ne 'auto') {
246                 $sth->bind_param(1, "_" . $clname);
247             }
248
249             $sth->execute();
250             $clname = $sth->fetchrow;
251
252             if (defined($clname)) {
253                 $qw_clname = $dbh->quote_identifier("_" . $clname);
254
255                 $query = "SELECT count(*), sum(CASE WHEN s.set_origin = $qw_clname.getlocalnodeid(?::name) THEN 1 ELSE 0 END)
256                         FROM $qw_clname.sl_set s" . (($sets ne 'all')?' WHERE s.set_id = ANY(?::integer[]);':';');
257
258                 $sth = $dbh->prepare($query);
259                 $sth->bind_param(1, "_" . $clname);
260                 if ($sets ne 'all') {
261                     $sth->bind_param(2, [ split(',', $sets) ]);
262                 }
263             
264                 $sth->execute();
265
266                 ($all_sets, $local_sets) = $sth->fetchrow;
267
268                 if (($all_sets // 0) == ($local_sets // 0)) {
269                     $run_master = true;
270                 }
271                 else {
272                     $run_master = false;
273                 }
274             }
275             else {
276                 $run_master = true;
277             }
278         }
279     };
280     warn ("ERROR: Failed reading slony node config\n") if $@;
281
282     return $run_master;
283 }
284
285 sub loadWork{
286     my $dbhost = shift;
287     my $dbport = shift;
288     my $dbname = shift;
289     my $dbuser = shift;
290     my $dbschema = shift;
291     my $dbtable = shift;
292     my $node_role = shift;
293     my $target_dbname = shift;
294     my $dsn;
295     my $dbh;
296     my $sth;
297     my $query;
298     my $qw_schedname;
299     my @pronames;
300
301     if ($node_role ne 'D') {    
302         $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
303         eval {
304             $dbh = DBI->connect($dsn, $dbuser);
305             {
306                 $dbh->do("SET application_name = 'pg_sched'");
307
308                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
309
310                 $query = "SELECT p.id, p.datname, p.pronamespace, p.proname, p.proargs, p.proargtypes, COALESCE(p.usename,current_user), ?::text, p.running FROM $qw_schedname p
311                         WHERE p.datname = ? AND (p.enabled = 'A' OR p.enabled = ?)
312                         AND (p.last_run IS NULL OR
313                             (date_trunc('m', current_timestamp)-date_trunc('m',p.last_run))::interval >= p.frequency)
314                         AND (p.isloose OR 
315                             (extract(EPOCH FROM date_trunc('m', current_timestamp-coalesce(frequency_offset,'0s'::interval)))::integer
316                             %extract(EPOCH FROM frequency)::integer) = 0);";
317     
318                 $sth = $dbh->prepare($query);
319                 $sth->bind_param(1, ($node_role?'O':'R'));
320                 $sth->bind_param(2, $target_dbname);
321                 $sth->bind_param(3, ($node_role?'O':'R'));
322                 $sth->execute();             
323     
324                 while (my @prodef = $sth->fetchrow) {
325                     push(@pronames,  \@prodef);
326                 }
327             }
328         };
329         warn ("ERROR: Loading work failed\n") if $@;
330     }
331
332     return @pronames;
333 }
334
335 sub doWork{
336     my $pronames = shift;
337     my $dbhost = shift;
338     my $dbport = shift;
339     my $dbname = shift;
340     my $dbuser = shift;
341     my $dbschema = shift;
342     my $dbtable = shift;
343     my $logfile = shift;
344     my @workers;
345     my $running = 0;
346     my @worker_result;
347     my $bad = 0;
348
349     foreach my $prodef (@$pronames) {
350         my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable, $logfile);
351         push(@workers,$t);
352         $running++;
353     }
354     while ($running > 0) {
355         foreach (@workers) {
356             if ($_->is_joinable()) {
357                 @worker_result = $_->join;
358                 $running--;
359                 if ($worker_result[0] == true) {
360                     print("$worker_result[1] [SUCCEEDED]\n") if (!defined($logfile));
361                 }
362                 else {
363                     print("$worker_result[1] [FAILED]\n") if (!defined($logfile));
364                     $bad++;
365                 }
366             }
367         }
368         sleep(1);
369     }
370     return $bad;
371 }
372
373 sub runWorker{
374     my $prodef = shift;
375     my $dbhost = shift;
376     my $dbport = shift;
377     my $dbname = shift;
378     my $dbuser = shift;
379     my $dbschema = shift;
380     my $dbtable = shift;
381     my $logfile = shift;
382
383     my $dsn;
384     my $dbh;
385     my $sth;
386     my $query;
387     my $qw_schedname;
388     my @result;
389     my $success;
390     my $run_timestamp = getTimestamp();
391     my $completed_timestamp;
392     my $detail = "Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" . 
393                 (defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: ";
394     my $typindex = 0;
395     my $start;
396     my $end;
397     
398     if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) {
399         eval {
400             eval {
401                 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
402                 $dbh = DBI->connect($dsn, $dbuser);
403                 $dbh->do("SET application_name = 'pg_sched'");
404
405                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
406
407                 $query = "UPDATE $qw_schedname SET running = ? WHERE id = ?;";
408                 $sth = $dbh->prepare($query);
409                 $sth->bind_param(1, $$);
410                 $sth->bind_param(2, @$prodef[0]);
411                 $sth->execute(); 
412                 $sth->finish;
413                 $dbh->disconnect();
414             };
415             if ($@) {
416                 warn ("WARNING: pre update of schedule failed\n");
417             }
418                                 
419             $dsn = "DBI:Pg:dbname=@$prodef[1];host=$dbhost;port=$dbport;";
420             $dbh = DBI->connect($dsn, $dbuser);
421             $dbh->do("SET application_name = 'pg_sched'");
422
423             $sth = $dbh->prepare("SET session_authorization = ?");
424             $sth->bind_param(1, @$prodef[6]);
425             $sth->execute();
426
427             $query = "SELECT " . $dbh->quote_identifier(@$prodef[2]) . '.' . $dbh->quote_identifier(@$prodef[3]) . '(';
428             if (defined(@$prodef[4])) {
429                 foreach my $arg (@{@$prodef[4]}) {
430                     #$query = $query . ($typindex>0?',':'') . $dbh->quote($arg) . '::' . @{@$prodef[5]}[$typindex];
431                     $query .= ($typindex>0?',':'') . '?::' . @{@$prodef[5]}[$typindex];
432                     $typindex++;
433                 }
434             }
435             $query .= ');';
436          
437             $sth = $dbh->prepare($query);
438
439             $typindex = 0;
440             if (defined(@$prodef[4])) {
441                 foreach my $arg (@{@$prodef[4]}) {
442                     $typindex++;
443                     $sth->bind_param($typindex, $arg);
444                 }
445             }
446             $start = gettimeofday();
447             $sth->execute();
448             $end = gettimeofday();
449             $completed_timestamp = getTimestamp(); 
450
451             while (my @result = $sth->fetchrow) {
452                 $success = true;
453                 $detail .= ($result[0] // 'NULL') . ' (Timing: ' . sprintf("%.2f sec", $end-$start) . ')';
454             }
455             $sth->finish;
456             $dbh->disconnect();
457
458             eval {
459                 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
460                 $dbh = DBI->connect($dsn, $dbuser);
461                 $dbh->do("SET application_name = 'pg_sched'");
462
463                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
464
465                 $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp, last_completed = ?::timestamp WHERE id = ?;";
466                 $sth = $dbh->prepare($query);
467                 $sth->bind_param(1, $run_timestamp);
468                 $sth->bind_param(2, $completed_timestamp);
469                 $sth->bind_param(3, @$prodef[0]);
470                 $sth->execute(); 
471                 $sth->finish;
472                 $dbh->disconnect();
473             };
474             if ($@) {
475                 warn ("WARNING: post update of schedule failed\n");
476             }
477         };
478         if ($@) {
479             $success = false;
480             $detail = $@;
481             warn ("WARNING: worker execution failed\n");
482         }
483     }
484     else {
485                 $success = false;
486                 $detail .= 'already running PID ' . @$prodef[8];
487     }
488
489     $detail = ($completed_timestamp // $run_timestamp) . " " . $detail;
490
491     if (defined($logfile)) {
492        logFile($logfile, $detail . ($success?' [SUCCEEDED]':' [FAILED]') . "\n");
493     }
494
495     return ($success, $detail);
496 }
497
498 sub getTimestamp {
499     my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
500     my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);
501     return $timestamp;
502 }