]> git.8kb.co.uk Git - slony-i/pgbouncer_follower/blob - pgbouncer_follower.pl
0b88b19dfd2f987af14dd3c9a32185ff015bb7c2
[slony-i/pgbouncer_follower] / pgbouncer_follower.pl
1 #!/usr/bin/perl
2
3 # Script:   pgbouncer_follower.pl
4 # Copyright:    22/04/2012: v1.0.1 Glyn Astill <glyn@8kb.co.uk>
5 # Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+
6 #
7 # This script is a command-line utility to monitor Slony-I clusters
8 # and reconfigure pgbouncer to follow replication sets.
9 #
10 # This script is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This script is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this script.  If not, see <http://www.gnu.org/licenses/>.
22
23 use strict;
24 use warnings;
25 use DBI;
26 use v5.10.1;
27 use Getopt::Long qw/GetOptions/;
28 use Digest::MD5 qw/md5 md5_hex md5_base64/;
29 use Sys::Hostname;
30 use IO::Socket;
31 use Time::HiRes qw/usleep/;
32 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
33 Getopt::Long::Configure qw/no_ignore_case/;
34
35 use vars qw{%opt};
36
37 use constant false => 0;
38 use constant true  => 1;
39
40 my $g_usage = 'Pass configuration file: pool_follower.pl -f <configuration_path> [-D]  ';
41 my $g_debug = false;
42 my $g_pidfile = "/tmp/pgbouncer_follower_%mode.pid";
43 my $g_logfile = "/tmp/pgbouncer_follower_%mode.log";
44 my $g_poll_interval = 1000;
45 my $g_user = "slony";
46 my $g_pass;
47 my $g_clname = "replication";
48 my $g_clsets = "1";
49 my @g_conninfos;
50 my @g_cluster;                  # no_id, no_comment, no_prov, orig_sets, conninfo, dbname, host, port
51 my $g_status_file = "/tmp/pgbouncer_follower_%mode.status";
52 my $g_conf_template = "/etc/pgbouncer/pgbouncer_%mode.template";
53 my $g_conf_target = "/etc/pgbouncer/pgbouncer_%mode.ini";
54 my $g_reload_command = "/etc/init.d/pgbouncer_%mode reload";
55 my $g_mode = 'rw';
56 my $g_all_databases=false;
57 my ($year, $month, $day, $hour, $min, $sec);
58 my $change_time;
59 my $g_host = hostname;
60 my ($g_addr)=inet_ntoa((gethostbyname(hostname))[4]);
61 my $g_origins_only = false;
62
63 die $g_usage unless GetOptions(\%opt, 'config_file|f=s', 'daemon|D',) and keys %opt and ! @ARGV;
64
65 unless (getConfig($opt{config_file})){
66     print ("There was a problem reading the configuration.\n");
67 }
68 if (!defined($g_status_file)) {
69     $g_status_file = "/tmp/$g_clname.status";
70 }
71
72  
73 if ($g_debug) {
74     printLogLn($g_logfile, "DEBUG: Logging to my '$g_logfile'");
75     printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms"); 
76     printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets");
77     printLogLn($g_logfile, "\t Template config '$g_conf_template' Target config '$g_conf_target'");
78     printLogLn($g_logfile, "\t Reload command is '$g_reload_command'");
79     printLogLn($g_logfile, "\t Status stored in '$g_status_file'");
80     printLogLn($g_logfile, "\t Using local address for '$g_host' as '$g_addr'");
81     #printLogLn($g_logfile, "\t '$g_user' as '$g_pass'");
82 }
83
84 if (defined($opt{daemon})) {
85     printLogLn($g_logfile, "pgbouncer_follower starting up");   
86     if (writePID($g_pidfile)) {
87         while (true) {
88             doAll();
89             if ($g_debug) {
90                 printLogLn($g_logfile, "DEBUG: Sleeping for ${g_poll_interval}ms");
91             }
92             usleep($g_poll_interval * 1000);
93         }
94     }
95 }
96 else {
97     doAll();
98 }
99
100 cleanExit(0);
101
102 sub cleanExit {
103     if (defined($opt{daemon})) {
104         printLogLn($g_logfile, "pgbouncer_follower shutting down");    
105         removePID($g_pidfile);
106     }
107     exit(0);
108 }
109
110 sub doAll {
111     my $node_to;
112
113     foreach my $conninfo (@g_conninfos) {
114         eval {
115             @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets);
116             if ($g_debug) {
117                 printLogLn($g_logfile, "DEBUG: Cluster with " . scalar(@g_cluster) . " nodes read from conninfo: $conninfo");
118                 foreach (@g_cluster) {
119                     printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "<NONE>") . " " . @$_[4] . " " . (@$_[5] // "<NONE>") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " );
120                 }
121             }
122         };
123         if ($@) {
124             printLogLn($g_logfile, "ERROR: Failed using conninfo: $conninfo DETAIL: $@");
125         }
126         else {
127             last;
128         } 
129     }
130     unless (checkCluster($g_status_file)) {
131         if ($g_debug) {
132              printLogLn ($g_logfile, "DEBUG: Cluster status unchanged");
133         }
134     }
135     else {
136         printLogLn ($g_logfile, "Cluster status changed");
137         $node_to = generateConfig($g_conf_template, $g_conf_target, $g_mode, $g_all_databases, $g_clsets);
138         if (reloadConfig($g_reload_command)) {
139             printLogLn ($g_logfile, "Pool repointed to node #$node_to");
140         }
141     }
142 }
143
144 sub reloadConfig {
145     my $reload_command = shift;
146     my $success = true;
147     if(length($reload_command // '')) {
148         printLogLn($g_logfile, "Running '$reload_command'");
149         eval {
150             open(RELOAD, "-|", $reload_command . " 2>&1");
151             while (<RELOAD>) {
152                 printLogLn($g_logfile, $_);
153             }
154             close(RELOAD);
155             printLogLn($g_logfile, "Reload command has been run.");
156         };
157         if ($@) {
158             printLogLn($g_logfile, "ERROR: Failed to run reload command DETAIL: $@");
159             $success = false;
160         }
161     }
162     return $success;
163 }
164
165 sub generateConfig {
166     my $template = shift;
167     my $target = shift;
168     my $mode = shift;
169     my $all_databases = shift;
170     my $clsets = shift;
171
172     my $success = false;
173     my @sets_to_follow;
174     my @sets_origin;
175     my @sets_subscribed;
176     my $target_node_id;
177     my $target_db;
178     my $target_host;
179     my $target_sets;
180     my $target_port = 5432;
181     my $target_is_origin;
182
183     if ($g_debug) {
184         printLogLn($g_logfile, "DEBUG: All databases = " . ($g_all_databases ? 'true' : 'false'));
185     }
186
187     if (open(INFILE, "<", $template)) {
188         if (open(OUTFILE, ">", $target)) {
189             print OUTFILE "# Configuration file autogenerated at " . getRuntime() . " from $template\n";
190             foreach (<INFILE>) {
191                if (m/\[databases]/) {
192
193                     # Try and choose a node; we always assign the origin initially regardless of rw/ro status
194                     # when in ro mode and if we then  find a suitable subscriber we'll reassign to it.
195                     foreach my $node (@g_cluster) {
196                         if ($clsets ne 'all') {
197                             @sets_to_follow = split(',', $clsets);
198                             if (defined($node->[3])) {
199                                 @sets_origin =  split(',', $node->[3]);
200                             }
201                             else {
202                                 undef @sets_origin;
203                             }
204                             if (defined($node->[6])) {
205                                 @sets_subscribed =  split(',', $node->[6]);
206                             }
207                             else {
208                                 undef @sets_subscribed;
209                             }
210                         }
211
212                         if (($clsets eq 'all' && defined($node->[3])) || (@sets_to_follow && @sets_origin && checkProvidesAllSets(\@sets_to_follow, \@sets_origin))) {
213                             if (defined($node->[8])) {
214                                 $target_db = $node->[7];
215                                 $target_host = $node->[8];
216                                 $target_node_id = $node->[0];
217                                 $target_sets = $node->[3];
218                                 $target_is_origin = true;
219                             }
220                             if (defined($node->[9])) {
221                                 $target_port = $node->[9];
222                             }
223                             if ($mode eq "rw") {
224                                 last;
225                             }
226                         }
227                         elsif (($mode eq "ro") && (($clsets eq 'all') || (@sets_to_follow && @sets_subscribed && checkProvidesAllSets(\@sets_to_follow, \@sets_subscribed)))) {    
228                             if (defined($node->[8])) {
229                                 $target_db = $node->[7];
230                                 $target_host = $node->[8];
231                                 $target_node_id = $node->[0];
232                                 $target_sets = ($node->[6] // $node->[3]);
233                                 $target_is_origin = false;
234                             }
235                             if (defined($node->[9])) {
236                                 $target_port = $node->[9];
237                             }
238                             last;
239                         }
240                     }
241                     if (defined($target_host)) {
242                         $_ = "# Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port\n" . $_;
243                         if ($g_debug) {
244                             printLogLn ($g_logfile, "DEBUG: Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port");
245                         }
246                         if ($all_databases) {
247                             $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port/;
248                         }
249                         else {
250                             $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db/;
251                         }
252                     }
253                     else {
254                             $_ = "# Could not find any node providing sets $g_clsets in mode $mode\n";
255                             printLogLn ($g_logfile, "DEBUG: Could not find any node providing sets $g_clsets in mode $mode");
256                     }
257                     
258                } 
259                print OUTFILE $_;
260             }
261             close (OUTFILE); 
262         }
263         else {
264             print ("ERROR: Can't open file $target\n");
265         }
266         close(INFILE);
267     }
268     else {
269         print ("ERROR: Can't open file $template\n");
270     }
271     return $target_node_id;
272 }
273
274 sub checkCluster {
275     my $infile = shift;
276     my $changed = false;
277     my $current_cluster;
278     my $previous_cluster;
279     foreach (@g_cluster) {
280         if (!$g_origins_only || defined($_->[3])) {
281             $current_cluster = md5_hex(($current_cluster // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f'));
282             if ($g_debug) {
283                 printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f'));
284             }
285         }
286     }
287    
288     if (-f $infile) {
289         if (open(CLUSTERFILE, "<", $infile)) {
290             $previous_cluster = <CLUSTERFILE>;
291             close(CLUSTERFILE);
292         }
293         else {
294             printLogLn ($g_logfile, "ERROR: Can't open file $infile for reading");
295         }
296     }
297
298     unless (-f $infile && ($current_cluster eq $previous_cluster)) {
299         if ($g_debug) {
300                 printLogLn($g_logfile, "DEBUG: Writing to status file");
301         }
302         if (open(CLUSTERFILE, ">", $infile)) {
303             print CLUSTERFILE $current_cluster;
304             close(CLUSTERFILE);
305         }
306         else {
307             printLogLn ($g_logfile, "ERROR: Can't open file $infile for writing");
308         }
309     }
310
311     if ((($previous_cluster // "") ne "") && (($current_cluster // "") ne "") && ($current_cluster ne $previous_cluster)){
312         $changed = true;
313     }
314
315     return $changed
316 }
317
318 sub loadCluster {
319     my $clname = shift;
320     my $conninfo = shift;
321     my $dbuser = shift;
322     my $dbpass = shift;
323     my $addr = shift;
324     my $clsets = shift;
325     my $param_on = 1;
326
327     my $dsn;
328     my $dbh;
329     my $sth;
330     my $query;
331     my $version;
332     my $qw_clname;
333     my @cluster;
334
335     $dsn = "DBI:Pg:$conninfo};";
336
337     eval {
338         $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 1});
339         $qw_clname = $dbh->quote_identifier("_" . $clname);
340
341         $query = "SELECT $qw_clname.getModuleVersion()";
342         $sth = $dbh->prepare($query);
343         $sth->execute();
344         ($version) = $sth->fetchrow; 
345         $sth->finish;
346
347         $query = "WITH x AS (
348                 SELECT a.no_id, 
349                     a.no_comment, 
350                     COALESCE(b.sub_provider, 0) AS no_prov, 
351                     NULLIF(array_to_string(array(SELECT set_id FROM $qw_clname.sl_set WHERE set_origin = a.no_id" .
352                     ($clsets ne "all" ? " AND set_id IN (" . substr('?, ' x scalar(split(',', $clsets)), 0, -2) . ")" : "") 
353                     . " ORDER BY set_id), ','), '') AS origin_sets,
354                     CASE " . ((substr($version,0,3) >= 2.2) ? "WHEN a.no_failed THEN 'FAILED' " : "") . "WHEN a.no_active THEN 'ACTIVE' ELSE 'INACTIVE' END AS no_status,
355                     string_agg(CASE WHEN b.sub_receiver = a.no_id AND b.sub_forward AND b.sub_active" .
356                     ($clsets ne "all" ? " AND b.sub_set IN (" . substr('?, ' x scalar(split(',', $clsets)), 0, -2) . ")" : "") 
357                     . " THEN b.sub_set::text END, ',' ORDER BY b.sub_set) AS prov_sets,
358                     COALESCE(c.pa_conninfo,(SELECT pa_conninfo FROM $qw_clname.sl_path WHERE pa_server = $qw_clname.getlocalnodeid(?) LIMIT 1)) AS no_conninfo
359                 FROM $qw_clname.sl_node a
360                 LEFT JOIN $qw_clname.sl_subscribe b ON a.no_id = b.sub_receiver AND b.sub_set <> 999 
361                 LEFT JOIN $qw_clname.sl_path c ON c.pa_server = a.no_id AND c.pa_client = $qw_clname.getlocalnodeid(?)
362                 LEFT JOIN $qw_clname.sl_set d ON d.set_origin = a.no_id
363                 GROUP BY b.sub_provider, a.no_id, a.no_comment, c.pa_conninfo, a.no_active
364                 ORDER BY (COALESCE(b.sub_provider, 0) = 0) DESC, a.no_id ASC
365                 ), z AS (
366                 SELECT *,  
367                     CASE WHEN x.no_conninfo ilike '%dbname=%' THEN(regexp_matches(x.no_conninfo, E'dbname=(.+?)\\\\M', 'ig'))[1] END AS database,
368                     CASE WHEN x.no_conninfo ilike '%host=%' THEN(regexp_matches(x.no_conninfo, E'host=(.+?)(?=\\\\s|\$)', 'ig'))[1] END AS host,
369                     CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port
370                 FROM x 
371                 )
372                 SELECT * FROM z 
373                 ORDER BY origin_sets, @(CASE WHEN (host ~ '^[0-9]{1,3}(.[0-9]{1,3}){3}\$') THEN host::inet ELSE '255.255.255.255'::inet END - ?::inet) ASC";
374
375         if ($g_debug) { 
376             printLogLn($g_logfile, "DEBUG: " . $query);
377         }
378
379         $sth = $dbh->prepare($query);
380
381         if ($clsets ne "all") {
382             for (0..1) { 
383                 foreach my $param (split(",", $clsets)) {
384                     $sth->bind_param($param_on, $param);
385                     $param_on++;
386                 } 
387             }
388         }
389         $sth->bind_param($param_on, "_" . $clname);
390         $param_on++;
391         $sth->bind_param($param_on, "_" . $clname);
392         $param_on++;
393         $sth->bind_param($param_on, (isInet($addr) ? $addr : '255.255.255.255'));
394         $sth->execute();
395
396         while (my @node = $sth->fetchrow) {
397             push(@cluster,  \@node);
398         }
399
400         $sth->finish;
401         $dbh->disconnect();
402     };
403     if ($@) { 
404         printLogLn($g_logfile, "ERROR: Failed to execute query against Postgres server: $@");
405     }
406
407     return @cluster;
408 }
409
410 sub getConfig {
411     my @fields;
412     my $success = false;
413     my $infile = shift;
414     my $value;
415
416     if (open(CFGFILE, "<", $infile)) {
417         foreach (<CFGFILE>) {
418             chomp $_;
419             for ($_) {
420                 s/\r//;
421                 #s/\#.*//;
422                 s/#(?=(?:(?:[^']|[^"]*+'){2})*+[^']|[^"]*+\z).*//;
423             } 
424             if (length(trim($_))) {
425                 @fields = split('=', $_, 2);
426                 $value = qtrim(trim($fields[1]));
427                 given(lc($fields[0])) {
428                     when(/\bdebug\b/i) {
429                         $g_debug = checkBoolean($value);
430                     }
431                     when(/\bpid_file\b/i) {
432                         $g_pidfile = $value;
433                     }
434                     when(/\blog_file\b/i) {
435                         $g_logfile = $value;
436                     }
437                     when(/\bslony_user\b/i) {
438                         $g_user = $value;
439                     }
440                     when(/\bslony_pass\b/i) {
441                         $g_pass = $value;
442                     }
443                     when(/\bslony_cluster_name\b/i) {
444                         $g_clname = $value;
445                     }
446                     when(/\bslony_sets_to_follow\b/i) {
447                         $g_clsets = $value;
448                     }
449                     when(/\bserver_conninfo\b/i) {
450                         push(@g_conninfos, $value);
451                     }
452                     when(/\bfollower_poll_interval\b/i) {
453                         $g_poll_interval = checkInteger($value);
454                     }
455                     when(/\bstatus_file\b/i) {
456                         $g_status_file = $value;
457                     } 
458                     when(/\bpool_conf_template\b/i) {
459                         $g_conf_template = $value;
460                     } 
461                     when(/\bpool_conf_target\b/i) {
462                         $g_conf_target = $value;
463                     } 
464                     when(/\bpool_reload_command\b/i) {
465                         $g_reload_command = $value;
466                     } 
467                     when(/\bpool_mode\b/i) {
468                         $g_mode = lc($value);
469                     } 
470                     when(/\bpool_all_databases\b/i) {
471                         $g_all_databases = checkBoolean($value);
472                     }
473                     when(/\bonly_follow_origins\b/i) {
474                         $g_origins_only = checkBoolean($value);
475                     }
476                 }  
477             }
478         }
479         close (CFGFILE);
480         if (defined($g_user) && (scalar(@g_conninfos) > 0)) {
481            $success = true;
482         }
483         # Replace %mode and %clname here for actual value
484         for ($g_pidfile, $g_logfile, $g_status_file, $g_conf_template, $g_conf_target, $g_reload_command) {
485             s/\%mode/$g_mode/g;
486             s/\%clname/$g_clname/g;
487         }
488
489
490     }
491     else {
492         printLogLn($g_logfile, "ERROR: Could not read configuration from '$infile'");
493     }
494     return $success;
495 }
496
497 sub writePID {
498     my $pidfile = shift;
499     my $success = true;
500
501     eval {
502         open (PIDFILE, ">", $pidfile);
503         print PIDFILE $$;
504         close (PIDFILE);
505         if ($g_debug) {
506             printLogLn($g_logfile, "DEBUG: Created PID file '$pidfile' for process $$");
507         }
508     };
509     if ($@) {
510         printLogLn($g_logfile, "ERROR: unable to write pidfile at '$pidfile' DETAIL $!");       
511         $success = false;
512     }
513     return $success;
514 }
515
516 sub removePID {
517     my $pidfile = shift;
518     my $success = true;
519
520     eval {
521         if (-f $pidfile) {
522             unlink $pidfile;
523             if ($g_debug) {
524                 printLogLn($g_logfile, "DEBUG: Removed PID file '$pidfile'");
525             }
526         }
527         elsif ($g_debug){
528             printLogLn($g_logfile, "DEBUG: PID file '$pidfile' never existed to be removed");
529         } 
530     };
531     if ($@) {
532         printLogLn($g_logfile, "ERROR: unable to remove pidfile at '$pidfile' DETAIL $!");       
533         $success = false;
534     }
535     return $success
536 }
537
538 sub checkBoolean {
539     my $text = shift;
540     my $value = undef;
541     if ( grep /^$text$/i, ("y","yes","t","true","on") ) {
542         $value = true;
543     }
544     elsif ( grep /^$text$/i, ("n","no","f","false","off") ) {
545         $value = false;
546     }
547     return $value;
548 }
549
550 sub checkInteger {
551     my $integer = shift;
552     my $value = undef;
553
554     if (($integer * 1) eq $integer) {
555         $value = int($integer);
556     }
557     return $value;
558 }
559
560 sub checkProvidesAllSets { 
561     my ($originSets, $providerSets) = @_;
562     my %test_hash;
563
564     undef @test_hash{@$originSets};       # add a hash key for each element of @$originSets
565     delete @test_hash{@$providerSets};    # remove all keys for elements of @$providerSets
566
567     return !%test_hash;              # return false if any keys are left in the hash
568 }
569
570 sub isInet {
571     my $address = shift;
572     my $success = true;
573
574     my(@octets) = $address =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/;
575     if (@octets == 4) {
576         foreach (@octets) {
577             unless ($_ <= 255) {
578                 $success = false;
579             }
580         }
581     }
582     else {
583         $success = false;
584     }
585
586     return $success;
587 }
588
589 sub qtrim {
590     my $string = shift;
591     $string =~ s/^('|")+//;
592     $string =~ s/('|")+$//;
593     return $string;
594 }
595
596 sub trim {
597     my $string = shift;
598     $string =~ s/^\s+//;
599     $string =~ s/\s+$//;
600     return $string;
601 }
602
603 sub getRuntime {
604     my ($year, $month, $day, $hour, $min, $sec) = (localtime(time))[5,4,3,2,1,0];
605     my $time = sprintf ("%02d:%02d:%02d on %02d/%02d/%04d", $hour, $min, $sec, $day, $month+1, $year+1900);
606     return $time;
607 }
608
609 sub printLog {
610     my $logfile = shift;
611     my $message = shift;
612
613     print $message;
614
615     if (open(LOGFILE, ">>", $logfile)) {
616         print LOGFILE getRuntime() . " " . $message;
617         close (LOGFILE);
618     }
619     else {
620         printLn("ERROR: Unable to write to logfile $logfile");
621     }
622 }
623
624 sub printLogLn {
625     printLog ($_[0], $_[1] . $/);
626 }
627
628 sub printLn {
629     print ((@_ ? join($/, @_) : $_), $/);
630 }