]> git.8kb.co.uk Git - slony-i/pgbouncer_follower/blobdiff - pgbouncer_follower.pl
Write correct boolean value for exceede lag, rather than just the lag in seconds.
[slony-i/pgbouncer_follower] / pgbouncer_follower.pl
index aa55bcba58d348d7851f70455c95e615223b9976..8677071634ed6fa2531161fab4aa8d1d13e69dde 100755 (executable)
@@ -2,7 +2,7 @@
 
 # Script:   pgbouncer_follower.pl
 # Copyright:    22/04/2012: v1.0.1 Glyn Astill <glyn@8kb.co.uk>
-# Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+
+# Requires: Perl 5.10.1+, PostgreSQL 9.0+ Slony-I 2.0+ OR Streaming Replication
 #
 # This script is a command-line utility to monitor Slony-I clusters
 # and reconfigure pgbouncer to follow replication sets.
@@ -39,6 +39,7 @@ use constant false => 0;
 use constant true  => 1;
 
 my $g_usage = 'Pass configuration file: pool_follower.pl -f <configuration_path> [-D]  ';
+my $g_method = "slony";
 my $g_debug = false;
 my $g_pidfile = "/tmp/pgbouncer_follower_%mode.pid";
 my $g_logfile = "/tmp/pgbouncer_follower_%mode.log";
@@ -55,6 +56,7 @@ my $g_conf_target = "/etc/pgbouncer/pgbouncer_%mode.ini";
 my $g_reload_command = "/etc/init.d/pgbouncer_%mode reload";
 my $g_mode = 'rw';
 my $g_all_databases=false;
+my $g_auth_user='';
 my ($year, $month, $day, $hour, $min, $sec);
 my $change_time;
 my $g_host = hostname;
