X-Git-Url: https://git.8kb.co.uk/?p=slony-i%2Fpgbouncer_follower;a=blobdiff_plain;f=pgbouncer_follower.pl;h=8677071634ed6fa2531161fab4aa8d1d13e69dde;hp=0b88b19dfd2f987af14dd3c9a32185ff015bb7c2;hb=HEAD;hpb=c7a68db8670965d110468e2f2696e60c209c0393 diff --git a/pgbouncer_follower.pl b/pgbouncer_follower.pl index 0b88b19..8677071 100755 --- a/pgbouncer_follower.pl +++ b/pgbouncer_follower.pl @@ -2,7 +2,7 @@ # Script: pgbouncer_follower.pl # Copyright: 22/04/2012: v1.0.1 Glyn Astill -# Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+ +# Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+ OR Streaming Replication # # This script is a command-line utility to monitor Slony-I clusters # and reconfigure pgbouncer to follow replication sets. @@ -22,6 +22,7 @@ use strict; use warnings; +use experimental 'smartmatch'; use DBI; use v5.10.1; use Getopt::Long qw/GetOptions/; @@ -38,6 +39,7 @@ use constant false => 0; use constant true => 1; my $g_usage = 'Pass configuration file: pool_follower.pl -f [-D] '; +my $g_method = "slony"; my $g_debug = false; my $g_pidfile = "/tmp/pgbouncer_follower_%mode.pid"; my $g_logfile = "/tmp/pgbouncer_follower_%mode.log"; @@ -54,30 +56,41 @@ my $g_conf_target = "/etc/pgbouncer/pgbouncer_%mode.ini"; my $g_reload_command = "/etc/init.d/pgbouncer_%mode reload"; my $g_mode = 'rw'; my $g_all_databases=false; +my $g_auth_user=''; my ($year, $month, $day, $hour, $min, $sec); my $change_time; my $g_host = hostname; my ($g_addr)=inet_ntoa((gethostbyname(hostname))[4]); my $g_origins_only = false; +my $g_best_config = false; +my $g_max_lag = 0; die $g_usage unless GetOptions(\%opt, 'config_file|f=s', 'daemon|D',) and keys %opt and ! @ARGV; unless (getConfig($opt{config_file})){ print ("There was a problem reading the configuration.\n"); } -if (!defined($g_status_file)) { - $g_status_file = "/tmp/$g_clname.status"; -} - if ($g_debug) { printLogLn($g_logfile, "DEBUG: Logging to my '$g_logfile'"); - printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms"); - printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets"); + if ($g_method eq 'slony') { + printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms"); + printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets"); + } + elsif ($g_method eq 'wal') { + printLogLn($g_logfile, "\t Watching streaming replication lag polling every ${g_poll_interval}ms"); + } + else { + printLogLn($g_logfile, "\t ERROR: Unknown replication method: '$g_method'"); + exit(1); + } printLogLn($g_logfile, "\t Template config '$g_conf_template' Target config '$g_conf_target'"); printLogLn($g_logfile, "\t Reload command is '$g_reload_command'"); printLogLn($g_logfile, "\t Status stored in '$g_status_file'"); printLogLn($g_logfile, "\t Using local address for '$g_host' as '$g_addr'"); + if (($g_max_lag > 0) && ($g_mode = 'ro')) { + printLogLn($g_logfile, "\t Max lag for read only targets will be $g_max_lag seconds"); + } #printLogLn($g_logfile, "\t '$g_user' as '$g_pass'"); } @@ -109,21 +122,36 @@ sub cleanExit { sub doAll { my $node_to; + my $conninfo_read = 0; foreach my $conninfo (@g_conninfos) { + $conninfo_read++; eval { - @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets); + if ($g_method eq 'slony') { + @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets); + } + elsif ($g_method eq 'wal') { + @g_cluster = loadBinCluster($g_user, $g_pass); + } if ($g_debug) { - printLogLn($g_logfile, "DEBUG: Cluster with " . scalar(@g_cluster) . " nodes read from conninfo: $conninfo"); + printLogLn($g_logfile, "DEBUG: ($conninfo_read) " . (($g_method eq 'slony')?'Slony':'Streaming replication') . " cluster with " . scalar(@g_cluster) . " nodes " . (($g_method eq 'slony')?"read from conninfo: $conninfo":"provided via config conninfos")); foreach (@g_cluster) { - printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "") . " " . @$_[4] . " " . (@$_[5] // "") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " ); + printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "") . " " . @$_[4] . " " . (@$_[5] // "") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " . @$_[10] . " " . @$_[11]); } } }; if ($@) { printLogLn($g_logfile, "ERROR: Failed using conninfo: $conninfo DETAIL: $@"); } - else { + elsif($g_best_config) { + if ($g_debug) { + if ($g_method eq 'slony') { + printLogLn($g_logfile, "DEBUG: Found current origin to read config from"); + } + elsif ($g_method eq 'wal') { + printLogLn($g_logfile, "DEBUG: Read config from all contactable nodes"); + } + } last; } } @@ -179,9 +207,14 @@ sub generateConfig { my $target_sets; my $target_port = 5432; my $target_is_origin; + my $target_auth = ""; if ($g_debug) { - printLogLn($g_logfile, "DEBUG: All databases = " . ($g_all_databases ? 'true' : 'false')); + printLogLn($g_logfile, "DEBUG: All databases = " . ($all_databases ? 'true' : 'false')); + } + + if ($g_auth_user ne "") { + $target_auth = " auth_user=" . $g_auth_user; } if (open(INFILE, "<", $template)) { @@ -193,6 +226,14 @@ sub generateConfig { # Try and choose a node; we always assign the origin initially regardless of rw/ro status # when in ro mode and if we then find a suitable subscriber we'll reassign to it. foreach my $node (@g_cluster) { + + + # If the node is lagging anyway skip it + if (($g_mode eq 'ro') && ($g_max_lag > 0) && ($node->[11])) { + printLogLn ($g_logfile, "Lag on node $node->[0] exceeds $g_max_lag seconds"); + next; + } + if ($clsets ne 'all') { @sets_to_follow = split(',', $clsets); if (defined($node->[3])) { @@ -201,8 +242,8 @@ sub generateConfig { else { undef @sets_origin; } - if (defined($node->[6])) { - @sets_subscribed = split(',', $node->[6]); + if (defined($node->[5])) { + @sets_subscribed = split(',', $node->[5]); } else { undef @sets_subscribed; @@ -229,7 +270,7 @@ sub generateConfig { $target_db = $node->[7]; $target_host = $node->[8]; $target_node_id = $node->[0]; - $target_sets = ($node->[6] // $node->[3]); + $target_sets = ($node->[5] // $node->[3]); $target_is_origin = false; } if (defined($node->[9])) { @@ -243,11 +284,11 @@ sub generateConfig { if ($g_debug) { printLogLn ($g_logfile, "DEBUG: Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port"); } - if ($all_databases) { - $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port/; + if ($all_databases || $target_db eq '*') { + $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port$target_auth/; } else { - $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db/; + $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db$target_auth/; } } else { @@ -274,20 +315,20 @@ sub generateConfig { sub checkCluster { my $infile = shift; my $changed = false; - my $current_cluster; - my $previous_cluster; + my $current_state = md5_hex('INIT'); + my $previous_state; foreach (@g_cluster) { if (!$g_origins_only || defined($_->[3])) { - $current_cluster = md5_hex(($current_cluster // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f')); + $current_state = md5_hex(($current_state // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]); if ($g_debug) { - printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f')); + printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]); } } } if (-f $infile) { if (open(CLUSTERFILE, "<", $infile)) { - $previous_cluster = ; + $previous_state = ; close(CLUSTERFILE); } else { @@ -295,12 +336,12 @@ sub checkCluster { } } - unless (-f $infile && ($current_cluster eq $previous_cluster)) { + unless (-f $infile && ($current_state eq $previous_state)) { if ($g_debug) { printLogLn($g_logfile, "DEBUG: Writing to status file"); } if (open(CLUSTERFILE, ">", $infile)) { - print CLUSTERFILE $current_cluster; + print CLUSTERFILE $current_state; close(CLUSTERFILE); } else { @@ -308,7 +349,7 @@ sub checkCluster { } } - if ((($previous_cluster // "") ne "") && (($current_cluster // "") ne "") && ($current_cluster ne $previous_cluster)){ + if ((($previous_state // "") ne "") && ($current_state ne $previous_state)){ $changed = true; } @@ -332,6 +373,7 @@ sub loadCluster { my $qw_clname; my @cluster; + $g_best_config = false; $dsn = "DBI:Pg:$conninfo};"; eval { @@ -363,17 +405,21 @@ sub loadCluster { GROUP BY b.sub_provider, a.no_id, a.no_comment, c.pa_conninfo, a.no_active ORDER BY (COALESCE(b.sub_provider, 0) = 0) DESC, a.no_id ASC ), z AS ( - SELECT *, + SELECT x.*, CASE WHEN x.no_conninfo ilike '%dbname=%' THEN(regexp_matches(x.no_conninfo, E'dbname=(.+?)\\\\M', 'ig'))[1] END AS database, CASE WHEN x.no_conninfo ilike '%host=%' THEN(regexp_matches(x.no_conninfo, E'host=(.+?)(?=\\\\s|\$)', 'ig'))[1] END AS host, - CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port + CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port, + (no_id = $qw_clname.getlocalnodeid(?)) AS this_node, + COALESCE((? BETWEEN 1 AND extract(epoch from s.st_lag_time)),false) AS lag_exceeded FROM x + LEFT JOIN $qw_clname.sl_status s ON s.st_received = x.no_id + WHERE x.no_conninfo != '' ) SELECT * FROM z ORDER BY origin_sets, @(CASE WHEN (host ~ '^[0-9]{1,3}(.[0-9]{1,3}){3}\$') THEN host::inet ELSE '255.255.255.255'::inet END - ?::inet) ASC"; if ($g_debug) { - printLogLn($g_logfile, "DEBUG: " . $query); +# printLogLn($g_logfile, "DEBUG: " . $query); } $sth = $dbh->prepare($query); @@ -386,14 +432,22 @@ sub loadCluster { } } } - $sth->bind_param($param_on, "_" . $clname); - $param_on++; - $sth->bind_param($param_on, "_" . $clname); + # This param is taken 3 times + for (0..2) { + $sth->bind_param($param_on, "_" . $clname); + $param_on++; + } + $sth->bind_param($param_on, $g_max_lag); $param_on++; $sth->bind_param($param_on, (isInet($addr) ? $addr : '255.255.255.255')); $sth->execute(); while (my @node = $sth->fetchrow) { + # If some origin sets exist for this node row (we can assume they're the sets we're following since they're filtered in the query) + # and the row is flagged as this_node then we have found the best node to read the configuration from. + if (defined($node[3]) && $node[10]) { + $g_best_config = true; + } push(@cluster, \@node); } @@ -407,6 +461,113 @@ sub loadCluster { return @cluster; } +sub loadBinCluster { + my $dbuser = shift; + my $dbpass = shift; + + my $dsn; + my $dbh; + my $sth; + my $query; + my $recovery; + my $xlog_location; + my $apply_lag; + my $primaries = 0; + + my @parts; + my $timeline; + my $location; + my $primary_location; + + my $hostname; + my $port; + my $database; + my @tmp_cluster; + my @cluster; + my $node_id = 1; + $g_best_config = true; + + foreach my $conninfo (@g_conninfos) { + $dsn = "DBI:Pg:$conninfo};"; + + eval { + $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 1}); + # Check to see if the server is a secondary, and also pull the current xlog + # location and the apply lag using pg_last_xact_replay_timestamp to get the + # last commit timestamp from the primary applied on the secondary. + # We will need to compare the current receive location to the primary xlog + # location, if they differ we can then use the apply_lag value; we'll have + # to post-process this. + # In 9.6+ we might want to pull the system identifier from pg_controldata view too + $query = "SELECT pg_is_in_recovery(), + CASE + WHEN pg_is_in_recovery() THEN pg_last_xlog_receive_location() + ELSE pg_current_xlog_location() + END, + COALESCE(extract(epoch from current_timestamp-pg_last_xact_replay_timestamp()),0)"; + $sth = $dbh->prepare($query); + $sth->execute(); + ($recovery, $xlog_location, $apply_lag) = $sth->fetchrow; + $sth->finish; + + ($port) = $conninfo =~ m/port=(.+?)(\s|$)/g; + ($hostname) = $conninfo =~ m/host=(.+?)(\s|$)/g; + ($database) = $conninfo =~ m/dbname=(.+?)(\s|$)/g; + @parts = split('/', $xlog_location, 2); + $timeline = qtrim(trim($parts[0])); + $location = hex(qtrim(trim($parts[1]))); + + if ($g_debug) { + printLogLn($g_logfile, "DEBUG: Server: $hostname:$port " . ($recovery ? 'secondary' : 'primary') . " at $xlog_location ($timeline/$location)"); + } + + # For WAL replication we assume if we can contact a server then it is active, + # which isn't strictly true, but nodes that have fallen behind can be excluded + # via the max_ro_lag setting. We also substitute timeline+1 for the slony + # replication set. + if (!$recovery) { + $primaries++; + $primary_location = $xlog_location; + my @node=(1,'Primary',0,$timeline,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,1,$xlog_location,$apply_lag); + push(@tmp_cluster, \@node); + } + else { + $node_id++; + my @node=($node_id,"Secondary".($node_id-1),1,undef,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,$node_id,$xlog_location,$apply_lag); + push(@tmp_cluster, \@node); + } + }; + if ($@) { + printLogLn($g_logfile, "ERROR: Could not connect to server with conninfo: $conninfo DETAIL: $@"); + } + } + + # Error if more than one primary discovered. + if ($primaries != 1) { + printLogLn($g_logfile, "ERROR: Invalid quantity of primaries: $primaries"); + die "no primaries found"; + } + # Do the post processing we mentioned above, once we know xlog locations differ + # then we can say the apply lag is correct, else it's just there has been no + # activity on the master. In the case the xlog locations differ but apply_lag + # is 0 then it means no WAL has been applied since srver start; we don't know + # a time for lag in this case. + else { + foreach (@tmp_cluster) { + $apply_lag = false; + if ($g_max_lag > 0) { + if ((@$_[11] ne $primary_location) && (@$_[12] > $g_max_lag || @$_[12] == 0)) { + $apply_lag = true; + } + } + my @node=(@$_[0],@$_[1],@$_[2],@$_[3],@$_[4],@$_[5],@$_[6],@$_[7],@$_[8],@$_[9],@$_[10],$apply_lag); + push(@cluster, \@node); + } + } + + return @cluster; +} + sub getConfig { my @fields; my $success = false; @@ -425,6 +586,9 @@ sub getConfig { @fields = split('=', $_, 2); $value = qtrim(trim($fields[1])); given(lc($fields[0])) { + when(/\breplication_method\b/i) { + $g_method = $value; + } when(/\bdebug\b/i) { $g_debug = checkBoolean($value); } @@ -434,10 +598,16 @@ sub getConfig { when(/\blog_file\b/i) { $g_logfile = $value; } - when(/\bslony_user\b/i) { + when(/\bslony_user\b/i) { # Depreciated $g_user = $value; } - when(/\bslony_pass\b/i) { + when(/\bslony_pass\b/i) { # Depreciated + $g_pass = $value; + } + when(/\breplication_user\b/i) { + $g_user = $value; + } + when(/\breplication_pass\b/i) { $g_pass = $value; } when(/\bslony_cluster_name\b/i) { @@ -470,9 +640,15 @@ sub getConfig { when(/\bpool_all_databases\b/i) { $g_all_databases = checkBoolean($value); } + when(/\bauth_user\b/i) { + $g_auth_user = $value; + } when(/\bonly_follow_origins\b/i) { $g_origins_only = checkBoolean($value); } + when(/\bmax_ro_lag\b/i) { + $g_max_lag = checkInteger($value); + } } } }