4 # Copyright: 26/06/2011: v1.0.0 Glyn Astill <glyn@8kb.co.uk>
5 # PostgreSQL 9.0+ (optional Slony-I)
7 # This script is a command-line utility to schedule execution of stored
8 # functions against databases in a PostgreSQL cluster, with support for
9 # selective execution based on the replication role of nodes in Slony-I
12 # The script is designed to be run from cron and has a maximum resolution
15 # This script is free software: you can redistribute it and/or modify
16 # it under the terms of the GNU General Public License as published by
17 # the Free Software Foundation, either version 3 of the License, or
18 # (at your option) any later version.
20 # This script is distributed in the hope that it will be useful,
21 # but WITHOUT ANY WARRANTY; without even the implied warranty of
22 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 # GNU General Public License for more details.
25 # You should have received a copy of the GNU General Public License
26 # along with this script. If not, see <http://www.gnu.org/licenses/>.
31 use Time::HiRes qw/gettimeofday/;
33 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
34 use Getopt::Long qw/GetOptions/;
35 Getopt::Long::Configure('no_ignore_case');
38 use constant false => 0;
39 use constant true => 1;
41 my $g_slony_cluster = "test_replication";
42 my $g_dbhost = 'localhost';
44 my $g_dbname = 'postgres';
45 my $g_dbuser = 'pg_sched';
46 my $g_dbschema = 'public';
47 my $g_dbtable = 'pg_sched';
48 my $g_clname = 'auto';
50 my $g_usage = "pg_sched.pl -h <db host> -p <db port> -d <db name> -U <db user> -n <schema> -t <table> -cl <slony clustername> -m <master sets> -l <lock file>
51 -h Hostname of database containing schedule table
53 -p Listening port of database containing schedule table
55 -d Name of database containing schedule table
57 -U User to connect to databse with, also used as default user for all tasks
59 -n Name of schema containing schedule table
61 -t Name of schedule table
63 -cl Name of slony cluster. 'auto' to autodetect, 'off' to disable
65 -m Comma separated list of slony sets on the master. 'all' for all
67 -l Lockfile used to prevent concurrent execution of tasks.
77 die $g_usage unless GetOptions(\%opt, 'host|H=s', 'port|p=i', 'dbname|d=s', 'user|U=s', 'schema|n=s', 'table|t=s', 'lockfile|l=s', 'clname|cl=s', 'msets|m=s');
79 if (defined($opt{host})) {
80 $g_dbhost = $opt{host};
82 if (defined($opt{port})) {
83 $g_dbport = $opt{port};
85 if (defined($opt{dbname})) {
86 $g_dbname = $opt{dbname};
88 if (defined($opt{user})) {
89 $g_dbuser = $opt{user};
91 if (defined($opt{schema})) {
92 $g_dbschema = $opt{schema};
94 if (defined($opt{table})) {
95 $g_dbtable = $opt{table};
97 if (defined($opt{lockfile})) {
98 $g_lockfile = $opt{lockfile};
100 if (defined($opt{clname})) {
101 $g_clname = $opt{clname};
103 if (defined($opt{msets})) {
104 $g_sets = $opt{msets};
107 # If lockfile supplied check if the script is already running
108 if (defined($g_lockfile)) {
109 schedLock($g_lockfile, 'lock');
112 # Get a list of databases in the postgresql cluster
113 @g_databases = loadDatabases($g_dbhost, $g_dbport, $g_dbname, $g_dbuser);
115 # For each database get its replication status then get any work to do
117 foreach my $target_dbname (@g_databases) {
118 if ($g_clname ne 'off') {
119 $g_origin = checkSlony($g_dbhost, $g_dbport, $target_dbname, $g_dbuser, $g_clname, $g_sets);
124 @g_pronames = (@g_pronames, loadWork($g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable, $g_origin, $target_dbname));
126 # Do all the work we have collected for all databases
127 $g_errors = doWork(\@g_pronames, $g_dbhost, $g_dbport, $g_dbname, $g_dbuser, $g_dbschema, $g_dbtable);
130 warn ("ERROR: Encountered $g_errors errors processing the schedule\n") if $@;
133 cleanExit($g_lockfile);
136 my $lockfile = shift;
138 if (defined($lockfile)) {
139 schedLock($lockfile, 'unlock');
150 mkdir($lockdir) or die "Can't make directory: $!";
152 die ("Script already running") if $@;
158 die ("unable to remove lock\n") if $@;
173 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
175 $dbh = DBI->connect($dsn, $dbuser);
177 $dbh->do("SET application_name = 'pg_sched'");
179 $query = "SELECT datname FROM pg_catalog.pg_database WHERE datallowconn AND NOT datistemplate;";
181 $sth = $dbh->prepare($query);
184 while (my $databasename = $sth->fetchrow) {
185 push(@databases, $databasename);
189 warn ("ERROR: Loading databaees failed\n") if $@;
197 my $target_dbname = shift;
208 my $run_master = false;
210 $dsn = "DBI:Pg:dbname=$target_dbname;host=$dbhost;port=$dbport;";
212 $dbh = DBI->connect($dsn, $dbuser);
214 $dbh->do("SET application_name = 'pg_sched'");
216 if ($clname ne 'auto') {
217 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_namespace n WHERE n.nspname = ?;";
220 $query = "SELECT substr(n.nspname,2) FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
221 WHERE c.relname = 'sl_set' AND c.relkind = 'r' GROUP BY n.nspname;";
224 $sth = $dbh->prepare($query);
226 if ($clname ne 'auto') {
227 $sth->bind_param(1, "_" . $clname);
231 $clname = $sth->fetchrow;
233 if (defined($clname)) {
234 $qw_clname = $dbh->quote_identifier("_" . $clname);
236 $query = "SELECT count(*), sum(CASE WHEN s.set_origin = $qw_clname.getlocalnodeid(?::name) THEN 1 ELSE 0 END)
237 FROM $qw_clname.sl_set s" . (($sets ne 'all')?' WHERE s.set_id = ANY(?::integer[]);':';');
239 $sth = $dbh->prepare($query);
240 $sth->bind_param(1, "_" . $clname);
241 if ($sets ne 'all') {
242 $sth->bind_param(2, [ split(',', $sets) ]);
247 ($all_sets, $local_sets) = $sth->fetchrow;
249 if (($all_sets // 0) == ($local_sets // 0)) {
261 warn ("ERROR: Failed reading slony node config\n") if $@;
271 my $dbschema = shift;
273 my $node_role = shift;
274 my $target_dbname = shift;
282 if ($node_role ne 'D') {
283 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
285 $dbh = DBI->connect($dsn, $dbuser);
287 $dbh->do("SET application_name = 'pg_sched'");
289 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
291 $query = "SELECT p.id, p.datname, p.pronamespace, p.proname, p.proargs, p.proargtypes, COALESCE(p.usename,current_user), ?::text, p.running FROM $qw_schedname p
292 WHERE p.datname = ? AND (p.enabled = 'A' OR p.enabled = ?)
293 AND (p.last_run IS NULL OR
294 (date_trunc('m', current_timestamp)-date_trunc('m',p.last_run))::interval >= p.frequency)
296 (extract(EPOCH FROM date_trunc('m', current_timestamp-coalesce(frequency_offset,'0s'::interval)))::integer
297 %extract(EPOCH FROM frequency)::integer) = 0);";
299 $sth = $dbh->prepare($query);
300 $sth->bind_param(1, ($node_role?'O':'R'));
301 $sth->bind_param(2, $target_dbname);
302 $sth->bind_param(3, ($node_role?'O':'R'));
305 while (my @prodef = $sth->fetchrow) {
306 push(@pronames, \@prodef);
310 warn ("ERROR: Loading work failed\n") if $@;
317 my $pronames = shift;
322 my $dbschema = shift;
328 foreach my $prodef (@$pronames) {
329 my ($t) = threads->new(\&runWorker, $prodef, $dbhost, $dbport, $dbname, $dbuser, $dbschema, $dbtable);
333 @worker_result = $_->join;
334 if ($worker_result[0] == true) {
335 print("$worker_result[1] [SUCCEEDED]\n");
338 print("$worker_result[1] [FAILED]\n");
351 my $dbschema = shift;
361 my ($g_year, $g_month, $g_day, $g_hour, $g_min, $g_sec) = (localtime(time))[5,4,3,2,1,0];
362 my $timestamp = sprintf ("%02d/%02d/%04d %02d:%02d:%02d", $g_day, $g_month+1, $g_year+1900, $g_hour, $g_min, $g_sec);
363 my $detail = $timestamp . " Schedule: $dbschema.$dbtable Task: id#@$prodef[0] @$prodef[1].@$prodef[2].@$prodef[3](" .
364 (defined(@$prodef[4])?join(', ',@{@$prodef[4]}):'') . ") Rep Role: @$prodef[7] User: @$prodef[6] Result: ";
369 if (!defined(@$prodef[8]) || !kill(0, @$prodef[8])) {
373 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
374 $dbh = DBI->connect($dsn, $dbuser);
375 $dbh->do("SET application_name = 'pg_sched'");
377 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
379 $query = "UPDATE $qw_schedname SET running = ? WHERE id = ?;";
380 $sth = $dbh->prepare($query);
381 $sth->bind_param(1, $$);
382 $sth->bind_param(2, @$prodef[0]);
388 warn ("WARNING: pre update of schedule failed\n");
391 $dsn = "DBI:Pg:dbname=@$prodef[1];host=$dbhost;port=$dbport;";
392 $dbh = DBI->connect($dsn, $dbuser);
393 $dbh->do("SET application_name = 'pg_sched'");
395 $sth = $dbh->prepare("SET session_authorization = ?");
396 $sth->bind_param(1, @$prodef[6]);
399 $query = "SELECT " . $dbh->quote_identifier(@$prodef[2]) . '.' . $dbh->quote_identifier(@$prodef[3]) . '(';
400 if (defined(@$prodef[4])) {
401 foreach my $arg (@{@$prodef[4]}) {
402 #$query = $query . ($typindex>0?',':'') . $dbh->quote($arg) . '::' . @{@$prodef[5]}[$typindex];
403 $query .= ($typindex>0?',':'') . '?::' . @{@$prodef[5]}[$typindex];
409 $sth = $dbh->prepare($query);
412 if (defined(@$prodef[4])) {
413 foreach my $arg (@{@$prodef[4]}) {
415 $sth->bind_param($typindex, $arg);
418 $start = gettimeofday();
420 $end = gettimeofday();
422 while (my @result = $sth->fetchrow) {
424 $detail .= ($result[0] // 'NULL') . ' (Timing: ' . sprintf("%.2f sec", $end-$start) . ')';
430 $dsn = "DBI:Pg:dbname=$dbname;host=$dbhost;port=$dbport;";
431 $dbh = DBI->connect($dsn, $dbuser);
432 $dbh->do("SET application_name = 'pg_sched'");
434 $qw_schedname = $dbh->quote_identifier($dbschema) . '.' . $dbh->quote_identifier($dbtable);
436 $query = "UPDATE $qw_schedname SET running = NULL, last_run = ?::timestamp WHERE id = ?;";
437 $sth = $dbh->prepare($query);
438 $sth->bind_param(1, $timestamp);
439 $sth->bind_param(2, @$prodef[0]);
445 warn ("WARNING: post update of schedule failed\n");
451 warn ("WARNING: worker execution failed\n");
456 $detail .= 'already running PID ' . @$prodef[8];
459 return ($success, $detail);