@@ -71,8 +73,17 @@ unless (getConfig($opt{config_file})){
  
 if ($g_debug) {
     printLogLn($g_logfile, "DEBUG: Logging to my '$g_logfile'");
-    printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms"); 
-    printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets");
+    if ($g_method eq 'slony') {
+        printLogLn($g_logfile, "\t Watching sets $g_clsets in Slony-I cluster '$g_clname' polling every ${g_poll_interval}ms"); 
+        printLogLn($g_logfile, "\t Following " . ($g_all_databases ? "all databases" : "replicated database only") . " on an '$g_mode' node for the above replicated sets");
+    }
+    elsif ($g_method eq 'wal')  {
+        printLogLn($g_logfile, "\t Watching streaming replication lag polling every ${g_poll_interval}ms"); 
+    }
+    else {
+        printLogLn($g_logfile, "\t ERROR: Unknown replication method: '$g_method'"); 
+       exit(1);
+    }
     printLogLn($g_logfile, "\t Template config '$g_conf_template' Target config '$g_conf_target'");
     printLogLn($g_logfile, "\t Reload command is '$g_reload_command'");
     printLogLn($g_logfile, "\t Status stored in '$g_status_file'");
@@ -116,9 +127,14 @@ sub doAll {
     foreach my $conninfo (@g_conninfos) {
         $conninfo_read++;
         eval {
-            @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets);
+            if ($g_method eq 'slony') {
+                @g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets);
+            }
+            elsif ($g_method eq 'wal') {
+                @g_cluster = loadBinCluster($g_user, $g_pass);
+            }
             if ($g_debug) {
-                printLogLn($g_logfile, "DEBUG: ($conninfo_read) Cluster with " . scalar(@g_cluster) . " nodes read from conninfo: $conninfo");
+                printLogLn($g_logfile, "DEBUG: ($conninfo_read) " . (($g_method eq 'slony')?'Slony':'Streaming replication') . " cluster with " . scalar(@g_cluster) . " nodes " . (($g_method eq 'slony')?"read from conninfo: $conninfo":"provided via config conninfos"));
                 foreach (@g_cluster) {
                     printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "<NONE>") . " " . @$_[4] . " " . (@$_[5] // "<NONE>") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " . @$_[10] . " "  . @$_[11]);
                 }
@@ -129,7 +145,12 @@ sub doAll {
         }
         elsif($g_best_config) {
             if ($g_debug) {
-                printLogLn($g_logfile, "DEBUG: Found current origin to read config from");
+                if ($g_method eq 'slony') {
+                    printLogLn($g_logfile, "DEBUG: Found current origin to read config from");
+                }
+                elsif ($g_method eq 'wal') {
+                    printLogLn($g_logfile, "DEBUG: Read config from all contactable nodes");
+                }
             }
             last;
         } 
@@ -186,9 +207,14 @@ sub generateConfig {
     my $target_sets;
     my $target_port = 5432;
     my $target_is_origin;
+    my $target_auth = "";
 
     if ($g_debug) {
-        printLogLn($g_logfile, "DEBUG: All databases = " . ($g_all_databases ? 'true' : 'false'));
+        printLogLn($g_logfile, "DEBUG: All databases = " . ($all_databases ? 'true' : 'false'));
+    }
+
+    if ($g_auth_user ne "") {
+        $target_auth = " auth_user=" . $g_auth_user;
     }
 
     if (open(INFILE, "<", $template)) {
@@ -258,11 +284,11 @@ sub generateConfig {
                         if ($g_debug) {
                             printLogLn ($g_logfile, "DEBUG: Configuration for " . ($target_is_origin ? "origin" : "subscriber") . " of sets $target_sets node #$target_node_id $target_host:$target_port");
                         }
-                        if ($all_databases) {
-                            $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port/;
+                        if ($all_databases || $target_db eq '*') {
+                            $_ =~ s/(\[databases\])/$1\n\* = host=$target_host port=$target_port$target_auth/;
                         }
                         else {
-                            $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db/;
+                            $_ =~ s/(\[databases\])/$1\n$target_db = host=$target_host port=$target_port dbname=$target_db$target_auth/;
                         }
                     }
                     else {
@@ -387,6 +413,7 @@ sub loadCluster {
                     COALESCE((? BETWEEN 1 AND extract(epoch from s.st_lag_time)),false) AS lag_exceeded
                 FROM x 
                 LEFT JOIN $qw_clname.sl_status s ON s.st_received = x.no_id
+               WHERE x.no_conninfo != '<event pending>'
                 )
                 SELECT * FROM z 
                 ORDER BY origin_sets, @(CASE WHEN (host ~ '^[0-9]{1,3}(.[0-9]{1,3}){3}\$') THEN host::inet ELSE '255.255.255.255'::inet END - ?::inet) ASC";
@@ -434,6 +461,113 @@ sub loadCluster {
     return @cluster;
 }
 
+sub loadBinCluster {
+    my $dbuser = shift;
+    my $dbpass = shift;
+
+    my $dsn;
+    my $dbh;
+    my $sth;
+    my $query;
+    my $recovery;
+    my $xlog_location;
+    my $apply_lag;
+    my $primaries = 0;
+
+    my @parts;
+    my $timeline;
+    my $location;
+    my $primary_location;
+
+    my $hostname;
+    my $port;
+    my $database;
+    my @tmp_cluster;
+    my @cluster;
+    my $node_id = 1;
+    $g_best_config = true;
+
+    foreach my $conninfo (@g_conninfos) {
+        $dsn = "DBI:Pg:$conninfo};";
+
+        eval {
+            $dbh = DBI->connect($dsn, $dbuser, $dbpass, {RaiseError => 1});
+            # Check to see if the server is a secondary, and also pull the current xlog
+            # location and the apply lag using pg_last_xact_replay_timestamp to get the
+            # last commit timestamp from the primary applied on the secondary.
+            # We will need to compare the current receive location to the primary xlog 
+            # location, if they differ we can then use the apply_lag value; we'll have 
+            # to post-process this.
+            # In 9.6+ we might want to pull the system identifier from pg_controldata view too
+            $query = "SELECT pg_is_in_recovery(), 
+                          CASE 
+                              WHEN pg_is_in_recovery() THEN pg_last_xlog_receive_location() 
+                              ELSE pg_current_xlog_location() 
+                          END,
+                          COALESCE(extract(epoch from current_timestamp-pg_last_xact_replay_timestamp()),0)";
+            $sth = $dbh->prepare($query);
+            $sth->execute();
+            ($recovery, $xlog_location, $apply_lag) = $sth->fetchrow;
+            $sth->finish;
+
+            ($port) = $conninfo =~ m/port=(.+?)(\s|$)/g; 
+            ($hostname) = $conninfo =~ m/host=(.+?)(\s|$)/g; 
+            ($database) = $conninfo =~ m/dbname=(.+?)(\s|$)/g; 
+            @parts = split('/', $xlog_location, 2);
+            $timeline = qtrim(trim($parts[0]));
+            $location = hex(qtrim(trim($parts[1])));
+
+            if ($g_debug) {
+                printLogLn($g_logfile, "DEBUG: Server: $hostname:$port " . ($recovery ? 'secondary' : 'primary') . " at $xlog_location ($timeline/$location)");
+            }
+
+            # For WAL replication we assume if we can contact a server then it is active,
+            # which isn't strictly true, but nodes that have fallen behind can be excluded
+            # via the max_ro_lag setting.  We also substitute timeline+1 for the slony 
+            # replication set.
+           if (!$recovery) {
+               $primaries++;
+               $primary_location = $xlog_location;
+                my @node=(1,'Primary',0,$timeline,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,1,$xlog_location,$apply_lag);
+                push(@tmp_cluster,  \@node);
+            }
+           else {
+                $node_id++;    
+                my @node=($node_id,"Secondary".($node_id-1),1,undef,"ACTIVE",($timeline+1),$conninfo,$database,$hostname,$port,$node_id,$xlog_location,$apply_lag);
+                push(@tmp_cluster,  \@node);
+            }
+        };
+        if ($@) {
+            printLogLn($g_logfile, "ERROR: Could not connect to server with conninfo: $conninfo DETAIL: $@");
+        }
+    }
+
+    # Error if more than one primary discovered.
+    if ($primaries != 1) {
+        printLogLn($g_logfile, "ERROR: Invalid quantity of primaries: $primaries");
+        die "no primaries found";
+    }
+    # Do the post processing we mentioned above, once we know xlog locations differ
+    # then we can say the apply lag is correct, else it's just there has been no
+    # activity on the master. In the case the xlog locations differ but apply_lag 
+    # is 0 then it means no WAL has been applied since srver start; we don't know
+    # a time for lag in this case.
+    else {
+        foreach (@tmp_cluster) {
+            $apply_lag = false;
+            if ($g_max_lag > 0) {
+                if ((@$_[11] ne $primary_location) && (@$_[12] > $g_max_lag || @$_[12] == 0)) {
+                   $apply_lag  = true;
+                }
+            }
+            my @node=(@$_[0],@$_[1],@$_[2],@$_[3],@$_[4],@$_[5],@$_[6],@$_[7],@$_[8],@$_[9],@$_[10],$apply_lag);
+            push(@cluster,  \@node);
+        }
+    }
+
+    return @cluster;
+}
+
 sub getConfig {
     my @fields;
     my $success = false;
@@ -452,6 +586,9 @@ sub getConfig {
                 @fields = split('=', $_, 2);
                 $value = qtrim(trim($fields[1]));
                 given(lc($fields[0])) {
+                    when(/\breplication_method\b/i) {
+                        $g_method = $value;
+                    }
                     when(/\bdebug\b/i) {
                         $g_debug = checkBoolean($value);
                     }
@@ -461,10 +598,16 @@ sub getConfig {
                     when(/\blog_file\b/i) {
                         $g_logfile = $value;
                     }
-                    when(/\bslony_user\b/i) {
+                    when(/\bslony_user\b/i) { # Depreciated
+                        $g_user = $value;
+                    }
+                    when(/\bslony_pass\b/i) { # Depreciated
+                        $g_pass = $value;
+                    }
+                    when(/\breplication_user\b/i) {
                         $g_user = $value;
                     }
-                    when(/\bslony_pass\b/i) {
+                    when(/\breplication_pass\b/i) {
                         $g_pass = $value;
                     }
                     when(/\bslony_cluster_name\b/i) {
@@ -497,6 +640,9 @@ sub getConfig {
                     when(/\bpool_all_databases\b/i) {
                         $g_all_databases = checkBoolean($value);
                     }
+                    when(/\bauth_user\b/i) {
+                        $g_auth_user = $value;
+                    }
                     when(/\bonly_follow_origins\b/i) {
                         $g_origins_only = checkBoolean($value);
                     }