]> git.8kb.co.uk Git - slony-i/pg_sched/blob - pg_sched.pl
Add mailmap file
[slony-i/pg_sched] / pg_sched.pl
1 #!/usr/bin/perl
2
3 # Script: pg_sched.pl
4 # Copyright: 26/06/2011: v1.0.0 Glyn Astill <glyn@8kb.co.uk>
5 # PostgreSQL 9.0+ (optional Slony-I)
6 #
7 # This script is a command-line utility to schedule execution of stored
8 # functions against databases in a PostgreSQL cluster, with support for 
9 # selective execution based on the replication role of nodes in Slony-I 
10 # clusters.
11 #
12 # The script is designed to be run from cron and has a maximum resolution
13 # of 1 minute. 
14 #
15 # This script is free software: you can redistribute it and/or modify
16 # it under the terms of the GNU General Public License as published by
17 # the Free Software Foundation, either version 3 of the License, or
18 # (at your option) any later version.
19 #
20 # This script is distributed in the hope that it will be useful,
21 # but WITHOUT ANY WARRANTY; without even the implied warranty of
22 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 # GNU General Public License for more details.
24 #
25 # You should have received a copy of the GNU General Public License
26 # along with this script. If not, see <http://www.gnu.org/licenses/>.
27
28 use strict;
29 use warnings;
30 use threads;
31 use Time::HiRes qw/gettimeofday/;
32 use DBI;
33 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
34 use Getopt::Long qw/GetOptions/;
35 Getopt::Long::Configure('no_ignore_case');
36 use vars qw{%opt};
37
38 use constant false => 0;
39 use constant true  => 1;
40
41 my $g_slony_cluster = "test_replication";
42 my $g_dbhost = 'localhost';
43 my $g_dbport = 5432;
44 my $g_dbname = 'postgres';
45 my $g_dbuser = 'pg_sched';
46 my $g_dbschema = 'public';
47 my $g_dbtable = 'pg_sched';
48 my $g_clname = 'auto';
49 my $g_sets = 'all';
50 my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -l <lock file>
51     -h      Hostname of database containing schedule table
52             DEFAULT = $g_dbhost
53     -p      Listening port of database containing schedule table 
54             DEFAULT = $g_dbport
55     -d      Name of database containing schedule table
56             DEFAULT = $g_dbname
57     -U      User to connect to databse with, also used as default user for all tasks
58             DEFAULT = $g_dbuser
59     -n      Name of schema containing schedule table
60             DEFAULT = $g_dbschema
61     -t      Name of schedule table
62             DEFAULT = $g_dbtable
63     -cl     Name of slony cluster. 'auto' to autodetect, 'off' to disable
64             DEFAULT = $g_clname
65     -m      Comma separated list of slony sets on the master. 'all' for all
66             DEFAULT = $g_sets
67     -l      Lockfile used to prevent concurrent execution of tasks.
68             DEFAULT = not used
69 ";
70
71 my @g_databases;
72 my $g_origin = false;
73 my @g_pronames;
74 my $g_errors;
75 my $g_lockfile;
76
77 die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|l=s', 'clname|cl=s', 'msets|m=s');
78
79 if (defined($opt{host})) {
80     $g_dbhost = $opt{host};
81 }
82 if (defined($opt{port})) {
83     $g_dbport = $opt{port};
84 }
85 if (defined($opt{dbname})) {
86     $g_dbname = $opt{dbname};
87 }
88 if (defined($opt{user})) {
89     $g_dbuser = $opt{user};
90 }
91 if (defined($opt{schema})) {
92     $g_dbschema = $opt{schema};
93 }
94 if (defined($opt{table})) {
95     $g_dbtable = $opt{table};
96 }
97 if (defined($opt{lockfile})) {
98     $g_lockfile = $opt{lockfile};
99 }
100 if (defined($opt{clname})) {
101     $g_clname = $opt{clname};
102 }
103 if (defined($opt{msets})) {
104     $g_sets = $opt{msets};
105 }
106
107 # If lockfile supplied check if the script is already running
108 if (defined($g_lockfile)) {
109     schedLock($g_lockfile, 'lock');
110 }
111
112 # Get a list of databases in the postgresql cluster
113 @g_databases = loadDatabases($g_dbhost, $g_dbport, $g_dbname, $g_dbuser);
114
115 # For each database get its replication status then get any work to do
116 undef @g_pronames;
117 foreach my $target_dbname (@g_databases) {
118     if ($g_clname ne 'off') {
119         $g_origin = checkSlony($g_dbhost, $g_dbport, $target_dbname, $g_dbuser, $g_clname, $g_sets);
120     }
121     else {
122         $g_origin = true;
123     }
124     @g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname));
125 }
126 # Do all the work we have collected for all databases
127 $g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable);
128
129 if ($g_errors > 0) {
130     warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@;
131 }
132
133 cleanExit($g_lockfile);
134
135 sub cleanExit{
136     my $lockfile = shift;
137
138     if (defined($lockfile)) {
139         schedLock($lockfile, 'unlock');
140     }
141     exit(0);
142 }
143
144 sub schedLock{
145     my $lockdir = shift;
146     my $op = shift;
147
148     if ($op eq 'lock') {
149         eval {
150             mkdir($lockdir) or die "Can't make directory: $!";
151         };
152         die ("Script already running") if $@;
153     }
154     else {
155         eval {
156             rmdir $lockdir;
157         };
158         die ("unable to remove lock\n") if $@;
159     }
160 }
161
162 sub loadDatabases{
163     my $dbhost = shift;
164     my $dbport = shift;
165     my $dbname = shift;
166     my $dbuser = shift;
167     my $dsn;
168     my $dbh;
169     my $sth;
170     my $query;
171     my @databases;
172
173     $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
174     eval {
175         $dbh = DBI->connect($dsn, $dbuser);
176         {
177             $dbh->do("SET application_name = 'pg_sched'");
178             
179             $query = "SELECT datname FROM pg_catalog.pg_database WHERE datallowconn AND NOT datistemplate;";
180         
181             $sth = $dbh->prepare($query);
182             $sth->execute();
183
184             while (my $databasename = $sth->fetchrow) {
185                 push(@databases,  $databasename);
186             }
187         }
188     };
189     warn ("ERROR: Loading databaees failed\n") if $@;
190     
191     return @databases;
192 }
193
194 sub checkSlony{
195     my $dbhost = shift;
196     my $dbport = shift;
197     my $target_dbname = shift;
198     my $dbuser = shift;
199     my $clname = shift;
200     my $sets = shift;
201     my $qw_clname;
202     my $dsn;
203     my $dbh;
204     my $sth;
205     my $query;
206     my $local_sets;
207     my $all_sets;
208     my $run_master = false;
209     
210     $dsn = "DBI:Pg:dbname=$target_dbname;host=$dbhost;port=$dbport;";
211     eval {
212         $dbh = DBI->connect($dsn, $dbuser);
213         {
214             $dbh->do("SET application_name = 'pg_sched'");
215
216             if ($clname ne 'auto') {           
217                 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_namespace n WHERE n.nspname = ?;";
218             }
219             else {
220                 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
221                           WHERE c.relname = 'sl_set' AND c.relkind = 'r' GROUP BY n.nspname;";
222             }
223         
224             $sth = $dbh->prepare($query);
225
226             if ($clname ne 'auto') {
227                 $sth->bind_param(1, "_" . $clname);
228             }
229
230             $sth->execute();
231             $clname = $sth->fetchrow;
232
233             if (defined($clname)) {
234                 $qw_clname = $dbh->quote_identifier("_" . $clname);
235
236                 $query = "SELECT count(*), sum(CASE WHEN s.set_origin = $qw_clname.getlocalnodeid(?::name) THEN 1 ELSE 0 END)
237                         FROM $qw_clname.sl_set s" . (($sets ne 'all')?' WHERE s.set_id = ANY(?::integer[]);':';');
238
239                 $sth = $dbh->prepare($query);
240                 $sth->bind_param(1, "_" . $clname);
241                 if ($sets ne 'all') {
242                     $sth->bind_param(2, [ split(',', $sets) ]);
243                 }
244             
245                 $sth->execute();
246
247                 ($all_sets, $local_sets) = $sth->fetchrow;
248
249                 if (($all_sets // 0) == ($local_sets // 0)) {
250                     $run_master = true;
251                 }
252                 else {
253                     $run_master = false;
254                 }
255             }
256             else {
257                 $run_master = true;
258             }
259         }
260     };
261     warn ("ERROR: Failed reading slony node config\n") if $@;
262
263     return $run_master;
264 }
265
266 sub loadWork{
267     my $dbhost = shift;
268     my $dbport = shift;
269     my $dbname = shift;
270     my $dbuser = shift;
271     my $dbschema = shift;
272     my $dbtable = shift;
273     my $node_role = shift;
274     my $target_dbname = shift;
275     my $dsn;
276     my $dbh;
277     my $sth;
278     my $query;
279     my $qw_schedname;
280     my @pronames;
281
282     if ($node_role ne 'D') {    
283         $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
284         eval {
285             $dbh = DBI->connect($dsn, $dbuser);
286             {
287                 $dbh->do("SET application_name = 'pg_sched'");
288
289                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
290
291                 $query = "SELECT p.id, p.datname, p.pronamespace, p.proname, p.proargs, p.proargtypes, COALESCE(p.usename,current_user), ?::text, p.running FROM $qw_schedname p
292                         WHERE p.datname = ? AND (p.enabled = 'A' OR p.enabled = ?)
293                         AND (p.last_run IS NULL OR
294                             (date_trunc('m', current_timestamp)-date_trunc('m',p.last_run))::interval >= p.frequency)
295                         AND (p.isloose OR 
296                             (extract(EPOCH FROM date_trunc('m', current_timestamp-coalesce(frequency_offset,'0s'::interval)))::integer
297                             %extract(EPOCH FROM frequency)::integer) = 0);";
298     
299                 $sth = $dbh->prepare($query);
300                 $sth->bind_param(1, ($node_role?'O':'R'));
301                 $sth->bind_param(2, $target_dbname);
302                 $sth->bind_param(3, ($node_role?'O':'R'));
303                 $sth->execute();             
304     
305                 while (my @prodef = $sth->fetchrow) {
306                     push(@pronames,  \@prodef);
307                 }
308             }
309         };
310         warn ("ERROR: Loading work failed\n") if $@;
311     }
312
313     return @pronames;
314 }
315
316 sub doWork{
317     my $pronames = shift;
318     my $dbhost = shift;
319     my $dbport = shift;
320     my $dbname = shift;
321     my $dbuser = shift;
322     my $dbschema = shift;
323     my $dbtable = shift;
324     my @workers;
325     my @worker_result;
326     my $bad = 0;
327
328     foreach my $prodef (@$pronames) {
329         my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable);
330         push(@workers,$t);
331     }
332     foreach (@workers) {
333         @worker_result = $_->join;
334         if ($worker_result[0] == true) {
335             print("$worker_result[1] [SUCCEEDED]\n");
336         }
337         else {
338             print("$worker_result[1] [FAILED]\n");
339             $bad++;
340         }
341     }
342     return $bad;
343 }
344
345 sub runWorker{
346     my $prodef = shift;
347     my $dbhost = shift;
348     my $dbport = shift;
349     my $dbname = shift;
350     my $dbuser = shift;
351     my $dbschema = shift;
352     my $dbtable = shift;
353
354     my $dsn;
355     my $dbh;
356     my $sth;
357     my $query;
358     my $qw_schedname;
359     my @result;
360     my $success;
361     my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
362     my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);
363     my $detail = $timestamp . " Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" . 
364                 (defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: ";
365     my $typindex = 0;
366     my $start;
367     my $end;
368     
369     if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) {
370             eval {
371                         
372             eval {
373                 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
374                 $dbh = DBI->connect($dsn, $dbuser);
375                 $dbh->do("SET application_name = 'pg_sched'");
376
377                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
378
379                 $query = "UPDATE $qw_schedname SET running = ? WHERE id = ?;";
380                 $sth = $dbh->prepare($query);
381                 $sth->bind_param(1, $$);
382                 $sth->bind_param(2, @$prodef[0]);
383                 $sth->execute(); 
384                 $sth->finish;
385                 $dbh->disconnect();
386             };
387             if ($@) {
388                 warn ("WARNING: pre update of schedule failed\n");
389             }
390                                 
391             $dsn = "DBI:Pg:dbname=@$prodef[1];host=$dbhost;port=$dbport;";
392             $dbh = DBI->connect($dsn, $dbuser);
393             $dbh->do("SET application_name = 'pg_sched'");
394
395             $sth = $dbh->prepare("SET session_authorization = ?");
396             $sth->bind_param(1, @$prodef[6]);
397             $sth->execute();
398
399             $query = "SELECT " . $dbh->quote_identifier(@$prodef[2]) . '.' . $dbh->quote_identifier(@$prodef[3]) . '(';
400             if (defined(@$prodef[4])) {
401                 foreach my $arg (@{@$prodef[4]}) {
402                     #$query = $query . ($typindex>0?',':'') . $dbh->quote($arg) . '::' . @{@$prodef[5]}[$typindex];
403                     $query .= ($typindex>0?',':'') . '?::' . @{@$prodef[5]}[$typindex];
404                     $typindex++;
405                 }
406             }
407             $query .= ');';
408          
409             $sth = $dbh->prepare($query);
410
411             $typindex = 0;
412             if (defined(@$prodef[4])) {
413                 foreach my $arg (@{@$prodef[4]}) {
414                     $typindex++;
415                     $sth->bind_param($typindex, $arg);
416                 }
417             }
418             $start = gettimeofday();
419             $sth->execute();
420             $end = gettimeofday();
421
422             while (my @result = $sth->fetchrow) {
423                 $success = true;
424                 $detail .= ($result[0] // 'NULL') . ' (Timing: ' . sprintf("%.2f sec", $end-$start) . ')';
425             }
426             $sth->finish;
427             $dbh->disconnect();
428
429             eval {
430                 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
431                 $dbh = DBI->connect($dsn, $dbuser);
432                 $dbh->do("SET application_name = 'pg_sched'");
433
434                 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
435
436                 $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp WHERE id = ?;";
437                 $sth = $dbh->prepare($query);
438                 $sth->bind_param(1, $timestamp);
439                 $sth->bind_param(2, @$prodef[0]);
440                 $sth->execute(); 
441                 $sth->finish;
442                 $dbh->disconnect();
443             };
444             if ($@) {
445                 warn ("WARNING: post update of schedule failed\n");
446             }
447         };
448         if ($@) {
449             $success = false;
450             $detail = $@;
451             warn ("WARNING: worker execution failed\n");
452         }
453     }
454     else {
455                 $success = false;
456                 $detail .= 'already running PID ' . @$prodef[8];
457     }
458
459     return ($success, $detail);
460 }
461