3 * $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.141.2.22 2009/10/02 07:53:08 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2009 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * pool_process_query.c: query processing stuff
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
30 #ifdef HAVE_SYS_TIME_H
33 #ifdef HAVE_SYS_SELECT_H
34 #include <sys/select.h>
41 #include <netinet/in.h>
45 #include "pool_signal.h"
46 #include "pool_proto_modules.h"
49 #define FD_SETSIZE 512
52 #define INIT_STATEMENT_LIST_SIZE 8
54 #define ACTIVE_SQL_TRANSACTION_ERROR_CODE "25001" /* SET TRANSACTION ISOLATION LEVEL must be called before any query */
55 #define DEADLOCK_ERROR_CODE "40P01"
56 #define SERIALIZATION_FAIL_ERROR_CODE "40001"
57 #define QUERY_CANCEL_ERROR_CODE "57014"
58 #define ADMIN_SHUTDOWN_ERROR_CODE "57P01"
59 #define CRASH_SHUTDOWN_ERROR_CODE "57P02"
61 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
62 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, int protoMajor, int pid, int key, int no_ready_for_query);
63 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major);
64 static char *get_insert_command_table_name(InsertStmt *node);
65 static void reset_prepared_list(PreparedStatementList *p);
66 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
67 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
68 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend);
69 static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len);
70 static int extract_ntuples(char *message);
71 static int detect_error(POOL_CONNECTION *master, char *error_code, int major, char class, bool unread);
72 static int detect_postmaster_down_error(POOL_CONNECTION *master, int major);
73 static void free_select_result(POOL_SELECT_RESULT *result);
75 static bool is_internal_transaction_needed(Node *node);
76 static int compare(const void *p1, const void *p2);
78 /* timeout sec for pool_check_fd */
79 static int timeoutsec;
81 int in_load_balance; /* non 0 if in load balance mode */
82 int selected_slot; /* selected DB node */
83 int master_slave_dml; /* non 0 if master/slave mode is specified in config file */
86 * main module for query processing
88 POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
89 POOL_CONNECTION_POOL *backend,
91 int first_ready_for_query_received)
93 char kind; /* packet kind (backend) */
94 char fkind; /* packet kind (frontend) */
101 int state; /* 0: ok to issue commands 1: waiting for "ready for query" response */
105 frontend->no_forward = connection_reuse;
114 if (state == 0 && connection_reuse)
118 /* send query for resetting connection such as "ROLLBACK" "RESET ALL"... */
119 st = reset_backend(backend, qcnt);
121 if (st < 0) /* error? */
123 /* probably we don't need this, since caller will
124 * close the connection to frontend after returning with POOL_END. But I
125 * guess I would like to be a paranoid...
127 frontend->no_forward = 0;
131 else if (st == 0) /* no query issued? */
137 else if (st == 1) /* more query remains */
144 else /* no more query(st == 2) */
146 TSTATE(backend) = 'I';
147 frontend->no_forward = 0;
148 return POOL_CONTINUE;
154 * if all backends do not have any pending data in the
155 * receiving data cache, then issue select(2) to wait for new
158 if (is_cache_empty(frontend, backend))
160 struct timeval timeoutdata;
161 struct timeval *timeout;
162 int num_fds, was_error = 0;
165 * frontend idle counters. depends on the following
166 * select(2) call's time out is 1 second.
168 int idle_count = 0; /* for other than in recovery */
169 int idle_count_in_recovery = 0; /* for in recovery */
174 FD_ZERO(&exceptmask);
179 * Do not read a message from frontend while backends process a query.
181 if (!connection_reuse && !in_progress)
183 FD_SET(frontend->fd, &readmask);
184 FD_SET(frontend->fd, &exceptmask);
185 num_fds = Max(frontend->fd + 1, num_fds);
189 * If we are in load balance mode and the selected node is
190 * down, we need to re-select load_balancing_node. Note
191 * that we cannnot use VALID_BACKEND macro here. If
192 * in_load_balance == 1, VALID_BACKEND macro may return 0.
194 if (pool_config->load_balance_mode &&
195 BACKEND_INFO(backend->info->load_balancing_node).backend_status == CON_DOWN)
197 /* select load balancing node */
198 backend->info->load_balancing_node = select_load_balancing_node();
201 for (i=0;i<NUM_BACKENDS;i++)
203 if (VALID_BACKEND(i))
205 num_fds = Max(CONNECTION(backend, i)->fd + 1, num_fds);
206 FD_SET(CONNECTION(backend, i)->fd, &readmask);
207 FD_SET(CONNECTION(backend, i)->fd, &exceptmask);
212 * wait for data arriving from frontend and backend
214 if (pool_config->client_idle_limit > 0 ||
215 pool_config->client_idle_limit_in_recovery > 0)
217 timeoutdata.tv_sec = 1;
218 timeoutdata.tv_usec = 0;
219 timeout = &timeoutdata;
224 fds = select(num_fds, &readmask, &writemask, &exceptmask, timeout);
231 pool_error("select() failed. reason: %s", strerror(errno));
238 if (*InRecovery == 0 && pool_config->client_idle_limit > 0)
242 if (idle_count > pool_config->client_idle_limit)
244 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit(%d) reached", pool_config->client_idle_limit);
248 else if (*InRecovery > 0 && pool_config->client_idle_limit_in_recovery > 0)
250 idle_count_in_recovery++;
252 if (idle_count_in_recovery > pool_config->client_idle_limit_in_recovery)
254 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit_in_recovery(%d) reached", pool_config->client_idle_limit_in_recovery);
261 for (i = 0; i < NUM_BACKENDS; i++)
263 if (VALID_BACKEND(i))
266 * make sure that connection slot exists
268 if (CONNECTION_SLOT(backend, i) == 0)
270 pool_log("FATAL ERROR: VALID_BACKEND returns non 0 but connection slot is empty. backend id:%d RAW_MODE:%d in_load_balance:%d LOAD_BALANCE_STATUS:%d status:%d",
271 i, RAW_MODE, in_load_balance, LOAD_BALANCE_STATUS(i), BACKEND_INFO(i).backend_status);
276 if (FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
279 * admin shutdown postmaster or postmaster goes down
281 if (detect_postmaster_down_error(CONNECTION(backend, i), MAJOR(backend)) == SPECIFIED_ERROR)
283 /* detach backend node. */
285 if (!VALID_BACKEND(i))
287 notice_backend_error(i);
291 status = read_kind_from_backend(frontend, backend, &kind);
292 if (status != POOL_CONTINUE)
302 if (!connection_reuse && !in_progress)
304 if (FD_ISSET(frontend->fd, &exceptmask))
306 else if (FD_ISSET(frontend->fd, &readmask))
308 status = ProcessFrontendResponse(frontend, backend);
309 if (status != POOL_CONTINUE)
318 if (FD_ISSET(MASTER(backend)->fd, &exceptmask))
325 if (frontend->len > 0 && !in_progress)
327 status = ProcessFrontendResponse(frontend, backend);
328 if (status != POOL_CONTINUE)
335 /* this is the synchronous point */
338 status = read_kind_from_backend(frontend, backend, &kind);
339 if (status != POOL_CONTINUE)
343 /* reload config file */
346 pool_get_config(get_config_file_name(), RELOAD_CONFIG);
347 if (pool_config->enable_pool_hba)
348 load_hba(get_hba_file_name());
349 if (pool_config->parallel_mode)
350 pool_memset_system_db_info(system_db_info->info);
354 first_ready_for_query_received = 0;
357 * Process backend Response
365 pool_error("pool_process_query: kind is 0!");
369 pool_debug("pool_process_query: kind from backend: %c", kind);
371 if (MAJOR(backend) == PROTO_MAJOR_V3)
376 /* CopyIn response */
377 status = CopyInResponse(frontend, backend);
380 /* Parameter Status */
381 status = ParameterStatus(frontend, backend);
384 /* Ready for query */
385 status = ReadyForQuery(frontend, backend, 1);
388 status = SimpleForwardToFrontend(kind, frontend, backend);
389 if (pool_flush(frontend))
399 /* Notification response */
400 status = NotificationResponse(frontend, backend);
405 status = BinaryRow(frontend, backend, num_fields);
409 /* Complete command response */
410 status = CompleteCommandResponse(frontend, backend);
415 status = AsciiRow(frontend, backend, num_fields);
420 status = ErrorResponse(frontend, backend);
424 /* CopyIn Response */
425 status = CopyInResponse(frontend, backend);
429 /* CopyOut Response */
430 status = CopyOutResponse(frontend, backend);
434 /* Empty Query Response */
435 status = EmptyQueryResponse(frontend, backend);
439 /* Notice Response */
440 status = NoticeResponse(frontend, backend);
445 status = CursorResponse(frontend, backend);
450 status = RowDescription(frontend, backend, &num_fields);
454 /* FunctionResultResponse and FunctionVoidResponse */
455 status = FunctionResultResponse(frontend, backend);
459 /* Ready for query */
460 status = ReadyForQuery(frontend, backend, 1);
464 pool_error("Unknown message type %c(%02x)", kind, kind);
469 if (status != POOL_CONTINUE)
472 if (kind == 'Z' && frontend->no_forward && state == 1)
478 return POOL_CONTINUE;
483 * set_fd,isset_fs,zero_fd are used
484 * for check fd in parallel mode
487 /* used only in pool_parallel_exec */
488 #define BITS (8 * sizeof(long int))
490 static void set_fd(unsigned long fd ,unsigned long *setp)
492 unsigned long tmp = fd / FD_SETSIZE;
493 unsigned long rem = fd % FD_SETSIZE;
494 setp[tmp] |= (1UL<<rem);
497 /* used only in pool_parallel_exec */
498 static int isset_fd(unsigned long fd, unsigned long *setp)
500 unsigned long tmp = fd / FD_SETSIZE;
501 unsigned long rem = fd % FD_SETSIZE;
502 return (setp[tmp] & (1UL<<rem)) != 0;
505 /* used only in pool_parallel_exec */
506 static void zero_fd(unsigned long *setp)
508 unsigned long *tmp = setp;
509 int i = FD_SETSIZE / BITS;
519 * This function transmits to a parallel Query, and does processing
520 * that receives the result to each back end.
522 POOL_STATUS pool_parallel_exec(POOL_CONNECTION *frontend,
523 POOL_CONNECTION_POOL *backend, char *string,
524 Node *node,bool send_to_frontend)
533 unsigned long donemask[FD_SETSIZE / BITS];
534 static char *sq = "show pool_status";
536 struct timeval timeout;
540 unsigned long datacount = 0;
545 len = strlen(string) + 1;
547 if (is_drop_database(node))
549 int stime = 5; /* XXX give arbitrary time to allow closing idle connections */
551 pool_debug("Query: sending HUP signal to parent");
553 kill(getppid(), SIGHUP); /* send HUP signal to parent */
555 /* we need to loop over here since we will get HUP signal while sleeping */
557 stime = sleep(stime);
560 /* process status reporting? */
561 if (strncasecmp(sq, string, strlen(sq)) == 0)
563 pool_debug("process reporting");
564 process_reporting(frontend, backend);
566 return POOL_CONTINUE;
569 /* In this loop,forward the query to the all backends */
570 for (i=0;i<NUM_BACKENDS;i++)
572 if (!VALID_BACKEND(i))
575 pool_write(CONNECTION(backend, i), "Q", 1);
577 if (MAJOR(backend) == PROTO_MAJOR_V3)
579 int sendlen = htonl(len + 4);
580 pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen));
583 if (pool_write_and_flush(CONNECTION(backend, i), string, len) < 0)
589 * in "strict mode" we need to wait for backend completing the query.
590 * note that this is not applied if "NO STRICT" is specified as a comment.
592 if (is_strict_query(node))
594 pool_debug("waiting for backend %d completing the query", i);
595 if (synchronize(CONNECTION(backend, i)))
600 if (!is_cache_empty(frontend, backend))
607 /* In this loop, receive data from the all backends and send data to frontend */
612 FD_ZERO(&exceptmask);
615 for (i=0;i<NUM_BACKENDS;i++)
617 if (VALID_BACKEND(i))
619 int fd = CONNECTION(backend,i)->fd;
620 num_fds = Max(fd + 1, num_fds);
621 if(!isset_fd(fd,donemask))
623 FD_SET(fd, &readmask);
624 FD_SET(fd, &exceptmask);
625 pool_debug("pool_parallel_query: %d th FD_SET: %d",i, CONNECTION(backend, i)->fd);
630 pool_debug("pool_parallel_query: num_fds: %d", num_fds);
632 fds = select(num_fds, &readmask, &writemask, &exceptmask, NULL);
639 pool_error("select() failed. reason: %s", strerror(errno));
645 return POOL_CONTINUE;
648 /* get header of protocol */
649 for (i=0;i<NUM_BACKENDS;i++)
651 if (!VALID_BACKEND(i) ||
652 !FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
658 status = read_kind_from_one_backend(frontend, backend, &kind,i);
659 if (status != POOL_CONTINUE)
664 status = ParallelForwardToFrontend(kind,
666 CONNECTION(backend, i),
667 backend->info->database,
669 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
673 status = ParallelForwardToFrontend(kind,
675 CONNECTION(backend, i),
676 backend->info->database,
678 pool_debug("pool_parallel_exec: dummy kind from backend: %c", kind);
681 if (status != POOL_CONTINUE)
684 if(kind == 'C' || kind == 'E' || kind == 'c')
686 if(used_count == NUM_BACKENDS -1)
687 return POOL_CONTINUE;
690 set_fd(CONNECTION(backend, i)->fd, donemask);
694 /* get body of protocol */
697 if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
699 pool_error("pool_parallel_exec: failed to read kind from %d th backend", i);
708 pool_error("pool_parallel_exec: kind is 0!");
713 used_count != NUM_BACKENDS -1)
717 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
719 status = ParallelForwardToFrontend(kind,
721 CONNECTION(backend, i),
722 backend->info->database,
726 pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
727 status = ParallelForwardToFrontend(kind,
729 CONNECTION(backend, i),
730 backend->info->database,
734 set_fd(CONNECTION(backend, i)->fd, donemask);
738 if((kind == 'c' || kind == 'C') &&
739 used_count != NUM_BACKENDS -1)
741 pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
742 status = ParallelForwardToFrontend(kind,
744 CONNECTION(backend, i),
745 backend->info->database,
748 set_fd(CONNECTION(backend, i)->fd, donemask);
751 if((kind == 'C' || kind == 'c' || kind == 'E') &&
752 used_count == NUM_BACKENDS -1)
754 pool_debug("pool_parallel_exec: kind from backend: D %lu", datacount);
758 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
759 status = ParallelForwardToFrontend(kind,
761 CONNECTION(backend, i),
762 backend->info->database,
765 pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
766 status = ParallelForwardToFrontend(kind,
768 CONNECTION(backend, i),
769 backend->info->database,
772 return POOL_CONTINUE;
778 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
780 status = ParallelForwardToFrontend(kind,
782 CONNECTION(backend, i),
783 backend->info->database,
786 if (status != POOL_CONTINUE)
792 pool_flush(frontend);
803 * send SimpleQuery message to a node.
805 POOL_STATUS send_simplequery_message(POOL_CONNECTION *backend, int len, char *string, int major)
807 /* forward the query to the backend */
808 pool_write(backend, "Q", 1);
810 if (major == PROTO_MAJOR_V3)
812 int sendlen = htonl(len + 4);
813 pool_write(backend, &sendlen, sizeof(sendlen));
816 if (pool_write_and_flush(backend, string, len) < 0)
821 return POOL_CONTINUE;
825 * Wait for query response from single node. This checks frontend
826 * connection by writing dummy parameter status packet every 30
827 * seccond, and if the connection broke, returns error since there's
828 * no point in that waiting until backend returns response.
830 POOL_STATUS wait_for_query_response(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *string, int protoVersion)
832 #define DUMMY_PARAMETER "pgpool_dummy_param"
833 #define DUMMY_VALUE "pgpool_dummy_value"
838 pool_debug("wait_for_query_response: waiting for backend %d completing the query", backend->db_node_id);
842 /* Check to see if data from backend is ready */
843 pool_set_timeout(30);
844 status = pool_check_fd(backend);
847 if (status < 0) /* error ? */
849 pool_error("wait_for_query_response: backend error occured while waiting for backend response");
852 else if (status > 0) /* data is not ready */
854 if (protoVersion == PROTO_MAJOR_V3)
856 /* Write dummy parameter staus packet to check if the socket to frontend is ok */
857 if (pool_write(frontend, "S", 1) < 0)
859 plen = sizeof(DUMMY_PARAMETER)+sizeof(DUMMY_VALUE)+sizeof(plen);
861 if (pool_write(frontend, &plen, sizeof(plen)) < 0)
863 if (pool_write(frontend, DUMMY_PARAMETER, sizeof(DUMMY_PARAMETER)) < 0)
865 if (pool_write(frontend, DUMMY_VALUE, sizeof(DUMMY_VALUE)) < 0)
867 if (pool_flush_it(frontend) < 0)
869 pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
873 } else /* Protocol version 2 */
876 * If you want to monitor client connection even if you are using V2 protocol,
879 #undef SEND_NOTICE_ON_PROTO2
880 #ifdef SEND_NOTICE_ON_PROTO2
881 static char *notice_message = {"keep alive checking from pgpool-II"};
883 /* Write notice message packet to check if the socket to frontend is ok */
884 if (pool_write(frontend, "N", 1) < 0)
886 if (pool_write(frontend, notice_message, strlen(notice_message)+1) < 0)
888 if (pool_flush_it(frontend) < 0)
890 pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
900 return POOL_CONTINUE;
905 * Extended query protocol has to send Flush message.
907 POOL_STATUS send_extended_protocol_message(POOL_CONNECTION_POOL *backend,
908 int node_id, char *kind,
909 int len, char *string)
911 POOL_CONNECTION *cp = CONNECTION(backend, node_id);
914 /* forward the query to the backend */
915 pool_write(cp, kind, 1);
916 sendlen = htonl(len + 4);
917 pool_write(cp, &sendlen, sizeof(sendlen));
918 pool_write(cp, string, len);
921 * send "Flush" message so that backend notices us
922 * the completion of the command
924 pool_write(cp, "H", 1);
926 if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
931 return POOL_CONTINUE;
934 POOL_STATUS send_execute_message(POOL_CONNECTION_POOL *backend,
935 int node_id, int len, char *string)
937 return send_extended_protocol_message(backend, node_id, "E", len, string);
941 * wait until read data is ready
943 int synchronize(POOL_CONNECTION *cp)
945 return pool_check_fd(cp);
949 * set timeout in seconds for pool_check_fd
950 * if timeoutval < 0, we assume no timeout(wait forever).
952 void pool_set_timeout(int timeoutval)
955 timeoutsec = timeoutval;
961 * Wait until read data is ready.
962 * return values: 0: normal 1: data is not ready -1: error
964 int pool_check_fd(POOL_CONNECTION *cp)
970 struct timeval timeout;
971 struct timeval *timeoutp;
977 timeout.tv_sec = timeoutsec;
987 FD_ZERO(&exceptmask);
988 FD_SET(fd, &readmask);
989 FD_SET(fd, &exceptmask);
991 fds = select(fd+1, &readmask, NULL, &exceptmask, timeoutp);
994 if (errno == EAGAIN || errno == EINTR)
997 pool_error("pool_check_fd: select() failed. reason %s", strerror(errno));
1000 else if (fds == 0) /* timeout */
1003 if (FD_ISSET(fd, &exceptmask))
1005 pool_error("pool_check_fd: exception occurred");
1014 * Process "show pool_status" query.
1016 void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1018 static char *cursorname = "blank";
1019 static short num_fields = 3;
1020 static char *field_names[] = {"item", "value", "description"};
1022 static short fsize = -1;
1030 static unsigned char nullmap[2] = {0xff, 0xff};
1031 int nbytes = (num_fields + 7)/8;
1033 #define POOLCONFIG_MAXNAMELEN 32
1034 #define POOLCONFIG_MAXVALLEN 512
1035 #define POOLCONFIG_MAXDESCLEN 64
1038 char name[POOLCONFIG_MAXNAMELEN+1];
1039 char value[POOLCONFIG_MAXVALLEN+1];
1040 char desc[POOLCONFIG_MAXDESCLEN+1];
1041 } POOL_REPORT_STATUS;
1043 #define MAXITEMS 128
1045 POOL_REPORT_STATUS status[MAXITEMS];
1053 strncpy(status[i].name, "listen_addresses", POOLCONFIG_MAXNAMELEN);
1054 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->listen_addresses);
1055 strncpy(status[i].desc, "host name(s) or IP address(es) to listen to", POOLCONFIG_MAXDESCLEN);
1058 strncpy(status[i].name, "port", POOLCONFIG_MAXNAMELEN);
1059 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->port);
1060 strncpy(status[i].desc, "pgpool accepting port number", POOLCONFIG_MAXDESCLEN);
1063 strncpy(status[i].name, "socket_dir", POOLCONFIG_MAXNAMELEN);
1064 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->socket_dir);
1065 strncpy(status[i].desc, "pgpool socket directory", POOLCONFIG_MAXDESCLEN);
1068 strncpy(status[i].name, "num_init_children", POOLCONFIG_MAXNAMELEN);
1069 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_init_children);
1070 strncpy(status[i].desc, "# of children initially pre-forked", POOLCONFIG_MAXDESCLEN);
1073 strncpy(status[i].name, "child_life_time", POOLCONFIG_MAXNAMELEN);
1074 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_life_time);
1075 strncpy(status[i].desc, "if idle for this seconds, child exits", POOLCONFIG_MAXDESCLEN);
1078 strncpy(status[i].name, "connection_life_time", POOLCONFIG_MAXNAMELEN);
1079 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_life_time);
1080 strncpy(status[i].desc, "if idle for this seconds, connection closes", POOLCONFIG_MAXDESCLEN);
1083 strncpy(status[i].name, "client_idle_limit", POOLCONFIG_MAXNAMELEN);
1084 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit);
1085 strncpy(status[i].desc, "if idle for this seconds, child connection closes", POOLCONFIG_MAXDESCLEN);
1088 strncpy(status[i].name, "child_max_connections", POOLCONFIG_MAXNAMELEN);
1089 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_max_connections);
1090 strncpy(status[i].desc, "if max_connections received, chile exits", POOLCONFIG_MAXDESCLEN);
1093 strncpy(status[i].name, "max_pool", POOLCONFIG_MAXNAMELEN);
1094 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->max_pool);
1095 strncpy(status[i].desc, "max # of connection pool per child", POOLCONFIG_MAXDESCLEN);
1098 strncpy(status[i].name, "authentication_timeout", POOLCONFIG_MAXNAMELEN);
1099 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->authentication_timeout);
1100 strncpy(status[i].desc, "maximum time in seconds to complete client authentication", POOLCONFIG_MAXNAMELEN);
1103 strncpy(status[i].name, "logdir", POOLCONFIG_MAXNAMELEN);
1104 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->logdir);
1105 strncpy(status[i].desc, "logging directory", POOLCONFIG_MAXDESCLEN);
1108 strncpy(status[i].name, "pid_file_name", POOLCONFIG_MAXNAMELEN);
1109 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pid_file_name);
1110 strncpy(status[i].desc, "path to pid file", POOLCONFIG_MAXDESCLEN);
1113 strncpy(status[i].name, "backend_socket_dir", POOLCONFIG_MAXNAMELEN);
1114 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->backend_socket_dir);
1115 strncpy(status[i].desc, "Unix domain socket directory for the PostgreSQL server", POOLCONFIG_MAXDESCLEN);
1118 strncpy(status[i].name, "replication_mode", POOLCONFIG_MAXNAMELEN);
1119 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_mode);
1120 strncpy(status[i].desc, "non 0 if operating in replication mode", POOLCONFIG_MAXDESCLEN);
1123 strncpy(status[i].name, "load_balance_mode", POOLCONFIG_MAXNAMELEN);
1124 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->load_balance_mode);
1125 strncpy(status[i].desc, "non 0 if operating in load balancing mode", POOLCONFIG_MAXDESCLEN);
1128 strncpy(status[i].name, "replication_stop_on_mismatch", POOLCONFIG_MAXNAMELEN);
1129 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_stop_on_mismatch);
1130 strncpy(status[i].desc, "stop replication mode on fatal error", POOLCONFIG_MAXDESCLEN);
1133 strncpy(status[i].name, "replicate_select", POOLCONFIG_MAXNAMELEN);
1134 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replicate_select);
1135 strncpy(status[i].desc, "non 0 if SELECT statement is replicated", POOLCONFIG_MAXDESCLEN);
1138 strncpy(status[i].name, "reset_query_list", POOLCONFIG_MAXNAMELEN);
1139 *(status[i].value) = '\0';
1140 for (j=0;j<pool_config->num_reset_queries;j++)
1143 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1144 strncat(status[i].value, pool_config->reset_query_list[j], len);
1145 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1146 strncat(status[i].value, ";", len);
1148 strncpy(status[i].desc, "queries issued at the end of session", POOLCONFIG_MAXDESCLEN);
1151 strncpy(status[i].name, "print_timestamp", POOLCONFIG_MAXNAMELEN);
1152 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->print_timestamp);
1153 strncpy(status[i].desc, "if true print time stamp to each log line", POOLCONFIG_MAXDESCLEN);
1156 strncpy(status[i].name, "master_slave_mode", POOLCONFIG_MAXNAMELEN);
1157 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_mode);
1158 strncpy(status[i].desc, "if true, operate in master/slave mode", POOLCONFIG_MAXDESCLEN);
1161 strncpy(status[i].name, "connection_cache", POOLCONFIG_MAXNAMELEN);
1162 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_cache);
1163 strncpy(status[i].desc, "if true, cache connection pool", POOLCONFIG_MAXDESCLEN);
1166 strncpy(status[i].name, "health_check_timeout", POOLCONFIG_MAXNAMELEN);
1167 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_timeout);
1168 strncpy(status[i].desc, "health check timeout", POOLCONFIG_MAXDESCLEN);
1171 strncpy(status[i].name, "health_check_period", POOLCONFIG_MAXNAMELEN);
1172 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_period);
1173 strncpy(status[i].desc, "health check period", POOLCONFIG_MAXDESCLEN);
1176 strncpy(status[i].name, "health_check_user", POOLCONFIG_MAXNAMELEN);
1177 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->health_check_user);
1178 strncpy(status[i].desc, "health check user", POOLCONFIG_MAXDESCLEN);
1181 strncpy(status[i].name, "failover_command", POOLCONFIG_MAXNAMELEN);
1182 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1183 strncpy(status[i].desc, "failover command", POOLCONFIG_MAXDESCLEN);
1186 strncpy(status[i].name, "failback_command", POOLCONFIG_MAXNAMELEN);
1187 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1188 strncpy(status[i].desc, "failback command", POOLCONFIG_MAXDESCLEN);
1191 strncpy(status[i].name, "insert_lock", POOLCONFIG_MAXNAMELEN);
1192 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->insert_lock);
1193 strncpy(status[i].desc, "insert lock", POOLCONFIG_MAXDESCLEN);
1196 strncpy(status[i].name, "ignore_leading_white_space", POOLCONFIG_MAXNAMELEN);
1197 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->ignore_leading_white_space);
1198 strncpy(status[i].desc, "ignore leading white spaces", POOLCONFIG_MAXDESCLEN);
1201 strncpy(status[i].name, "replication_enabled", POOLCONFIG_MAXNAMELEN);
1202 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_enabled);
1203 strncpy(status[i].desc, "non 0 if actually operating in replication mode", POOLCONFIG_MAXDESCLEN);
1206 strncpy(status[i].name, "master_slave_enabled", POOLCONFIG_MAXNAMELEN);
1207 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_enabled);
1208 strncpy(status[i].desc, "non 0 if actually operating in master/slave", POOLCONFIG_MAXDESCLEN);
1211 strncpy(status[i].name, "num_reset_queries", POOLCONFIG_MAXNAMELEN);
1212 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_reset_queries);
1213 strncpy(status[i].desc, "number of queries in reset_query_list", POOLCONFIG_MAXDESCLEN);
1216 strncpy(status[i].name, "pcp_port", POOLCONFIG_MAXNAMELEN);
1217 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_port);
1218 strncpy(status[i].desc, "PCP port # to bind", POOLCONFIG_MAXDESCLEN);
1221 strncpy(status[i].name, "pcp_socket_dir", POOLCONFIG_MAXNAMELEN);
1222 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pcp_socket_dir);
1223 strncpy(status[i].desc, "PCP socket directory", POOLCONFIG_MAXDESCLEN);
1226 strncpy(status[i].name, "pcp_timeout", POOLCONFIG_MAXNAMELEN);
1227 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_timeout);
1228 strncpy(status[i].desc, "PCP timeout for an idle client", POOLCONFIG_MAXDESCLEN);
1231 strncpy(status[i].name, "log_statement", POOLCONFIG_MAXNAMELEN);
1232 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_statement);
1233 strncpy(status[i].desc, "if non 0, logs all SQL statements", POOLCONFIG_MAXDESCLEN);
1236 strncpy(status[i].name, "log_connections", POOLCONFIG_MAXNAMELEN);
1237 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_connections);
1238 strncpy(status[i].desc, "if true, print incoming connections to the log", POOLCONFIG_MAXDESCLEN);
1241 strncpy(status[i].name, "log_hostname", POOLCONFIG_MAXNAMELEN);
1242 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_hostname);
1243 strncpy(status[i].desc, "if true, resolve hostname for ps and log print", POOLCONFIG_MAXDESCLEN);
1246 strncpy(status[i].name, "enable_pool_hba", POOLCONFIG_MAXNAMELEN);
1247 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_pool_hba);
1248 strncpy(status[i].desc, "if true, use pool_hba.conf for client authentication", POOLCONFIG_MAXDESCLEN);
1251 strncpy(status[i].name, "recovery_user", POOLCONFIG_MAXNAMELEN);
1252 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_user);
1253 strncpy(status[i].desc, "online recovery user", POOLCONFIG_MAXDESCLEN);
1256 strncpy(status[i].name, "recovery_password", POOLCONFIG_MAXNAMELEN);
1257 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_password);
1258 strncpy(status[i].desc, "online recovery password", POOLCONFIG_MAXDESCLEN);
1261 strncpy(status[i].name, "recovery_1st_stage_command", POOLCONFIG_MAXNAMELEN);
1262 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_1st_stage_command);
1263 strncpy(status[i].desc, "execute a command in first stage.", POOLCONFIG_MAXDESCLEN);
1266 strncpy(status[i].name, "recovery_2nd_stage_command", POOLCONFIG_MAXNAMELEN);
1267 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_2nd_stage_command);
1268 strncpy(status[i].desc, "execute a command in second stage.", POOLCONFIG_MAXDESCLEN);
1271 strncpy(status[i].name, "recovery_timeout", POOLCONFIG_MAXNAMELEN);
1272 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->recovery_timeout);
1273 strncpy(status[i].desc, "max time in seconds to wait for the recovering node's postmaster", POOLCONFIG_MAXDESCLEN);
1276 strncpy(status[i].name, "client_idle_limit_in_recovery", POOLCONFIG_MAXNAMELEN);
1277 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit_in_recovery);
1278 strncpy(status[i].desc, "if idle for this seconds, child connection closes in recovery 2nd statge", POOLCONFIG_MAXDESCLEN);
1281 strncpy(status[i].name, "parallel_mode", POOLCONFIG_MAXNAMELEN);
1282 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->parallel_mode);
1283 strncpy(status[i].desc, "if non 0, run in parallel query mode", POOLCONFIG_MAXDESCLEN);
1286 strncpy(status[i].name, "enable_query_cache", POOLCONFIG_MAXNAMELEN);
1287 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_query_cache);
1288 strncpy(status[i].desc, "if non 0, use query cache", POOLCONFIG_MAXDESCLEN);
1291 strncpy(status[i].name, "pgpool2_hostname", POOLCONFIG_MAXNAMELEN);
1292 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pgpool2_hostname);
1293 strncpy(status[i].desc, "pgpool2 hostname", POOLCONFIG_MAXDESCLEN);
1296 strncpy(status[i].name, "system_db_hostname", POOLCONFIG_MAXNAMELEN);
1297 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_hostname);
1298 strncpy(status[i].desc, "system DB hostname", POOLCONFIG_MAXDESCLEN);
1301 strncpy(status[i].name, "system_db_port", POOLCONFIG_MAXNAMELEN);
1302 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->system_db_port);
1303 strncpy(status[i].desc, "system DB port number", POOLCONFIG_MAXDESCLEN);
1306 strncpy(status[i].name, "system_db_dbname", POOLCONFIG_MAXNAMELEN);
1307 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_dbname);
1308 strncpy(status[i].desc, "system DB name", POOLCONFIG_MAXDESCLEN);
1311 strncpy(status[i].name, "system_db_schema", POOLCONFIG_MAXNAMELEN);
1312 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_schema);
1313 strncpy(status[i].desc, "system DB schema name", POOLCONFIG_MAXDESCLEN);
1316 strncpy(status[i].name, "system_db_user", POOLCONFIG_MAXNAMELEN);
1317 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_user);
1318 strncpy(status[i].desc, "user name to access system DB", POOLCONFIG_MAXDESCLEN);
1321 strncpy(status[i].name, "system_db_password", POOLCONFIG_MAXNAMELEN);
1322 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_password);
1323 strncpy(status[i].desc, "password to access system DB", POOLCONFIG_MAXDESCLEN);
1326 for (j = 0; j < NUM_BACKENDS; j++)
1328 if (BACKEND_INFO(j).backend_port == 0)
1331 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_hostname%d", j);
1332 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", BACKEND_INFO(j).backend_hostname);
1333 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d hostname", j);
1336 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_port%d", j);
1337 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_port);
1338 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d port number", j);
1341 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_weight%d", j);
1342 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%f", BACKEND_INFO(j).backend_weight);
1343 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "weight of backend #%d", j);
1346 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend status%d", j);
1347 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_status);
1348 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "status of backend #%d", j);
1354 if (MAJOR(backend) == PROTO_MAJOR_V2)
1356 /* cursor response */
1357 pool_write(frontend, "P", 1);
1358 pool_write(frontend, cursorname, strlen(cursorname)+1);
1361 /* row description */
1362 pool_write(frontend, "T", 1);
1364 if (MAJOR(backend) == PROTO_MAJOR_V3)
1366 len = sizeof(num_fields) + sizeof(len);
1368 for (i=0;i<num_fields;i++)
1370 char *f = field_names[i];
1373 len += sizeof(colnum);
1381 pool_write(frontend, &len, sizeof(len));
1384 n = htons(num_fields);
1385 pool_write(frontend, &n, sizeof(short));
1387 for (i=0;i<num_fields;i++)
1389 char *f = field_names[i];
1391 pool_write(frontend, f, strlen(f)+1); /* field name */
1393 if (MAJOR(backend) == PROTO_MAJOR_V3)
1395 pool_write(frontend, &oid, sizeof(oid)); /* table oid */
1397 pool_write(frontend, &colnum, sizeof(colnum)); /* column number */
1400 pool_write(frontend, &oid, sizeof(oid)); /* data type oid */
1402 pool_write(frontend, &s, sizeof(fsize)); /* field size */
1403 pool_write(frontend, &mod, sizeof(mod)); /* modifier */
1405 if (MAJOR(backend) == PROTO_MAJOR_V3)
1408 pool_write(frontend, &s, sizeof(fsize)); /* field format (text) */
1411 pool_flush(frontend);
1413 if (MAJOR(backend) == PROTO_MAJOR_V2)
1416 for (i=0;i<nrows;i++)
1418 pool_write(frontend, "D", 1);
1419 pool_write_and_flush(frontend, nullmap, nbytes);
1421 size = strlen(status[i].name);
1422 hsize = htonl(size+4);
1423 pool_write(frontend, &hsize, sizeof(hsize));
1424 pool_write(frontend, status[i].name, size);
1426 size = strlen(status[i].value);
1427 hsize = htonl(size+4);
1428 pool_write(frontend, &hsize, sizeof(hsize));
1429 pool_write(frontend, status[i].value, size);
1431 size = strlen(status[i].desc);
1432 hsize = htonl(size+4);
1433 pool_write(frontend, &hsize, sizeof(hsize));
1434 pool_write(frontend, status[i].desc, size);
1440 for (i=0;i<nrows;i++)
1442 pool_write(frontend, "D", 1);
1443 len = sizeof(len) + sizeof(nrows);
1444 len += sizeof(int) + strlen(status[i].name);
1445 len += sizeof(int) + strlen(status[i].value);
1446 len += sizeof(int) + strlen(status[i].desc);
1448 pool_write(frontend, &len, sizeof(len));
1450 pool_write(frontend, &s, sizeof(s));
1452 len = htonl(strlen(status[i].name));
1453 pool_write(frontend, &len, sizeof(len));
1454 pool_write(frontend, status[i].name, strlen(status[i].name));
1456 len = htonl(strlen(status[i].value));
1457 pool_write(frontend, &len, sizeof(len));
1458 pool_write(frontend, status[i].value, strlen(status[i].value));
1460 len = htonl(strlen(status[i].desc));
1461 pool_write(frontend, &len, sizeof(len));
1462 pool_write(frontend, status[i].desc, strlen(status[i].desc));
1466 /* complete command response */
1467 pool_write(frontend, "C", 1);
1468 if (MAJOR(backend) == PROTO_MAJOR_V3)
1470 len = htonl(sizeof(len) + strlen("SELECT")+1);
1471 pool_write(frontend, &len, sizeof(len));
1473 pool_write(frontend, "SELECT", strlen("SELECT")+1);
1475 /* ready for query */
1476 pool_write(frontend, "Z", 1);
1477 if (MAJOR(backend) == PROTO_MAJOR_V3)
1479 len = htonl(sizeof(len) + 1);
1480 pool_write(frontend, &len, sizeof(len));
1481 pool_write(frontend, "I", 1);
1484 pool_flush(frontend);
1488 * send "terminate"(X) message to all backends, indicating that
1489 * backend should prepare to close connection to frontend (actually
1490 * pgpool). Note that caller must be protecedt from a signal
1491 * interruption while calling this function. Otherwise the number of
1492 * valid backends might be changed by failover/failback.
1494 void pool_send_frontend_exits(POOL_CONNECTION_POOL *backend)
1499 for (i=0;i<NUM_BACKENDS;i++)
1502 * send a terminate message to backend if there's an existing
1505 if (VALID_BACKEND(i) && CONNECTION_SLOT(backend, i))
1507 pool_write(CONNECTION(backend, i), "X", 1);
1509 if (MAJOR(backend) == PROTO_MAJOR_V3)
1512 pool_write(CONNECTION(backend, i), &len, sizeof(len));
1516 * XXX we cannot call pool_flush() here since backend may already
1517 * close the socket and pool_flush() automatically invokes fail
1518 * over handler. This could happen in copy command (remember the
1519 * famous "lost synchronization with server, resetting
1520 * connection" message)
1522 pool_set_nonblock(CONNECTION(backend, i)->fd);
1523 pool_flush_it(CONNECTION(backend, i));
1524 pool_unset_nonblock(CONNECTION(backend, i)->fd);
1530 * -------------------------------------------------------
1532 * -------------------------------------------------------
1536 * This function transmits to a parallel Query to each backend,
1537 * and receives the results from backends .
1540 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend)
1546 if (send_to_frontend)
1548 pool_write(frontend, &kind, 1);
1551 status = pool_read(backend, &len, sizeof(len));
1554 pool_error("ParallelForwardToFrontend: error while reading message length");
1558 if (send_to_frontend)
1560 pool_write(frontend, &len, sizeof(len));
1563 len = ntohl(len) - 4 ;
1566 return POOL_CONTINUE;
1568 p = pool_read2(backend, len);
1572 status = POOL_CONTINUE;
1573 if (send_to_frontend)
1575 status = pool_write(frontend, p, len);
1576 if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP && status == 0)
1578 query_cache_register(kind, frontend, database, p, len);
1585 POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1594 int command_ok_row_count = 0;
1595 int delete_or_update = 0;
1600 * Check if packet kind == 'C'(Command complete), '1'(Parse
1601 * complete), '3'(Close complete). If so, then register or
1602 * unregister pending prepared statement.
1604 if ((kind == 'C' || kind == '1' || kind == '3') &&
1607 pending_function(&prepared_list, pending_prepared_portal);
1608 if (pending_prepared_portal &&
1609 pending_prepared_portal->stmt &&
1610 IsA(pending_prepared_portal->stmt, DeallocateStmt))
1612 free(pending_prepared_portal->portal_name);
1613 pending_prepared_portal->portal_name = NULL;
1614 pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1615 free(pending_prepared_portal);
1618 else if (kind == 'E' && pending_function)
1620 /* An error occurred with PREPARE or DEALLOCATE command.
1621 * Free pending portal object.
1623 if (pending_prepared_portal)
1625 free(pending_prepared_portal->portal_name);
1626 pending_prepared_portal->portal_name = NULL;
1627 pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1628 free(pending_prepared_portal);
1631 else if (kind == 'C' && select_in_transaction)
1633 select_in_transaction = 0;
1638 * Remove a pending function if a received message is not
1643 pending_function = NULL;
1644 pending_prepared_portal = NULL;
1647 status = pool_read(MASTER(backend), &len, sizeof(len));
1652 p = pool_read2(MASTER(backend), len);
1658 pool_error("SimpleForwardToFrontend: malloc failed");
1663 if (kind == 'C') /* packet kind is "Command Complete"? */
1665 command_ok_row_count = extract_ntuples(p);
1668 * if we are in the parallel mode, we have to sum up the number
1671 if (PARALLEL_MODE && is_parallel_table &&
1672 (strstr(p, "UPDATE") || strstr(p, "DELETE")))
1674 delete_or_update = 1;
1678 for (i=0;i<NUM_BACKENDS;i++)
1680 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1682 status = pool_read(CONNECTION(backend, i), &len, sizeof(len));
1685 pool_error("SimpleForwardToFrontend: error while reading message length");
1692 p = pool_read2(CONNECTION(backend, i), len);
1698 pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) %d th backend(%d) kind:(%c)",
1699 len, i, len1, kind);
1702 if (kind == 'C') /* packet kind is "Command Complete"? */
1704 int n = extract_ntuples(p);
1707 * if we are in the parallel mode, we have to sum up the number
1710 if (delete_or_update)
1712 command_ok_row_count += n;
1714 else if (command_ok_row_count != n) /* mismatch update rows */
1716 mismatch_ntuples = 1;
1722 if (mismatch_ntuples)
1724 String *msg = init_string("pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
1725 string_append_char(msg, query_string_buffer);
1726 string_append_char(msg, "\"");
1727 pool_send_error_message(frontend, MAJOR(backend),
1728 "XX001", msg->data, "",
1729 "check data consistency between master and other db node", __FILE__, __LINE__);
1730 pool_error(msg->data);
1735 if (delete_or_update)
1739 strncpy(tmp, p1, 7);
1740 sprintf(tmp+7, "%d", command_ok_row_count);
1745 pool_error("SimpleForwardToFrontend: malloc failed");
1752 len1 = strlen(p2) + 1;
1755 pool_write(frontend, &kind, 1);
1756 sendlen = htonl(len1+4);
1757 pool_write(frontend, &sendlen, sizeof(sendlen));
1758 pool_write(frontend, p1, len1);
1761 /* save the received result for each kind */
1762 if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP)
1764 query_cache_register(kind, frontend, backend->info->database, p1, len1);
1771 if (kind == 'A') /* notification response */
1773 pool_flush(frontend); /* we need to immediately notice to frontend */
1775 else if (kind == 'E') /* error response? */
1782 * check if the error was PANIC or FATAL. If so, we just flush
1783 * the message and exit since the backend will close the
1784 * channel immediately.
1794 if (e == 'S' && (strcasecmp("PANIC", p) == 0 || strcasecmp("FATAL", p) == 0))
1796 pool_flush(frontend);
1807 if (select_in_transaction)
1811 in_load_balance = 0;
1813 for (i = 0; i < NUM_BACKENDS; i++)
1815 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1818 * We must abort transaction to sync transaction state.
1819 * If the error was caused by an Execute message,
1820 * we must send invalid Execute message to abort
1823 * Because extended query protocol ignores all
1824 * messages before receiving Sync message inside error state.
1827 do_error_execute_command(backend, i, PROTO_MAJOR_V3);
1829 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1832 select_in_transaction = 0;
1836 for (i = 0;i < NUM_BACKENDS; i++)
1838 if (VALID_BACKEND(i))
1840 POOL_CONNECTION *cp = CONNECTION(backend, i);
1842 /* We need to send "sync" message to backend in extend mode
1843 * so that it accepts next command.
1844 * Note that this may be overkill since client may send
1845 * it by itself. Moreover we do not need it in non-extend mode.
1846 * At this point we regard it is not harmful since error response
1847 * will not be sent too frequently.
1849 pool_write(cp, "S", 1);
1851 if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
1858 while ((ret = read_kind_from_backend(frontend, backend, &kind1)) == POOL_CONTINUE)
1860 if (kind1 == 'Z') /* ReadyForQuery? */
1863 ret = SimpleForwardToFrontend(kind1, frontend, backend);
1864 if (ret != POOL_CONTINUE)
1866 pool_flush(frontend);
1869 if (ret != POOL_CONTINUE)
1872 for (i = 0; i < NUM_BACKENDS; i++)
1874 if (VALID_BACKEND(i))
1876 status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1));
1879 pool_error("SimpleForwardToFrontend: error while reading message length");
1882 res1 = ntohl(res1) - sizeof(res1);
1883 p1 = pool_read2(CONNECTION(backend, i), res1);
1889 return POOL_CONTINUE;
1892 POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1901 for (i=0;i<NUM_BACKENDS;i++)
1903 if (VALID_BACKEND(i))
1905 if (pool_write(CONNECTION(backend, i), &kind, 1))
1910 if (pool_read(frontend, &sendlen, sizeof(sendlen)))
1915 len = ntohl(sendlen) - 4;
1917 for (i=0;i<NUM_BACKENDS;i++)
1919 if (VALID_BACKEND(i))
1921 if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen)))
1927 return POOL_CONTINUE;
1930 pool_error("SimpleForwardToBackend: invalid message length");
1934 p = pool_read2(frontend, len);
1938 for (i=0;i<NUM_BACKENDS;i++)
1940 if (VALID_BACKEND(i))
1942 if (pool_write_and_flush(CONNECTION(backend, i), p, len))
1947 if (kind == 'B') /* Bind message */
1949 Portal *portal = NULL;
1950 char *stmt_name, *portal_name;
1953 stmt_name = p + strlen(portal_name) + 1;
1955 pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name);
1957 if (*stmt_name == '\0')
1958 portal = unnamed_statement;
1961 portal = lookup_prepared_statement_by_statement(&prepared_list, stmt_name);
1964 if (*portal_name == '\0'){
1965 unnamed_portal = portal;
1969 if (portal->portal_name)
1970 free(portal->portal_name);
1971 portal->portal_name = strdup(portal_name);
1975 /* Close message with prepared statement name. */
1976 else if (kind == 'C' && *p == 'S' && *(p + 1))
1978 POOL_MEMORY_POOL *old_context = pool_memory;
1979 DeallocateStmt *deallocate_stmt;
1981 pending_prepared_portal = create_portal();
1982 if (pending_prepared_portal == NULL)
1984 pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1988 pool_memory = pending_prepared_portal->prepare_ctxt;
1989 name = pstrdup(p+1);
1992 pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1993 pool_memory = old_context;
1997 /* Translate from Close message to DEALLOCATE statement.*/
1998 deallocate_stmt = palloc(sizeof(DeallocateStmt));
1999 if (deallocate_stmt == NULL)
2001 pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
2002 pool_memory = old_context;
2005 deallocate_stmt->name = name;
2006 pending_prepared_portal->stmt = (Node *)deallocate_stmt;
2007 pending_prepared_portal->portal_name = NULL;
2008 pending_function = del_prepared_list;
2009 pool_memory = old_context;
2012 if (kind == 'B' || kind == 'D' || kind == 'C')
2017 for (i = 0;i < NUM_BACKENDS; i++)
2019 if (VALID_BACKEND(i))
2021 POOL_CONNECTION *cp = CONNECTION(backend, i);
2024 * send "Flush" message so that backend notices us
2025 * the completion of the command
2027 pool_write(cp, "H", 1);
2029 if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
2037 * Describe message with a portal name will receive two messages.
2038 * 1. ParameterDescription
2039 * 2. RowDescriptions or NoData
2040 * So we read one message here.
2042 if (kind == 'D' && *p == 'S')
2044 ret = read_kind_from_backend(frontend, backend, &kind1);
2045 if (ret != POOL_CONTINUE)
2047 SimpleForwardToFrontend(kind1, frontend, backend);
2048 if (pool_flush(frontend))
2053 * Forward to frontend until a NOTICE message received.
2057 ret = read_kind_from_backend(frontend, backend, &kind1);
2058 if (ret != POOL_CONTINUE)
2060 SimpleForwardToFrontend(kind1, frontend, backend);
2061 if (pool_flush(frontend) < 0)
2069 return POOL_CONTINUE;
2072 POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2081 char parambuf[1024]; /* parameter + value string buffer. XXX is this enough? */
2084 pool_write(frontend, "S", 1);
2086 len_array = pool_read_message_length2(backend);
2088 if (len_array == NULL)
2093 len = len_array[MASTER_NODE_ID];
2094 sendlen = htonl(len);
2095 pool_write(frontend, &sendlen, sizeof(sendlen));
2097 for (i=0;i<NUM_BACKENDS;i++)
2099 if (VALID_BACKEND(i))
2104 p = pool_read2(CONNECTION(backend, i), len);
2109 value = p + strlen(name) + 1;
2111 pool_debug("%d th backend: name: %s value: %s", i, name, value);
2113 if (IS_MASTER_NODE_ID(i))
2116 memcpy(parambuf, p, len);
2117 pool_add_param(&CONNECTION(backend, i)->params, name, value);
2121 pool_param_debug_print(&MASTER(backend)->params);
2126 status = pool_write(frontend, parambuf, len1);
2131 * Reset backend status. return values are:
2132 * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error
2134 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt)
2140 * Reset all state variables
2143 in_load_balance = 0;
2146 * Until pgpool-II 2.2.3 we don't have following 2 lines.
2147 * If we were executing someting in load balance mode, and if
2148 * frontend failed before executing end_loadl_balance() in
2149 * ReadyForQuery(), these variables remained and we may do
2150 * something only in master node!
2152 REPLICATION = pool_config->replication_mode;
2153 MASTER_SLAVE = pool_config->master_slave_mode;
2155 force_replication = 0;
2156 internal_transaction_started = 0;
2157 mismatch_ntuples = 0;
2158 select_in_transaction = 0;
2160 receive_extended_begin = 0;
2162 qn = pool_config->num_reset_queries;
2165 * After execution of all SQL commands in the reset_query_list, we
2166 * remove all prepared objects in the prepared_list.
2170 if (prepared_list.cnt == 0)
2173 * Either no prepared objects were created or DISCARD ALL
2174 * or DEALLOCATE ALL is on the reset_query_list and they
2175 * were executed. The latter causes call to
2176 * reset_prepared_list which removes all prepared objects.
2178 reset_prepared_list(&prepared_list);
2182 /* Delete from prepared list */
2183 if (send_deallocate(backend, &prepared_list, 0))
2185 /* Deallocate failed. We are in unknown state. Ask caller
2186 * to reset backend connection.
2188 reset_prepared_list(&prepared_list);
2192 * If DEALLOCATE returns ERROR response, instead of
2193 * CommandComplete, del_prepared_list is not called and the
2194 * prepared object keeps on sitting on the prepared list. This
2195 * will cause infinite call to reset_backend. So we call
2196 * del_prepared_list() again. This is harmless since trying to
2197 * remove same prepared object will be ignored.
2199 del_prepared_list(&prepared_list, prepared_list.portal_list[0]);
2203 query = pool_config->reset_query_list[qcnt];
2205 /* if transaction state is idle, we don't need to issue ABORT */
2206 if (TSTATE(backend) == 'I' && !strcmp("ABORT", query))
2209 pool_set_timeout(10);
2211 if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
2213 pool_set_timeout(0);
2217 pool_set_timeout(0);
2222 * return non 0 if load balance is possible
2224 int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql)
2226 return (pool_config->load_balance_mode &&
2227 (DUAL_MODE || pool_config->parallel_mode) &&
2228 MAJOR(backend) == PROTO_MAJOR_V3 &&
2229 TSTATE(backend) == 'I' &&
2230 is_select_query(node, sql) &&
2231 !is_sequence_query(node));
2236 * returns non 0 if the SQL statement can be load
2237 * balanced. Followings are statemnts go into this category.
2239 * - SELECT without FOR UPDATE/SHARE
2241 * - DECLARE..SELECT (without INTO nor FOR UPDATE/SHARE)
2245 * note that for SELECT INTO, this function returns 0
2247 int is_select_query(Node *node, char *sql)
2253 * 2009/5/1 Tatsuo says: This test is not bogus. As of 2.2, pgpool
2254 * sets Portal->sql_string to NULL for SQL command PREPARE.
2255 * Usually this is ok, since in most cases SQL command EXECUTE
2256 * follows anyway. Problem is, some applications mix PREPARE with
2257 * extended protocol command "EXECUTE" and so on. Execute() seems
2258 * to think this never happens but it is not real. Someday we
2259 * should extract actual query string from PrepareStmt->query and
2260 * set it to Portal->sql_string.
2265 if (pool_config->ignore_leading_white_space)
2267 /* ignore leading white spaces */
2268 while (*sql && isspace(*sql))
2272 if (IsA(node, SelectStmt) || IsA(node, DeclareCursorStmt))
2274 SelectStmt *select_stmt;
2276 if (IsA(node, SelectStmt))
2277 select_stmt = (SelectStmt *)node;
2279 select_stmt = (SelectStmt *)((DeclareCursorStmt *)node)->query;
2281 if (select_stmt->intoClause || select_stmt->lockingClause)
2284 if (IsA(node, SelectStmt))
2285 return (*sql == 's' || *sql == 'S' || *sql == '(');
2287 return (*sql == 'd' || *sql == 'D');
2289 else if (IsA(node, FetchStmt) || IsA(node, ClosePortalStmt))
2291 return (*sql == 'f' || *sql == 'F' || *sql == 'c' || *sql == 'C');
2293 else if (IsA(node, CopyStmt))
2295 CopyStmt *copy_stmt = (CopyStmt *)node;
2296 return (copy_stmt->is_from == FALSE &&
2297 copy_stmt->filename == NULL);
2303 * returns non 0 if SQL is SELECT statement including nextval() or
2306 int is_sequence_query(Node *node)
2308 SelectStmt *select_stmt;
2311 if (node == NULL || !IsA(node, SelectStmt))
2314 select_stmt = (SelectStmt *)node;
2315 foreach (lc, select_stmt->targetList)
2317 if (IsA(lfirst(lc), ResTarget))
2323 t = (ResTarget *) lfirst(lc);
2324 if (IsA(t->val, FuncCall))
2326 fc = (FuncCall *) t->val;
2327 foreach (c, fc->funcname)
2329 Value *v = lfirst(c);
2330 if (strncasecmp(v->val.str, "NEXTVAL", 7) == 0)
2332 else if (strncasecmp(v->val.str, "SETVAL", 6) == 0)
2343 * returns non 0 if SQL is transaction starting command (START
2344 * TRANSACTION or BEGIN)
2346 int is_start_transaction_query(Node *node)
2348 TransactionStmt *stmt;
2350 if (node == NULL || !IsA(node, TransactionStmt))
2353 stmt = (TransactionStmt *)node;
2354 return stmt->kind == TRANS_STMT_START || stmt->kind == TRANS_STMT_BEGIN;
2358 * returns non 0 if SQL is transaction commit or abort command (END
2359 * TRANSACTION or ROLLBACK or ABORT)
2361 int is_commit_query(Node *node)
2363 TransactionStmt *stmt;
2365 if (node == NULL || !IsA(node, TransactionStmt))
2368 stmt = (TransactionStmt *)node;
2369 return stmt->kind == TRANS_STMT_COMMIT || stmt->kind == TRANS_STMT_ROLLBACK;
2373 * start load balance mode
2375 void start_load_balance(POOL_CONNECTION_POOL *backend)
2378 double total_weight,r;
2381 /* save backend connection slots */
2382 for (i=0;i<NUM_BACKENDS;i++)
2384 if (VALID_BACKEND(i))
2386 slots[i] = CONNECTION_SLOT(backend, i);
2391 /* temporarily turn off replication mode */
2393 replication_was_enabled = 1;
2395 master_slave_was_enabled = 1;
2401 backend->slots[0] = slots[selected_slot];
2403 LOAD_BALANCE_STATUS(backend->info->load_balancing_node) = LOAD_SELECTED;
2404 selected_slot = backend->info->load_balancing_node;
2406 /* start load balancing */
2407 in_load_balance = 1;
2411 * finish load balance mode
2413 void end_load_balance(POOL_CONNECTION_POOL *backend)
2415 in_load_balance = 0;
2416 LOAD_BALANCE_STATUS(selected_slot) = LOAD_UNSELECTED;
2419 /* restore backend connection slots */
2421 for (i=0;i<NUM_BACKENDS;i++)
2423 if (VALID_BACKEND(i))
2425 CONNECTION_SLOT(backend, i) = slots[i];
2430 /* turn on replication mode */
2431 REPLICATION = replication_was_enabled;
2432 MASTER_SLAVE = master_slave_was_enabled;
2434 replication_was_enabled = 0;
2435 master_slave_was_enabled = 0;
2437 pool_debug("end_load_balance: end load balance mode");
2441 * send error message to frontend
2443 void pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor,
2451 pool_send_severity_message(frontend, protoMajor, code, message, detail, hint, file, "ERROR", line);
2455 * send fatal message to frontend
2457 void pool_send_fatal_message(POOL_CONNECTION *frontend, int protoMajor,
2465 pool_send_severity_message(frontend, protoMajor, code, message, detail, hint, file, "FATAL", line);
2469 * send severity message to frontend
2471 void pool_send_severity_message(POOL_CONNECTION *frontend, int protoMajor,
2481 * Buffer length for each message part
2483 #define MAXMSGBUF 256
2485 * Buffer length for result message buffer.
2486 * Since msg is consisted of 7 parts, msg buffer should be large
2487 * enough to hold those message parts
2489 #define MAXDATA (MAXMSGBUF+1)*7+1
2491 pool_set_nonblock(frontend->fd);
2493 if (protoMajor == PROTO_MAJOR_V2)
2495 pool_write(frontend, "E", 1);
2496 pool_write_and_flush(frontend, message, strlen(message)+1);
2498 else if (protoMajor == PROTO_MAJOR_V3)
2501 char msgbuf[MAXMSGBUF];
2508 pool_write(frontend, "E", 1);
2511 thislen = snprintf(msgbuf, MAXMSGBUF, "S%s", severity);
2512 thislen = Min(thislen, MAXMSGBUF);
2513 memcpy(data +len, msgbuf, thislen+1);
2517 thislen = snprintf(msgbuf, MAXMSGBUF, "C%s", code);
2518 thislen = Min(thislen, MAXMSGBUF);
2519 memcpy(data +len, msgbuf, thislen+1);
2523 thislen = snprintf(msgbuf, MAXMSGBUF, "M%s", message);
2524 thislen = Min(thislen, MAXMSGBUF);
2525 memcpy(data +len, msgbuf, thislen+1);
2529 if (*detail != '\0')
2531 thislen = snprintf(msgbuf, MAXMSGBUF, "D%s", detail);
2532 thislen = Min(thislen, MAXMSGBUF);
2533 memcpy(data +len, msgbuf, thislen+1);
2540 thislen = snprintf(msgbuf, MAXMSGBUF, "H%s", hint);
2541 thislen = Min(thislen, MAXMSGBUF);
2542 memcpy(data +len, msgbuf, thislen+1);
2547 thislen = snprintf(msgbuf, MAXMSGBUF, "F%s", file);
2548 thislen = Min(thislen, MAXMSGBUF);
2549 memcpy(data +len, msgbuf, thislen+1);
2553 thislen = snprintf(msgbuf, MAXMSGBUF, "L%d", line);
2554 thislen = Min(thislen, MAXMSGBUF);
2555 memcpy(data +len, msgbuf, thislen+1);
2560 *(data + len - 1) = '\0';
2563 len = htonl(len + 4);
2564 pool_write(frontend, &len, sizeof(len));
2565 pool_write_and_flush(frontend, data, sendlen);
2568 pool_error("send_error_message: unknown protocol major %d", protoMajor);
2570 pool_unset_nonblock(frontend->fd);
2573 void pool_send_readyforquery(POOL_CONNECTION *frontend)
2576 pool_write(frontend, "Z", 1);
2579 pool_write(frontend, &len, sizeof(len));
2580 pool_write(frontend, "I", 1);
2581 pool_flush(frontend);
2585 * Send a query to a backend in sync manner.
2586 * This function sends a query and waits for CommandComplete/ReadyForQuery.
2587 * If an error occured, it returns with POOL_ERROR.
2588 * This function does NOT handle SELECT/SHOW queries.
2589 * If no_ready_for_query is non 0, returns without reading the packet
2590 * length for ReadyForQuery. This mode is necessary when called from ReadyForQuery().
2592 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend,
2593 char *query, int protoMajor, int pid, int key, int no_ready_for_query)
2599 int deadlock_detected = 0;
2601 pool_debug("do_command: Query: %s", query);
2603 /* send the query to the backend */
2604 if (send_simplequery_message(backend, strlen(query)+1, query, protoMajor) != POOL_CONTINUE)
2608 * Wait for response from badckend while polling frontend connection is ok.
2609 * If not, cancel the transaction.
2611 if (wait_for_query_response(frontend, backend, query, protoMajor) != POOL_CONTINUE)
2613 /* Cancel current transaction */
2614 CancelPacket cancel_packet;
2616 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
2617 cancel_packet.pid = pid;
2618 cancel_packet.key= key;
2619 cancel_request(&cancel_packet);
2624 * We must check deadlock error here. If a deadlock error is
2625 * detected by a backend, other backend might not be noticed the
2626 * error. In this case caller should send an error query to the
2627 * backend to abort the transaction. Otherwise the transaction
2628 * state might vary among backends(idle in transaction vs. abort).
2630 deadlock_detected = detect_deadlock_error(backend, protoMajor);
2631 if (deadlock_detected < 0)
2635 * Continue to read packets until we get ReadForQuery (Z).
2636 * Until that we may recieve one of:
2638 * N: Notice response
2640 * C: Comand complete
2642 * XXX: we ignore Notice and Error here. Even notice/error
2643 * messages are not sent to the frontend. May be it's ok since the
2644 * error was caused by our internal use of SQL command (otherwise users
2645 * will be confused).
2649 status = pool_read(backend, &kind, sizeof(kind));
2652 pool_error("do_command: error while reading message kind");
2656 pool_debug("do_command: kind: %c", kind);
2658 if (kind == 'Z') /* Ready for Query? */
2659 break; /* get out the loop without reading message lenghth */
2661 if (protoMajor == PROTO_MAJOR_V3)
2663 if (pool_read(backend, &len, sizeof(len)) < 0)
2665 pool_error("do_command: error while reading message length");
2668 len = ntohl(len) - 4;
2670 if (kind != 'N' && kind != 'E' && kind != 'C')
2672 pool_error("do_command: error, kind is not N, E or C(%02x)", kind);
2675 string = pool_read2(backend, len);
2678 pool_error("do_command: error while reading rest of message");
2684 string = pool_read_string(backend, &len, 0);
2687 pool_error("do_command: error while reading rest of message");
2694 * until 2008/11/12 we believed that we never had packets other than
2695 * 'Z' after receiving 'C'. However a counter example was presented by
2696 * a poor customer. So we replaced the whole thing with codes
2697 * above. In a side effect we were be able to get ride of nasty
2698 * "goto". Congratulations.
2702 * Expecting CompleteCommand
2705 status = pool_read(backend, &kind, sizeof(kind));
2708 pool_error("do_command: error while reading message kind");
2714 pool_log("do_command: backend does not successfully complete command %s status %c", query, kind);
2718 * read command tag of CommandComplete response
2720 if (protoMajor == PROTO_MAJOR_V3)
2722 if (pool_read(backend, &len, sizeof(len)) < 0)
2724 len = ntohl(len) - 4;
2725 string = pool_read2(backend, len);
2728 pool_debug("command tag: %s", string);
2732 string = pool_read_string(backend, &len, 0);
2737 if (kind == 'N') /* warning? */
2738 goto retry_read_packet;
2741 * Expecting ReadyForQuery
2743 status = pool_read(backend, &kind, sizeof(kind));
2746 pool_error("do_command: error while reading message kind");
2752 pool_error("do_command: backend returns %c while expecting ReadyForQuery", kind);
2757 if (no_ready_for_query)
2758 return POOL_CONTINUE;
2760 if (protoMajor == PROTO_MAJOR_V3)
2762 /* read packet lenghth for ready for query */
2763 if (pool_read(backend, &len, sizeof(len)) < 0)
2765 pool_error("do_command: error while reading message length");
2769 /* read transaction state */
2770 status = pool_read(backend, &kind, sizeof(kind));
2773 pool_error("do_command: error while reading transaction status");
2777 /* set transaction state */
2778 pool_debug("do_command: transaction state: %c", kind);
2779 backend->tstate = kind;
2782 return deadlock_detected ? POOL_DEADLOCK : POOL_CONTINUE;
2786 * Send a syntax error query to abort transaction and receive response
2787 * from backend and discard it until we get Error response.
2789 * We need to sync transaction status in transaction block.
2790 * SELECT query is sent to master only.
2791 * If SELECT is error, we must abort transaction on other nodes.
2793 POOL_STATUS do_error_command(POOL_CONNECTION *backend, int major)
2795 char *error_query = POOL_ERROR_QUERY;
2800 if (send_simplequery_message(backend, strlen(error_query) + 1, error_query, major) != POOL_CONTINUE)
2806 * Continue to read packets until we get Error response (E).
2807 * Until that we may recieve one of:
2809 * N: Notice response
2810 * C: Comand complete
2812 * XXX: we ignore Notice here. Even notice messages are not sent
2813 * to the frontend. May be it's ok since the error was caused by
2814 * our internal use of SQL command (otherwise users will be
2819 status = pool_read(backend, &kind, sizeof(kind));
2822 pool_error("do_error_command: error while reading message kind");
2826 pool_debug("do_error_command: kind: %c", kind);
2828 if (major == PROTO_MAJOR_V3)
2830 if (pool_read(backend, &len, sizeof(len)) < 0)
2832 pool_error("do_error_command: error while reading message length");
2835 len = ntohl(len) - 4;
2836 string = pool_read2(backend, len);
2839 pool_error("do_error_command: error while reading rest of message");
2845 string = pool_read_string(backend, &len, 0);
2848 pool_error("do_error_command: error while reading rest of message");
2852 } while (kind != 'E');
2856 * Expecting ErrorResponse
2858 status = pool_read(backend, &kind, sizeof(kind));
2861 pool_error("do_command: error while reading message kind");
2866 * read command tag of CommandComplete response
2868 if (major == PROTO_MAJOR_V3)
2870 if (pool_read(backend, &len, sizeof(len)) < 0)
2872 len = ntohl(len) - 4;
2873 string = pool_read2(backend, len);
2876 pool_debug("command tag: %s", string);
2880 string = pool_read_string(backend, &len, 0);
2885 return POOL_CONTINUE;
2889 * Send invalid portal execution to abort transaction.
2890 * We need to sync transaction status in transaction block.
2891 * SELECT query is sent to master only.
2892 * If SELECT is error, we must abort transaction on other nodes.
2894 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major)
2899 char msg[1024] = "pgpoool_error_portal"; /* large enough */
2900 int len = strlen(msg);
2902 memset(msg + len, 0, sizeof(int));
2903 if (send_execute_message(backend, node_id, len + 5, msg))
2909 * Expecting ErrorResponse
2911 status = pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
2914 pool_error("do_error_execute_command: error while reading message kind");
2919 * read command tag of CommandComplete response
2921 if (major == PROTO_MAJOR_V3)
2923 if (pool_read(CONNECTION(backend, node_id), &len, sizeof(len)) < 0)
2925 len = ntohl(len) - 4;
2926 string = pool_read2(CONNECTION(backend, node_id), len);
2929 pool_debug("command tag: %s", string);
2933 string = pool_read_string(CONNECTION(backend, node_id), &len, 0);
2938 return POOL_CONTINUE;
2942 * Transmit an arbitrary Query to a specific node.
2943 * This function is only used in parallel mode
2945 POOL_STATUS OneNode_do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, char *database)
2950 bool notice = false;
2952 pool_debug("OneNode_do_command: Query: %s", query);
2954 /* send the query to the backend */
2955 pool_write(backend, "Q", 1);
2956 len = strlen(query)+1;
2958 sendlen = htonl(len + 4);
2959 pool_write(backend, &sendlen, sizeof(sendlen));
2961 if (pool_write_and_flush(backend, query, len) < 0)
2968 status = pool_read(backend, &kind, sizeof(kind));
2971 pool_error("OneNode_do_command: error while reading message kind");
2975 if (kind == 'N' && strstr(query,"dblink")) {
2977 status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2980 status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2982 status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
2984 if (kind == 'C' || kind =='E')
2990 * Expecting ReadyForQuery
2993 status = pool_read(backend, &kind, sizeof(kind));
2996 pool_send_error_message(frontend, 3, "XX000",
2997 "pgpool2 sql restriction(notice from dblink)",query,"",
3002 pool_error("OneNode_do_command: error while reading message kind");
3008 pool_error("OneNode_do_command: backend does not return ReadyForQuery");
3013 status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
3014 pool_flush(frontend);
3020 * Free POOL_SELECT_RESULT object
3022 static void free_select_result(POOL_SELECT_RESULT *result)
3026 if (result->nullflags)
3027 free(result->nullflags);
3031 for(i=0;i<result->numrows;i++)
3033 if (result->data[i])
3034 free(result->data[i]);
3039 if (result->rowdesc)
3041 if (result->rowdesc->attrinfo)
3043 for(i=0;i<result->rowdesc->num_attrs;i++)
3045 if (result->rowdesc->attrinfo[i].attrname)
3046 free(result->rowdesc->attrinfo[i].attrname);
3048 free(result->rowdesc->attrinfo);
3050 free(result->rowdesc);
3055 * Send a SELECT to one DB node. This function works for V3 only.
3057 POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result)
3059 #define DO_QUERY_ALLOC_NUM 1024 /* memory allocation unit for POOL_SELECT_RESULT */
3071 POOL_SELECT_RESULT *res;
3075 res = malloc(sizeof(*res));
3078 pool_error("pool_query: malloc failed");
3081 rowdesc = malloc(sizeof(*rowdesc));
3084 pool_error("pool_query: malloc failed");
3087 memset(res, 0, sizeof(*res));
3088 memset(rowdesc, 0, sizeof(*rowdesc));
3091 res->rowdesc = rowdesc;
3095 res->nullflags = malloc(DO_QUERY_ALLOC_NUM*sizeof(int));
3096 if (!res->nullflags)
3098 pool_error("do_query: malloc failed");
3101 res->data = malloc(DO_QUERY_ALLOC_NUM*sizeof(char *));
3104 pool_error("do_query: malloc failed");
3108 /* send a query to the backend */
3109 if (send_simplequery_message(backend, strlen(query) + 1, query, PROTO_MAJOR_V3) != POOL_CONTINUE)
3115 * Continue to read packets until we get Ready for command('Z')
3117 * XXX: we ignore other than Z here. Even notice messages are not sent
3118 * to the frontend. May be it's ok since the error was caused by
3119 * our internal use of SQL command (otherwise users will be
3124 if (pool_read(backend, &kind, sizeof(kind)) < 0)
3126 pool_error("do_query: error while reading message kind");
3130 pool_debug("do_query: kind: %c", kind);
3132 if (pool_read(backend, &len, sizeof(len)) < 0)
3134 pool_error("do_query: error while reading message length");
3137 len = ntohl(len) - 4;
3138 packet = pool_read2(backend, len);
3141 pool_error("do_query: error while reading rest of message");
3147 case 'Z': /* Ready for query */
3148 return POOL_CONTINUE;
3151 case 'T': /* Row Description */
3153 memcpy(&shortval, p, sizeof(short));
3154 num_fields = ntohs(shortval); /* number of fields */
3155 pool_debug("num_fileds: %d", num_fields);
3159 rowdesc->num_attrs = num_fields;
3160 attrinfo = malloc(sizeof(*attrinfo)*num_fields);
3163 pool_error("do_query: malloc failed");
3166 rowdesc->attrinfo = attrinfo;
3168 p += sizeof(num_fields);
3170 /* extract attribute info */
3171 for (i = 0;i<num_fields;i++)
3173 len = strlen(p) + 1;
3174 attrinfo->attrname = malloc(len);
3175 if (!attrinfo->attrname)
3177 pool_error("do_query: malloc failed");
3180 memcpy(attrinfo->attrname, p, len);
3182 memcpy(&intval, p, sizeof(int));
3183 attrinfo->oid = htonl(intval);
3185 memcpy(&shortval, p, sizeof(short));
3186 attrinfo->attrnumber = htons(shortval);
3188 memcpy(&intval, p, sizeof(int));
3189 attrinfo->typeoid = htonl(intval);
3191 memcpy(&shortval, p, sizeof(short));
3192 attrinfo->size = htons(shortval);
3194 memcpy(&intval, p, sizeof(int));
3195 attrinfo->mod = htonl(intval);
3197 p += sizeof(short); /* skip format code since we use "text" anyway */
3204 case 'D': /* data row */
3207 memcpy(&shortval, p, sizeof(short));
3208 num_fields = htons(shortval);
3215 for (i=0;i<num_fields;i++)
3217 memcpy(&intval, p, sizeof(int));
3218 len = htonl(intval);
3221 res->nullflags[num_data] = len;
3223 if (len > 0) /* NOT NULL? */
3225 res->data[num_data] = malloc(len + 1);
3226 if (!res->data[num_data])
3228 pool_error("do_query: malloc failed");
3231 memcpy(res->data[num_data], p, len);
3232 *(res->data[num_data] + 1) = '\0';
3239 if (num_data % DO_QUERY_ALLOC_NUM == 0)
3241 res->nullflags = realloc(res->nullflags,
3242 (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(int));
3243 if (!res->nullflags)
3245 pool_error("do_query: malloc failed");
3248 res->data = realloc(res->data,
3249 (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(char *));
3252 pool_error("do_query: malloc failed");
3265 return POOL_CONTINUE;
3270 * judge if we need to lock the table
3271 * to keep SERIAL consistency among servers
3273 int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node)
3276 * Query to know if the target table has SERIAL column or not.
3277 * This query is valid through PostgreSQL 7.3 to 8.3.
3279 #define NEXTVALQUERY "SELECT count(*) FROM pg_catalog.pg_attrdef AS d, pg_catalog.pg_class AS c WHERE d.adrelid = c.oid AND d.adsrc ~ 'nextval' AND c.relname = '%s'"
3281 #define INSERT_STATEMENT_MAX_CACHE 16
3282 #define MAX_ITEM_LENGTH 1024
3284 /* table lookup cache structure */
3286 char dbname[MAX_ITEM_LENGTH]; /* database name */
3287 char relname[MAX_ITEM_LENGTH]; /* table name */
3288 int use_serial; /* 1: use SERIAL data type */
3289 int refcnt; /* reference count */
3292 static MyRelCache relcache[INSERT_STATEMENT_MAX_CACHE];
3301 * for version 2 protocol, we cannot check if it's actually uses
3302 * SERIAL data types or not since the underlying infrastructure
3303 * (do_query) does not support the protocol. So we just return
3306 if (MAJOR(backend) == PROTO_MAJOR_V2)
3309 /* INSERT statement? */
3310 if (!IsA(node, InsertStmt))
3313 /* need to ignore leading white spaces? */
3314 if (pool_config->ignore_leading_white_space)
3316 /* ignore leading white spaces */
3317 while (*query && isspace(*query))
3321 /* is there "NO_LOCK" comment? */
3322 if (strncasecmp(query, NO_LOCK_COMMENT, NO_LOCK_COMMENT_SZ) == 0)
3325 /* is there "LOCK" comment? */
3326 if (strncasecmp(query, LOCK_COMMENT, LOCK_COMMENT_SZ) == 0)
3329 if (pool_config->insert_lock == 0) /* insert_lock is specified? */
3333 * if insert_lock is true, then check if the table actually uses
3337 /* obtain table name */
3338 str = get_insert_command_table_name((InsertStmt *)node);
3341 pool_error("need_insert_lock: get_insert_command_table_name failed");
3345 /* eliminate double quotes */
3346 rel = malloc(strlen(str)+1);
3349 pool_error("need_insert_lock: malloc failed");
3359 /* obtain database name */
3360 dbname = MASTER_CONNECTION(backend)->sp->database;
3362 /* look for cache first */
3363 for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3365 if (strcasecmp(relcache[i].dbname, dbname) == 0 &&
3366 strcasecmp(relcache[i].relname, rel) == 0)
3368 relcache[i].refcnt++;
3369 use_serial = relcache[i].use_serial;
3374 if (i == INSERT_STATEMENT_MAX_CACHE) /* not in cache? */
3377 int maxrefcnt = INT_MAX;
3378 POOL_SELECT_RESULT *res = NULL;
3381 snprintf(qbuf, sizeof(qbuf), NEXTVALQUERY, rel);
3383 /* check the system catalog if the table has SERIAL data type */
3384 if (do_query(MASTER(backend), qbuf, &res) != POOL_CONTINUE)
3386 pool_error("need_insert_lock: do_query failed");
3388 free_select_result(res);
3393 * if the query returns some rows and found nextval() is used,
3394 * then we assume it uses SERIAL data type
3396 if (res->numrows >= 1 && strcmp(res->data[0], "0"))
3399 free_select_result(res);
3401 for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3403 if (relcache[i].refcnt == 0)
3408 else if (relcache[i].refcnt < maxrefcnt)
3410 maxrefcnt = relcache[i].refcnt;
3415 /* register cache */
3416 strncpy(relcache[index].dbname, dbname, MAX_ITEM_LENGTH);
3417 strncpy(relcache[index].relname, rel, MAX_ITEM_LENGTH);
3418 relcache[index].use_serial = use_serial;
3419 relcache[index].refcnt++;
3425 * if a transaction has not already started, start a new one.
3426 * issue LOCK TABLE IN SHARE ROW EXCLUSIVE MODE
3428 POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node)
3433 int i, deadlock_detected = 0;
3435 /* insert_lock can be used in V3 only */
3436 if (MAJOR(backend) != PROTO_MAJOR_V3)
3437 return POOL_CONTINUE;
3439 /* get table name */
3440 table = get_insert_command_table_name(node);
3442 /* could not get table name. probably wrong SQL command */
3445 return POOL_CONTINUE;
3448 /* issue lock table command */
3449 snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
3451 status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid,
3452 MASTER_CONNECTION(backend)->key, 0);
3453 if (status == POOL_END)
3455 internal_transaction_started = 0;
3458 else if (status == POOL_DEADLOCK)
3459 deadlock_detected = 1;
3461 for (i=0;i<NUM_BACKENDS;i++)
3463 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
3465 if (deadlock_detected)
3466 status = do_command(frontend, CONNECTION(backend, i), POOL_ERROR_QUERY, PROTO_MAJOR_V3,
3467 MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3469 status = do_command(frontend, CONNECTION(backend, i), qbuf, PROTO_MAJOR_V3,
3470 MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3472 if (status != POOL_CONTINUE)
3474 internal_transaction_started = 0;
3480 return POOL_CONTINUE;
3483 bool is_partition_table(POOL_CONNECTION_POOL *backend, Node *node)
3485 DistDefInfo *info = NULL;
3486 RangeVar *var = NULL;;
3488 if (IsA(node, UpdateStmt))
3490 UpdateStmt *update = (UpdateStmt*) node;
3492 if(!IsA(update->relation,RangeVar))
3495 var = (RangeVar *) update->relation;
3497 else if (IsA(node, DeleteStmt))
3499 DeleteStmt *delete = (DeleteStmt*) node;
3501 if(!IsA(delete->relation,RangeVar))
3504 var = (RangeVar *) delete->relation;
3508 info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
3518 * obtain table name in INSERT statement
3520 static char *get_insert_command_table_name(InsertStmt *node)
3522 char *table = nodeToString(node->relation);
3524 pool_debug("get_insert_command_table_name: extracted table name: %s", table);
3528 /* judge if this is a DROP DATABASE command */
3529 int is_drop_database(Node *node)
3531 return (IsA(node, DropdbStmt)) ? 1 : 0;
3535 * check if any pending data remains. Also if there's some pending data in
3536 * frontend AND no processing any Query, then returns 0.
3537 * XXX: is this correct thing?
3539 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
3543 if (frontend->len > 0 && !in_progress)
3546 for (i=0;i<NUM_BACKENDS;i++)
3548 if (!VALID_BACKEND(i))
3551 if (CONNECTION(backend, i)->len > 0)
3559 * check if query is needed to wait completion
3561 int is_strict_query(Node *node)
3567 SelectStmt *stmt = (SelectStmt *)node;
3568 return (stmt->intoClause || stmt->lockingClause) ? 1 : 0;
3584 int check_copy_from_stdin(Node *node)
3593 copy_schema = copy_table = copy_null = NULL;
3595 if (IsA(node, CopyStmt))
3597 CopyStmt *stmt = (CopyStmt *)node;
3598 if (stmt->is_from == TRUE && stmt->filename == NULL)
3600 RangeVar *relation = (RangeVar *)stmt->relation;
3603 /* query is COPY FROM STDIN */
3604 if (relation->schemaname)
3605 copy_schema = strdup(relation->schemaname);
3607 copy_schema = strdup("public");
3608 copy_table = strdup(relation->relname);
3610 copy_delimiter = '\t'; /* default delimiter */
3611 copy_null = strdup("\\N"); /* default null string */
3613 /* look up delimiter and null string. */
3614 foreach (lc, stmt->options)
3616 DefElem *elem = lfirst(lc);
3619 if (strcmp(elem->defname, "delimiter") == 0)
3621 v = (Value *)elem->arg;
3622 copy_delimiter = v->val.str[0];
3624 else if (strcmp(elem->defname, "null") == 0)
3628 v = (Value *)elem->arg;
3629 copy_null = strdup(v->val.str);
3640 * read kind from one backend
3642 POOL_STATUS read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node)
3644 if (VALID_BACKEND(node))
3647 if (pool_read(CONNECTION(backend, node), &k, 1) < 0)
3649 pool_error("read_kind_from_one_backend: failed to read kind from %d th backend", node);
3653 pool_debug("read_kind_from_one_backend: read kind from %d th backend %c", node, k);
3656 return POOL_CONTINUE;
3660 pool_error("read_kind_from_one_backend: %d th backend is not valid", node);
3666 * read_kind_from_backend: read kind from backends.
3667 * the "frontend" parameter is used to send "kind mismatch" error message to the frontend.
3668 * the out parameter "decided_kind" is the packet kind decided by this function.
3669 * this function uses "decide by majority" method if kinds from all backends do not agree.
3671 POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *decided_kind)
3674 unsigned char kind_list[MAX_NUM_BACKENDS]; /* records each backend's kind */
3675 unsigned char kind_map[256]; /* records which kind gets majority.
3676 * 256 is the number of distinct values expressed by unsigned char
3679 int trust_kind; /* decided kind */
3681 double max_count = 0;
3682 int degenerate_node_num = 0; /* number of backends degeneration requested */
3683 int degenerate_node[MAX_NUM_BACKENDS]; /* degeneration requested backend list */
3685 memset(kind_map, 0, sizeof(kind_map));
3687 for (i=0;i<NUM_BACKENDS;i++)
3689 /* initialize degenerate record */
3690 degenerate_node[i] = 0;
3692 if (VALID_BACKEND(i))
3699 if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
3701 pool_error("read_kind_from_backend: failed to read kind from %d th backend", i);
3706 * Read and discard parameter status
3713 if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
3715 pool_error("read_kind_from_backend: failed to read parameter status packet length from %d th backend", i);
3718 len = htonl(len) - 4;
3719 p = pool_read2(CONNECTION(backend, i), len);
3722 pool_error("read_kind_from_backend: failed to read parameter status packet from %d th backend", i);
3724 value = p + strlen(p) + 1;
3725 pool_debug("read_kind_from_backend: parameter name: %s value: %s", p, value);
3726 } while (kind == 'S');
3728 kind_list[i] = kind;
3730 pool_debug("read_kind_from_backend: read kind from %d th backend %c NUM_BACKENDS: %d", i, kind_list[i], NUM_BACKENDS);
3734 if (kind_map[kind] > max_count)
3736 max_kind = kind_list[i];
3737 max_count = kind_map[kind];
3745 /* register kind map */
3746 for (i = 0; i < NUM_BACKENDS; i++)
3748 /* initialize degenerate record */
3749 degenerate_node[i] = 0;
3751 /* kind is signed char.
3752 * We must check negative number.
3754 int id = kind_list[i] + 128;
3756 if (kind_list[i] == -1)
3760 if (kind_map[id] > max_count)
3762 max_kind = kind_list[i];
3763 max_count = kind_map[id];
3768 if (max_count != NUM_BACKENDS)
3771 * not all backends agree with kind. We need to do "decide by majority"
3774 if (max_count <= NUM_BACKENDS / 2.0)
3776 /* no one gets majority. We trust master node's kind */
3777 trust_kind = kind_list[MASTER_NODE_ID];
3779 else /* max_count > NUM_BACKENDS / 2.0 */
3781 /* trust majority's kind */
3782 trust_kind = max_kind;
3785 for (i = 0; i < NUM_BACKENDS; i++)
3787 if (kind_list[i] != 0 && trust_kind != kind_list[i])
3790 pool_error("read_kind_from_backend: %d th kind %c does not match with master or majority connection kind %c",
3791 i, kind_list[i], trust_kind);
3792 degenerate_node[degenerate_node_num++] = i;
3797 trust_kind = kind_list[MASTER_NODE_ID];
3799 *decided_kind = trust_kind;
3801 if (degenerate_node_num)
3803 String *msg = init_string("kind mismatch among backends. ");
3805 string_append_char(msg, "Possible last query was: \"");
3806 string_append_char(msg, query_string_buffer);
3807 string_append_char(msg, "\" kind details are:");
3809 for (i=0;i<NUM_BACKENDS;i++)
3815 snprintf(buf, sizeof(buf), " %d[%c]", i, kind_list[i]);
3816 string_append_char(msg, buf);
3820 pool_send_error_message(frontend, MAJOR(backend), "XX000",
3822 "check data consistency among db nodes",
3823 __FILE__, __LINE__);
3824 pool_error(msg->data);
3828 if (pool_config->replication_stop_on_mismatch)
3830 degenerate_backend_set(degenerate_node, degenerate_node_num);
3837 return POOL_CONTINUE;
3841 * Create portal object
3842 * Return object is allocated from heap memory.
3844 Portal *create_portal(void)
3848 if ((p = malloc(sizeof(Portal))) == NULL)
3851 p->prepare_ctxt = pool_memory_create(PREPARE_BLOCK_SIZE);
3852 if (p->prepare_ctxt == NULL)
3860 void init_prepared_list(void)
3862 prepared_list.cnt = 0;
3863 prepared_list.size = INIT_STATEMENT_LIST_SIZE;
3864 prepared_list.portal_list = malloc(sizeof(Portal *) * prepared_list.size);
3865 if (prepared_list.portal_list == NULL)
3867 pool_error("init_prepared_list: malloc failed: %s", strerror(errno));
3872 void add_prepared_list(PreparedStatementList *p, Portal *portal)
3874 if (p->cnt == p->size)
3877 p->portal_list = realloc(p->portal_list, sizeof(Portal *) * p->size);
3878 if (p->portal_list == NULL)
3880 pool_error("add_prepared_list: realloc failed: %s", strerror(errno));
3884 p->portal_list[p->cnt++] = portal;
3887 void add_unnamed_portal(PreparedStatementList *p, Portal *portal)
3889 if (unnamed_statement)
3891 pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3892 free(unnamed_statement);
3895 unnamed_portal = NULL;
3896 unnamed_statement = portal;
3899 void del_prepared_list(PreparedStatementList *p, Portal *portal)
3902 DeallocateStmt *s = (DeallocateStmt *)portal->stmt;
3904 /* DEALLOCATE ALL? */
3905 if (s->name == NULL)
3907 reset_prepared_list(p);
3911 for (i = 0; i < p->cnt; i++)
3913 PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3914 if (strcmp(p_stmt->name, s->name) == 0)
3921 pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3922 free(p->portal_list[i]->portal_name);
3923 free(p->portal_list[i]);
3924 if (i != p->cnt - 1)
3926 memmove(&p->portal_list[i], &p->portal_list[i+1],
3927 sizeof(Portal *) * (p->cnt - i - 1));
3933 void delete_all_prepared_list(PreparedStatementList *p, Portal *portal)
3935 reset_prepared_list(p);
3938 static void reset_prepared_list(PreparedStatementList *p)
3944 for (i = 0; i < p->cnt; i++)
3946 pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3947 free(p->portal_list[i]->portal_name);
3948 free(p->portal_list[i]);
3950 if (unnamed_statement)
3952 pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3953 free(unnamed_statement);
3955 unnamed_portal = NULL;
3956 unnamed_statement = NULL;
3961 Portal *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name)
3965 /* unnamed portal? */
3966 if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3967 return unnamed_statement;
3969 for (i = 0; i < p->cnt; i++)
3971 PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3972 if (strcmp(p_stmt->name, name) == 0)
3973 return p->portal_list[i];
3979 Portal *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name)
3983 /* unnamed portal? */
3984 if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3985 return unnamed_portal;
3987 for (i = 0; i < p->cnt; i++)
3989 if (p->portal_list[i]->portal_name &&
3990 strcmp(p->portal_list[i]->portal_name, name) == 0)
3991 return p->portal_list[i];
3997 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p,
4002 PrepareStmt *p_stmt;
4007 p_stmt = (PrepareStmt *)p->portal_list[n]->stmt;
4008 len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */
4009 query = malloc(len);
4012 pool_error("send_deallocate: malloc failed: %s", strerror(errno));
4015 sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name);
4017 if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
4029 * Parses CopyDataRow string.
4030 * Returns divide key value. If cannot parse data, returns NULL.
4033 parse_copy_data(char *buf, int len, char delimiter, int col_id)
4035 int i, j, field = 0;
4036 char *str, *p = NULL;
4038 str = malloc(len + 1);
4040 /* buf is terminated by '\n'. */
4041 /* skip '\n' in for loop. */
4042 for (i = 0, j = 0; i < len - 1; i++)
4044 if (buf[i] == '\\' && i != len - 2) /* escape */
4046 if (buf[i+1] == delimiter)
4056 else if (buf[i] == delimiter) /* delimiter */
4058 if (field == col_id)
4074 if (field == col_id)
4080 pool_error("parse_copy_data: malloc failed: %s", strerror(errno));
4085 pool_debug("parse_copy_data: divide key value is %s", p);
4093 query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len)
4095 static int inside_T; /* flag to see the result data sequence */
4098 if (is_select_pgcatalog || is_select_for_update)
4101 if (kind == 'T' && parsed_query)
4103 result = pool_query_cache_register(kind, frontend, database, data, data_len, parsed_query);
4106 pool_error("pool_query_cache_register: query cache registration failed");
4114 else if ((kind == 'D' || kind == 'C' || kind == 'E') && inside_T)
4116 result = pool_query_cache_register(kind, frontend, database, data, data_len, NULL);
4117 if (kind == 'C' || kind == 'E' || result < 0)
4120 pool_error("pool_query_cache_register: query cache registration failed");
4122 pool_debug("pool_query_cache_register: query cache saved");
4126 parsed_query = NULL;
4131 void query_ps_status(char *query, POOL_CONNECTION_POOL *backend)
4140 sp = MASTER_CONNECTION(backend)->sp;
4141 i = snprintf(psbuf, sizeof(psbuf), "%s %s %s ",
4142 sp->user, sp->database, remote_ps_data);
4145 while (*query && isspace(*query))
4148 for (; i< sizeof(psbuf); i++)
4150 if (!*query || isspace(*query))
4153 psbuf[i] = toupper(*query++);
4157 set_ps_display(psbuf, false);
4160 /* compare function for bsearch() */
4161 static int compare(const void *p1, const void *p2)
4165 v1 = *(NodeTag *) p1;
4166 v2 = *(NodeTag *) p2;
4167 return (v1 > v2) ? 1 : ((v1 == v2) ? 0 : -1);
4170 /* return true if needed to start a transaction for the nodetag */
4171 static bool is_internal_transaction_needed(Node *node)
4173 static NodeTag nodemap[] = {
4185 T_CreateStmt, /* CREAE TABLE */
4186 T_DefineStmt, /* CREATE AGGREGATE, OPERATOR, TYPE */
4187 T_DropStmt, /* DROP TABLE etc. */
4191 T_IndexStmt, /* CREATE INDEX */
4192 T_CreateFunctionStmt,
4193 T_AlterFunctionStmt,
4195 T_RenameStmt, /* ALTER AGGREGATE etc. */
4196 T_RuleStmt, /* CREATE RULE */
4200 T_ViewStmt, /* CREATE VIEW */
4204 T_CreatedbStmt, CREATE DATABASE/DROP DATABASE cannot execute inside a transaction block
4209 T_VariableSetStmt, /* SET */
4218 T_ConstraintsSetStmt,
4221 T_AlterDatabaseStmt,
4222 T_AlterDatabaseSetStmt,
4224 T_CreateConversionStmt,
4227 T_CreateOpClassStmt,
4228 T_CreateOpFamilyStmt,
4229 T_AlterOpFamilyStmt,
4230 T_RemoveOpClassStmt,
4231 T_RemoveOpFamilyStmt,
4235 T_DeclareCursorStmt,
4236 T_CreateTableSpaceStmt,
4237 T_DropTableSpaceStmt,
4238 T_AlterObjectSchemaStmt,
4241 T_ReassignOwnedStmt,
4242 T_CompositeTypeStmt, /* CREATE TYPE */
4244 T_AlterTSDictionaryStmt,
4245 T_AlterTSConfigurationStmt
4248 if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]), sizeof(NodeTag), compare) != NULL)
4251 * Check CREATE INDEX CONCURRENTLY. If so, do not start transaction
4253 if (IsA(node, IndexStmt))
4255 if (((IndexStmt *)node)->concurrent)
4260 * Check CLUSTER with no option. If so, do not start transaction
4262 else if (IsA(node, ClusterStmt))
4264 if (((ClusterStmt *)node)->relation == NULL)
4274 POOL_STATUS start_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node)
4278 if (TSTATE(backend) != 'I')
4279 return POOL_CONTINUE;
4281 /* if we are not in a transaction block,
4282 * start a new transaction
4284 if (is_internal_transaction_needed(node))
4286 for (i=0;i<NUM_BACKENDS;i++)
4288 if (VALID_BACKEND(i))
4290 if (do_command(frontend, CONNECTION(backend, i), "BEGIN", MAJOR(backend),
4291 MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
4296 /* mark that we started new transaction */
4297 internal_transaction_started = 1;
4299 return POOL_CONTINUE;
4303 POOL_STATUS end_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
4306 #ifdef HAVE_SIGPROCMASK
4313 * We must block all signals. If pgpool SIGTERM, SIGINT or SIGQUIT
4314 * is delivered, it could cause data inconsistency.
4316 POOL_SETMASK2(&BlockSig, &oldmask);
4318 /* We need to commit from secondary to master. */
4319 for (i=0;i<NUM_BACKENDS;i++)
4321 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
4323 /* COMMIT success? */
4324 if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend),
4325 MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4327 internal_transaction_started = 0;
4328 POOL_SETMASK(&oldmask);
4334 /* commit on master */
4335 if (do_command(frontend, MASTER(backend), "COMMIT", MAJOR(backend),
4336 MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4338 internal_transaction_started = 0;
4339 POOL_SETMASK(&oldmask);
4343 internal_transaction_started = 0;
4344 POOL_SETMASK(&oldmask);
4345 return POOL_CONTINUE;
4349 * Extract the number of tuples from CommandComplete message
4351 static int extract_ntuples(char *message)
4355 if ((rows = strstr(message, "UPDATE")) || (rows = strstr(message, "DELETE")))
4357 else if ((rows = strstr(message, "INSERT")))
4360 while (*rows && *rows != ' ') rows++;
4368 static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major)
4370 int r = detect_error(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, 'E', false);
4371 if (r == SPECIFIED_ERROR)
4373 pool_debug("detect_stop_postmaster_error: receive admin shutdown error from a node.");
4377 r = detect_error(backend, CRASH_SHUTDOWN_ERROR_CODE, major, 'N', false);
4378 if (r == SPECIFIED_ERROR)
4380 pool_debug("detect_stop_postmaster_error: receive crash shutdown error from a node.");
4385 int detect_active_sql_transaction_error(POOL_CONNECTION *backend, int major)
4387 int r = detect_error(backend, ACTIVE_SQL_TRANSACTION_ERROR_CODE, major, 'E', true);
4388 if (r == SPECIFIED_ERROR)
4390 pool_debug("detect_active_sql_transaction_error: receive SET TRANSACTION ISOLATION LEVEL must be called before any query error from a node.");
4395 int detect_deadlock_error(POOL_CONNECTION *backend, int major)
4397 int r = detect_error(backend, DEADLOCK_ERROR_CODE, major, 'E', true);
4398 if (r == SPECIFIED_ERROR)
4399 pool_debug("detect_deadlock_error: received deadlock error message from backend");
4403 int detect_serialization_error(POOL_CONNECTION *backend, int major)
4405 int r = detect_error(backend, SERIALIZATION_FAIL_ERROR_CODE, major, 'E', true);
4406 if (r == SPECIFIED_ERROR)
4407 pool_debug("detect_serialization_error: received serialization failure message from backend");
4411 int detect_query_cancel_error(POOL_CONNECTION *backend, int major)
4413 int r = detect_error(backend, QUERY_CANCEL_ERROR_CODE, major, 'E', true);
4414 if (r == SPECIFIED_ERROR)
4415 pool_debug("detect_query_cancel_error: received query cancel error message from backend");
4420 * detect_error: Detect specified error from error code.
4422 static int detect_error(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread)
4426 int readlen = 0, len;
4427 static char buf[8192]; /* memory space is large enough */
4430 if (pool_read(backend, &kind, sizeof(kind)))
4432 readlen += sizeof(kind);
4434 memcpy(p, &kind, sizeof(kind));
4437 pool_debug("detect_error: kind: %c", kind);
4439 /* Specified class? */
4442 /* read actual message */
4443 if (major == PROTO_MAJOR_V3)
4447 if (pool_read(backend, &len, sizeof(len)) < 0)
4449 readlen += sizeof(len);
4450 memcpy(p, &len, sizeof(len));
4453 len = ntohl(len) - 4;
4455 pool_read(backend, str, len);
4457 memcpy(p, str, len);
4460 * Checks error code which is formatted 'Cxxxxxx'
4461 * (xxxxxx is error code).
4467 {/* specified error? */
4468 is_error = (strcmp(e+1, error_code) == 0) ? SPECIFIED_ERROR : 0;
4472 e = e + strlen(e) + 1;
4478 str = pool_read_string(backend, &len, 0);
4480 memcpy(p, str, len);
4483 if (unread || !is_error)
4485 /* put a message to read buffer */
4486 if (pool_unread(backend, buf, readlen) != 0)
4494 * read message length and rest of the packet then discard it
4496 POOL_STATUS pool_discard_packet(POOL_CONNECTION_POOL *cp)
4501 POOL_CONNECTION *backend;
4503 for (i=0;i<NUM_BACKENDS;i++)
4505 if (!VALID_BACKEND(i))
4510 backend = CONNECTION(cp, i);
4512 status = pool_read(backend, &kind, sizeof(kind));
4515 pool_error("pool_discard_packet: error while reading message kind");
4519 pool_debug("pool_discard_packet: kind: %c", kind);
4521 if (MAJOR(cp) == PROTO_MAJOR_V3)
4523 if (pool_read(backend, &len, sizeof(len)) < 0)
4525 pool_error("pool_discard_packet: error while reading message length");
4528 len = ntohl(len) - 4;
4529 string = pool_read2(backend, len);
4532 pool_error("pool_discard_packet: error while reading rest of message");
4538 string = pool_read_string(backend, &len, 0);
4541 pool_error("pool_discard_packet: error while reading rest of message");
4546 return POOL_CONTINUE;