my $g_host = hostname;
my ($g_addr)=inet_ntoa((gethostbyname(hostname))[4]);
my $g_origins_only = false;
+my $g_best_config = false;
+my $g_max_lag = 0;
die $g_usage unless GetOptions(\%opt, 'config_file|f=s', 'daemon|D',) and keys %opt and ! @ARGV;
printLogLn($g_logfile, "\t Reload command is '$g_reload_command'");
printLogLn($g_logfile, "\t Status stored in '$g_status_file'");
printLogLn($g_logfile, "\t Using local address for '$g_host' as '$g_addr'");
+ if (($g_max_lag > 0) && ($g_mode = 'ro')) {
+ printLogLn($g_logfile, "\t Max lag for read only targets will be $g_max_lag seconds");
+ }
#printLogLn($g_logfile, "\t '$g_user' as '$g_pass'");
}
sub doAll {
my $node_to;
+ my $conninfo_read = 0;
foreach my $conninfo (@g_conninfos) {
+ $conninfo_read++;
eval {
@g_cluster = loadCluster($g_clname, $conninfo, $g_user, $g_pass, $g_addr, $g_clsets);
if ($g_debug) {
- printLogLn($g_logfile, "DEBUG: Cluster with " . scalar(@g_cluster) . " nodes read from conninfo: $conninfo");
+ printLogLn($g_logfile, "DEBUG: ($conninfo_read) Cluster with " . scalar(@g_cluster) . " nodes read from conninfo: $conninfo");
foreach (@g_cluster) {
- printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "<NONE>") . " " . @$_[4] . " " . (@$_[5] // "<NONE>") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " );
+ printLogLn($g_logfile, "DEBUG: Node #" . @$_[0] . " DETAIL: " . @$_[1] . " " . @$_[2] . " " . (@$_[3] // "<NONE>") . " " . @$_[4] . " " . (@$_[5] // "<NONE>") . " " . @$_[6] . " " . @$_[7] . " " . @$_[8] . " " . @$_[9] . " " . @$_[10] . " " . @$_[11]);
}
}
};
if ($@) {
printLogLn($g_logfile, "ERROR: Failed using conninfo: $conninfo DETAIL: $@");
}
- else {
+ elsif($g_best_config) {
+ if ($g_debug) {
+ printLogLn($g_logfile, "DEBUG: Found current origin to read config from");
+ }
last;
}
}
# Try and choose a node; we always assign the origin initially regardless of rw/ro status
# when in ro mode and if we then find a suitable subscriber we'll reassign to it.
foreach my $node (@g_cluster) {
+
+
+ # If the node is lagging anyway skip it
+ if (($g_mode eq 'ro') && ($g_max_lag > 0) && ($node->[11])) {
+ printLogLn ($g_logfile, "Lag on node $node->[0] exceeds $g_max_lag seconds");
+ next;
+ }
+
if ($clsets ne 'all') {
@sets_to_follow = split(',', $clsets);
if (defined($node->[3])) {
my $previous_cluster;
foreach (@g_cluster) {
if (!$g_origins_only || defined($_->[3])) {
- $current_cluster = md5_hex(($current_cluster // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6]);
+ $current_cluster = md5_hex(($current_cluster // "") . $_->[0] . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]);
if ($g_debug) {
- printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6]);
+ printLogLn($g_logfile, "DEBUG: Node " . $_->[0] . " detail = " . $_->[2] . (defined($_->[3]) ? 't' : 'f') . $_->[6] . $_->[11]);
}
}
}
my $qw_clname;
my @cluster;
+ $g_best_config = false;
$dsn = "DBI:Pg:$conninfo};";
eval {
GROUP BY b.sub_provider, a.no_id, a.no_comment, c.pa_conninfo, a.no_active
ORDER BY (COALESCE(b.sub_provider, 0) = 0) DESC, a.no_id ASC
), z AS (
- SELECT *,
+ SELECT x.*,
CASE WHEN x.no_conninfo ilike '%dbname=%' THEN(regexp_matches(x.no_conninfo, E'dbname=(.+?)\\\\M', 'ig'))[1] END AS database,
CASE WHEN x.no_conninfo ilike '%host=%' THEN(regexp_matches(x.no_conninfo, E'host=(.+?)(?=\\\\s|\$)', 'ig'))[1] END AS host,
- CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port
+ CASE WHEN x.no_conninfo ilike '%port=%' THEN(regexp_matches(x.no_conninfo, E'port=(.+?)\\\\M', 'ig'))[1] ELSE '5432' END AS port,
+ (no_id = $qw_clname.getlocalnodeid(?)) AS this_node,
+ COALESCE((? BETWEEN 1 AND extract(epoch from s.st_lag_time)),false) AS lag_exceeded
FROM x
+ LEFT JOIN $qw_clname.sl_status s ON s.st_received = x.no_id
)
SELECT * FROM z
ORDER BY origin_sets, @(CASE WHEN (host ~ '^[0-9]{1,3}(.[0-9]{1,3}){3}\$') THEN host::inet ELSE '255.255.255.255'::inet END - ?::inet) ASC";
if ($g_debug) {
- printLogLn($g_logfile, "DEBUG: " . $query);
+# printLogLn($g_logfile, "DEBUG: " . $query);
}
$sth = $dbh->prepare($query);
}
}
}
- $sth->bind_param($param_on, "_" . $clname);
- $param_on++;
- $sth->bind_param($param_on, "_" . $clname);
+ # This param is taken 3 times
+ for (0..2) {
+ $sth->bind_param($param_on, "_" . $clname);
+ $param_on++;
+ }
+ $sth->bind_param($param_on, $g_max_lag);
$param_on++;
$sth->bind_param($param_on, (isInet($addr) ? $addr : '255.255.255.255'));
$sth->execute();
while (my @node = $sth->fetchrow) {
+ # If some origin sets exist for this node row (we can assume they're the sets we're following since they're filtered in the query)
+ # and the row is flagged as this_node then we have found the best node to read the configuration from.
+ if (defined($node[3]) && $node[10]) {
+ $g_best_config = true;
+ }
push(@cluster, \@node);
}
when(/\bonly_follow_origins\b/i) {
$g_origins_only = checkBoolean($value);
}
+ when(/\bmax_ro_lag\b/i) {
+ $g_max_lag = checkInteger($value);
+ }
}
}
}