3 # Script: pgbouncer_follower.pl
4 # Copyright: 22/04/2012: v1.0.1 Glyn Astill <glyn@8kb.co.uk>
5 # Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+ OR Streaming Replication
7 # This script is a command-line utility to monitor Slony-I clusters
8 # and reconfigure pgbouncer to follow replication sets.
10 # This script is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This script is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this script. If not, see <http://www.gnu.org/licenses/>.
25 use experimental 'smartmatch';
28 use Getopt::Long qw/GetOptions/;
29 use Digest::MD5 qw/md5 md5_hex md5_base64/;
32 use Time::HiRes qw/usleep/;
33 use sigtrap 'handler' => \&cleanExit, 'HUP', 'INT','ABRT','QUIT','TERM';
34 Getopt::Long::Configure qw/no_ignore_case/;
38 use constant false => 0;
39 use constant true => 1;
41 my $g_usage = 'Pass configuration file: pool_follower.pl -f <configuration_path> [-D] ';
42 my $g_method = "slony";
44 my $g_pidfile = "/tmp/pgbouncer_follower_%mode.pid";
45 my $g_logfile = "/tmp/pgbouncer_follower_%mode.log";
46 my $g_poll_interval = 1000;
49 my $g_clname = "replication";
52 my @g_cluster; # no_id, no_comment, no_prov, orig_sets, conninfo, dbname, host, port
53 my $g_status_file = "/tmp/pgbouncer_follower_%mode.status";
54 my $g_conf_template = "/etc/pgbouncer/pgbouncer_%mode.template";
55 my $g_conf_target = "/etc/pgbouncer/pgbouncer_%mode.ini";
56 my $g_reload_command = "/etc/init.d/pgbouncer_%mode reload";
58 my $g_all_databases=false;
60 my ($year, $month, $day, $hour, $min, $sec);
62 my $g_host = hostname;
63 my ($g_addr)=inet_ntoa((gethostbyname(hostname))[4]);
64 my $g_origins_only = false;
65 my $g_best_config = false;
68 die $g_usage unless GetOptions(\%opt, 'config_file|f=s', 'daemon|D',) and keys %opt and ! @ARGV;
70 unless (getConfig($opt{config_file})){
71 print ("There was a problem reading the configuration.\n");
75 printLogLn($g_logfile, "DEBUG: Logging to my '$g_logfile'");
76 if ($g_method eq 'slony') {
77 printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms");
78 printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets");
80 elsif ($g_method eq 'wal') {
81 printLogLn($g_logfile, "\t Watching streaming replication lag polling every ${g_poll_interval}ms");
84 printLogLn($g_logfile, "\t ERROR: Unknown replication method: '$g_method'");
87 printLogLn($g_logfile, "\t Template config '$g_conf_template' Target config '$g_conf_target'");
88 printLogLn($g_logfile, "\t Reload command is '$g_reload_command'");
89 printLogLn($g_logfile, "\t Status stored in '$g_status_file'");
90 printLogLn($g_logfile, "\t Using local address for '$g_host' as '$g_addr'");
91 if (($g_max_lag > 0) && ($g_mode = 'ro')) {
92 printLogLn($g_logfile, "\t Max lag for read only targets will be $g_max_lag seconds");
94 #printLogLn($g_logfile, "\t '$g_user' as '$g_pass'");
97 if (defined($opt{daemon})) {
98 printLogLn($g_logfile, "pgbouncer_follower starting up");
99 if (writePID($g_pidfile)) {
103 printLogLn($g_logfile, "DEBUG: Sleeping for ${g_poll_interval}ms");
105 usleep($g_poll_interval * 1000);
116 if (defined($opt{daemon})) {
117 printLogLn($g_logfile, "pgbouncer_follower shutting down");
118 removePID($g_pidfile);
125 my $conninfo_read = 0;
127 foreach my $conninfo (@g_conninfos) {
130 if ($g_method eq 'slony') {
131 @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets);
133 elsif ($g_method eq 'wal') {
134 @g_cluster = loadBinCluster($g_user, $g_pass);
137 printLogLn($g_logfile, "DEBUG: ($conninfo_read) " . (($g_method eq 'slony')?'Slony':'Streaming replication') . " cluster with " . scalar(@g_cluster) . " nodes " . (($g_method eq 'slony')?"read from conninfo: $conninfo":"provided via config conninfos"));
138 foreach (@g_cluster) {
139 printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "<NONE>") . " " . @$_[4] . " " . (@$_[5] // "<NONE>") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " . @$_[10] . " " . @$_[11]);
144 printLogLn($g_logfile, "ERROR: Failed using conninfo: $conninfo DETAIL: $@");
146 elsif($g_best_config) {
148 printLogLn($g_logfile, "DEBUG: Found current origin to read config from");
153 unless (checkCluster($g_status_file)) {
155 printLogLn ($g_logfile, "DEBUG: Cluster status unchanged");
159 printLogLn ($g_logfile, "Cluster status changed");
160 $node_to = generateConfig($g_conf_template, $g_conf_target, $g_mode, $g_all_databases, $g_clsets);
161 if (reloadConfig($g_reload_command)) {
162 printLogLn ($g_logfile, "Pool repointed to node #$node_to");
168 my $reload_command = shift;
170 if(length($reload_command // '')) {
171 printLogLn($g_logfile, "Running '$reload_command'");
173 open(RELOAD, "-|", $reload_command . " 2>&1");
175 printLogLn($g_logfile, $_);
178 printLogLn($g_logfile, "Reload command has been run.");
181 printLogLn($g_logfile, "ERROR: Failed to run reload command DETAIL: $@");
189 my $template = shift;
192 my $all_databases = shift;
203 my $target_port = 5432;
204 my $target_is_origin;
205 my $target_auth = "";
208 printLogLn($g_logfile, "DEBUG: All databases = " . ($all_databases ? 'true' : 'false'));
211 if ($g_auth_user ne "") {
212 $target_auth = " auth_user=" . $g_auth_user;
215 if (open(INFILE, "<", $template)) {
216 if (open(OUTFILE, ">", $target)) {
217 print OUTFILE "# Configuration file autogenerated at " . getRuntime() . " from $template\n";
219 if (m/\[databases]/) {
221 # Try and choose a node; we always assign the origin initially regardless of rw/ro status
222 # when in ro mode and if we then find a suitable subscriber we'll reassign to it.
223 foreach my $node (@g_cluster) {
226 # If the node is lagging anyway skip it
227 if (($g_mode eq 'ro') && ($g_max_lag > 0) && ($node->[11])) {
228 printLogLn ($g_logfile, "Lag on node $node->[0] exceeds $g_max_lag seconds");
232 if ($clsets ne 'all') {
233 @sets_to_follow = split(',', $clsets);
234 if (defined($node->[3])) {
235 @sets_origin = split(',', $node->[3]);
240 if (defined($node->[5])) {
241 @sets_subscribed = split(',', $node->[5]);
244 undef @sets_subscribed;
248 if (($clsets eq 'all' && defined($node->[3])) || (@sets_to_follow && @sets_origin && checkProvidesAllSets(\@sets_to_follow, \@sets_origin))) {
249 if (defined($node->[8])) {
250 $target_db = $node->[7];
251 $target_host = $node->[8];
252 $target_node_id = $node->[0];
253 $target_sets = $node->[3];
254 $target_is_origin = true;
256 if (defined($node->[9])) {
257 $target_port = $node->[9];
263 elsif (($mode eq "ro") && (($clsets eq 'all') || (@sets_to_follow && @sets_subscribed && checkProvidesAllSets(\@sets_to_follow, \@sets_subscribed)))) {
264 if (defined($node->[8])) {
265 $target_db = $node->[7];
266 $target_host = $node->[8];
267 $target_node_id = $node->[0];
268 $target_sets = ($node->[5] // $node->[3]);
269 $target_is_origin = false;
271 if (defined($node->[9])) {
272 $target_port = $node->[9];
277 if (defined($target_host)) {
278 $_ = "# Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port\n" . $_;
280 printLogLn ($g_logfile, "DEBUG: Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port");
282 if ($all_databases || $target_db eq '*') {
283 $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port$target_auth/;
286 $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db$target_auth/;
290 $_ = "# Could not find any node providing sets $g_clsets in mode $mode\n";
291 printLogLn ($g_logfile, "DEBUG: Could not find any node providing sets $g_clsets in mode $mode");
300 print ("ERROR: Can't open file $target\n");
305 print ("ERROR: Can't open file $template\n");
307 return $target_node_id;
313 my $current_state = md5_hex('INIT');
315 foreach (@g_cluster) {
316 if (!$g_origins_only || defined($_->[3])) {
317 $current_state = md5_hex(($current_state // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]);
319 printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]);
325 if (open(CLUSTERFILE, "<", $infile)) {
326 $previous_state = <CLUSTERFILE>;
330 printLogLn ($g_logfile, "ERROR: Can't open file $infile for reading");
334 unless (-f $infile && ($current_state eq $previous_state)) {
336 printLogLn($g_logfile, "DEBUG: Writing to status file");
338 if (open(CLUSTERFILE, ">", $infile)) {
339 print CLUSTERFILE $current_state;
343 printLogLn ($g_logfile, "ERROR: Can't open file $infile for writing");
347 if ((($previous_state // "") ne "") && ($current_state ne $previous_state)){
356 my $conninfo = shift;
371 $g_best_config = false;
372 $dsn = "DBI:Pg:$conninfo};";
375 $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 1});
376 $qw_clname = $dbh->quote_identifier("_" . $clname);
378 $query = "SELECT $qw_clname.getModuleVersion()";
379 $sth = $dbh->prepare($query);
381 ($version) = $sth->fetchrow;
384 $query = "WITH x AS (
387 COALESCE(b.sub_provider, 0) AS no_prov,
388 NULLIF(array_to_string(array(SELECT set_id FROM $qw_clname.sl_set WHERE set_origin = a.no_id" .
389 ($clsets ne "all" ? " AND set_id IN (" . substr('?, ' x scalar(split(',', $clsets)), 0, -2) . ")" : "")
390 . " ORDER BY set_id), ','), '') AS origin_sets,
391 CASE " . ((substr($version,0,3) >= 2.2) ? "WHEN a.no_failed THEN 'FAILED' " : "") . "WHEN a.no_active THEN 'ACTIVE' ELSE 'INACTIVE' END AS no_status,
392 string_agg(CASE WHEN b.sub_receiver = a.no_id AND b.sub_forward AND b.sub_active" .
393 ($clsets ne "all" ? " AND b.sub_set IN (" . substr('?, ' x scalar(split(',', $clsets)), 0, -2) . ")" : "")
394 . " THEN b.sub_set::text END, ',' ORDER BY b.sub_set) AS prov_sets,
395 COALESCE(c.pa_conninfo,(SELECT pa_conninfo FROM $qw_clname.sl_path WHERE pa_server = $qw_clname.getlocalnodeid(?) LIMIT 1)) AS no_conninfo
396 FROM $qw_clname.sl_node a
397 LEFT JOIN $qw_clname.sl_subscribe b ON a.no_id = b.sub_receiver AND b.sub_set <> 999
398 LEFT JOIN $qw_clname.sl_path c ON c.pa_server = a.no_id AND c.pa_client = $qw_clname.getlocalnodeid(?)
399 LEFT JOIN $qw_clname.sl_set d ON d.set_origin = a.no_id
400 GROUP BY b.sub_provider, a.no_id, a.no_comment, c.pa_conninfo, a.no_active
401 ORDER BY (COALESCE(b.sub_provider, 0) = 0) DESC, a.no_id ASC
404 CASE WHEN x.no_conninfo ilike '%dbname=%' THEN(regexp_matches(x.no_conninfo, E'dbname=(.+?)\\\\M', 'ig'))[1] END AS database,
405 CASE WHEN x.no_conninfo ilike '%host=%' THEN(regexp_matches(x.no_conninfo, E'host=(.+?)(?=\\\\s|\$)', 'ig'))[1] END AS host,
406 CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port,
407 (no_id = $qw_clname.getlocalnodeid(?)) AS this_node,
408 COALESCE((? BETWEEN 1 AND extract(epoch from s.st_lag_time)),false) AS lag_exceeded
410 LEFT JOIN $qw_clname.sl_status s ON s.st_received = x.no_id
411 WHERE x.no_conninfo != '<event pending>'
414 ORDER BY origin_sets, @(CASE WHEN (host ~ '^[0-9]{1,3}(.[0-9]{1,3}){3}\$') THEN host::inet ELSE '255.255.255.255'::inet END - ?::inet) ASC";
417 # printLogLn($g_logfile, "DEBUG: " . $query);
420 $sth = $dbh->prepare($query);
422 if ($clsets ne "all") {
424 foreach my $param (split(",", $clsets)) {
425 $sth->bind_param($param_on, $param);
430 # This param is taken 3 times
432 $sth->bind_param($param_on, "_" . $clname);
435 $sth->bind_param($param_on, $g_max_lag);
437 $sth->bind_param($param_on, (isInet($addr) ? $addr : '255.255.255.255'));
440 while (my @node = $sth->fetchrow) {
441 # If some origin sets exist for this node row (we can assume they're the sets we're following since they're filtered in the query)
442 # and the row is flagged as this_node then we have found the best node to read the configuration from.
443 if (defined($node[3]) && $node[10]) {
444 $g_best_config = true;
446 push(@cluster, \@node);
453 printLogLn($g_logfile, "ERROR: Failed to execute query against Postgres server: $@");
475 my $primary_location;
483 $g_best_config = true;
485 foreach my $conninfo (@g_conninfos) {
486 $dsn = "DBI:Pg:$conninfo};";
489 $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 1});
490 # Check to see if the server is a secondary, and also pull the current xlog
491 # location and the apply lag using pg_last_xact_replay_timestamp to get the
492 # last commit timestamp from the primary applied on the secondary.
493 # We will need to compare the current receive location to the primary xlog
494 # location, if they differ we can then use the apply_lag value; we'll have
495 # to post-process this.
496 # In 9.6+ we might want to pull the system identifier from pg_controldata view too
497 $query = "SELECT pg_is_in_recovery(),
499 WHEN pg_is_in_recovery() THEN pg_last_xlog_receive_location()
500 ELSE pg_current_xlog_location()
502 COALESCE(extract(epoch from current_timestamp-pg_last_xact_replay_timestamp()),0)";
503 $sth = $dbh->prepare($query);
505 ($recovery, $xlog_location, $apply_lag) = $sth->fetchrow;
508 ($port) = $conninfo =~ m/port=(.+?)(\s|$)/g;
509 ($hostname) = $conninfo =~ m/host=(.+?)(\s|$)/g;
510 ($database) = $conninfo =~ m/dbname=(.+?)(\s|$)/g;
511 @parts = split('/', $xlog_location, 2);
512 $timeline = qtrim(trim($parts[0]));
513 $location = hex(qtrim(trim($parts[1])));
516 printLogLn($g_logfile, "DEBUG: Server: $hostname:$port " . ($recovery ? 'secondary' : 'primary') . " at $xlog_location ($timeline/$location)");
519 # For WAL replication we assume if we can contact a server then it is active,
520 # which isn't strictly true, but nodes that have fallen behind can be excluded
521 # via the max_ro_lag setting. We also substitute timeline+1 for the slony
525 $primary_location = $xlog_location;
526 my @node=(1,'Primary',0,$timeline,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,1,$xlog_location,$apply_lag);
527 push(@tmp_cluster, \@node);
531 my @node=($node_id,"Secondary".($node_id-1),1,undef,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,$node_id,$xlog_location,$apply_lag);
532 push(@tmp_cluster, \@node);
536 printLogLn($g_logfile, "ERROR: Could not connect to server with conninfo: $conninfo DETAIL: $@");
540 # Error if more than one primary discovered.
541 if ($primaries != 1) {
542 printLogLn($g_logfile, "ERROR: Invalid quantity of primaries: $primaries");
543 die "no primaries found";
545 # Do the post processing we mentioned above, once we know xlog locations differ
546 # then we can say the apply lag is correct, else it's just there has been no
547 # activity on the master. In the case the xlog locations differ but apply_lag
548 # is 0 then it means no WAL has been applied since srver start; cludge here
549 # and set lag to 1 week.
551 foreach (@tmp_cluster) {
552 if (@$_[11] eq $primary_location) {
556 $apply_lag = @$_[12] if (@$_[12] > 0);
557 $apply_lag = 604800 if (@$_[12] = 0);
559 my @node=(@$_[0],@$_[1],@$_[2],@$_[3],@$_[4],@$_[5],@$_[6],@$_[7],@$_[8],@$_[9],@$_[10],$apply_lag);
560 push(@cluster, \@node);
573 if (open(CFGFILE, "<", $infile)) {
574 foreach (<CFGFILE>) {
579 s/#(?=(?:(?:[^']|[^"]*+'){2})*+[^']|[^"]*+\z).*//;
581 if (length(trim($_))) {
582 @fields = split('=', $_, 2);
583 $value = qtrim(trim($fields[1]));
584 given(lc($fields[0])) {
585 when(/\breplication_method\b/i) {
589 $g_debug = checkBoolean($value);
591 when(/\bpid_file\b/i) {
594 when(/\blog_file\b/i) {
597 when(/\bslony_user\b/i) { # Depreciated
600 when(/\bslony_pass\b/i) { # Depreciated
603 when(/\breplication_user\b/i) {
606 when(/\breplication_pass\b/i) {
609 when(/\bslony_cluster_name\b/i) {
612 when(/\bslony_sets_to_follow\b/i) {
615 when(/\bserver_conninfo\b/i) {
616 push(@g_conninfos, $value);
618 when(/\bfollower_poll_interval\b/i) {
619 $g_poll_interval = checkInteger($value);
621 when(/\bstatus_file\b/i) {
622 $g_status_file = $value;
624 when(/\bpool_conf_template\b/i) {
625 $g_conf_template = $value;
627 when(/\bpool_conf_target\b/i) {
628 $g_conf_target = $value;
630 when(/\bpool_reload_command\b/i) {
631 $g_reload_command = $value;
633 when(/\bpool_mode\b/i) {
634 $g_mode = lc($value);
636 when(/\bpool_all_databases\b/i) {
637 $g_all_databases = checkBoolean($value);
639 when(/\bauth_user\b/i) {
640 $g_auth_user = $value;
642 when(/\bonly_follow_origins\b/i) {
643 $g_origins_only = checkBoolean($value);
645 when(/\bmax_ro_lag\b/i) {
646 $g_max_lag = checkInteger($value);
652 if (defined($g_user) && (scalar(@g_conninfos) > 0)) {
655 # Replace %mode and %clname here for actual value
656 for ($g_pidfile, $g_logfile, $g_status_file, $g_conf_template, $g_conf_target, $g_reload_command) {
658 s/\%clname/$g_clname/g;
664 printLogLn($g_logfile, "ERROR: Could not read configuration from '$infile'");
674 open (PIDFILE, ">", $pidfile);
678 printLogLn($g_logfile, "DEBUG: Created PID file '$pidfile' for process $$");
682 printLogLn($g_logfile, "ERROR: unable to write pidfile at '$pidfile' DETAIL $!");
696 printLogLn($g_logfile, "DEBUG: Removed PID file '$pidfile'");
700 printLogLn($g_logfile, "DEBUG: PID file '$pidfile' never existed to be removed");
704 printLogLn($g_logfile, "ERROR: unable to remove pidfile at '$pidfile' DETAIL $!");
713 if ( grep /^$text$/i, ("y","yes","t","true","on") ) {
716 elsif ( grep /^$text$/i, ("n","no","f","false","off") ) {
726 if (($integer * 1) eq $integer) {
727 $value = int($integer);
732 sub checkProvidesAllSets {
733 my ($originSets, $providerSets) = @_;
736 undef @test_hash{@$originSets}; # add a hash key for each element of @$originSets
737 delete @test_hash{@$providerSets}; # remove all keys for elements of @$providerSets
739 return !%test_hash; # return false if any keys are left in the hash
746 my(@octets) = $address =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/;
763 $string =~ s/^('|")+//;
764 $string =~ s/('|")+$//;
776 my ($year, $month, $day, $hour, $min, $sec) = (localtime(time))[5,4,3,2,1,0];
777 my $time = sprintf ("%02d:%02d:%02d on %02d/%02d/%04d", $hour, $min, $sec, $day, $month+1, $year+1900);
787 if (open(LOGFILE, ">>", $logfile)) {
788 print LOGFILE getRuntime() . " " . $message;
792 printLn("ERROR: Unable to write to logfile $logfile");
797 printLog ($_[0], $_[1] . $/);
801 print ((@_ ? join($/, @_) : $_), $/);