3 * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6.2.13 2009/10/02 07:56:42 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2009 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 *---------------------------------------------------------------------
22 * pool_proto_modules.c: modules corresponding to message protocols.
23 * used by pool_process_query()
24 *---------------------------------------------------------------------
29 #ifdef HAVE_SYS_TYPES_H
30 #include <sys/types.h>
32 #ifdef HAVE_SYS_TIME_H
35 #ifdef HAVE_SYS_SELECT_H
36 #include <sys/select.h>
43 #include <netinet/in.h>
47 #include "pool_signal.h"
48 #include "pool_proto_modules.h"
49 #include "parser/pool_string.h"
51 int force_replication;
52 int replication_was_enabled; /* replication mode was enabled */
53 int master_slave_was_enabled; /* master/slave mode was enabled */
54 int internal_transaction_started; /* to issue table lock command a transaction
55 has been started internally */
56 int in_progress = 0; /* indicates while doing something after receiving Query */
57 int mismatch_ntuples; /* number of updated tuples */
58 char *copy_table = NULL; /* copy table name */
59 char *copy_schema = NULL; /* copy table name */
60 char copy_delimiter; /* copy delimiter char */
61 char *copy_null = NULL; /* copy null string */
62 void (*pending_function)(PreparedStatementList *p, Portal *portal) = NULL;
63 Portal *pending_prepared_portal = NULL;
64 Portal *unnamed_statement = NULL;
65 Portal *unnamed_portal = NULL;
66 int select_in_transaction = 0; /* non 0 if select query is in transaction */
67 int execute_select = 0; /* non 0 if select query is in transaction */
69 /* non 0 if "BEGIN" query with extended query protocol received */
70 int receive_extended_begin = 0;
73 * Non 0 if allow to close internal transaction. This variable was
74 * introduced on 2008/4/3 not to close an internal transaction when
75 * Sync message is received after receiving Parse message. This hack
78 static int allow_close_transaction = 1;
80 PreparedStatementList prepared_list; /* prepared statement name list */
82 int is_select_pgcatalog = 0;
83 int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
84 bool is_parallel_table = false;
87 * last query string sent to simpleQuery()
89 char query_string_buffer[QUERY_STRING_BUFFER_LEN];
92 * query string produced by nodeToString() in simpleQuery().
93 * this variable only usefull when enable_query_cache is true.
95 char *parsed_query = NULL;
97 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
98 static void generate_error_message(char *prefix, int specific_error, char *query);
100 POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
101 POOL_CONNECTION_POOL *backend)
104 char *condition, *condition1 = NULL;
109 pool_write(frontend, "A", 1);
111 for (i=0;i<NUM_BACKENDS;i++)
113 if (VALID_BACKEND(i))
115 if (pool_read(CONNECTION(backend, i), &pid, sizeof(pid)) < 0)
117 condition = pool_read_string(CONNECTION(backend, i), &len, 0);
118 if (condition == NULL)
121 if (IS_MASTER_NODE_ID(i))
125 condition1 = strdup(condition);
130 pool_write(frontend, &pid1, sizeof(pid1));
131 status = pool_write_and_flush(frontend, condition1, len1);
137 * Process Query('Q') message
138 * Query messages include a SQL string.
140 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
141 POOL_CONNECTION_POOL *backend, char *query)
143 char *string, *string1;
145 static char *sq = "show pool_status";
147 List *parse_tree_list;
148 Node *node = NULL, *node1;
152 POOL_MEMORY_POOL *old_context = NULL;
155 force_replication = 0;
156 if (query == NULL) /* need to read query from frontend? */
158 /* read actual query */
159 if (MAJOR(backend) == PROTO_MAJOR_V3)
161 if (pool_read(frontend, &len, sizeof(len)) < 0)
163 len = ntohl(len) - 4;
164 string = pool_read2(frontend, len);
167 string = pool_read_string(frontend, &len, 0);
174 len = strlen(query)+1;
178 /* save last query string for logging purpose */
179 strncpy(query_string_buffer, string, sizeof(query_string_buffer));
182 query_ps_status(string, backend);
184 /* log query to log file if necessary */
185 if (pool_config->log_statement)
187 pool_log("statement: %s", string);
191 pool_debug("statement2: %s", string);
194 /* parse SQL string */
195 parse_tree_list = raw_parser(string);
197 if (parse_tree_list != NIL)
199 node = (Node *) lfirst(list_head(parse_tree_list));
202 is_parallel_table = is_partition_table(backend,node);
204 if (pool_config->enable_query_cache &&
205 SYSDB_STATUS == CON_UP &&
206 IsA(node, SelectStmt) &&
207 !(is_select_pgcatalog = IsSelectpgcatalog(node, backend)))
209 SelectStmt *select = (SelectStmt *)node;
211 if (! (select->intoClause || select->lockingClause))
213 parsed_query = strdup(nodeToString(node));
214 if (parsed_query == NULL)
216 pool_error("pool_process_query: malloc failed");
222 if (pool_query_cache_lookup(frontend, parsed_query, backend->info->database, TSTATE(backend)) == POOL_CONTINUE)
227 return POOL_CONTINUE;
230 is_select_for_update = 0;
234 is_select_for_update = 1;
238 if (pool_config->parallel_mode)
240 /* The Query is analyzed first in a parallel mode(in_parallel_query),
241 * and, next, the Query is rewritten(rewrite_query_stmt).
244 /* analyze the query */
245 RewriteQuery *r_query = is_parallel_query(node,backend);
247 if(r_query->is_loadbalance)
249 /* Usual processing of pgpool is done by using the rewritten Query
250 * if judged a possible load-balancing as a result of analyzing
252 * Of course, the load is distributed only for load_balance_mode=true.
254 if(r_query->r_code == SEND_LOADBALANCE_ENGINE)
256 /* use rewritten query */
257 string = r_query->rewrite_query;
258 /* change query length */
259 len = strlen(string)+1;
261 pool_debug("SimpleQuery: loadbalance_query =%s",string);
263 else if (r_query->is_parallel)
266 * For the Query that the parallel processing is possible.
267 * Call parallel exe engine and return status to the upper layer.
269 POOL_STATUS stats = pool_parallel_exec(frontend,backend,r_query->rewrite_query, node,true);
274 else if(!r_query->is_pg_catalog)
276 /* rewrite query and execute */
277 r_query = rewrite_query_stmt(node,frontend,backend,r_query);
278 if(r_query->type == T_InsertStmt)
282 if(r_query->r_code != INSERT_DIST_NO_RULE) {
284 return r_query->status;
287 else if(r_query->type == T_SelectStmt)
291 return r_query->status;
295 * The same processing as usual pgpool is done to other Query type.
299 /* check COPY FROM STDIN
300 * if true, set copy_* variable
302 check_copy_from_stdin(node);
305 * if this is DROP DATABASE command, send USR1 signal to parent and
306 * ask it to close all idle connections.
307 * XXX This is overkill. It would be better to close the idle
308 * connection for the database which DROP DATABASE command tries
309 * to drop. This is impossible at this point, since we have no way
310 * to pass such info to other processes.
312 if (is_drop_database(node))
314 int stime = 5; /* XXX give arbitrary time to allow closing idle connections */
316 pool_debug("Query: sending SIGUSR1 signal to parent");
318 Req_info->kind = CLOSE_IDLE_REQUEST;
319 kill(getppid(), SIGUSR1); /* send USR1 signal to parent */
321 /* we need to loop over here since we will get USR1 signal while sleeping */
324 stime = sleep(stime);
328 /* process status reporting? */
329 if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, strlen(sq)) == 0)
334 pool_debug("process reporting");
335 process_reporting(frontend, backend);
339 sp = MASTER_CONNECTION(backend)->sp;
340 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
341 sp->user, sp->database, remote_ps_data);
342 set_ps_display(psbuf, false);
345 return POOL_CONTINUE;
348 if (IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
349 IsA(node, VariableSetStmt) || IsA(node, DiscardStmt))
352 * PREPARE, DEALLOCATE and SET statements must be replicated.
354 if (MASTER_SLAVE && TSTATE(backend) != 'E')
355 force_replication = 1;
358 * Before we did followings only when frontend != NULL,
359 * which was wrong since if, for example, reset_query_list
360 * contains "DISCARD ALL", then it does not register
361 * pending function and it causes trying to DEALLOCATE non
362 * existing prepared statment(2009/4/3 Tatsuo).
364 if (IsA(node, PrepareStmt))
366 pending_function = add_prepared_list;
367 portal = create_portal();
370 pool_error("SimpleQuery: create_portal() failed");
374 /* switch memory context */
375 old_context = pool_memory;
376 pool_memory = portal->prepare_ctxt;
378 portal->portal_name = NULL;
379 portal->stmt = copyObject(node);
380 portal->sql_string = NULL;
381 pending_prepared_portal = portal;
383 else if (IsA(node, DeallocateStmt))
385 pending_function = del_prepared_list;
386 portal = create_portal();
389 pool_error("SimpleQuery: create_portal() failed");
393 /* switch memory context */
394 old_context = pool_memory;
395 pool_memory = portal->prepare_ctxt;
397 portal->portal_name = NULL;
398 portal->stmt = copyObject(node);
399 portal->sql_string = NULL;
400 pending_prepared_portal = portal;
402 else if (IsA(node, DiscardStmt))
404 DiscardStmt *stmt = (DiscardStmt *)node;
405 if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
407 pending_function = delete_all_prepared_list;
408 pending_prepared_portal = NULL;
412 /* switch old memory context */
414 pool_memory = old_context;
416 /* end of wrong if (see 2009/4/3 comment above) */
419 if (frontend && IsA(node, ExecuteStmt))
423 ExecuteStmt *e_stmt = (ExecuteStmt *)node;
425 portal = lookup_prepared_statement_by_statement(&prepared_list,
434 p_stmt = (PrepareStmt *)portal->stmt;
435 string1 = nodeToString(p_stmt->query);
436 node1 = (Node *)p_stmt->query;
445 /* load balance trick */
446 if (load_balance_enabled(backend, node1, string1))
447 start_load_balance(backend);
448 else if (MASTER_SLAVE)
450 pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
451 master_slave_was_enabled = 1;
453 master_slave_dml = 1;
454 if (force_replication)
456 replication_was_enabled = 0;
460 else if (REPLICATION &&
461 !pool_config->replicate_select &&
462 is_select_query(node1, string1) &&
463 !is_sequence_query(node1))
465 selected_slot = MASTER_NODE_ID;
466 replication_was_enabled = 1;
468 LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
470 select_in_transaction = 1;
475 * determine if we need to lock the table
476 * to keep SERIAL data consistency among servers
478 * - replication is enabled
480 * - statement is INSERT
481 * - either "INSERT LOCK" comment exists or insert_lock directive specified
485 /* start a transaction if needed */
486 if (start_internal_transaction(frontend, backend, (Node *)node) != POOL_CONTINUE)
489 /* check if need lock */
490 if (need_insert_lock(backend, string, node))
492 /* if so, issue lock command */
493 status = insert_lock(frontend, backend, string, (InsertStmt *)node);
494 if (status != POOL_CONTINUE)
501 else if (REPLICATION && query == NULL && start_internal_transaction(frontend, backend, node))
511 pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
512 master_slave_was_enabled = 1;
514 master_slave_dml = 1;
518 if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
520 TSTATE(backend) = 'T';
523 if (REPLICATION || PARALLEL_MODE)
525 /* check if query is "COMMIT" or "ROLLBACK" */
526 commit = is_commit_query(node);
530 * Query is not commit/rollback
534 /* Send the query to master node */
536 if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
539 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
541 /* Cancel current transaction */
542 CancelPacket cancel_packet;
544 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
545 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
546 cancel_packet.key= MASTER_CONNECTION(backend)->key;
547 cancel_request(&cancel_packet);
552 /* Check specific errors */
553 specific_error = check_errors(backend, MASTER_NODE_ID);
556 /* log error message */
557 generate_error_message("SimpleQuery: ", specific_error, string);
559 /* Set error query to abort transactions on other nodes */
560 string = POOL_ERROR_QUERY;
561 len = strlen(string) + 1;
565 /* send query to other than master nodes */
566 for (i=0;i<NUM_BACKENDS;i++)
568 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
571 if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
575 /* Wait for nodes othan than the master node */
576 for (i=0;i<NUM_BACKENDS;i++)
578 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
581 if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
583 /* Cancel current transaction */
584 CancelPacket cancel_packet;
586 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
587 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
588 cancel_packet.key= MASTER_CONNECTION(backend)->key;
589 cancel_request(&cancel_packet);
595 /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
598 if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
601 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
603 /* Cancel current transaction */
604 CancelPacket cancel_packet;
606 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
607 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
608 cancel_packet.key= MASTER_CONNECTION(backend)->key;
609 cancel_request(&cancel_packet);
615 TSTATE(backend) = 'I';
621 if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
624 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
626 /* Cancel current transaction */
627 CancelPacket cancel_packet;
629 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
630 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
631 cancel_packet.key= MASTER_CONNECTION(backend)->key;
632 cancel_request(&cancel_packet);
638 return POOL_CONTINUE;
642 * process EXECUTE (V3 only)
644 POOL_STATUS Execute(POOL_CONNECTION *frontend,
645 POOL_CONNECTION_POOL *backend)
647 char *string; /* portal name + null terminate + max_tobe_returned_rows */
651 int status, commit = 0;
656 int specific_error = 0;
658 /* read Execute packet */
659 if (pool_read(frontend, &len, sizeof(len)) < 0)
662 len = ntohl(len) - 4;
663 string = pool_read2(frontend, len);
665 pool_debug("Execute: portal name <%s>", string);
667 portal = lookup_prepared_statement_by_portal(&prepared_list,
670 /* load balance trick */
675 p_stmt = (PrepareStmt *)portal->stmt;
677 string1 = portal->sql_string;
678 pool_debug("Execute: query: %s", string1);
679 node = (Node *)p_stmt->query;
680 strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
682 if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
683 IsA(node, VariableSetStmt)) &&
684 MASTER_SLAVE && TSTATE(backend) != 'E')
687 * PREPARE, DEALLOCATE, SET, DISCARD
688 * should be executed on all nodes. So we set
691 force_replication = 1;
694 * JDBC driver sends "BEGIN" query internally if
695 * setAutoCommit(false). But it does not send Sync message
696 * after "BEGIN" query. In extended query protocol,
697 * PostgreSQL returns ReadyForQuery when a client sends Sync
698 * message. Problem is, pgpool can't know the transaction
699 * state without receiving ReadyForQuery. So we remember that
700 * we need to send Sync message internally afterward, whenever
701 * we receive BEGIN in extended protocol.
703 else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
705 TransactionStmt *stmt = (TransactionStmt *) node;
707 if (stmt->kind == TRANS_STMT_BEGIN ||
708 stmt->kind == TRANS_STMT_START)
709 /* Remember we need to send sync later in extended protocol */
710 receive_extended_begin = 1;
713 if (load_balance_enabled(backend, node, string1))
714 start_load_balance(backend);
715 else if (REPLICATION &&
716 !pool_config->replicate_select &&
717 is_select_query((Node *)p_stmt->query, string1) &&
718 !is_sequence_query((Node *)p_stmt->query))
720 selected_slot = MASTER_NODE_ID;
721 replication_was_enabled = 1;
723 LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
725 select_in_transaction = 1;
729 else if (REPLICATION && start_internal_transaction(backend, (Node *)p_stmt->query))
734 /* check if query is "COMMIT" or "ROLLBACK" */
735 commit = is_commit_query((Node *)p_stmt->query);
740 master_slave_was_enabled = 1;
742 master_slave_dml = 1;
743 if (force_replication)
745 replication_was_enabled = 0;
750 if (REPLICATION || PARALLEL_MODE)
753 * Query is not commit/rollback
757 /* Send the query to master node */
759 if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
762 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
764 /* Cancel current transaction */
765 CancelPacket cancel_packet;
767 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
768 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
769 cancel_packet.key= MASTER_CONNECTION(backend)->key;
770 cancel_request(&cancel_packet);
776 /* Check specific errors */
777 specific_error = check_errors(backend, MASTER_NODE_ID);
780 /* log error message */
781 generate_error_message("Execute: ", specific_error, string);
785 /* send query to other nodes */
786 for (i=0;i<NUM_BACKENDS;i++)
788 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
793 char msg[1024] = "pgpoool_error_portal"; /* large enough */
794 int len = strlen(msg);
796 memset(msg + len, 0, sizeof(int));
797 if (send_execute_message(backend, i, len + 5, msg))
800 else if (send_execute_message(backend, i, len, string) != POOL_CONTINUE)
804 /* Wait for nodes other than the master node */
805 for (i=0;i<NUM_BACKENDS;i++)
807 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
810 if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
812 /* Cancel current transaction */
813 CancelPacket cancel_packet;
815 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
816 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
817 cancel_packet.key= MASTER_CONNECTION(backend)->key;
818 cancel_request(&cancel_packet);
824 /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
827 if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
830 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
832 /* Cancel current transaction */
833 CancelPacket cancel_packet;
835 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
836 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
837 cancel_packet.key= MASTER_CONNECTION(backend)->key;
838 cancel_request(&cancel_packet);
846 if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
849 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
851 /* Cancel current transaction */
852 CancelPacket cancel_packet;
854 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
855 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
856 cancel_packet.key= MASTER_CONNECTION(backend)->key;
857 cancel_request(&cancel_packet);
863 while ((ret = read_kind_from_backend(frontend, backend, &kind)) == POOL_CONTINUE)
866 * forward message until receiving CommandComplete,
867 * ErrorResponse, EmptyQueryResponse or PortalSuspend.
869 if (kind == 'C' || kind == 'E' || kind == 'I' || kind == 's')
872 status = SimpleForwardToFrontend(kind, frontend, backend);
873 if (status != POOL_CONTINUE)
876 if (ret != POOL_CONTINUE)
879 status = SimpleForwardToFrontend(kind, frontend, backend);
880 if (status != POOL_CONTINUE)
883 return POOL_CONTINUE;
887 * process Parse (V3 only)
889 POOL_STATUS Parse(POOL_CONNECTION *frontend,
890 POOL_CONNECTION_POOL *backend)
897 POOL_MEMORY_POOL *old_context;
900 List *parse_tree_list;
902 int deadlock_detected = 0;
903 int insert_stmt_with_lock = 0;
906 /* read Parse packet */
907 if (pool_read(frontend, &len, sizeof(len)) < 0)
910 len = ntohl(len) - 4;
911 string = pool_read2(frontend, len);
913 pool_debug("Parse: portal name <%s>", string);
916 stmt = string + strlen(string) + 1;
918 parse_tree_list = raw_parser(stmt);
919 if (parse_tree_list == NIL)
925 /* Save last query string for logging purpose */
926 snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
928 node = (Node *) lfirst(list_head(parse_tree_list));
930 insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
932 portal = create_portal();
935 pool_error("Parse: create_portal() failed");
939 /* switch memory context */
940 old_context = pool_memory;
941 pool_memory = portal->prepare_ctxt;
943 /* translate Parse message to PrepareStmt */
944 p_stmt = palloc(sizeof(PrepareStmt));
945 p_stmt->type = T_PrepareStmt;
946 p_stmt->name = pstrdup(name);
947 p_stmt->query = copyObject(node);
948 portal->stmt = (Node *)p_stmt;
949 portal->portal_name = NULL;
950 portal->sql_string = pstrdup(stmt);
954 pending_function = add_prepared_list;
955 pending_prepared_portal = portal;
957 else /* unnamed statement */
959 pending_function = add_unnamed_portal;
962 pending_prepared_portal = portal;
965 /* switch old memory context */
966 pool_memory = old_context;
972 if (TSTATE(backend) != 'T')
974 /* synchronize transaction state */
975 for (i = 0; i < NUM_BACKENDS; i++)
977 if (!VALID_BACKEND(i))
980 /* send sync message */
981 send_extended_protocol_message(backend, i, "S", 0, "");
984 kind = pool_read_kind(backend);
987 if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
991 if (is_strict_query(node))
992 start_internal_transaction(frontend, backend, node);
994 if (insert_stmt_with_lock)
996 /* start a transaction if needed and lock the table */
997 status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
998 if (status != POOL_CONTINUE)
1007 /* send to master node */
1008 if (send_extended_protocol_message(backend, MASTER_NODE_ID,
1012 if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
1015 * We must synchronize because Parse message acquires table
1018 pool_debug("Parse: waiting for master completing the query");
1019 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
1021 /* Cancel current transaction */
1022 CancelPacket cancel_packet;
1024 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1025 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1026 cancel_packet.key= MASTER_CONNECTION(backend)->key;
1027 cancel_request(&cancel_packet);
1032 * We must check deadlock error because a aborted transaction
1033 * by detecting deadlock isn't same on all nodes.
1034 * If a transaction is aborted on master node, pgpool send a
1035 * error query to another nodes.
1037 deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1038 if (deadlock_detected < 0)
1041 for (i=0;i<NUM_BACKENDS;i++)
1043 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1045 if (deadlock_detected)
1047 pool_log("Parse: received deadlock error message from master node");
1049 if (send_simplequery_message(CONNECTION(backend, i),
1050 strlen(POOL_ERROR_QUERY)+1,
1055 else if (send_extended_protocol_message(backend, i,
1061 /* wait for DB nodes completing query except master node */
1062 for (i=0;i<NUM_BACKENDS;i++)
1064 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1067 pool_debug("Parse: waiting for %dth backend completing the query", i);
1068 if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
1070 /* Cancel current transaction */
1071 CancelPacket cancel_packet;
1073 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1074 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1075 cancel_packet.key= MASTER_CONNECTION(backend)->key;
1076 cancel_request(&cancel_packet);
1085 ret = read_kind_from_backend(frontend, backend, &kind);
1087 if (ret != POOL_CONTINUE)
1090 SimpleForwardToFrontend(kind, frontend, backend);
1091 if (pool_flush(frontend) < 0)
1094 /* Ignore warning messages */
1098 return POOL_CONTINUE;
1102 * Process ReadyForQuery('Z') message.
1104 * - if the global error status "mismatch_ntuples" is set, send an error query
1105 * to all DB nodes to abort transaction.
1106 * - internal transaction is closed
1108 POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
1109 POOL_CONNECTION_POOL *backend, int send_ready)
1118 * If the numbers of update tuples are differ, we need to abort transaction
1119 * by using do_error_command. This only works with PROTO_MAJOR_V3.
1121 if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1128 * XXX: discard rest of ReadyForQuery packet
1130 if (pool_read_message_length(backend) < 0)
1133 state = pool_read_kind(backend);
1137 pool_debug("ReadyForQuery: transaction state: %c", state);
1139 for (i = 0; i < NUM_BACKENDS; i++)
1141 if (VALID_BACKEND(i))
1143 /* abort transaction on all nodes. */
1144 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1148 /* loop through until we get ReadyForQuery */
1151 kind = pool_read_kind(backend);
1158 /* put the message back to read buffer */
1159 for (i=0;i<NUM_BACKENDS;i++)
1161 if (VALID_BACKEND(i))
1163 pool_unread(CONNECTION(backend,i), &kind, 1);
1167 /* discard rest of the packet */
1168 if (pool_discard_packet(backend) != POOL_CONTINUE)
1170 pool_error("ReadyForQuery: pool_discard_packet failed");
1174 mismatch_ntuples = 0;
1178 * if a transaction is started for insert lock, we need to close
1181 if (internal_transaction_started && allow_close_transaction)
1186 if (MAJOR(backend) == PROTO_MAJOR_V3)
1188 if ((len = pool_read_message_length(backend)) < 0)
1191 pool_debug("ReadyForQuery: message length: %d", len);
1195 state = pool_read_kind(backend);
1199 /* set transaction state */
1200 pool_debug("ReadyForQuery: transaction state: %c", state);
1203 if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
1207 if (MAJOR(backend) == PROTO_MAJOR_V3)
1209 if ((len = pool_read_message_length(backend)) < 0)
1212 pool_debug("ReadyForQuery: message length: %d", len);
1215 * Do not check transaction state in master/slave mode.
1216 * Because SET, PREPARE, DEALLOCATE are replicated.
1217 * If these queries are executed inside a transaction block,
1218 * transation state will be inconsistent. But it is no problem.
1220 if (master_slave_dml)
1224 if (pool_read(MASTER(backend), &kind, sizeof(kind)))
1227 for (i = 0; i < NUM_BACKENDS; i++)
1229 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1232 if (pool_read(CONNECTION(backend, i), &kind1, sizeof(kind)))
1239 state = pool_read_kind(backend);
1244 /* set transaction state */
1245 pool_debug("ReadyForQuery: transaction state: %c", state);
1247 for (i=0;i<NUM_BACKENDS;i++)
1249 if (!VALID_BACKEND(i))
1252 CONNECTION(backend, i)->tstate = state;
1258 pool_write(frontend, "Z", 1);
1260 if (MAJOR(backend) == PROTO_MAJOR_V3)
1263 pool_write(frontend, &len, sizeof(len));
1264 pool_write(frontend, &state, 1);
1267 if (pool_flush(frontend))
1273 /* end load balance mode */
1274 if (in_load_balance)
1275 end_load_balance(backend);
1277 if (master_slave_dml)
1280 master_slave_was_enabled = 0;
1281 master_slave_dml = 0;
1282 if (force_replication)
1284 force_replication = 0;
1286 replication_was_enabled = 0;
1291 return ProcessFrontendResponse(frontend, backend);
1294 sp = MASTER_CONNECTION(backend)->sp;
1295 if (MASTER(backend)->tstate == 'T')
1296 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
1297 sp->user, sp->database, remote_ps_data);
1299 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
1300 sp->user, sp->database, remote_ps_data);
1301 set_ps_display(psbuf, false);
1303 return POOL_CONTINUE;
1307 POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
1308 POOL_CONNECTION_POOL *backend)
1315 for (i=0;i<NUM_BACKENDS;i++)
1317 if (VALID_BACKEND(i))
1319 pool_write(CONNECTION(backend, i), "F", 1);
1324 if (pool_read(frontend, dummy, sizeof(dummy)) < 0)
1327 for (i=0;i<NUM_BACKENDS;i++)
1329 if (VALID_BACKEND(i))
1331 pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
1335 /* function object id */
1336 if (pool_read(frontend, &oid, sizeof(oid)) < 0)
1339 for (i=0;i<NUM_BACKENDS;i++)
1341 if (VALID_BACKEND(i))
1343 pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
1347 /* number of arguments */
1348 if (pool_read(frontend, &argn, sizeof(argn)) < 0)
1351 for (i=0;i<NUM_BACKENDS;i++)
1353 if (VALID_BACKEND(i))
1355 pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
1361 for (i=0;i<argn;i++)
1366 /* length of each argument in bytes */
1367 if (pool_read(frontend, &len, sizeof(len)) < 0)
1370 for (i=0;i<NUM_BACKENDS;i++)
1372 if (VALID_BACKEND(i))
1374 pool_write(CONNECTION(backend, i), &len, sizeof(len));
1380 /* argument value itself */
1381 if ((arg = pool_read2(frontend, len)) == NULL)
1384 for (i=0;i<NUM_BACKENDS;i++)
1386 if (VALID_BACKEND(i))
1388 pool_write(CONNECTION(backend, i), arg, len);
1393 for (i=0;i<NUM_BACKENDS;i++)
1395 if (VALID_BACKEND(i))
1397 if (pool_flush(CONNECTION(backend, i)))
1401 return POOL_CONTINUE;
1404 POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend,
1405 POOL_CONNECTION_POOL *backend)
1412 pool_write(frontend, "V", 1);
1414 for (i=0;i<NUM_BACKENDS;i++)
1416 if (VALID_BACKEND(i))
1418 if (pool_read(CONNECTION(backend, i), &dummy, 1) < 0)
1422 pool_write(frontend, &dummy, 1);
1424 /* non empty result? */
1427 for (i=0;i<NUM_BACKENDS;i++)
1429 if (VALID_BACKEND(i))
1431 /* length of result in bytes */
1432 if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
1436 pool_write(frontend, &len, sizeof(len));
1440 for (i=0;i<NUM_BACKENDS;i++)
1442 if (VALID_BACKEND(i))
1444 /* result value itself */
1445 if ((result = pool_read2(MASTER(backend), len)) == NULL)
1449 pool_write(frontend, result, len);
1452 for (i=0;i<NUM_BACKENDS;i++)
1454 if (VALID_BACKEND(i))
1457 if (pool_read(MASTER(backend), &dummy, 1) < 0)
1461 pool_write(frontend, "0", 1);
1463 return pool_flush(frontend);
1466 POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
1467 POOL_CONNECTION_POOL *backend)
1474 if (frontend->len <= 0 && frontend->no_forward != 0)
1475 return POOL_CONTINUE;
1477 if (pool_read(frontend, &fkind, 1) < 0)
1479 pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited");
1483 pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
1486 * If we have received BEGIN in extended protocol before, we need
1487 * to send a sync message to know the transaction stare.
1489 if (receive_extended_begin)
1491 receive_extended_begin = 0;
1493 /* send sync message */
1494 send_extended_protocol_message(backend, MASTER_NODE_ID, "S", 0, "");
1496 kind = pool_read_kind(backend);
1499 if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
1506 case 'X': /* Terminate message*/
1507 if (MAJOR(backend) == PROTO_MAJOR_V3)
1510 pool_read(frontend, &len, sizeof(len));
1514 case 'Q': /* Query message*/
1516 allow_close_transaction = 1;
1517 status = SimpleQuery(frontend, backend, NULL);
1520 case 'E': /* Execute message */
1521 allow_close_transaction = 1;
1522 status = Execute(frontend, backend);
1525 case 'P': /* Parse message */
1526 allow_close_transaction = 0;
1527 status = Parse(frontend, backend);
1530 case 'S': /* Sync message */
1531 receive_extended_begin = 0;
1535 if (MAJOR(backend) == PROTO_MAJOR_V3)
1538 (TSTATE(backend) != 'I' || receive_extended_begin))
1540 pool_debug("kind: %c master_slave_dml enabled", fkind);
1541 master_slave_was_enabled = 1;
1543 master_slave_dml = 1;
1546 status = SimpleForwardToBackend(fkind, frontend, backend);
1547 for (i=0;i<NUM_BACKENDS;i++)
1549 if (VALID_BACKEND(i))
1551 if (pool_flush(CONNECTION(backend, i)))
1552 status = POOL_ERROR;
1556 else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F')
1557 status = FunctionCall(frontend, backend);
1560 pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
1561 status = POOL_ERROR;
1566 if (status != POOL_CONTINUE)
1567 status = POOL_ERROR;
1571 POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend,
1572 POOL_CONNECTION_POOL *backend)
1575 char *string = NULL;
1576 char *string1 = NULL;
1579 /* read command tag */
1580 string = pool_read_string(MASTER(backend), &len, 0);
1584 string1 = strdup(string);
1586 for (i=0;i<NUM_BACKENDS;i++)
1588 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1591 /* read command tag */
1592 string = pool_read_string(CONNECTION(backend, i), &len, 0);
1598 pool_debug("Complete Command Response: message length does not match between master(%d \"%s\",) and %d th server (%d \"%s\",)",
1599 len, string, i, len1, string1);
1605 /* forward to the frontend */
1606 pool_write(frontend, "C", 1);
1607 pool_debug("Complete Command Response: string: \"%s\"", string1);
1608 if (pool_write(frontend, string1, len1) < 0)
1615 return pool_flush(frontend);
1618 int RowDescription(POOL_CONNECTION *frontend,
1619 POOL_CONNECTION_POOL *backend,
1622 short num_fields, num_fields1 = 0;
1630 pool_read(MASTER(backend), &num_fields, sizeof(short));
1631 num_fields1 = num_fields;
1632 for (i=0;i<NUM_BACKENDS;i++)
1634 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1636 /* # of fields (could be 0) */
1637 pool_read(CONNECTION(backend, i), &num_fields, sizeof(short));
1638 if (num_fields != num_fields1)
1640 pool_error("RowDescription: num_fields does not match between backends master(%d) and %d th backend(%d)",
1641 num_fields, i, num_fields1);
1647 /* forward it to the frontend */
1648 pool_write(frontend, "T", 1);
1649 pool_write(frontend, &num_fields, sizeof(short));
1650 num_fields = ntohs(num_fields);
1651 for (i = 0;i<num_fields;i++)
1656 string = pool_read_string(MASTER(backend), &len, 0);
1660 if (pool_write(frontend, string, len) < 0)
1663 for (j=0;j<NUM_BACKENDS;j++)
1665 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1667 string = pool_read_string(CONNECTION(backend, j), &len, 0);
1673 pool_error("RowDescription: field length does not match between backends master(%d) and %d th backend(%d)",
1674 ntohl(len), j, ntohl(len1));
1681 pool_read(MASTER(backend), &oid, sizeof(int));
1683 pool_debug("RowDescription: type oid: %d", ntohl(oid));
1684 for (j=0;j<NUM_BACKENDS;j++)
1686 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1688 pool_read(CONNECTION(backend, j), &oid, sizeof(int));
1690 /* we do not regard oid mismatch as fatal */
1693 pool_debug("RowDescription: field oid does not match between backends master(%d) and %d th backend(%d)",
1694 ntohl(oid), j, ntohl(oid1));
1698 if (pool_write(frontend, &oid1, sizeof(int)) < 0)
1702 pool_read(MASTER(backend), &size, sizeof(short));
1704 for (j=0;j<NUM_BACKENDS;j++)
1706 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1708 pool_read(CONNECTION(backend, j), &size, sizeof(short));
1711 pool_error("RowDescription: field size does not match between backends master(%d) and %d th backend(%d)",
1712 ntohs(size), j, ntohs(size1));
1717 pool_debug("RowDescription: field size: %d", ntohs(size));
1718 pool_write(frontend, &size1, sizeof(short));
1721 pool_read(MASTER(backend), &mod, sizeof(int));
1722 pool_debug("RowDescription: modifier: %d", ntohs(mod));
1724 for (j=0;j<NUM_BACKENDS;j++)
1726 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1728 pool_read(CONNECTION(backend, j), &mod, sizeof(int));
1731 pool_debug("RowDescription: modifier does not match between backends master(%d) and %d th backend(%d)",
1732 ntohl(mod), j, ntohl(mod1));
1736 if (pool_write(frontend, &mod1, sizeof(int)) < 0)
1740 *result = num_fields;
1742 return pool_flush(frontend);
1745 POOL_STATUS AsciiRow(POOL_CONNECTION *frontend,
1746 POOL_CONNECTION_POOL *backend,
1749 static char nullmap[8192], nullmap1[8192];
1753 int size, size1 = 0;
1754 char *buf = NULL, *sendbuf = NULL;
1757 pool_write(frontend, "D", 1);
1759 nbytes = (num_fields + 7)/8;
1762 return POOL_CONTINUE;
1765 pool_read(MASTER(backend), nullmap, nbytes);
1766 memcpy(nullmap1, nullmap, nbytes);
1767 for (i=0;i<NUM_BACKENDS;i++)
1769 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1771 pool_read(CONNECTION(backend, i), nullmap, nbytes);
1772 if (memcmp(nullmap, nullmap1, nbytes))
1774 /* XXX: NULLMAP maybe different among
1775 backends. If we were a paranoid, we have to treat
1776 this as a fatal error. However in the real world
1777 we'd better to adapt this situation. Just throw a
1779 pool_debug("AsciiRow: NULLMAP differ between master and %d th backend", i);
1784 if (pool_write(frontend, nullmap1, nbytes) < 0)
1789 for (i = 0;i<num_fields;i++)
1795 if (mask & nullmap[i/8])
1798 if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
1801 size1 = ntohl(size) - 4;
1803 /* read and send actual data only when size > 0 */
1806 sendbuf = pool_read2(MASTER(backend), size1);
1807 if (sendbuf == NULL)
1811 /* forward to frontend */
1812 pool_write(frontend, &size, sizeof(int));
1813 pool_write(frontend, sendbuf, size1);
1814 snprintf(msgbuf, Min(sizeof(msgbuf), size1+1), "%s", sendbuf);
1815 pool_debug("AsciiRow: len: %d data: %s", size1, msgbuf);
1817 for (j=0;j<NUM_BACKENDS;j++)
1819 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1822 if (pool_read(CONNECTION(backend, j), &size, sizeof(int)) < 0)
1826 size = ntohl(size) - 4;
1828 /* XXX: field size maybe different among
1829 backends. If we were a paranoid, we have to treat
1830 this as a fatal error. However in the real world
1831 we'd better to adapt this situation. Just throw a
1834 pool_debug("AsciiRow: %d th field size does not match between master(%d) and %d th backend(%d)",
1835 i, ntohl(size), j, ntohl(size1));
1837 /* read and send actual data only when size > 0 */
1840 buf = pool_read2(CONNECTION(backend, j), size);
1851 if (pool_flush(frontend))
1854 return POOL_CONTINUE;
1857 POOL_STATUS BinaryRow(POOL_CONNECTION *frontend,
1858 POOL_CONNECTION_POOL *backend,
1861 static char nullmap[8192], nullmap1[8192];
1865 int size, size1 = 0;
1868 pool_write(frontend, "B", 1);
1870 nbytes = (num_fields + 7)/8;
1873 return POOL_CONTINUE;
1876 pool_read(MASTER(backend), nullmap, nbytes);
1877 if (pool_write(frontend, nullmap, nbytes) < 0)
1879 memcpy(nullmap1, nullmap, nbytes);
1880 for (i=0;i<NUM_BACKENDS;i++)
1882 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1884 pool_read(CONNECTION(backend, i), nullmap, nbytes);
1885 if (memcmp(nullmap, nullmap1, nbytes))
1887 /* XXX: NULLMAP maybe different among
1888 backends. If we were a paranoid, we have to treat
1889 this as a fatal error. However in the real world
1890 we'd better to adapt this situation. Just throw a
1892 pool_debug("BinaryRow: NULLMAP differ between master and %d th backend", i);
1899 for (i = 0;i<num_fields;i++)
1905 if (mask & nullmap[i/8])
1908 if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
1910 for (j=0;j<NUM_BACKENDS;j++)
1912 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1915 if (pool_read(CONNECTION(backend, i), &size, sizeof(int)) < 0)
1918 /* XXX: field size maybe different among
1919 backends. If we were a paranoid, we have to treat
1920 this as a fatal error. However in the real world
1921 we'd better to adapt this situation. Just throw a
1924 pool_debug("BinaryRow: %d th field size does not match between master(%d) and %d th backend(%d)",
1925 i, ntohl(size), j, ntohl(size1));
1930 /* forward to frontend */
1931 if (IS_MASTER_NODE_ID(j))
1932 pool_write(frontend, &size, sizeof(int));
1933 size = ntohl(size) - 4;
1935 /* read and send actual data only when size > 0 */
1938 buf = pool_read2(CONNECTION(backend, j), size);
1942 if (IS_MASTER_NODE_ID(j))
1944 pool_write(frontend, buf, size);
1953 if (pool_flush(frontend))
1956 return POOL_CONTINUE;
1959 POOL_STATUS CursorResponse(POOL_CONNECTION *frontend,
1960 POOL_CONNECTION_POOL *backend)
1962 char *string = NULL;
1963 char *string1 = NULL;
1967 /* read cursor name */
1968 string = pool_read_string(MASTER(backend), &len, 0);
1972 string1 = strdup(string);
1974 for (i=0;i<NUM_BACKENDS;i++)
1976 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1978 /* read cursor name */
1979 string = pool_read_string(CONNECTION(backend, i), &len, 0);
1984 pool_error("CursorResponse: length does not match between master(%d) and %d th backend(%d)",
1986 pool_error("CursorResponse: master(%s) %d th backend(%s)", string1, i, string);
1993 /* forward to the frontend */
1994 pool_write(frontend, "P", 1);
1995 if (pool_write(frontend, string1, len1) < 0)
2002 if (pool_flush(frontend))
2005 return POOL_CONTINUE;
2008 POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
2009 POOL_CONNECTION_POOL *backend)
2011 char *string = NULL;
2015 for (i=0;i<NUM_BACKENDS;i++)
2017 if (VALID_BACKEND(i))
2019 /* read error message */
2020 string = pool_read_string(CONNECTION(backend, i), &len, 0);
2026 /* forward to the frontend */
2027 pool_write(frontend, "E", 1);
2028 if (pool_write_and_flush(frontend, string, len) < 0)
2031 /* change transaction state */
2032 if (TSTATE(backend) == 'T')
2033 TSTATE(backend) = 'E';
2035 TSTATE(backend) = 'I';
2037 return POOL_CONTINUE;
2040 POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend,
2041 POOL_CONNECTION_POOL *backend)
2043 char *string = NULL;
2047 for (i=0;i<NUM_BACKENDS;i++)
2049 if (VALID_BACKEND(i))
2051 /* read notice message */
2052 string = pool_read_string(CONNECTION(backend, i), &len, 0);
2058 /* forward to the frontend */
2059 pool_write(frontend, "N", 1);
2060 if (pool_write_and_flush(frontend, string, len) < 0)
2064 return POOL_CONTINUE;
2067 POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
2068 POOL_CONNECTION_POOL *backend)
2072 /* forward to the frontend */
2073 if (MAJOR(backend) == PROTO_MAJOR_V3)
2075 if (SimpleForwardToFrontend('G', frontend, backend) != POOL_CONTINUE)
2077 if (pool_flush(frontend) != POOL_CONTINUE)
2081 if (pool_write_and_flush(frontend, "G", 1) < 0)
2084 status = CopyDataRows(frontend, backend, 1);
2088 POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
2089 POOL_CONNECTION_POOL *backend)
2093 /* forward to the frontend */
2094 if (MAJOR(backend) == PROTO_MAJOR_V3)
2096 if (SimpleForwardToFrontend('H', frontend, backend) != POOL_CONTINUE)
2098 if (pool_flush(frontend) != POOL_CONTINUE)
2102 if (pool_write_and_flush(frontend, "H", 1) < 0)
2105 status = CopyDataRows(frontend, backend, 0);
2109 POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
2110 POOL_CONNECTION_POOL *backend, int copyin)
2112 char *string = NULL;
2115 DistDefInfo *info = NULL;
2122 if (copyin && pool_config->parallel_mode == TRUE)
2124 info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
2133 if (MAJOR(backend) == PROTO_MAJOR_V3)
2139 if (pool_read(frontend, &kind, 1) < 0)
2142 if (info && kind == 'd')
2145 if (pool_read(frontend, &sendlen, sizeof(sendlen)))
2150 len = ntohl(sendlen) - 4;
2153 return POOL_CONTINUE;
2155 p = pool_read2(frontend, len);
2160 if (len == 3 && memcmp(p, "\\.\n", 3) == 0)
2162 for (i=0;i<NUM_BACKENDS;i++)
2164 if (VALID_BACKEND(i))
2166 if (pool_write(CONNECTION(backend, i), &kind, 1))
2168 if (pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen)))
2170 if (pool_write(CONNECTION(backend, i), p, len))
2177 p1 = parse_copy_data(p, len, copy_delimiter, info->dist_key_col_id);
2181 pool_error("CopyDataRow: cannot parse data");
2184 else if (strcmp(p1, copy_null) == 0)
2186 pool_error("CopyDataRow: key parameter is NULL");
2191 id = pool_get_id(info, p1);
2192 pool_debug("CopyDataRow: copying id: %d", id);
2194 if (!VALID_BACKEND(id))
2198 if (pool_write(CONNECTION(backend, id), &kind, 1))
2202 if (pool_write(CONNECTION(backend, id), &sendlen, sizeof(sendlen)))
2206 if (pool_write_and_flush(CONNECTION(backend, id), p, len))
2214 SimpleForwardToBackend(kind, frontend, backend);
2222 pool_debug("CopyDataRows: copyin kind other than d (%c)", kind);
2227 string = pool_read_string(frontend, &len, 1);
2232 if (MAJOR(backend) == PROTO_MAJOR_V3)
2236 if ((kind = pool_read_kind(backend)) < 0)
2239 SimpleForwardToFrontend(kind, frontend, backend);
2249 for (i=0;i<NUM_BACKENDS;i++)
2251 if (VALID_BACKEND(i))
2253 string = pool_read_string(CONNECTION(backend, i), &len, 1);
2263 strncpy(buf, string, len);
2264 pool_debug("copy line %d %d bytes :%s:", j++, len, buf);
2269 for (i=0;i<NUM_BACKENDS;i++)
2271 if (VALID_BACKEND(i))
2273 pool_write(CONNECTION(backend, i), string, len);
2278 pool_write(frontend, string, len);
2280 if (len == PROTO_MAJOR_V3)
2283 if (string[0] == '\\' &&
2294 for (i=0;i<NUM_BACKENDS;i++)
2296 if (VALID_BACKEND(i))
2298 if (pool_flush(CONNECTION(backend, i)) <0)
2301 if (synchronize(CONNECTION(backend, i)))
2307 if (pool_flush(frontend) <0)
2310 return POOL_CONTINUE;
2313 POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend,
2314 POOL_CONNECTION_POOL *backend)
2319 for (i=0;i<NUM_BACKENDS;i++)
2321 if (VALID_BACKEND(i))
2323 if (pool_read(CONNECTION(backend, i), &c, sizeof(c)) < 0)
2328 pool_write(frontend, "I", 1);
2329 return pool_write_and_flush(frontend, "", 1);
2333 * Check various errors from backend. return values: 0: no error 1:
2334 * deadlock detected 2: serialization error detected 3: query cancel
2337 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id)
2341 * Check dead lock error on the master node and abort
2342 * transactions on all nodes if so.
2344 if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2348 * Check serialization failure error and abort
2349 * transactions on all nodes if so. Otherwise we allow
2350 * data inconsistency among DB nodes. See following
2351 * scenario: (M:master, S:slave)
2357 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2358 * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2359 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2360 * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2361 * M:S1:UPDATE t1 SET i = i + 1;
2362 * S:S1:UPDATE t1 SET i = i + 1;
2363 * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
2366 * M:S2:ERROR: could not serialize access due to concurrent update
2367 * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
2369 if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2373 * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
2374 * This happens in following scenario:
2378 * M:S1:SELECT 1; <-- only sent to MASTER
2379 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2380 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2382 * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
2384 if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2387 /* check query cancel error */
2388 if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2394 static void generate_error_message(char *prefix, int specific_error, char *query)
2396 static char *error_messages[] = {
2397 "received deadlock error message from master node. query: %s",
2398 "received serialization failure error message from master node. query: %s",
2399 "received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
2400 "received query cancel error message from master node. query: %s"
2405 if (specific_error < 1 || specific_error > sizeof(error_messages)/sizeof(char *))
2407 pool_error("generate_error_message: invalid specific_error: %d", specific_error);
2413 msg = init_string(prefix);
2414 string_append_char(msg, error_messages[specific_error]);
2415 pool_error(msg->data, query);