]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_proto_modules.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_proto_modules.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6.2.13 2009/10/02 07:56:42 t-ishii Exp $
4  * 
5  * pgpool: a language independent connection pool server for PostgreSQL 
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2009      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  *---------------------------------------------------------------------
22  * pool_proto_modules.c: modules corresponding to message protocols.
23  * used by pool_process_query()
24  *---------------------------------------------------------------------
25  */
26 #include "config.h"
27 #include <errno.h>
28
29 #ifdef HAVE_SYS_TYPES_H
30 #include <sys/types.h>
31 #endif
32 #ifdef HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #ifdef HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38
39
40 #include <stdlib.h>
41 #include <unistd.h>
42 #include <string.h>
43 #include <netinet/in.h>
44 #include <ctype.h>
45
46 #include "pool.h"
47 #include "pool_signal.h"
48 #include "pool_proto_modules.h"
49 #include "parser/pool_string.h"
50
51 int force_replication;
52 int replication_was_enabled;            /* replication mode was enabled */
53 int master_slave_was_enabled;   /* master/slave mode was enabled */
54 int internal_transaction_started;               /* to issue table lock command a transaction
55                                                                                                    has been started internally */
56 int in_progress = 0;            /* indicates while doing something after receiving Query */
57 int mismatch_ntuples;   /* number of updated tuples */
58 char *copy_table = NULL;  /* copy table name */
59 char *copy_schema = NULL;  /* copy table name */
60 char copy_delimiter; /* copy delimiter char */
61 char *copy_null = NULL; /* copy null string */
62 void (*pending_function)(PreparedStatementList *p, Portal *portal) = NULL;
63 Portal *pending_prepared_portal = NULL;
64 Portal *unnamed_statement = NULL;
65 Portal *unnamed_portal = NULL;
66 int select_in_transaction = 0; /* non 0 if select query is in transaction */
67 int execute_select = 0; /* non 0 if select query is in transaction */
68
69 /* non 0 if "BEGIN" query with extended query protocol received */
70 int receive_extended_begin = 0;
71
72 /*
73  * Non 0 if allow to close internal transaction.  This variable was
74  * introduced on 2008/4/3 not to close an internal transaction when
75  * Sync message is received after receiving Parse message. This hack
76  * is for PHP-PDO.
77  */
78 static int allow_close_transaction = 1;
79
80 PreparedStatementList prepared_list; /* prepared statement name list */
81
82 int is_select_pgcatalog = 0;
83 int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
84 bool is_parallel_table = false;
85
86 /*
87  * last query string sent to simpleQuery()
88  */
89 char query_string_buffer[QUERY_STRING_BUFFER_LEN];
90
91 /*
92  * query string produced by nodeToString() in simpleQuery().
93  * this variable only usefull when enable_query_cache is true.
94  */
95 char *parsed_query = NULL;
96
97 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
98 static void generate_error_message(char *prefix, int specific_error, char *query);
99
100 POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
101                                                                                 POOL_CONNECTION_POOL *backend)
102 {
103         int pid, pid1;
104         char *condition, *condition1 = NULL;
105         int len, len1 = 0;
106         int i;
107         POOL_STATUS status;
108
109         pool_write(frontend, "A", 1);
110
111         for (i=0;i<NUM_BACKENDS;i++)
112         {
113                 if (VALID_BACKEND(i))
114                 {
115                         if (pool_read(CONNECTION(backend, i), &pid, sizeof(pid)) < 0)
116                                 return POOL_ERROR;
117                         condition = pool_read_string(CONNECTION(backend, i), &len, 0);
118                         if (condition == NULL)
119                                 return POOL_END;
120
121                         if (IS_MASTER_NODE_ID(i))
122                         {
123                                 pid1 = pid;
124                                 len1 = len;
125                                 condition1 = strdup(condition);
126                         }
127                 }
128         }
129
130         pool_write(frontend, &pid1, sizeof(pid1));
131         status = pool_write_and_flush(frontend, condition1, len1);
132         free(condition1);
133         return status;
134 }
135
136 /*
137  * Process Query('Q') message
138  * Query messages include a SQL string.
139  */
140  POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
141                                                  POOL_CONNECTION_POOL *backend, char *query)
142 {
143         char *string, *string1;
144         int len;
145         static char *sq = "show pool_status";
146         int i, commit;
147         List *parse_tree_list;
148         Node *node = NULL, *node1;
149         POOL_STATUS status;
150         int specific_error;
151
152         POOL_MEMORY_POOL *old_context = NULL;
153         Portal *portal;
154
155         force_replication = 0;
156         if (query == NULL)      /* need to read query from frontend? */
157         {
158                 /* read actual query */
159                 if (MAJOR(backend) == PROTO_MAJOR_V3)
160                 {
161                         if (pool_read(frontend, &len, sizeof(len)) < 0)
162                                 return POOL_END;
163                         len = ntohl(len) - 4;
164                         string = pool_read2(frontend, len);
165                 }
166                 else
167                         string = pool_read_string(frontend, &len, 0);
168
169                 if (string == NULL)
170                         return POOL_END;
171         }
172         else
173         {
174                 len = strlen(query)+1;
175                 string = query;
176         }
177
178         /* save last query string for logging purpose */
179         strncpy(query_string_buffer, string, sizeof(query_string_buffer));
180
181         /* show ps status */
182         query_ps_status(string, backend);
183
184         /* log query to log file if necessary */
185         if (pool_config->log_statement)
186         {
187                 pool_log("statement: %s", string);
188         }
189         else
190         {
191                 pool_debug("statement2: %s", string);
192         }
193
194         /* parse SQL string */
195         parse_tree_list = raw_parser(string);
196
197         if (parse_tree_list != NIL)
198         {
199                 node = (Node *) lfirst(list_head(parse_tree_list));
200
201                 if (PARALLEL_MODE)
202                         is_parallel_table = is_partition_table(backend,node);
203
204                 if (pool_config->enable_query_cache &&
205                         SYSDB_STATUS == CON_UP &&
206                         IsA(node, SelectStmt) &&
207                         !(is_select_pgcatalog = IsSelectpgcatalog(node, backend)))
208                 {
209                         SelectStmt *select = (SelectStmt *)node;
210
211                         if (! (select->intoClause || select->lockingClause))
212                         {
213                                 parsed_query = strdup(nodeToString(node));
214                                 if (parsed_query == NULL)
215                                 {
216                                         pool_error("pool_process_query: malloc failed");
217                                         return POOL_ERROR;
218                                 }
219
220                                 if (parsed_query)
221                                 {
222                                         if (pool_query_cache_lookup(frontend, parsed_query, backend->info->database, TSTATE(backend)) == POOL_CONTINUE)
223                                         {
224                                                 free(parsed_query);
225                                                 parsed_query = NULL;
226                                                 free_parser();
227                                                 return POOL_CONTINUE;
228                                         }
229                                 }
230                                 is_select_for_update = 0;
231                         }
232                         else
233                         {
234                                 is_select_for_update = 1;
235                         }
236                 }
237
238                 if (pool_config->parallel_mode)
239                 {
240       /* The Query is analyzed first in a parallel mode(in_parallel_query),
241        * and, next, the Query is rewritten(rewrite_query_stmt).
242        */
243
244                         /* analyze the query */
245                         RewriteQuery *r_query = is_parallel_query(node,backend);
246
247                         if(r_query->is_loadbalance)
248                         {
249         /* Usual processing of pgpool is done by using the rewritten Query
250          * if judged a possible load-balancing as a result of analyzing
251          * the Query.
252          * Of course, the load is distributed only for load_balance_mode=true.
253          */
254                                 if(r_query->r_code ==  SEND_LOADBALANCE_ENGINE)
255                                 {
256                                         /* use rewritten query */
257                                         string = r_query->rewrite_query;
258                                         /* change query length */
259                                         len = strlen(string)+1;
260                                 }
261                                 pool_debug("SimpleQuery: loadbalance_query =%s",string);
262                         }
263                         else if (r_query->is_parallel)
264                         {
265                                 /*
266                                  * For the Query that the parallel processing is possible.
267                                  * Call parallel exe engine and return status to the upper layer.
268                                  */
269                                 POOL_STATUS stats = pool_parallel_exec(frontend,backend,r_query->rewrite_query, node,true);
270                                 free_parser();
271                                 in_progress = 0;
272                                 return stats;
273                         }
274                         else if(!r_query->is_pg_catalog)
275                         {
276                                 /* rewrite query and execute */
277                                 r_query = rewrite_query_stmt(node,frontend,backend,r_query);
278                                 if(r_query->type == T_InsertStmt)
279                                 {
280                                         free_parser();
281
282                                         if(r_query->r_code != INSERT_DIST_NO_RULE) {
283                                                 in_progress = 0;
284                                                 return r_query->status;
285                                         }
286                                 }
287                                 else if(r_query->type == T_SelectStmt)
288                                 {
289                                         free_parser();
290                                         in_progress = 0;
291                                         return r_query->status;
292                                 }
293                         }
294                         /*
295                          * The same processing as usual pgpool is done to other Query type.
296        */
297                 }
298
299                 /* check COPY FROM STDIN
300                  * if true, set copy_* variable
301                  */
302                 check_copy_from_stdin(node);
303
304                 /*
305                  * if this is DROP DATABASE command, send USR1 signal to parent and
306                  * ask it to close all idle connections.
307                  * XXX This is overkill. It would be better to close the idle
308                  * connection for the database which DROP DATABASE command tries
309                  * to drop. This is impossible at this point, since we have no way
310                  * to pass such info to other processes.
311                  */
312                 if (is_drop_database(node))
313                 {
314                         int stime = 5;  /* XXX give arbitrary time to allow closing idle connections */
315
316                         pool_debug("Query: sending SIGUSR1 signal to parent");
317
318                         Req_info->kind = CLOSE_IDLE_REQUEST;
319                         kill(getppid(), SIGUSR1);               /* send USR1 signal to parent */
320
321                         /* we need to loop over here since we will get USR1 signal while sleeping */
322                         while (stime > 0)
323                         {
324                                 stime = sleep(stime);
325                         }
326                 }
327
328                 /* process status reporting? */
329                 if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, strlen(sq)) == 0)
330                 {
331                         StartupPacket *sp;
332                         char psbuf[1024];
333
334                         pool_debug("process reporting");
335                         process_reporting(frontend, backend);
336                         in_progress = 0;
337
338                         /* show ps status */
339                         sp = MASTER_CONNECTION(backend)->sp;
340                         snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
341                                          sp->user, sp->database, remote_ps_data);
342                         set_ps_display(psbuf, false);
343
344                         free_parser();
345                         return POOL_CONTINUE;
346                 }
347
348                 if (IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
349                         IsA(node, VariableSetStmt) || IsA(node, DiscardStmt))
350                 {
351                         /*
352                          * PREPARE, DEALLOCATE and SET statements must be replicated.
353                          */
354                         if (MASTER_SLAVE && TSTATE(backend) != 'E')
355                                 force_replication = 1;
356
357                         /*
358                          * Before we did followings only when frontend != NULL,
359                          * which was wrong since if, for example, reset_query_list
360                          * contains "DISCARD ALL", then it does not register
361                          * pending function and it causes trying to DEALLOCATE non
362                          * existing prepared statment(2009/4/3 Tatsuo).
363                          */
364                         if (IsA(node, PrepareStmt))
365                         {
366                                 pending_function = add_prepared_list;
367                                 portal = create_portal();
368                                 if (portal == NULL)
369                                 {
370                                         pool_error("SimpleQuery: create_portal() failed");
371                                         return POOL_END;
372                                 }
373
374                                 /* switch memory context */
375                                 old_context = pool_memory;
376                                 pool_memory = portal->prepare_ctxt;
377
378                                 portal->portal_name = NULL;
379                                 portal->stmt = copyObject(node);
380                                 portal->sql_string = NULL;
381                                 pending_prepared_portal = portal;
382                         }
383                         else if (IsA(node, DeallocateStmt))
384                         {
385                                 pending_function = del_prepared_list;
386                                 portal = create_portal();
387                                 if (portal == NULL)
388                                 {
389                                         pool_error("SimpleQuery: create_portal() failed");
390                                         return POOL_END;
391                                 }
392
393                                 /* switch memory context */
394                                 old_context = pool_memory;
395                                 pool_memory = portal->prepare_ctxt;
396
397                                 portal->portal_name = NULL;
398                                 portal->stmt = copyObject(node);
399                                 portal->sql_string = NULL;
400                                 pending_prepared_portal = portal;
401                         }
402                         else if (IsA(node, DiscardStmt))
403                         {
404                                 DiscardStmt *stmt = (DiscardStmt *)node;
405                                 if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS)
406                                 {
407                                         pending_function = delete_all_prepared_list;
408                                         pending_prepared_portal = NULL;
409                                 }
410                         }
411
412                         /* switch old memory context */
413                         if (old_context)
414                                 pool_memory = old_context;
415
416                         /* end of wrong if (see 2009/4/3 comment above) */
417                 }
418
419                 if (frontend && IsA(node, ExecuteStmt))
420                 {
421                         Portal *portal;
422                         PrepareStmt *p_stmt;
423                         ExecuteStmt *e_stmt = (ExecuteStmt *)node;
424
425                         portal = lookup_prepared_statement_by_statement(&prepared_list,
426                                                                                                                         e_stmt->name);
427                         if (!portal)
428                         {
429                                 string1 = string;
430                                 node1 = node;
431                         }
432                         else
433                         {
434                                 p_stmt = (PrepareStmt *)portal->stmt;
435                                 string1 = nodeToString(p_stmt->query);
436                                 node1 = (Node *)p_stmt->query;
437                         }
438                 }
439                 else
440                 {
441                         string1 = string;
442                         node1 = node;
443                 }
444
445                 /* load balance trick */
446                 if (load_balance_enabled(backend, node1, string1))
447                         start_load_balance(backend);
448                 else if (MASTER_SLAVE)
449                 {
450                         pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
451                         master_slave_was_enabled = 1;
452                         MASTER_SLAVE = 0;
453                         master_slave_dml = 1;
454                         if (force_replication)
455                         {
456                                 replication_was_enabled = 0;
457                                 REPLICATION = 1;
458                         }
459                 }
460                 else if (REPLICATION &&
461                                  !pool_config->replicate_select &&
462                                  is_select_query(node1, string1) &&
463                                  !is_sequence_query(node1))
464                 {
465                         selected_slot = MASTER_NODE_ID;
466                         replication_was_enabled = 1;
467                         REPLICATION = 0;
468                         LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
469                         in_load_balance = 1;
470                         select_in_transaction = 1;
471                 }
472
473
474                 /*
475                  * determine if we need to lock the table
476                  * to keep SERIAL data consistency among servers
477                  * conditions:
478                  * - replication is enabled
479                  * - protocol is V3
480                  * - statement is INSERT
481                  * - either "INSERT LOCK" comment exists or insert_lock directive specified
482                  */
483                 if (REPLICATION)
484                 {
485                         /* start a transaction if needed */
486                         if (start_internal_transaction(frontend, backend, (Node *)node) != POOL_CONTINUE)
487                                 return POOL_END;
488
489                         /* check if need lock */
490                         if (need_insert_lock(backend, string, node))
491                         {
492                                 /* if so, issue lock command */
493                                 status = insert_lock(frontend, backend, string, (InsertStmt *)node);
494                                 if (status != POOL_CONTINUE)
495                                 {
496                                         free_parser();
497                                         return status;
498                                 }
499                         }
500                 }
501                 else if (REPLICATION && query == NULL && start_internal_transaction(frontend, backend, node))
502                 {
503                         free_parser();
504                         return POOL_ERROR;
505                 }
506         }
507         else
508         {  /* syntax error */
509                 if (MASTER_SLAVE)
510                 {
511                         pool_debug("SimpleQuery: set master_slave_dml query: %s", string);
512                         master_slave_was_enabled = 1;
513                         MASTER_SLAVE = 0;
514                         master_slave_dml = 1;
515                 }
516         }
517
518         if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
519         {
520                 TSTATE(backend) = 'T';
521         }
522
523         if (REPLICATION || PARALLEL_MODE)
524         {
525                 /* check if query is "COMMIT" or "ROLLBACK" */
526                 commit = is_commit_query(node);
527                 free_parser();
528
529                 /*
530                  * Query is not commit/rollback
531                  */
532                 if (!commit)
533                 {
534                         /* Send the query to master node */
535
536                         if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
537                                 return POOL_END;
538
539                         if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
540                         {
541                                 /* Cancel current transaction */
542                                 CancelPacket cancel_packet;
543
544                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
545                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
546                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
547                                 cancel_request(&cancel_packet);
548
549                                 return POOL_END;
550                         }
551
552                         /* Check specific errors */
553                         specific_error = check_errors(backend, MASTER_NODE_ID);
554                         if (specific_error)
555                         {
556                                 /* log error message */
557                                 generate_error_message("SimpleQuery: ", specific_error, string);
558
559                                 /* Set error query to abort transactions on other nodes */
560                                 string = POOL_ERROR_QUERY;
561                                 len = strlen(string) + 1;
562                         }
563                 }
564
565                 /* send query to other than master nodes */
566                 for (i=0;i<NUM_BACKENDS;i++)
567                 {
568                         if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
569                                 continue;
570
571                         if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
572                                 return POOL_END;
573                 }
574
575                 /* Wait for nodes othan than the master node */
576                 for (i=0;i<NUM_BACKENDS;i++)
577                 {
578                         if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
579                                 continue;
580
581                         if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
582                         {
583                                 /* Cancel current transaction */
584                                 CancelPacket cancel_packet;
585
586                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
587                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
588                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
589                                 cancel_request(&cancel_packet);
590
591                                 return POOL_END;
592                         }
593                 }
594
595                 /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
596                 if (commit)
597                 {
598                         if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
599                                 return POOL_END;
600
601                         if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
602                         {
603                                 /* Cancel current transaction */
604                                 CancelPacket cancel_packet;
605
606                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
607                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
608                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
609                                 cancel_request(&cancel_packet);
610
611                                 return POOL_END;
612                         }
613
614
615                         TSTATE(backend) = 'I';
616                 }
617         }
618         else
619         {
620                 free_parser();
621                 if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
622                         return POOL_END;
623
624                 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
625                 {
626                                 /* Cancel current transaction */
627                                 CancelPacket cancel_packet;
628
629                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
630                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
631                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
632                                 cancel_request(&cancel_packet);
633
634                                 return POOL_END;
635                 }
636         }
637
638         return POOL_CONTINUE;
639 }
640
641 /*
642  * process EXECUTE (V3 only)
643  */
644 POOL_STATUS Execute(POOL_CONNECTION *frontend,
645                                                    POOL_CONNECTION_POOL *backend)
646 {
647         char *string;           /* portal name + null terminate + max_tobe_returned_rows */
648         int len;
649         int i;
650         char kind;
651         int status, commit = 0;
652         Portal *portal;
653         char *string1;
654         PrepareStmt *p_stmt;
655         POOL_STATUS ret;
656         int specific_error = 0;
657
658         /* read Execute packet */
659         if (pool_read(frontend, &len, sizeof(len)) < 0)
660                 return POOL_END;
661
662         len = ntohl(len) - 4;
663         string = pool_read2(frontend, len);
664
665         pool_debug("Execute: portal name <%s>", string);
666
667         portal = lookup_prepared_statement_by_portal(&prepared_list,
668                                                                                                  string);
669
670         /* load balance trick */
671         if (portal)
672         {
673                 Node *node;
674
675                 p_stmt = (PrepareStmt *)portal->stmt;
676
677                 string1 = portal->sql_string;
678                 pool_debug("Execute: query: %s", string1);
679                 node = (Node *)p_stmt->query;
680                 strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
681
682                 if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
683                          IsA(node, VariableSetStmt)) &&
684                         MASTER_SLAVE && TSTATE(backend) != 'E')
685                 {
686                         /*
687                          * PREPARE, DEALLOCATE, SET, DISCARD
688                          * should be executed on all nodes.  So we set
689                          * force_replication.
690                          */
691                         force_replication = 1;
692                 }
693                 /*
694                  * JDBC driver sends "BEGIN" query internally if
695                  * setAutoCommit(false).  But it does not send Sync message
696                  * after "BEGIN" query.  In extended query protocol,
697                  * PostgreSQL returns ReadyForQuery when a client sends Sync
698                  * message.  Problem is, pgpool can't know the transaction
699                  * state without receiving ReadyForQuery. So we remember that
700                  * we need to send Sync message internally afterward, whenever
701                  * we receive BEGIN in extended protocol.
702                  */
703                 else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
704                 {
705                         TransactionStmt *stmt = (TransactionStmt *) node;
706
707                         if (stmt->kind == TRANS_STMT_BEGIN ||
708                                 stmt->kind == TRANS_STMT_START)
709                                 /* Remember we need to send sync later in extended protocol */
710                                 receive_extended_begin = 1;
711                 }
712
713                 if (load_balance_enabled(backend, node, string1))
714                         start_load_balance(backend);
715                 else if (REPLICATION &&
716                                  !pool_config->replicate_select &&
717                                  is_select_query((Node *)p_stmt->query, string1) &&
718                                  !is_sequence_query((Node *)p_stmt->query))
719                 {
720                         selected_slot = MASTER_NODE_ID;
721                         replication_was_enabled = 1;
722                         REPLICATION = 0;
723                         LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
724                         in_load_balance = 1;
725                         select_in_transaction = 1;
726                         execute_select = 1;
727                 }
728 /*
729                 else if (REPLICATION && start_internal_transaction(backend, (Node *)p_stmt->query))
730                 {
731                         return POOL_END;
732                 }
733 */
734                 /* check if query is "COMMIT" or "ROLLBACK" */
735                 commit = is_commit_query((Node *)p_stmt->query);
736         }
737
738         if (MASTER_SLAVE)
739         {
740                 master_slave_was_enabled = 1;
741                 MASTER_SLAVE = 0;
742                 master_slave_dml = 1;
743                 if (force_replication)
744                 {
745                         replication_was_enabled = 0;
746                         REPLICATION = 1;
747                 }
748         }
749
750         if (REPLICATION || PARALLEL_MODE)
751         {
752                 /*
753                  * Query is not commit/rollback
754                  */
755                 if (!commit)
756                 {
757                         /* Send the query to master node */
758
759                         if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
760                                 return POOL_END;
761
762                         if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
763                         {
764                                 /* Cancel current transaction */
765                                 CancelPacket cancel_packet;
766
767                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
768                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
769                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
770                                 cancel_request(&cancel_packet);
771
772                                 return POOL_END;
773                         }
774
775
776                         /* Check specific errors */
777                         specific_error = check_errors(backend, MASTER_NODE_ID);
778                         if (specific_error)
779                         {
780                                 /* log error message */
781                                 generate_error_message("Execute: ", specific_error, string);
782                         }
783                 }
784
785                 /* send query to other nodes */
786                 for (i=0;i<NUM_BACKENDS;i++)
787                 {
788                         if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
789                                 continue;
790
791                         if (specific_error)
792                         {
793                                 char msg[1024] = "pgpoool_error_portal"; /* large enough */
794                                 int len = strlen(msg);
795
796                                 memset(msg + len, 0, sizeof(int));
797                                 if (send_execute_message(backend, i, len + 5, msg))
798                                         return POOL_END;
799                         }
800                         else if (send_execute_message(backend, i, len, string) != POOL_CONTINUE)
801                                 return POOL_END;
802                 }
803
804                 /* Wait for nodes other than the master node */
805                 for (i=0;i<NUM_BACKENDS;i++)
806                 {
807                         if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
808                                 continue;
809
810                         if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
811                         {
812                                 /* Cancel current transaction */
813                                 CancelPacket cancel_packet;
814
815                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
816                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
817                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
818                                 cancel_request(&cancel_packet);
819
820                                 return POOL_END;
821                         }
822                 }
823
824                 /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
825                 if (commit)
826                 {
827                         if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
828                                 return POOL_END;
829
830                         if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
831                         {
832                                 /* Cancel current transaction */
833                                 CancelPacket cancel_packet;
834
835                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
836                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
837                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
838                                 cancel_request(&cancel_packet);
839
840                                 return POOL_END;
841                         }
842                 }
843         }
844         else
845         {
846                 if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
847                         return POOL_END;
848
849                 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
850                 {
851                                 /* Cancel current transaction */
852                                 CancelPacket cancel_packet;
853
854                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
855                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
856                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
857                                 cancel_request(&cancel_packet);
858
859                                 return POOL_END;
860                 }
861         }
862
863         while ((ret = read_kind_from_backend(frontend, backend, &kind)) == POOL_CONTINUE)
864         {
865                 /*
866                  * forward message until receiving CommandComplete,
867                  * ErrorResponse, EmptyQueryResponse or PortalSuspend.
868                  */
869                 if (kind == 'C' || kind == 'E' || kind == 'I' || kind == 's')
870                         break;
871
872                 status = SimpleForwardToFrontend(kind, frontend, backend);
873                 if (status != POOL_CONTINUE)
874                         return status;
875         }
876         if (ret != POOL_CONTINUE)
877                 return ret;
878
879         status = SimpleForwardToFrontend(kind, frontend, backend);
880         if (status != POOL_CONTINUE)
881                 return status;
882
883         return POOL_CONTINUE;
884 }
885
886 /*
887  * process Parse (V3 only)
888  */
889 POOL_STATUS Parse(POOL_CONNECTION *frontend,
890                                                  POOL_CONNECTION_POOL *backend)
891 {
892         char kind;
893         int len;
894         char *string;
895         int i;
896         Portal *portal;
897         POOL_MEMORY_POOL *old_context;
898         PrepareStmt *p_stmt;
899         char *name, *stmt;
900         List *parse_tree_list;
901         Node *node = NULL;
902         int deadlock_detected = 0;
903         int insert_stmt_with_lock = 0;
904         POOL_STATUS status;
905
906         /* read Parse packet */
907         if (pool_read(frontend, &len, sizeof(len)) < 0)
908                 return POOL_END;
909
910         len = ntohl(len) - 4;
911         string = pool_read2(frontend, len);
912
913         pool_debug("Parse: portal name <%s>", string);
914
915         name = string;
916         stmt = string + strlen(string) + 1;
917
918         parse_tree_list = raw_parser(stmt);
919         if (parse_tree_list == NIL)
920         {
921                 free_parser();
922         }
923         else
924         {
925                 /* Save last query string for logging purpose */
926                 snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
927
928                 node = (Node *) lfirst(list_head(parse_tree_list));
929
930                 insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
931
932                 portal = create_portal();
933                 if (portal == NULL)
934                 {
935                         pool_error("Parse: create_portal() failed");
936                         return POOL_END;
937                 }
938
939                 /* switch memory context */
940                 old_context = pool_memory;
941                 pool_memory = portal->prepare_ctxt;
942
943                 /* translate Parse message to PrepareStmt */
944                 p_stmt = palloc(sizeof(PrepareStmt));
945                 p_stmt->type = T_PrepareStmt;
946                 p_stmt->name = pstrdup(name);
947                 p_stmt->query = copyObject(node);
948                 portal->stmt = (Node *)p_stmt;
949                 portal->portal_name = NULL;
950                 portal->sql_string = pstrdup(stmt);
951
952                 if (*name)
953                 {
954                         pending_function = add_prepared_list;
955                         pending_prepared_portal = portal;
956                 }
957                 else /* unnamed statement */
958                 {
959                         pending_function = add_unnamed_portal;
960                         pfree(p_stmt->name);
961                         p_stmt->name = NULL;
962                         pending_prepared_portal = portal;
963                 }
964
965                 /* switch old memory context */
966                 pool_memory = old_context;
967
968                 if (REPLICATION)
969                 {
970                         char kind;
971
972                         if (TSTATE(backend) != 'T')
973                         {
974                                 /* synchronize transaction state */
975                                 for (i = 0; i < NUM_BACKENDS; i++)
976                                 {
977                                         if (!VALID_BACKEND(i))
978                                                 continue;
979
980                                         /* send sync message */
981                                         send_extended_protocol_message(backend, i, "S", 0, "");
982                                 }
983
984                                 kind = pool_read_kind(backend);
985                                 if (kind != 'Z')
986                                         return POOL_END;
987                                 if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
988                                         return POOL_END;
989                         }
990
991                         if (is_strict_query(node))
992                                 start_internal_transaction(frontend, backend, node);
993
994                         if (insert_stmt_with_lock)
995                         {
996                                 /* start a transaction if needed and lock the table */
997                                 status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
998                                 if (status != POOL_CONTINUE)
999                                 {
1000                                         return status;
1001                                 }
1002                         }
1003                 }
1004                 free_parser();
1005         }
1006
1007         /* send to master node */
1008         if (send_extended_protocol_message(backend, MASTER_NODE_ID,
1009                                                                            "P", len, string))
1010                 return POOL_END;
1011
1012         if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
1013         {
1014                 /*
1015                  * We must synchronize because Parse message acquires table
1016                  * locks.
1017                  */
1018                 pool_debug("Parse: waiting for master completing the query");
1019                 if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
1020                 {
1021                         /* Cancel current transaction */
1022                         CancelPacket cancel_packet;
1023
1024                         cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1025                         cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1026                         cancel_packet.key= MASTER_CONNECTION(backend)->key;
1027                         cancel_request(&cancel_packet);
1028                         return POOL_END;
1029                 }
1030
1031                 /*
1032                  * We must check deadlock error because a aborted transaction
1033                  * by detecting deadlock isn't same on all nodes.
1034                  * If a transaction is aborted on master node, pgpool send a
1035                  * error query to another nodes.
1036                  */
1037                 deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1038                 if (deadlock_detected < 0)
1039                         return POOL_END;
1040
1041                 for (i=0;i<NUM_BACKENDS;i++)
1042                 {
1043                         if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1044                         {
1045                                 if (deadlock_detected)
1046                                 {
1047                                         pool_log("Parse: received deadlock error message from master node");
1048
1049                                         if (send_simplequery_message(CONNECTION(backend, i),
1050                                                                                                  strlen(POOL_ERROR_QUERY)+1,
1051                                                                                                  POOL_ERROR_QUERY,
1052                                                                                                  MAJOR(backend)))
1053                                                 return POOL_END;
1054                                 }
1055                                 else if (send_extended_protocol_message(backend, i,
1056                                                                                                                 "P", len, string))
1057                                         return POOL_END;
1058                         }
1059                 }
1060
1061                 /* wait for DB nodes completing query except master node */
1062                 for (i=0;i<NUM_BACKENDS;i++)
1063                 {
1064                         if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1065                                 continue;
1066
1067                         pool_debug("Parse: waiting for %dth backend completing the query", i);
1068                         if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
1069                         {
1070                                 /* Cancel current transaction */
1071                                 CancelPacket cancel_packet;
1072
1073                                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1074                                 cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1075                                 cancel_packet.key= MASTER_CONNECTION(backend)->key;
1076                                 cancel_request(&cancel_packet);
1077                                 return POOL_END;
1078                         }
1079                 }
1080         }
1081
1082         for (;;)
1083         {
1084                 POOL_STATUS ret;
1085                 ret = read_kind_from_backend(frontend, backend, &kind);
1086
1087                 if (ret != POOL_CONTINUE)
1088                         return ret;
1089
1090                 SimpleForwardToFrontend(kind, frontend, backend);
1091                 if (pool_flush(frontend) < 0)
1092                         return POOL_ERROR;
1093
1094                 /* Ignore warning messages */
1095                 if (kind != 'N')
1096                         break;
1097         }
1098         return POOL_CONTINUE;
1099 }
1100
1101 /*
1102  * Process ReadyForQuery('Z') message.
1103  *
1104  * - if the global error status "mismatch_ntuples" is set, send an error query
1105  *       to all DB nodes to abort transaction.
1106  * - internal transaction is closed
1107  */
1108 POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
1109                                                                  POOL_CONNECTION_POOL *backend, int send_ready)
1110 {
1111         StartupPacket *sp;
1112         char psbuf[1024];
1113         int i;
1114         int len;
1115         signed char state;
1116
1117         /*
1118          * If the numbers of update tuples are differ, we need to abort transaction
1119          * by using do_error_command. This only works with PROTO_MAJOR_V3.
1120          */
1121         if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1122         {
1123                 int i;
1124                 signed char state;
1125                 char kind;
1126
1127                 /*
1128                  * XXX: discard rest of ReadyForQuery packet
1129                  */
1130                 if (pool_read_message_length(backend) < 0)
1131                         return POOL_END;
1132
1133                 state = pool_read_kind(backend);
1134                 if (state < 0)
1135                         return POOL_END;
1136
1137                 pool_debug("ReadyForQuery: transaction state: %c", state);
1138
1139                 for (i = 0; i < NUM_BACKENDS; i++)
1140                 {
1141                         if (VALID_BACKEND(i))
1142                         {
1143                                 /* abort transaction on all nodes. */
1144                                 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1145                         }
1146                 }
1147
1148                 /* loop through until we get ReadyForQuery */
1149                 for(;;)
1150                 {
1151                         kind = pool_read_kind(backend);
1152                         if (kind < 0)
1153                                 return POOL_END;
1154
1155                         if (kind == 'Z')
1156                                 break;
1157
1158                         /* put the message back to read buffer */
1159                         for (i=0;i<NUM_BACKENDS;i++)
1160                         {
1161                                 if (VALID_BACKEND(i))
1162                                 {
1163                                         pool_unread(CONNECTION(backend,i), &kind, 1);
1164                                 }
1165                         }
1166
1167                         /* discard rest of the packet */
1168                         if (pool_discard_packet(backend) != POOL_CONTINUE)
1169                         {
1170                                 pool_error("ReadyForQuery: pool_discard_packet failed");
1171                                 return POOL_END;
1172                         }
1173                 }
1174                 mismatch_ntuples = 0;
1175         }
1176
1177         /*
1178          * if a transaction is started for insert lock, we need to close
1179          * the transaction.
1180          */
1181         if (internal_transaction_started && allow_close_transaction)
1182         {
1183                 int len;
1184                 signed char state;
1185
1186                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1187                 {
1188                         if ((len = pool_read_message_length(backend)) < 0)
1189                                 return POOL_END;
1190
1191                         pool_debug("ReadyForQuery: message length: %d", len);
1192
1193                         len = htonl(len);
1194
1195                         state = pool_read_kind(backend);
1196                         if (state < 0)
1197                                 return POOL_END;
1198
1199                         /* set transaction state */
1200                         pool_debug("ReadyForQuery: transaction state: %c", state);
1201                 }
1202
1203                 if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
1204                         return POOL_ERROR;
1205         }
1206
1207         if (MAJOR(backend) == PROTO_MAJOR_V3)
1208         {
1209                 if ((len = pool_read_message_length(backend)) < 0)
1210                         return POOL_END;
1211
1212                 pool_debug("ReadyForQuery: message length: %d", len);
1213
1214                 /*
1215                  * Do not check transaction state in master/slave mode.
1216                  * Because SET, PREPARE, DEALLOCATE are replicated.
1217                  * If these queries are executed inside a transaction block,
1218                  * transation state will be inconsistent. But it is no problem.
1219                  */
1220                 if (master_slave_dml)
1221                 {
1222                         char kind, kind1;
1223
1224                         if (pool_read(MASTER(backend), &kind, sizeof(kind)))
1225                                 return POOL_END;
1226
1227                         for (i = 0; i < NUM_BACKENDS; i++)
1228                         {
1229                                 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1230                                         continue;
1231
1232                                 if (pool_read(CONNECTION(backend, i), &kind1, sizeof(kind)))
1233                                         return POOL_END;
1234                         }
1235                         state = kind;
1236                 }
1237                 else
1238                 {
1239                         state = pool_read_kind(backend);
1240                         if (state < 0)
1241                                 return POOL_END;
1242                 }
1243
1244                 /* set transaction state */
1245                 pool_debug("ReadyForQuery: transaction state: %c", state);
1246
1247                 for (i=0;i<NUM_BACKENDS;i++)
1248                 {
1249                         if (!VALID_BACKEND(i))
1250                                 continue;
1251
1252                         CONNECTION(backend, i)->tstate = state;
1253                 }
1254         }
1255
1256         if (send_ready)
1257         {
1258                 pool_write(frontend, "Z", 1);
1259
1260                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1261                 {
1262                         len = htonl(len);
1263                         pool_write(frontend, &len, sizeof(len));
1264                         pool_write(frontend, &state, 1);
1265                 }
1266
1267                 if (pool_flush(frontend))
1268                         return POOL_END;
1269         }
1270
1271         in_progress = 0;
1272
1273         /* end load balance mode */
1274         if (in_load_balance)
1275                 end_load_balance(backend);
1276
1277         if (master_slave_dml)
1278         {
1279                 MASTER_SLAVE = 1;
1280                 master_slave_was_enabled = 0;
1281                 master_slave_dml = 0;
1282                 if (force_replication)
1283                 {
1284                         force_replication = 0;
1285                         REPLICATION = 0;
1286                         replication_was_enabled = 0;
1287                 }
1288         }
1289
1290 #ifdef NOT_USED
1291         return ProcessFrontendResponse(frontend, backend);
1292 #endif
1293
1294         sp = MASTER_CONNECTION(backend)->sp;
1295         if (MASTER(backend)->tstate == 'T')
1296                 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
1297                                  sp->user, sp->database, remote_ps_data);
1298         else
1299                 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
1300                                  sp->user, sp->database, remote_ps_data);
1301         set_ps_display(psbuf, false);
1302
1303         return POOL_CONTINUE;
1304 }
1305
1306
1307 POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
1308                                                                 POOL_CONNECTION_POOL *backend)
1309 {
1310         char dummy[2];
1311         int oid;
1312         int argn;
1313         int i;
1314
1315         for (i=0;i<NUM_BACKENDS;i++)
1316         {
1317                 if (VALID_BACKEND(i))
1318                 {
1319                         pool_write(CONNECTION(backend, i), "F", 1);
1320                 }
1321         }
1322
1323         /* dummy */
1324         if (pool_read(frontend, dummy, sizeof(dummy)) < 0)
1325                 return POOL_ERROR;
1326
1327         for (i=0;i<NUM_BACKENDS;i++)
1328         {
1329                 if (VALID_BACKEND(i))
1330                 {
1331                         pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
1332                 }
1333         }
1334
1335         /* function object id */
1336         if (pool_read(frontend, &oid, sizeof(oid)) < 0)
1337                 return POOL_ERROR;
1338
1339         for (i=0;i<NUM_BACKENDS;i++)
1340         {
1341                 if (VALID_BACKEND(i))
1342                 {
1343                         pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
1344                 }
1345         }
1346
1347         /* number of arguments */
1348         if (pool_read(frontend, &argn, sizeof(argn)) < 0)
1349                 return POOL_ERROR;
1350
1351         for (i=0;i<NUM_BACKENDS;i++)
1352         {
1353                 if (VALID_BACKEND(i))
1354                 {
1355                         pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
1356                 }
1357         }
1358
1359         argn = ntohl(argn);
1360
1361         for (i=0;i<argn;i++)
1362         {
1363                 int len;
1364                 char *arg;
1365
1366                 /* length of each argument in bytes */
1367                 if (pool_read(frontend, &len, sizeof(len)) < 0)
1368                         return POOL_ERROR;
1369
1370                 for (i=0;i<NUM_BACKENDS;i++)
1371                 {
1372                         if (VALID_BACKEND(i))
1373                         {
1374                                 pool_write(CONNECTION(backend, i), &len, sizeof(len));
1375                         }
1376                 }
1377
1378                 len = ntohl(len);
1379
1380                 /* argument value itself */
1381                 if ((arg = pool_read2(frontend, len)) == NULL)
1382                         return POOL_ERROR;
1383
1384                 for (i=0;i<NUM_BACKENDS;i++)
1385                 {
1386                         if (VALID_BACKEND(i))
1387                         {
1388                                 pool_write(CONNECTION(backend, i), arg, len);
1389                         }
1390                 }
1391         }
1392
1393         for (i=0;i<NUM_BACKENDS;i++)
1394         {
1395                 if (VALID_BACKEND(i))
1396                 {
1397                         if (pool_flush(CONNECTION(backend, i)))
1398                                 return POOL_ERROR;
1399                 }
1400         }
1401         return POOL_CONTINUE;
1402 }
1403
1404 POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend,
1405                                                                                   POOL_CONNECTION_POOL *backend)
1406 {
1407         char dummy;
1408         int len;
1409         char *result = 0;
1410         int i;
1411
1412         pool_write(frontend, "V", 1);
1413
1414         for (i=0;i<NUM_BACKENDS;i++)
1415         {
1416                 if (VALID_BACKEND(i))
1417                 {
1418                         if (pool_read(CONNECTION(backend, i), &dummy, 1) < 0)
1419                                 return POOL_ERROR;
1420                 }
1421         }
1422         pool_write(frontend, &dummy, 1);
1423
1424         /* non empty result? */
1425         if (dummy == 'G')
1426         {
1427                 for (i=0;i<NUM_BACKENDS;i++)
1428                 {
1429                         if (VALID_BACKEND(i))
1430                         {
1431                                 /* length of result in bytes */
1432                                 if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
1433                                         return POOL_ERROR;
1434                         }
1435                 }
1436                 pool_write(frontend, &len, sizeof(len));
1437
1438                 len = ntohl(len);
1439
1440                 for (i=0;i<NUM_BACKENDS;i++)
1441                 {
1442                         if (VALID_BACKEND(i))
1443                         {
1444                                 /* result value itself */
1445                                 if ((result = pool_read2(MASTER(backend), len)) == NULL)
1446                                         return POOL_ERROR;
1447                         }
1448                 }
1449                 pool_write(frontend, result, len);
1450         }
1451
1452         for (i=0;i<NUM_BACKENDS;i++)
1453         {
1454                 if (VALID_BACKEND(i))
1455                 {
1456                         /* unused ('0') */
1457                         if (pool_read(MASTER(backend), &dummy, 1) < 0)
1458                                 return POOL_ERROR;
1459                 }
1460         }
1461         pool_write(frontend, "0", 1);
1462
1463         return pool_flush(frontend);
1464 }
1465
1466 POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
1467                                                                                    POOL_CONNECTION_POOL *backend)
1468 {
1469         char fkind;
1470         char kind;
1471         POOL_STATUS status;
1472         int i;
1473
1474         if (frontend->len <= 0 && frontend->no_forward != 0)
1475                 return POOL_CONTINUE;
1476
1477         if (pool_read(frontend, &fkind, 1) < 0)
1478         {
1479                 pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited");
1480                 return POOL_END;
1481         }
1482
1483         pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
1484
1485         /*
1486          * If we have received BEGIN in extended protocol before, we need
1487          * to send a sync message to know the transaction stare.
1488          */
1489         if (receive_extended_begin)
1490         {
1491                 receive_extended_begin = 0;
1492
1493                 /* send sync message */
1494                 send_extended_protocol_message(backend, MASTER_NODE_ID, "S", 0, "");
1495
1496                 kind = pool_read_kind(backend);
1497                 if (kind != 'Z')
1498                         return POOL_END;
1499                 if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
1500                         return POOL_END;
1501         }
1502
1503         switch (fkind)
1504         {
1505
1506                 case 'X':  /* Terminate message*/
1507                         if (MAJOR(backend) == PROTO_MAJOR_V3)
1508                         {
1509                                 int len;
1510                                 pool_read(frontend, &len, sizeof(len));
1511                         }
1512                         return POOL_END;
1513
1514                 case 'Q':  /* Query message*/
1515                         in_progress = 1;
1516                         allow_close_transaction = 1;
1517                         status = SimpleQuery(frontend, backend, NULL);
1518                         break;
1519
1520                 case 'E':  /* Execute message */
1521                         allow_close_transaction = 1;
1522                         status = Execute(frontend, backend);
1523                         break;
1524
1525                 case 'P':  /* Parse message */
1526                         allow_close_transaction = 0;
1527                         status = Parse(frontend, backend);
1528                         break;
1529
1530                 case 'S':  /* Sync message */
1531                         receive_extended_begin = 0;
1532                         /* fall through */
1533
1534                 default:
1535                         if (MAJOR(backend) == PROTO_MAJOR_V3)
1536                         {
1537                                 if (MASTER_SLAVE &&
1538                                         (TSTATE(backend) != 'I' || receive_extended_begin))
1539                                 {
1540                                         pool_debug("kind: %c master_slave_dml enabled", fkind);
1541                                         master_slave_was_enabled = 1;
1542                                         MASTER_SLAVE = 0;
1543                                         master_slave_dml = 1;
1544                                 }
1545         
1546                                 status = SimpleForwardToBackend(fkind, frontend, backend);
1547                                 for (i=0;i<NUM_BACKENDS;i++)
1548                                 {
1549                                         if (VALID_BACKEND(i))
1550                                         {
1551                                                 if (pool_flush(CONNECTION(backend, i)))
1552                                                         status = POOL_ERROR;
1553                                         }
1554                                 }
1555                         }
1556                         else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F')
1557                                 status = FunctionCall(frontend, backend);
1558                         else
1559                         {
1560                                 pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);
1561                                 status = POOL_ERROR;
1562                         }
1563                         break;
1564         }
1565
1566         if (status != POOL_CONTINUE)
1567                 status = POOL_ERROR;
1568         return status;
1569 }
1570
1571 POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend,
1572                                                                                    POOL_CONNECTION_POOL *backend)
1573 {
1574         int i;
1575         char *string = NULL;
1576         char *string1 = NULL;
1577         int len, len1 = 0;
1578
1579         /* read command tag */
1580         string = pool_read_string(MASTER(backend), &len, 0);
1581         if (string == NULL)
1582                 return POOL_END;
1583         len1 = len;
1584         string1 = strdup(string);
1585
1586         for (i=0;i<NUM_BACKENDS;i++)
1587         {
1588                 if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1589                         continue;
1590
1591                 /* read command tag */
1592                 string = pool_read_string(CONNECTION(backend, i), &len, 0);
1593                 if (string == NULL)
1594                         return POOL_END;
1595
1596                 if (len != len1)
1597                 {
1598                         pool_debug("Complete Command Response: message length does not match between master(%d \"%s\",) and %d th server (%d \"%s\",)",
1599                                            len, string, i, len1, string1);
1600
1601                         free(string1);
1602                         return POOL_END;
1603                 }
1604         }
1605         /* forward to the frontend */
1606         pool_write(frontend, "C", 1);
1607         pool_debug("Complete Command Response: string: \"%s\"", string1);
1608         if (pool_write(frontend, string1, len1) < 0)
1609         {
1610                 free(string1);
1611                 return POOL_END;
1612         }
1613
1614         free(string1);
1615         return pool_flush(frontend);
1616 }
1617
1618 int RowDescription(POOL_CONNECTION *frontend,
1619                                                   POOL_CONNECTION_POOL *backend,
1620                                                   short *result)
1621 {
1622         short num_fields, num_fields1 = 0;
1623         int oid, mod;
1624         int oid1, mod1;
1625         short size, size1;
1626         char *string;
1627         int len, len1;
1628         int i;
1629
1630         pool_read(MASTER(backend), &num_fields, sizeof(short));
1631         num_fields1 = num_fields;
1632         for (i=0;i<NUM_BACKENDS;i++)
1633         {
1634                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1635                 {
1636                         /* # of fields (could be 0) */
1637                         pool_read(CONNECTION(backend, i), &num_fields, sizeof(short));
1638                         if (num_fields != num_fields1)
1639                         {
1640                                 pool_error("RowDescription: num_fields does not match between backends master(%d) and %d th backend(%d)",
1641                                                    num_fields, i, num_fields1);
1642                                 return POOL_FATAL;
1643                         }
1644                 }
1645         }
1646
1647         /* forward it to the frontend */
1648         pool_write(frontend, "T", 1);
1649         pool_write(frontend, &num_fields, sizeof(short));
1650         num_fields = ntohs(num_fields);
1651         for (i = 0;i<num_fields;i++)
1652         {
1653                 int j;
1654
1655                 /* field name */
1656                 string = pool_read_string(MASTER(backend), &len, 0);
1657                 if (string == NULL)
1658                         return POOL_END;
1659                 len1 = len;
1660                 if (pool_write(frontend, string, len) < 0)
1661                         return POOL_END;
1662
1663                 for (j=0;j<NUM_BACKENDS;j++)
1664                 {
1665                         if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1666                         {
1667                                 string = pool_read_string(CONNECTION(backend, j), &len, 0);
1668                                 if (string == NULL)
1669                                         return POOL_END;
1670
1671                                 if (len != len1)
1672                                 {
1673                                         pool_error("RowDescription: field length does not match between backends master(%d) and %d th backend(%d)",
1674                                                            ntohl(len), j, ntohl(len1));
1675                                         return POOL_FATAL;
1676                                 }
1677                         }
1678                 }
1679
1680                 /* type oid */
1681                 pool_read(MASTER(backend), &oid, sizeof(int));
1682                 oid1 = oid;
1683                 pool_debug("RowDescription: type oid: %d", ntohl(oid));
1684                 for (j=0;j<NUM_BACKENDS;j++)
1685                 {
1686                         if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1687                         {
1688                                 pool_read(CONNECTION(backend, j), &oid, sizeof(int));
1689
1690                                 /* we do not regard oid mismatch as fatal */
1691                                 if (oid != oid1)
1692                                 {
1693                                         pool_debug("RowDescription: field oid does not match between backends master(%d) and %d th backend(%d)",
1694                                                            ntohl(oid), j, ntohl(oid1));
1695                                 }
1696                         }
1697                 }
1698                 if (pool_write(frontend, &oid1, sizeof(int)) < 0)
1699                         return POOL_END;
1700
1701                 /* size */
1702                 pool_read(MASTER(backend), &size, sizeof(short));
1703                 size1 = size;
1704                 for (j=0;j<NUM_BACKENDS;j++)
1705                 {
1706                         if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1707                         {
1708                                 pool_read(CONNECTION(backend, j), &size, sizeof(short));
1709                                 if (size1 != size1)
1710                                 {
1711                                         pool_error("RowDescription: field size does not match between backends master(%d) and %d th backend(%d)",
1712                                                            ntohs(size), j, ntohs(size1));
1713                                         return POOL_FATAL;
1714                                 }
1715                         }
1716                 }
1717                 pool_debug("RowDescription: field size: %d", ntohs(size));
1718                 pool_write(frontend, &size1, sizeof(short));
1719
1720                 /* modifier */
1721                 pool_read(MASTER(backend), &mod, sizeof(int));
1722                 pool_debug("RowDescription: modifier: %d", ntohs(mod));
1723                 mod1 = mod;
1724                 for (j=0;j<NUM_BACKENDS;j++)
1725                 {
1726                         if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1727                         {
1728                                 pool_read(CONNECTION(backend, j), &mod, sizeof(int));
1729                                 if (mod != mod1)
1730                                 {
1731                                         pool_debug("RowDescription: modifier does not match between backends master(%d) and %d th backend(%d)",
1732                                                            ntohl(mod), j, ntohl(mod1));
1733                                 }
1734                         }
1735                 }
1736                 if (pool_write(frontend, &mod1, sizeof(int)) < 0)
1737                         return POOL_END;
1738         }
1739
1740         *result = num_fields;
1741
1742         return pool_flush(frontend);
1743 }
1744
1745 POOL_STATUS AsciiRow(POOL_CONNECTION *frontend,
1746                                                         POOL_CONNECTION_POOL *backend,
1747                                                         short num_fields)
1748 {
1749         static char nullmap[8192], nullmap1[8192];
1750         int nbytes;
1751         int i, j;
1752         unsigned char mask;
1753         int size, size1 = 0;
1754         char *buf = NULL, *sendbuf = NULL;
1755         char msgbuf[1024];
1756
1757         pool_write(frontend, "D", 1);
1758
1759         nbytes = (num_fields + 7)/8;
1760
1761         if (nbytes <= 0)
1762                 return POOL_CONTINUE;
1763
1764         /* NULL map */
1765         pool_read(MASTER(backend), nullmap, nbytes);
1766         memcpy(nullmap1, nullmap, nbytes);
1767         for (i=0;i<NUM_BACKENDS;i++)
1768         {
1769                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1770                 {
1771                         pool_read(CONNECTION(backend, i), nullmap, nbytes);
1772                         if (memcmp(nullmap, nullmap1, nbytes))
1773                         {
1774                                 /* XXX: NULLMAP maybe different among
1775                                    backends. If we were a paranoid, we have to treat
1776                                    this as a fatal error. However in the real world
1777                                    we'd better to adapt this situation. Just throw a
1778                                    log... */
1779                                 pool_debug("AsciiRow: NULLMAP differ between master and %d th backend", i);
1780                         }
1781                 }
1782         }
1783
1784         if (pool_write(frontend, nullmap1, nbytes) < 0)
1785                 return POOL_END;
1786
1787         mask = 0;
1788
1789         for (i = 0;i<num_fields;i++)
1790         {
1791                 if (mask == 0)
1792                         mask = 0x80;
1793
1794                 /* NOT NULL? */
1795                 if (mask & nullmap[i/8])
1796                 {
1797                         /* field size */
1798                         if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
1799                                 return POOL_END;
1800
1801                         size1 = ntohl(size) - 4;
1802
1803                         /* read and send actual data only when size > 0 */
1804                         if (size1 > 0)
1805                         {
1806                                 sendbuf = pool_read2(MASTER(backend), size1);
1807                                 if (sendbuf == NULL)
1808                                         return POOL_END;
1809                         }
1810
1811                         /* forward to frontend */
1812                         pool_write(frontend, &size, sizeof(int));
1813                         pool_write(frontend, sendbuf, size1);
1814                         snprintf(msgbuf, Min(sizeof(msgbuf), size1+1), "%s", sendbuf);
1815                         pool_debug("AsciiRow: len: %d data: %s", size1, msgbuf);
1816
1817                         for (j=0;j<NUM_BACKENDS;j++)
1818                         {
1819                                 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1820                                 {
1821                                         /* field size */
1822                                         if (pool_read(CONNECTION(backend, j), &size, sizeof(int)) < 0)
1823                                                 return POOL_END;
1824
1825                                         buf = NULL;
1826                                         size = ntohl(size) - 4;
1827
1828                                         /* XXX: field size maybe different among
1829                                            backends. If we were a paranoid, we have to treat
1830                                            this as a fatal error. However in the real world
1831                                            we'd better to adapt this situation. Just throw a
1832                                            log... */
1833                                         if (size != size1)
1834                                                 pool_debug("AsciiRow: %d th field size does not match between master(%d) and %d th backend(%d)",
1835                                                                    i, ntohl(size), j, ntohl(size1));
1836
1837                                         /* read and send actual data only when size > 0 */
1838                                         if (size > 0)
1839                                         {
1840                                                 buf = pool_read2(CONNECTION(backend, j), size);
1841                                                 if (buf == NULL)
1842                                                         return POOL_END;
1843                                         }
1844                                 }
1845                         }
1846                 }
1847
1848                 mask >>= 1;
1849         }
1850
1851         if (pool_flush(frontend))
1852                 return POOL_END;
1853
1854         return POOL_CONTINUE;
1855 }
1856
1857 POOL_STATUS BinaryRow(POOL_CONNECTION *frontend,
1858                                                          POOL_CONNECTION_POOL *backend,
1859                                                          short num_fields)
1860 {
1861         static char nullmap[8192], nullmap1[8192];
1862         int nbytes;
1863         int i, j;
1864         unsigned char mask;
1865         int size, size1 = 0;
1866         char *buf = NULL;
1867
1868         pool_write(frontend, "B", 1);
1869
1870         nbytes = (num_fields + 7)/8;
1871
1872         if (nbytes <= 0)
1873                 return POOL_CONTINUE;
1874
1875         /* NULL map */
1876         pool_read(MASTER(backend), nullmap, nbytes);
1877         if (pool_write(frontend, nullmap, nbytes) < 0)
1878                 return POOL_END;
1879         memcpy(nullmap1, nullmap, nbytes);
1880         for (i=0;i<NUM_BACKENDS;i++)
1881         {
1882                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1883                 {
1884                         pool_read(CONNECTION(backend, i), nullmap, nbytes);
1885                         if (memcmp(nullmap, nullmap1, nbytes))
1886                         {
1887                                 /* XXX: NULLMAP maybe different among
1888                                    backends. If we were a paranoid, we have to treat
1889                                    this as a fatal error. However in the real world
1890                                    we'd better to adapt this situation. Just throw a
1891                                    log... */
1892                                 pool_debug("BinaryRow: NULLMAP differ between master and %d th backend", i);
1893                         }
1894                 }
1895         }
1896
1897         mask = 0;
1898
1899         for (i = 0;i<num_fields;i++)
1900         {
1901                 if (mask == 0)
1902                         mask = 0x80;
1903
1904                 /* NOT NULL? */
1905                 if (mask & nullmap[i/8])
1906                 {
1907                         /* field size */
1908                         if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
1909                                 return POOL_END;
1910                         for (j=0;j<NUM_BACKENDS;j++)
1911                         {
1912                                 if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1913                                 {
1914                                         /* field size */
1915                                         if (pool_read(CONNECTION(backend, i), &size, sizeof(int)) < 0)
1916                                                 return POOL_END;
1917
1918                                         /* XXX: field size maybe different among
1919                                            backends. If we were a paranoid, we have to treat
1920                                            this as a fatal error. However in the real world
1921                                            we'd better to adapt this situation. Just throw a
1922                                            log... */
1923                                         if (size != size1)
1924                                                 pool_debug("BinaryRow: %d th field size does not match between master(%d) and %d th backend(%d)",
1925                                                                    i, ntohl(size), j, ntohl(size1));
1926                                 }
1927
1928                                 buf = NULL;
1929
1930                                 /* forward to frontend */
1931                                 if (IS_MASTER_NODE_ID(j))
1932                                         pool_write(frontend, &size, sizeof(int));
1933                                 size = ntohl(size) - 4;
1934
1935                                 /* read and send actual data only when size > 0 */
1936                                 if (size > 0)
1937                                 {
1938                                         buf = pool_read2(CONNECTION(backend, j), size);
1939                                         if (buf == NULL)
1940                                                 return POOL_END;
1941
1942                                         if (IS_MASTER_NODE_ID(j))
1943                                         {
1944                                                 pool_write(frontend, buf, size);
1945                                         }
1946                                 }
1947                         }
1948
1949                         mask >>= 1;
1950                 }
1951         }
1952
1953         if (pool_flush(frontend))
1954                 return POOL_END;
1955
1956         return POOL_CONTINUE;
1957 }
1958
1959 POOL_STATUS CursorResponse(POOL_CONNECTION *frontend,
1960                                                                   POOL_CONNECTION_POOL *backend)
1961 {
1962         char *string = NULL;
1963         char *string1 = NULL;
1964         int len, len1 = 0;
1965         int i;
1966
1967         /* read cursor name */
1968         string = pool_read_string(MASTER(backend), &len, 0);
1969         if (string == NULL)
1970                 return POOL_END;
1971         len1 = len;
1972         string1 = strdup(string);
1973
1974         for (i=0;i<NUM_BACKENDS;i++)
1975         {
1976                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1977                 {
1978                         /* read cursor name */
1979                         string = pool_read_string(CONNECTION(backend, i), &len, 0);
1980                         if (string == NULL)
1981                                 return POOL_END;
1982                         if (len != len1)
1983                         {
1984                                 pool_error("CursorResponse: length does not match between master(%d) and %d th backend(%d)",
1985                                                    len, i, len1);
1986                                 pool_error("CursorResponse: master(%s) %d th backend(%s)", string1, i, string);
1987                                 free(string1);
1988                                 return POOL_END;
1989                         }
1990                 }
1991         }
1992
1993         /* forward to the frontend */
1994         pool_write(frontend, "P", 1);
1995         if (pool_write(frontend, string1, len1) < 0)
1996         {
1997                 free(string1);
1998                 return POOL_END;
1999         }
2000         free(string1);
2001
2002         if (pool_flush(frontend))
2003                 return POOL_END;
2004
2005         return POOL_CONTINUE;
2006 }
2007
2008 POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
2009                                                   POOL_CONNECTION_POOL *backend)
2010 {
2011         char *string = NULL;
2012         int len;
2013         int i;
2014
2015         for (i=0;i<NUM_BACKENDS;i++)
2016         {
2017                 if (VALID_BACKEND(i))
2018                 {
2019                         /* read error message */
2020                         string = pool_read_string(CONNECTION(backend, i), &len, 0);
2021                         if (string == NULL)
2022                                 return POOL_END;
2023                 }
2024         }
2025
2026         /* forward to the frontend */
2027         pool_write(frontend, "E", 1);
2028         if (pool_write_and_flush(frontend, string, len) < 0)
2029                 return POOL_END;
2030
2031         /* change transaction state */
2032         if (TSTATE(backend) == 'T')
2033                 TSTATE(backend) = 'E';
2034         else
2035                 TSTATE(backend) = 'I';
2036
2037         return POOL_CONTINUE;
2038 }
2039
2040 POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend,
2041                                                                   POOL_CONNECTION_POOL *backend)
2042 {
2043         char *string = NULL;
2044         int len;
2045         int i;
2046
2047         for (i=0;i<NUM_BACKENDS;i++)
2048         {
2049                 if (VALID_BACKEND(i))
2050                 {
2051                         /* read notice message */
2052                         string = pool_read_string(CONNECTION(backend, i), &len, 0);
2053                         if (string == NULL)
2054                                 return POOL_END;
2055                 }
2056         }
2057
2058         /* forward to the frontend */
2059         pool_write(frontend, "N", 1);
2060         if (pool_write_and_flush(frontend, string, len) < 0)
2061         {
2062                 return POOL_END;
2063         }
2064         return POOL_CONTINUE;
2065 }
2066
2067 POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
2068                                                                   POOL_CONNECTION_POOL *backend)
2069 {
2070         POOL_STATUS status;
2071
2072         /* forward to the frontend */
2073         if (MAJOR(backend) == PROTO_MAJOR_V3)
2074         {
2075                 if (SimpleForwardToFrontend('G', frontend, backend) != POOL_CONTINUE)
2076                         return POOL_END;
2077                 if (pool_flush(frontend) != POOL_CONTINUE)
2078                         return POOL_END;
2079         }
2080         else
2081                 if (pool_write_and_flush(frontend, "G", 1) < 0)
2082                         return POOL_END;
2083
2084         status = CopyDataRows(frontend, backend, 1);
2085         return status;
2086 }
2087
2088 POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
2089                                                                    POOL_CONNECTION_POOL *backend)
2090 {
2091         POOL_STATUS status;
2092
2093         /* forward to the frontend */
2094         if (MAJOR(backend) == PROTO_MAJOR_V3)
2095         {
2096                 if (SimpleForwardToFrontend('H', frontend, backend) != POOL_CONTINUE)
2097                         return POOL_END;
2098                 if (pool_flush(frontend) != POOL_CONTINUE)
2099                         return POOL_END;
2100         }
2101         else
2102                 if (pool_write_and_flush(frontend, "H", 1) < 0)
2103                         return POOL_END;
2104
2105         status = CopyDataRows(frontend, backend, 0);
2106         return status;
2107 }
2108
2109 POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
2110                                                                 POOL_CONNECTION_POOL *backend, int copyin)
2111 {
2112         char *string = NULL;
2113         int len;
2114         int i;
2115         DistDefInfo *info = NULL;
2116
2117 #ifdef DEBUG
2118         int j = 0;
2119         char buf[1024];
2120 #endif
2121
2122         if (copyin && pool_config->parallel_mode == TRUE)
2123         {
2124                 info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
2125                                                                           copy_schema,
2126                                                                           copy_table);
2127         }
2128
2129         for (;;)
2130         {
2131                 if (copyin)
2132                 {
2133                         if (MAJOR(backend) == PROTO_MAJOR_V3)
2134                         {
2135                                 char kind;
2136                                 int sendlen;
2137                                 char *p, *p1;
2138
2139                                 if (pool_read(frontend, &kind, 1) < 0)
2140                                         return POOL_END;
2141
2142                                 if (info && kind == 'd')
2143                                 {
2144                                         int id;
2145                                         if (pool_read(frontend, &sendlen, sizeof(sendlen)))
2146                                         {
2147                                                 return POOL_END;
2148                                         }
2149
2150                                         len = ntohl(sendlen) - 4;
2151
2152                                         if (len <= 0)
2153                                                 return POOL_CONTINUE;
2154
2155                                         p = pool_read2(frontend, len);
2156                                         if (p == NULL)
2157                                                 return POOL_END;
2158
2159                                         /* copy end ? */
2160                                         if (len == 3 && memcmp(p, "\\.\n", 3) == 0)
2161                                         {
2162                                                 for (i=0;i<NUM_BACKENDS;i++)
2163                                                 {
2164                                                         if (VALID_BACKEND(i))
2165                                                         {
2166                                                                 if (pool_write(CONNECTION(backend, i), &kind, 1))
2167                                                                         return POOL_END;
2168                                                                 if (pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen)))
2169                                                                         return POOL_END;
2170                                                                 if (pool_write(CONNECTION(backend, i), p, len))
2171                                                                         return POOL_END;
2172                                                         }
2173                                                 }
2174                                         }
2175                                         else
2176                                         {
2177                                                 p1 = parse_copy_data(p, len, copy_delimiter, info->dist_key_col_id);
2178
2179                                                 if (!p1)
2180                                                 {
2181                                                         pool_error("CopyDataRow: cannot parse data");
2182                                                         return POOL_END;
2183                                                 }
2184                                                 else if (strcmp(p1, copy_null) == 0)
2185                                                 {
2186                                                         pool_error("CopyDataRow: key parameter is NULL");
2187                                                         free(p1);
2188                                                         return POOL_END;
2189                                                 }
2190
2191                                                 id = pool_get_id(info, p1);
2192                                                 pool_debug("CopyDataRow: copying id: %d", id);
2193                                                 free(p1);
2194                                                 if (!VALID_BACKEND(id))
2195                                                 {
2196                                                         exit(1);
2197                                                 }
2198                                                 if (pool_write(CONNECTION(backend, id), &kind, 1))
2199                                                 {
2200                                                         return POOL_END;
2201                                                 }
2202                                                 if (pool_write(CONNECTION(backend, id), &sendlen, sizeof(sendlen)))
2203                                                 {
2204                                                         return POOL_END;
2205                                                 }
2206                                                 if (pool_write_and_flush(CONNECTION(backend, id), p, len))
2207                                                 {
2208                                                         return POOL_END;
2209                                                 }
2210                                         }
2211                                 }
2212                                 else
2213                                 {
2214                                         SimpleForwardToBackend(kind, frontend, backend);
2215                                 }
2216
2217                                 /* CopyData? */
2218                                 if (kind == 'd')
2219                                         continue;
2220                                 else
2221                                 {
2222                                         pool_debug("CopyDataRows: copyin kind other than d (%c)", kind);
2223                                         break;
2224                                 }
2225                         }
2226                         else
2227                                 string = pool_read_string(frontend, &len, 1);
2228                 }
2229                 else
2230                 {
2231                         /* CopyOut */
2232                         if (MAJOR(backend) == PROTO_MAJOR_V3)
2233                         {
2234                                 signed char kind;
2235
2236                                 if ((kind = pool_read_kind(backend)) < 0)
2237                                         return POOL_END;
2238
2239                                 SimpleForwardToFrontend(kind, frontend, backend);
2240
2241                                 /* CopyData? */
2242                                 if (kind == 'd')
2243                                         continue;
2244                                 else
2245                                         break;
2246                         }
2247                         else
2248                         {
2249                                 for (i=0;i<NUM_BACKENDS;i++)
2250                                 {
2251                                         if (VALID_BACKEND(i))
2252                                         {
2253                                                 string = pool_read_string(CONNECTION(backend, i), &len, 1);
2254                                         }
2255                                 }
2256                         }
2257                 }
2258
2259                 if (string == NULL)
2260                         return POOL_END;
2261
2262 #ifdef DEBUG
2263                 strncpy(buf, string, len);
2264                 pool_debug("copy line %d %d bytes :%s:", j++, len, buf);
2265 #endif
2266
2267                 if (copyin)
2268                 {
2269                         for (i=0;i<NUM_BACKENDS;i++)
2270                         {
2271                                 if (VALID_BACKEND(i))
2272                                 {
2273                                         pool_write(CONNECTION(backend, i), string, len);
2274                                 }
2275                         }
2276                 }
2277                 else
2278                         pool_write(frontend, string, len);
2279
2280                 if (len == PROTO_MAJOR_V3)
2281                 {
2282                         /* end of copy? */
2283                         if (string[0] == '\\' &&
2284                                 string[1] == '.' &&
2285                                 string[2] == '\n')
2286                         {
2287                                 break;
2288                         }
2289                 }
2290         }
2291
2292         if (copyin)
2293         {
2294                 for (i=0;i<NUM_BACKENDS;i++)
2295                 {
2296                         if (VALID_BACKEND(i))
2297                         {
2298                                 if (pool_flush(CONNECTION(backend, i)) <0)
2299                                         return POOL_END;
2300
2301                                 if (synchronize(CONNECTION(backend, i)))
2302                                         return POOL_END;
2303                         }
2304                 }
2305         }
2306         else
2307                 if (pool_flush(frontend) <0)
2308                         return POOL_END;
2309
2310         return POOL_CONTINUE;
2311 }
2312
2313 POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend,
2314                                                                           POOL_CONNECTION_POOL *backend)
2315 {
2316         char c;
2317         int i;
2318
2319         for (i=0;i<NUM_BACKENDS;i++)
2320         {
2321                 if (VALID_BACKEND(i))
2322                 {
2323                         if (pool_read(CONNECTION(backend, i), &c, sizeof(c)) < 0)
2324                                 return POOL_END;
2325                 }
2326         }
2327
2328         pool_write(frontend, "I", 1);
2329         return pool_write_and_flush(frontend, "", 1);
2330 }
2331
2332 /*
2333  * Check various errors from backend.  return values: 0: no error 1:
2334  * deadlock detected 2: serialization error detected 3: query cancel
2335  * detected: 4
2336  */
2337 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id)
2338 {
2339
2340         /*
2341          * Check dead lock error on the master node and abort
2342          * transactions on all nodes if so.
2343          */
2344         if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2345                 return 1;
2346
2347         /*
2348          * Check serialization failure error and abort
2349          * transactions on all nodes if so. Otherwise we allow
2350          * data inconsistency among DB nodes. See following
2351          * scenario: (M:master, S:slave)
2352          *
2353          * M:S1:BEGIN;
2354          * M:S2:BEGIN;
2355          * S:S1:BEGIN;
2356          * S:S2:BEGIN;
2357          * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2358          * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2359          * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2360          * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2361          * M:S1:UPDATE t1 SET i = i + 1;
2362          * S:S1:UPDATE t1 SET i = i + 1;
2363          * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
2364          * S:S1:COMMIT;
2365          * M:S1:COMMIT;
2366          * M:S2:ERROR:  could not serialize access due to concurrent update
2367          * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
2368          */
2369         if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2370                 return 2;
2371
2372         /*
2373          * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
2374          * This happens in following scenario:
2375          *
2376          * M:S1:BEGIN;
2377          * S:S1:BEGIN;
2378          * M:S1:SELECT 1; <-- only sent to MASTER
2379          * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2380          * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
2381          * M: <-- error
2382          * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
2383          */
2384         if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2385                 return 3;
2386
2387         /* check query cancel error */
2388         if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
2389                 return 4;
2390
2391         return 0;
2392 }
2393
2394 static void generate_error_message(char *prefix, int specific_error, char *query)
2395 {
2396         static char *error_messages[] = {
2397                 "received deadlock error message from master node. query: %s",
2398                 "received serialization failure error message from master node. query: %s",
2399                 "received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
2400                 "received query cancel error message from master node. query: %s"
2401         };
2402
2403         String *msg;
2404
2405         if (specific_error < 1 || specific_error > sizeof(error_messages)/sizeof(char *))
2406         {
2407                 pool_error("generate_error_message: invalid specific_error: %d", specific_error);
2408                 return;
2409         }
2410
2411         specific_error--;
2412
2413         msg = init_string(prefix);
2414         string_append_char(msg, error_messages[specific_error]);
2415         pool_error(msg->data, query);
2416         free_string(msg);
2417 }