]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_process_query.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_process_query.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.141.2.22 2009/10/02 07:53:08 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2009      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_process_query.c: query processing stuff
22  *
23 */
24 #include "config.h"
25 #include <errno.h>
26
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30 #ifdef HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #ifdef HAVE_SYS_SELECT_H
34 #include <sys/select.h>
35 #endif
36
37
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <string.h>
41 #include <netinet/in.h>
42 #include <ctype.h>
43
44 #include "pool.h"
45 #include "pool_signal.h"
46 #include "pool_proto_modules.h"
47
48 #ifndef FD_SETSIZE
49 #define FD_SETSIZE 512
50 #endif
51
52 #define INIT_STATEMENT_LIST_SIZE 8
53
54 #define ACTIVE_SQL_TRANSACTION_ERROR_CODE "25001"               /* SET TRANSACTION ISOLATION LEVEL must be called before any query */
55 #define DEADLOCK_ERROR_CODE "40P01"
56 #define SERIALIZATION_FAIL_ERROR_CODE "40001"
57 #define QUERY_CANCEL_ERROR_CODE "57014"
58 #define ADMIN_SHUTDOWN_ERROR_CODE "57P01"
59 #define CRASH_SHUTDOWN_ERROR_CODE "57P02"
60
61 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
62 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, int protoMajor, int pid, int key, int no_ready_for_query);
63 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major);
64 static char *get_insert_command_table_name(InsertStmt *node);
65 static void reset_prepared_list(PreparedStatementList *p);
66 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
67 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
68 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend);
69 static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len);
70 static int extract_ntuples(char *message);
71 static int detect_error(POOL_CONNECTION *master, char *error_code, int major, char class, bool unread);
72 static int detect_postmaster_down_error(POOL_CONNECTION *master, int major);
73 static void free_select_result(POOL_SELECT_RESULT *result);
74
75 static bool is_internal_transaction_needed(Node *node);
76 static int compare(const void *p1, const void *p2);
77
78 /* timeout sec for pool_check_fd */
79 static int timeoutsec;
80
81 int in_load_balance;    /* non 0 if in load balance mode */
82 int selected_slot;              /* selected DB node */
83 int master_slave_dml;   /* non 0 if master/slave mode is specified in config file */
84
85 /*
86  * main module for query processing
87  */
88 POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
89                                                            POOL_CONNECTION_POOL *backend,
90                                                            int connection_reuse,
91                                                            int first_ready_for_query_received)
92 {
93         char kind;      /* packet kind (backend) */
94         char fkind;     /* packet kind (frontend) */
95         short num_fields = 0;
96         fd_set  readmask;
97         fd_set  writemask;
98         fd_set  exceptmask;
99         int fds;
100         POOL_STATUS status;
101         int state;      /* 0: ok to issue commands 1: waiting for "ready for query" response */
102         int qcnt;
103         int i;
104
105         frontend->no_forward = connection_reuse;
106         qcnt = 0;
107         state = 0;
108
109         for (;;)
110         {
111                 kind = 0;
112                 fkind = 0;
113
114                 if (state == 0 && connection_reuse)
115                 {
116                         int st;
117
118                         /* send query for resetting connection such as "ROLLBACK" "RESET ALL"... */
119                         st = reset_backend(backend, qcnt);
120
121                         if (st < 0)             /* error? */
122                         {
123                                 /* probably we don't need this, since caller will
124                                  * close the connection to frontend after returning with POOL_END. But I
125                                  * guess I would like to be a paranoid...
126                                  */
127                                 frontend->no_forward = 0;
128                                 return POOL_END;
129                         }
130
131                         else if (st == 0)       /* no query issued? */
132                         {
133                                 qcnt++;
134                                 continue;
135                         }
136
137                         else if (st == 1)       /* more query remains */
138                         {
139                                 state = 1;
140                                 qcnt++;
141                                 continue;
142                         }
143
144                         else    /* no more query(st == 2) */
145                         {
146                                 TSTATE(backend) = 'I';
147                                 frontend->no_forward = 0;
148                                 return POOL_CONTINUE;
149                         }
150
151                 }
152
153                 /*
154                  * if all backends do not have any pending data in the
155                  * receiving data cache, then issue select(2) to wait for new
156                  * data arrival
157                  */
158                 if (is_cache_empty(frontend, backend))
159                 {
160                         struct timeval timeoutdata;
161                         struct timeval *timeout;
162                         int num_fds, was_error = 0;
163
164                     /*
165                          * frontend idle counters. depends on the following
166                          * select(2) call's time out is 1 second.
167                          */
168                         int idle_count = 0;     /* for other than in recovery */
169                         int idle_count_in_recovery = 0; /* for in recovery */
170
171                 SELECT_RETRY:
172                         FD_ZERO(&readmask);
173                         FD_ZERO(&writemask);
174                         FD_ZERO(&exceptmask);
175
176                         num_fds = 0;
177
178                         /*
179                          * Do not read a message from frontend while backends process a query.
180                          */
181                         if (!connection_reuse && !in_progress)
182                         {
183                                 FD_SET(frontend->fd, &readmask);
184                                 FD_SET(frontend->fd, &exceptmask);
185                                 num_fds = Max(frontend->fd + 1, num_fds);
186                         }
187
188                         /*
189                          * If we are in load balance mode and the selected node is
190                          * down, we need to re-select load_balancing_node.  Note
191                          * that we cannnot use VALID_BACKEND macro here.  If
192                          * in_load_balance == 1, VALID_BACKEND macro may return 0.
193                          */
194                         if (pool_config->load_balance_mode &&
195                                 BACKEND_INFO(backend->info->load_balancing_node).backend_status == CON_DOWN)
196                         {
197                                 /* select load balancing node */
198                                 backend->info->load_balancing_node = select_load_balancing_node();
199                         }
200
201                         for (i=0;i<NUM_BACKENDS;i++)
202                         {
203                                 if (VALID_BACKEND(i))
204                                 {
205                                         num_fds = Max(CONNECTION(backend, i)->fd + 1, num_fds);
206                                         FD_SET(CONNECTION(backend, i)->fd, &readmask);
207                                         FD_SET(CONNECTION(backend, i)->fd, &exceptmask);
208                                 }
209                         }
210
211                         /*
212                          * wait for data arriving from frontend and backend
213                          */
214                         if (pool_config->client_idle_limit > 0 ||
215                                 pool_config->client_idle_limit_in_recovery > 0)
216                         {
217                                 timeoutdata.tv_sec = 1;
218                                 timeoutdata.tv_usec = 0;
219                                 timeout = &timeoutdata;
220                         }
221                         else
222                                 timeout = NULL;
223
224                         fds = select(num_fds, &readmask, &writemask, &exceptmask, timeout);
225
226                         if (fds == -1)
227                         {
228                                 if (errno == EINTR)
229                                         continue;
230
231                                 pool_error("select() failed. reason: %s", strerror(errno));
232                                 return POOL_ERROR;
233                         }
234
235                         /* select timeout */
236                         if (fds == 0)
237                         {
238                                 if (*InRecovery == 0 && pool_config->client_idle_limit > 0)
239                                 {
240                                         idle_count++;
241
242                                         if (idle_count > pool_config->client_idle_limit)
243                                         {
244                                                 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit(%d) reached", pool_config->client_idle_limit);
245                                                 return POOL_END;
246                                         }
247                                 }
248                                 else if (*InRecovery > 0 && pool_config->client_idle_limit_in_recovery > 0)
249                                 {
250                                         idle_count_in_recovery++;
251
252                                         if (idle_count_in_recovery > pool_config->client_idle_limit_in_recovery)
253                                         {
254                                                 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit_in_recovery(%d) reached", pool_config->client_idle_limit_in_recovery);
255                                                 return POOL_END;
256                                         }
257                                 }
258                                 goto SELECT_RETRY;
259                         }
260
261                         for (i = 0; i < NUM_BACKENDS; i++)
262                         {
263                                 if (VALID_BACKEND(i))
264                                 {
265                                         /*
266                                          * make sure that connection slot exists
267                                          */
268                                         if (CONNECTION_SLOT(backend, i) == 0)
269                                         {
270                                                 pool_log("FATAL ERROR: VALID_BACKEND returns non 0 but connection slot is empty. backend id:%d RAW_MODE:%d in_load_balance:%d LOAD_BALANCE_STATUS:%d status:%d",
271                                                                  i, RAW_MODE, in_load_balance, LOAD_BALANCE_STATUS(i), BACKEND_INFO(i).backend_status);
272                                                 was_error = 1;
273                                                 break;
274                                         }
275
276                                         if (FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
277                                         {
278                                                 /*
279                                                  * admin shutdown postmaster or postmaster goes down
280                                                  */
281                                                 if (detect_postmaster_down_error(CONNECTION(backend, i), MAJOR(backend)) == SPECIFIED_ERROR)
282                                                 {
283                                                         /* detach backend node. */
284                                                         was_error = 1;
285                                                         if (!VALID_BACKEND(i))
286                                                                 break;
287                                                         notice_backend_error(i);
288                                                         sleep(5);
289                                                         break;
290                                                 }
291                                                 status = read_kind_from_backend(frontend, backend, &kind);
292                                                 if (status != POOL_CONTINUE)
293                                                         return status;
294                                                 break;
295                                         }
296                                 }
297                         }
298
299                         if (was_error)
300                                 continue;
301
302                         if (!connection_reuse && !in_progress)
303                         {
304                                 if (FD_ISSET(frontend->fd, &exceptmask))
305                                         return POOL_END;
306                                 else if (FD_ISSET(frontend->fd, &readmask))
307                                 {
308                                         status = ProcessFrontendResponse(frontend, backend);
309                                         if (status != POOL_CONTINUE)
310                                                 return status;
311
312                                         continue;
313                                 }
314                                 if (kind == 0)
315                                         continue;
316                         }
317
318                         if (FD_ISSET(MASTER(backend)->fd, &exceptmask))
319                         {
320                                 return POOL_ERROR;
321                         }
322                 }
323                 else
324                 {
325                         if (frontend->len > 0 && !in_progress)
326                         {
327                                 status = ProcessFrontendResponse(frontend, backend);
328                                 if (status != POOL_CONTINUE)
329                                         return status;
330
331                                 continue;
332                         }
333                 }
334
335                 /* this is the synchronous point */
336                 if (kind == 0)
337                 {
338                         status = read_kind_from_backend(frontend, backend, &kind);
339                         if (status != POOL_CONTINUE)
340                                 return status;
341                 }
342
343                 /* reload config file */
344                 if (got_sighup)
345                 {
346                         pool_get_config(get_config_file_name(), RELOAD_CONFIG);
347                         if (pool_config->enable_pool_hba)
348                                 load_hba(get_hba_file_name());
349                         if (pool_config->parallel_mode)
350                                 pool_memset_system_db_info(system_db_info->info);
351                         got_sighup = 0;
352                 }
353
354                 first_ready_for_query_received = 0;
355
356                 /*
357                  * Process backend Response
358                  */
359
360                 /*
361                  * Sanity check
362                  */
363                 if (kind == 0)
364                 {
365                         pool_error("pool_process_query: kind is 0!");
366                         return POOL_ERROR;
367                 }
368
369                 pool_debug("pool_process_query: kind from backend: %c", kind);
370
371                 if (MAJOR(backend) == PROTO_MAJOR_V3)
372                 {
373                         switch (kind)
374                         {
375                                 case 'G':
376                                         /* CopyIn response */
377                                         status = CopyInResponse(frontend, backend);
378                                         break;
379                                 case 'S':
380                                         /* Parameter Status */
381                                         status = ParameterStatus(frontend, backend);
382                                         break;
383                                 case 'Z':
384                                         /* Ready for query */
385                                         status = ReadyForQuery(frontend, backend, 1);
386                                         break;
387                                 default:
388                                         status = SimpleForwardToFrontend(kind, frontend, backend);
389                                         if (pool_flush(frontend))
390                                                 return POOL_END;
391                                         break;
392                         }
393                 }
394                 else
395                 {
396                         switch (kind)
397                         {
398                                 case 'A':
399                                         /* Notification  response */
400                                         status = NotificationResponse(frontend, backend);
401                                         break;
402
403                                 case 'B':
404                                         /* BinaryRow */
405                                         status = BinaryRow(frontend, backend, num_fields);
406                                         break;
407
408                                 case 'C':
409                                         /* Complete command response */
410                                         status = CompleteCommandResponse(frontend, backend);
411                                         break;
412
413                                 case 'D':
414                                         /* AsciiRow */
415                                         status = AsciiRow(frontend, backend, num_fields);
416                                         break;
417
418                                 case 'E':
419                                         /* Error Response */
420                                         status = ErrorResponse(frontend, backend);
421                                         break;
422
423                                 case 'G':
424                                         /* CopyIn Response */
425                                         status = CopyInResponse(frontend, backend);
426                                         break;
427
428                                 case 'H':
429                                         /* CopyOut Response */
430                                         status = CopyOutResponse(frontend, backend);
431                                         break;
432
433                                 case 'I':
434                                         /* Empty Query Response */
435                                         status = EmptyQueryResponse(frontend, backend);
436                                         break;
437
438                                 case 'N':
439                                         /* Notice Response */
440                                         status = NoticeResponse(frontend, backend);
441                                         break;
442
443                                 case 'P':
444                                         /* CursorResponse */
445                                         status = CursorResponse(frontend, backend);
446                                         break;
447
448                                 case 'T':
449                                         /* RowDescription */
450                                         status = RowDescription(frontend, backend, &num_fields);
451                                         break;
452
453                                 case 'V':
454                                         /* FunctionResultResponse and FunctionVoidResponse */
455                                         status = FunctionResultResponse(frontend, backend);
456                                         break;
457
458                                 case 'Z':
459                                         /* Ready for query */
460                                         status = ReadyForQuery(frontend, backend, 1);
461                                         break;
462
463                                 default:
464                                         pool_error("Unknown message type %c(%02x)", kind, kind);
465                                         exit(1);
466                         }
467                 }
468
469                 if (status != POOL_CONTINUE)
470                         return status;
471
472                 if (kind == 'Z' && frontend->no_forward && state == 1)
473                 {
474                         state = 0;
475                 }
476
477         }
478         return POOL_CONTINUE;
479 }
480
481
482 /*
483  * set_fd,isset_fs,zero_fd are used
484  * for check fd in parallel mode
485  */
486
487 /* used only in pool_parallel_exec */
488 #define BITS (8 * sizeof(long int))
489
490 static void set_fd(unsigned long fd ,unsigned long *setp)
491 {
492         unsigned long tmp = fd / FD_SETSIZE;
493         unsigned long rem = fd % FD_SETSIZE;
494         setp[tmp] |= (1UL<<rem);
495 }
496
497 /* used only in pool_parallel_exec */
498 static int isset_fd(unsigned long fd, unsigned long *setp)
499 {
500         unsigned long tmp = fd / FD_SETSIZE;
501         unsigned long rem = fd % FD_SETSIZE;
502         return (setp[tmp] & (1UL<<rem)) != 0;
503 }
504
505 /* used only in pool_parallel_exec */
506 static void zero_fd(unsigned long *setp)
507 {
508         unsigned long *tmp = setp;
509         int i = FD_SETSIZE / BITS;
510         while(i)
511         {
512                 i--;
513                 *tmp = 0;
514                 tmp++;
515         }
516 }
517
518 /*
519  * This function transmits to a parallel Query, and does processing
520  * that receives the result to each back end.
521  */
522 POOL_STATUS pool_parallel_exec(POOL_CONNECTION *frontend,
523                                                                           POOL_CONNECTION_POOL *backend, char *string,
524                                                                           Node *node,bool send_to_frontend)
525 {
526         int len;
527         int fds;
528         int i;
529         char kind;
530         fd_set readmask;
531         fd_set writemask;
532         fd_set exceptmask;
533         unsigned long donemask[FD_SETSIZE / BITS];
534         static char *sq = "show pool_status";
535         POOL_STATUS status;
536         struct timeval timeout;
537         int num_fds;
538         int used_count = 0;
539         int error_flag = 0;
540         unsigned long datacount = 0;
541
542         timeout.tv_sec = 1;
543         timeout.tv_usec = 0;
544
545         len = strlen(string) + 1;
546
547         if (is_drop_database(node))
548         {
549                 int stime = 5;  /* XXX give arbitrary time to allow closing idle connections */
550
551                 pool_debug("Query: sending HUP signal to parent");
552
553                 kill(getppid(), SIGHUP);        /* send HUP signal to parent */
554
555                 /* we need to loop over here since we will get HUP signal while sleeping */
556                 while (stime > 0)
557                         stime = sleep(stime);
558         }
559
560         /* process status reporting? */
561         if (strncasecmp(sq, string, strlen(sq)) == 0)
562         {
563                 pool_debug("process reporting");
564                 process_reporting(frontend, backend);
565                 in_progress = 0;
566                 return POOL_CONTINUE;
567         }
568
569         /* In this loop,forward the query to the all backends */
570         for (i=0;i<NUM_BACKENDS;i++)
571         {
572                 if (!VALID_BACKEND(i))
573                         continue;
574
575                 pool_write(CONNECTION(backend, i), "Q", 1);
576
577                 if (MAJOR(backend) == PROTO_MAJOR_V3)
578                 {
579                         int sendlen = htonl(len + 4);
580                         pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen));
581                 }
582
583                 if (pool_write_and_flush(CONNECTION(backend, i), string, len) < 0)
584                 {
585                         return POOL_END;
586                 }
587
588                 /*
589                  * in "strict mode" we need to wait for backend completing the query.
590                  * note that this is not applied if "NO STRICT" is specified as a comment.
591                  */
592                 if (is_strict_query(node))
593                 {
594                         pool_debug("waiting for backend %d completing the query", i);
595                         if (synchronize(CONNECTION(backend, i)))
596                                 return POOL_END;
597                 }
598         }
599
600         if (!is_cache_empty(frontend, backend))
601         {
602                 return POOL_END;
603         }
604
605         zero_fd(donemask);
606
607         /* In this loop, receive data from the all backends and send data to frontend */
608         for (;;)
609         {
610                 FD_ZERO(&readmask);
611                 FD_ZERO(&writemask);
612                 FD_ZERO(&exceptmask);
613                 num_fds = 0;
614
615                 for (i=0;i<NUM_BACKENDS;i++)
616                 {
617                         if (VALID_BACKEND(i))
618                         {
619                                 int fd = CONNECTION(backend,i)->fd;
620                                 num_fds = Max(fd + 1, num_fds);
621                                 if(!isset_fd(fd,donemask))
622                                 {
623                                         FD_SET(fd, &readmask);
624                                         FD_SET(fd, &exceptmask);
625                                         pool_debug("pool_parallel_query:  %d th FD_SET: %d",i, CONNECTION(backend, i)->fd);
626                                 }
627                         }
628                 }
629
630                 pool_debug("pool_parallel_query: num_fds: %d", num_fds);
631
632                 fds = select(num_fds, &readmask, &writemask, &exceptmask, NULL);
633
634                 if (fds == -1)
635                 {
636                         if (errno == EINTR)
637                                 continue;
638
639                                 pool_error("select() failed. reason: %s", strerror(errno));
640                         return POOL_ERROR;
641                  }
642
643                 if (fds == 0)
644                 {
645                         return POOL_CONTINUE;
646                 }
647
648                 /* get header of protocol */
649                 for (i=0;i<NUM_BACKENDS;i++)
650                 {
651                         if (!VALID_BACKEND(i) ||
652                                 !FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
653                         {
654                                 continue;
655                         }
656                         else
657                         {
658                                 status = read_kind_from_one_backend(frontend, backend, &kind,i);
659                                 if (status != POOL_CONTINUE)
660                                         return status;
661
662                                 if (used_count == 0)
663                                 {
664                                         status = ParallelForwardToFrontend(kind,
665                                                                                                                 frontend,
666                                                                                                                 CONNECTION(backend, i),
667                                                                                                                 backend->info->database,
668                                                                                                                 send_to_frontend);
669                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
670                                 }
671                                 else
672                                 {
673                                         status = ParallelForwardToFrontend(kind,
674                                                                                                                 frontend,
675                                                                                                                 CONNECTION(backend, i),
676                                                                                                                 backend->info->database,
677                                                                                                                 false);
678                                         pool_debug("pool_parallel_exec: dummy kind from backend: %c", kind);
679                                 }
680
681                                 if (status != POOL_CONTINUE)
682                                         return status;
683
684                                 if(kind == 'C' || kind == 'E' || kind == 'c')
685                                 {
686                                         if(used_count == NUM_BACKENDS -1)
687                                                 return POOL_CONTINUE;
688
689                                         used_count++;
690                                         set_fd(CONNECTION(backend, i)->fd, donemask);
691                                         continue;
692                                 }
693
694                                 /* get body of protocol */
695                                 for(;;)
696                                 {
697                                         if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
698                                         {
699                                                 pool_error("pool_parallel_exec: failed to read kind from %d th backend", i);
700                                                 return POOL_ERROR;
701                                         }
702
703                                         /*
704                                          * Sanity check
705                                          */
706                                         if (kind == 0)
707                                         {
708                                                 pool_error("pool_parallel_exec: kind is 0!");
709                                                 return POOL_ERROR;
710                                         }
711
712                                         if((kind == 'E' ) &&
713                                                 used_count != NUM_BACKENDS -1)
714                                         {
715                                                 if(error_flag ==0)
716                                                 {
717                                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
718
719                                                         status = ParallelForwardToFrontend(kind,
720                                                                                                                         frontend,
721                                                                                                                         CONNECTION(backend, i),
722                                                                                                                         backend->info->database,
723                                                                                                                         send_to_frontend);
724                                                         error_flag++;
725                                                 } else {
726                                                         pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
727                                                         status = ParallelForwardToFrontend(kind,
728                                                                                                                         frontend,
729                                                                                                                         CONNECTION(backend, i),
730                                                                                                                         backend->info->database,
731                                                                                                                         false);
732                                                 }
733                                                 used_count++;
734                                                 set_fd(CONNECTION(backend, i)->fd, donemask);
735                                                 break;
736                                         }
737
738                                         if((kind == 'c' || kind == 'C') &&
739                                            used_count != NUM_BACKENDS -1)
740                                         {
741                                                 pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
742                                                 status = ParallelForwardToFrontend(kind,
743                                                                                                                         frontend,
744                                                                                                                         CONNECTION(backend, i),
745                                                                                                                         backend->info->database,
746                                                                                                                         false);
747                                                 used_count++;
748                                                 set_fd(CONNECTION(backend, i)->fd, donemask);
749                                                 break;
750                                         }
751                                         if((kind == 'C' || kind == 'c' || kind == 'E') &&
752                                                 used_count == NUM_BACKENDS -1)
753                                         {
754                                                 pool_debug("pool_parallel_exec: kind from backend: D %lu", datacount);
755
756                                                 if(error_flag == 0)
757                                                 {
758                                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
759                                                         status = ParallelForwardToFrontend(kind,
760                                                                                                                         frontend,
761                                                                                                                         CONNECTION(backend, i),
762                                                                                                                         backend->info->database,
763                                                                                                                         send_to_frontend);
764                                                 } else {
765                                                         pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
766                                                         status = ParallelForwardToFrontend(kind,
767                                                                                                                         frontend,
768                                                                                                                         CONNECTION(backend, i),
769                                                                                                                         backend->info->database,
770                                                                                                                         false);
771                                                 }
772                                                 return POOL_CONTINUE;
773                                         }
774
775                                         if(kind == 'D')
776                                                 datacount++;
777                                         else
778                                                 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
779
780                                         status = ParallelForwardToFrontend(kind,
781                                                                                                                 frontend,
782                                                                                                                 CONNECTION(backend, i),
783                                                                                                                 backend->info->database,
784                                                                                                                 send_to_frontend);
785
786                                         if (status != POOL_CONTINUE)
787                                         {
788                                                 return status;
789                                         }
790                                         else
791                                         {
792                                                 pool_flush(frontend);
793                                         }
794                                 }
795                         }
796                 }
797         }
798 }
799
800
801
802 /*
803  * send SimpleQuery message to a node.
804  */
805 POOL_STATUS send_simplequery_message(POOL_CONNECTION *backend, int len, char *string, int major)
806 {
807         /* forward the query to the backend */
808         pool_write(backend, "Q", 1);
809
810         if (major == PROTO_MAJOR_V3)
811         {
812                 int sendlen = htonl(len + 4);
813                 pool_write(backend, &sendlen, sizeof(sendlen));
814         }
815
816         if (pool_write_and_flush(backend, string, len) < 0)
817         {
818                 return POOL_END;
819         }
820
821         return POOL_CONTINUE;
822 }
823
824 /*
825  * Wait for query response from single node. This checks frontend
826  * connection by writing dummy parameter status packet every 30
827  * seccond, and if the connection broke, returns error since there's
828  * no point in that waiting until backend returns response.
829  */
830 POOL_STATUS wait_for_query_response(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *string, int protoVersion)
831 {
832 #define DUMMY_PARAMETER "pgpool_dummy_param"
833 #define DUMMY_VALUE "pgpool_dummy_value"
834
835         int status;
836         int plen;
837
838         pool_debug("wait_for_query_response: waiting for backend %d completing the query", backend->db_node_id);
839
840         for (;;)
841         {
842                 /* Check to see if data from backend is ready */
843                 pool_set_timeout(30);
844                 status = pool_check_fd(backend);
845                 pool_set_timeout(0);
846
847                 if (status < 0) /* error ? */
848                 {
849                         pool_error("wait_for_query_response: backend error occured while waiting for backend response");
850                         return POOL_END;
851                 }
852                 else if (status > 0)            /* data is not ready */
853                 {
854                         if (protoVersion == PROTO_MAJOR_V3)
855                         {
856                                 /* Write dummy parameter staus packet to check if the socket to frontend is ok */
857                                 if (pool_write(frontend, "S", 1) < 0)
858                                         return POOL_END;
859                                 plen = sizeof(DUMMY_PARAMETER)+sizeof(DUMMY_VALUE)+sizeof(plen);
860                                 plen = htonl(plen);
861                                 if (pool_write(frontend, &plen, sizeof(plen)) < 0)
862                                         return POOL_END;
863                                 if (pool_write(frontend, DUMMY_PARAMETER, sizeof(DUMMY_PARAMETER)) < 0)
864                                         return POOL_END;
865                                 if (pool_write(frontend, DUMMY_VALUE, sizeof(DUMMY_VALUE)) < 0)
866                                         return POOL_END;
867                                 if (pool_flush_it(frontend) < 0)
868                                 {
869                                         pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
870                                         return POOL_END;
871                                 }
872
873                         } else          /* Protocol version 2 */
874                         {
875 /*
876  * If you want to monitor client connection even if you are using V2 protocol,
877  * define following
878  */
879 #undef SEND_NOTICE_ON_PROTO2
880 #ifdef SEND_NOTICE_ON_PROTO2
881                                 static char *notice_message = {"keep alive checking from pgpool-II"};
882
883                                 /* Write notice message packet to check if the socket to frontend is ok */
884                                 if (pool_write(frontend, "N", 1) < 0)
885                                         return POOL_END;
886                                 if (pool_write(frontend, notice_message, strlen(notice_message)+1) < 0)
887                                         return POOL_END;
888                                 if (pool_flush_it(frontend) < 0)
889                                 {
890                                         pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
891                                         return POOL_END;
892                                 }
893 #endif
894                         }
895                 }
896                 else
897                         break;
898         }
899
900         return POOL_CONTINUE;
901 }
902
903
904 /*
905  * Extended query protocol has to send Flush message.
906  */
907 POOL_STATUS send_extended_protocol_message(POOL_CONNECTION_POOL *backend,
908                                                                                                   int node_id, char *kind,
909                                                                                                   int len, char *string)
910 {
911         POOL_CONNECTION *cp = CONNECTION(backend, node_id);
912         int sendlen;
913
914         /* forward the query to the backend */
915         pool_write(cp, kind, 1);
916         sendlen = htonl(len + 4);
917         pool_write(cp, &sendlen, sizeof(sendlen));
918         pool_write(cp, string, len);
919
920         /*
921          * send "Flush" message so that backend notices us
922          * the completion of the command
923          */
924         pool_write(cp, "H", 1);
925         sendlen = htonl(4);
926         if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
927         {
928                 return POOL_ERROR;
929         }
930
931         return POOL_CONTINUE;
932 }
933
934 POOL_STATUS send_execute_message(POOL_CONNECTION_POOL *backend,
935                                                                                 int node_id, int len, char *string)
936 {
937         return send_extended_protocol_message(backend, node_id, "E", len, string);
938 }
939
940 /*
941  * wait until read data is ready
942  */
943 int synchronize(POOL_CONNECTION *cp)
944 {
945         return pool_check_fd(cp);
946 }
947
948 /*
949  * set timeout in seconds for pool_check_fd
950  * if timeoutval < 0, we assume no timeout(wait forever).
951  */
952 void pool_set_timeout(int timeoutval)
953 {
954         if (timeoutval > 0)
955                 timeoutsec = timeoutval;
956         else
957                 timeoutsec = 0;
958 }
959
960 /*
961  * Wait until read data is ready.
962  * return values: 0: normal 1: data is not ready -1: error
963  */
964 int pool_check_fd(POOL_CONNECTION *cp)
965 {
966         fd_set readmask;
967         fd_set exceptmask;
968         int fd;
969         int fds;
970         struct timeval timeout;
971         struct timeval *timeoutp;
972
973         fd = cp->fd;
974
975         if (timeoutsec > 0)
976         {
977                 timeout.tv_sec = timeoutsec;
978                 timeout.tv_usec = 0;
979                 timeoutp = &timeout;
980         }
981         else
982                 timeoutp = NULL;
983
984         for (;;)
985         {
986                 FD_ZERO(&readmask);
987                 FD_ZERO(&exceptmask);
988                 FD_SET(fd, &readmask);
989                 FD_SET(fd, &exceptmask);
990
991                 fds = select(fd+1, &readmask, NULL, &exceptmask, timeoutp);
992                 if (fds == -1)
993                 {
994                         if (errno == EAGAIN || errno == EINTR)
995                                 continue;
996
997                         pool_error("pool_check_fd: select() failed. reason %s", strerror(errno));
998                         break;
999                 }
1000                 else if (fds == 0)              /* timeout */
1001                         return 1;
1002
1003                 if (FD_ISSET(fd, &exceptmask))
1004                 {
1005                         pool_error("pool_check_fd: exception occurred");
1006                         break;
1007                 }
1008                 return 0;
1009         }
1010         return -1;
1011 }
1012
1013 /*
1014  * Process "show pool_status" query.
1015  */
1016 void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1017 {
1018         static char *cursorname = "blank";
1019         static short num_fields = 3;
1020         static char *field_names[] = {"item", "value", "description"};
1021         static int oid = 0;
1022         static short fsize = -1;
1023         static int mod = 0;
1024         short n;
1025         int i, j;
1026         short s;
1027         int len;
1028         short colnum;
1029
1030         static unsigned char nullmap[2] = {0xff, 0xff};
1031         int nbytes = (num_fields + 7)/8;
1032
1033 #define POOLCONFIG_MAXNAMELEN 32
1034 #define POOLCONFIG_MAXVALLEN 512
1035 #define POOLCONFIG_MAXDESCLEN 64
1036
1037         typedef struct {
1038                 char name[POOLCONFIG_MAXNAMELEN+1];
1039                 char value[POOLCONFIG_MAXVALLEN+1];
1040                 char desc[POOLCONFIG_MAXDESCLEN+1];
1041         } POOL_REPORT_STATUS;
1042
1043 #define MAXITEMS 128
1044
1045         POOL_REPORT_STATUS status[MAXITEMS];
1046
1047         short nrows;
1048         int size;
1049         int hsize;
1050
1051         i = 0;
1052
1053         strncpy(status[i].name, "listen_addresses", POOLCONFIG_MAXNAMELEN);
1054         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->listen_addresses);
1055         strncpy(status[i].desc, "host name(s) or IP address(es) to listen to", POOLCONFIG_MAXDESCLEN);
1056         i++;
1057
1058         strncpy(status[i].name, "port", POOLCONFIG_MAXNAMELEN);
1059         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->port);
1060         strncpy(status[i].desc, "pgpool accepting port number", POOLCONFIG_MAXDESCLEN);
1061         i++;
1062
1063         strncpy(status[i].name, "socket_dir", POOLCONFIG_MAXNAMELEN);
1064         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->socket_dir);
1065         strncpy(status[i].desc, "pgpool socket directory", POOLCONFIG_MAXDESCLEN);
1066         i++;
1067
1068         strncpy(status[i].name, "num_init_children", POOLCONFIG_MAXNAMELEN);
1069         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_init_children);
1070         strncpy(status[i].desc, "# of children initially pre-forked", POOLCONFIG_MAXDESCLEN);
1071         i++;
1072
1073         strncpy(status[i].name, "child_life_time", POOLCONFIG_MAXNAMELEN);
1074         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_life_time);
1075         strncpy(status[i].desc, "if idle for this seconds, child exits", POOLCONFIG_MAXDESCLEN);
1076         i++;
1077
1078         strncpy(status[i].name, "connection_life_time", POOLCONFIG_MAXNAMELEN);
1079         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_life_time);
1080         strncpy(status[i].desc, "if idle for this seconds, connection closes", POOLCONFIG_MAXDESCLEN);
1081         i++;
1082
1083         strncpy(status[i].name, "client_idle_limit", POOLCONFIG_MAXNAMELEN);
1084         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit);
1085         strncpy(status[i].desc, "if idle for this seconds, child connection closes", POOLCONFIG_MAXDESCLEN);
1086         i++;
1087
1088         strncpy(status[i].name, "child_max_connections", POOLCONFIG_MAXNAMELEN);
1089         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_max_connections);
1090         strncpy(status[i].desc, "if max_connections received, chile exits", POOLCONFIG_MAXDESCLEN);
1091         i++;
1092
1093         strncpy(status[i].name, "max_pool", POOLCONFIG_MAXNAMELEN);
1094         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->max_pool);
1095         strncpy(status[i].desc, "max # of connection pool per child", POOLCONFIG_MAXDESCLEN);
1096         i++;
1097
1098         strncpy(status[i].name, "authentication_timeout", POOLCONFIG_MAXNAMELEN);
1099         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->authentication_timeout);
1100         strncpy(status[i].desc, "maximum time in seconds to complete client authentication", POOLCONFIG_MAXNAMELEN);
1101         i++;
1102
1103         strncpy(status[i].name, "logdir", POOLCONFIG_MAXNAMELEN);
1104         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->logdir);
1105         strncpy(status[i].desc, "logging directory", POOLCONFIG_MAXDESCLEN);
1106         i++;
1107
1108         strncpy(status[i].name, "pid_file_name", POOLCONFIG_MAXNAMELEN);
1109         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pid_file_name);
1110         strncpy(status[i].desc, "path to pid file", POOLCONFIG_MAXDESCLEN);
1111         i++;
1112
1113         strncpy(status[i].name, "backend_socket_dir", POOLCONFIG_MAXNAMELEN);
1114         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->backend_socket_dir);
1115         strncpy(status[i].desc, "Unix domain socket directory for the PostgreSQL server", POOLCONFIG_MAXDESCLEN);
1116         i++;
1117
1118         strncpy(status[i].name, "replication_mode", POOLCONFIG_MAXNAMELEN);
1119         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_mode);
1120         strncpy(status[i].desc, "non 0 if operating in replication mode", POOLCONFIG_MAXDESCLEN);
1121         i++;
1122
1123         strncpy(status[i].name, "load_balance_mode", POOLCONFIG_MAXNAMELEN);
1124         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->load_balance_mode);
1125         strncpy(status[i].desc, "non 0 if operating in load balancing mode", POOLCONFIG_MAXDESCLEN);
1126         i++;
1127
1128         strncpy(status[i].name, "replication_stop_on_mismatch", POOLCONFIG_MAXNAMELEN);
1129         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_stop_on_mismatch);
1130         strncpy(status[i].desc, "stop replication mode on fatal error", POOLCONFIG_MAXDESCLEN);
1131         i++;
1132
1133         strncpy(status[i].name, "replicate_select", POOLCONFIG_MAXNAMELEN);
1134         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replicate_select);
1135         strncpy(status[i].desc, "non 0 if SELECT statement is replicated", POOLCONFIG_MAXDESCLEN);
1136         i++;
1137
1138         strncpy(status[i].name, "reset_query_list", POOLCONFIG_MAXNAMELEN);
1139         *(status[i].value) = '\0';
1140         for (j=0;j<pool_config->num_reset_queries;j++)
1141         {
1142                 int len;
1143                 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1144                 strncat(status[i].value, pool_config->reset_query_list[j], len);
1145                 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1146                 strncat(status[i].value, ";", len);
1147         }
1148         strncpy(status[i].desc, "queries issued at the end of session", POOLCONFIG_MAXDESCLEN);
1149         i++;
1150
1151         strncpy(status[i].name, "print_timestamp", POOLCONFIG_MAXNAMELEN);
1152         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->print_timestamp);
1153         strncpy(status[i].desc, "if true print time stamp to each log line", POOLCONFIG_MAXDESCLEN);
1154         i++;
1155
1156         strncpy(status[i].name, "master_slave_mode", POOLCONFIG_MAXNAMELEN);
1157         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_mode);
1158         strncpy(status[i].desc, "if true, operate in master/slave mode", POOLCONFIG_MAXDESCLEN);
1159         i++;
1160
1161         strncpy(status[i].name, "connection_cache", POOLCONFIG_MAXNAMELEN);
1162         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_cache);
1163         strncpy(status[i].desc, "if true, cache connection pool", POOLCONFIG_MAXDESCLEN);
1164         i++;
1165
1166         strncpy(status[i].name, "health_check_timeout", POOLCONFIG_MAXNAMELEN);
1167         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_timeout);
1168         strncpy(status[i].desc, "health check timeout", POOLCONFIG_MAXDESCLEN);
1169         i++;
1170
1171         strncpy(status[i].name, "health_check_period", POOLCONFIG_MAXNAMELEN);
1172         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_period);
1173         strncpy(status[i].desc, "health check period", POOLCONFIG_MAXDESCLEN);
1174         i++;
1175
1176         strncpy(status[i].name, "health_check_user", POOLCONFIG_MAXNAMELEN);
1177         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->health_check_user);
1178         strncpy(status[i].desc, "health check user", POOLCONFIG_MAXDESCLEN);
1179         i++;
1180
1181         strncpy(status[i].name, "failover_command", POOLCONFIG_MAXNAMELEN);
1182         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1183         strncpy(status[i].desc, "failover command", POOLCONFIG_MAXDESCLEN);
1184         i++;
1185
1186         strncpy(status[i].name, "failback_command", POOLCONFIG_MAXNAMELEN);
1187         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1188         strncpy(status[i].desc, "failback command", POOLCONFIG_MAXDESCLEN);
1189         i++;
1190
1191         strncpy(status[i].name, "insert_lock", POOLCONFIG_MAXNAMELEN);
1192         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->insert_lock);
1193         strncpy(status[i].desc, "insert lock", POOLCONFIG_MAXDESCLEN);
1194         i++;
1195
1196         strncpy(status[i].name, "ignore_leading_white_space", POOLCONFIG_MAXNAMELEN);
1197         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->ignore_leading_white_space);
1198         strncpy(status[i].desc, "ignore leading white spaces", POOLCONFIG_MAXDESCLEN);
1199         i++;
1200
1201         strncpy(status[i].name, "replication_enabled", POOLCONFIG_MAXNAMELEN);
1202         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_enabled);
1203         strncpy(status[i].desc, "non 0 if actually operating in replication mode", POOLCONFIG_MAXDESCLEN);
1204         i++;
1205
1206         strncpy(status[i].name, "master_slave_enabled", POOLCONFIG_MAXNAMELEN);
1207         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_enabled);
1208         strncpy(status[i].desc, "non 0 if actually operating in master/slave", POOLCONFIG_MAXDESCLEN);
1209         i++;
1210
1211         strncpy(status[i].name, "num_reset_queries", POOLCONFIG_MAXNAMELEN);
1212         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_reset_queries);
1213         strncpy(status[i].desc, "number of queries in reset_query_list", POOLCONFIG_MAXDESCLEN);
1214         i++;
1215
1216         strncpy(status[i].name, "pcp_port", POOLCONFIG_MAXNAMELEN);
1217         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_port);
1218         strncpy(status[i].desc, "PCP port # to bind", POOLCONFIG_MAXDESCLEN);
1219         i++;
1220
1221         strncpy(status[i].name, "pcp_socket_dir", POOLCONFIG_MAXNAMELEN);
1222         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pcp_socket_dir);
1223         strncpy(status[i].desc, "PCP socket directory", POOLCONFIG_MAXDESCLEN);
1224         i++;
1225
1226         strncpy(status[i].name, "pcp_timeout", POOLCONFIG_MAXNAMELEN);
1227         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_timeout);
1228         strncpy(status[i].desc, "PCP timeout for an idle client", POOLCONFIG_MAXDESCLEN);
1229         i++;
1230
1231         strncpy(status[i].name, "log_statement", POOLCONFIG_MAXNAMELEN);
1232         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_statement);
1233         strncpy(status[i].desc, "if non 0, logs all SQL statements", POOLCONFIG_MAXDESCLEN);
1234         i++;
1235
1236         strncpy(status[i].name, "log_connections", POOLCONFIG_MAXNAMELEN);
1237         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_connections);
1238         strncpy(status[i].desc, "if true, print incoming connections to the log", POOLCONFIG_MAXDESCLEN);
1239         i++;
1240
1241         strncpy(status[i].name, "log_hostname", POOLCONFIG_MAXNAMELEN);
1242         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_hostname);
1243         strncpy(status[i].desc, "if true, resolve hostname for ps and log print", POOLCONFIG_MAXDESCLEN);
1244         i++;
1245
1246         strncpy(status[i].name, "enable_pool_hba", POOLCONFIG_MAXNAMELEN);
1247         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_pool_hba);
1248         strncpy(status[i].desc, "if true, use pool_hba.conf for client authentication", POOLCONFIG_MAXDESCLEN);
1249         i++;
1250
1251         strncpy(status[i].name, "recovery_user", POOLCONFIG_MAXNAMELEN);
1252         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_user);
1253         strncpy(status[i].desc, "online recovery user", POOLCONFIG_MAXDESCLEN);
1254         i++;
1255
1256         strncpy(status[i].name, "recovery_password", POOLCONFIG_MAXNAMELEN);
1257         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_password);
1258         strncpy(status[i].desc, "online recovery password", POOLCONFIG_MAXDESCLEN);
1259         i++;
1260
1261         strncpy(status[i].name, "recovery_1st_stage_command", POOLCONFIG_MAXNAMELEN);
1262         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_1st_stage_command);
1263         strncpy(status[i].desc, "execute a command in first stage.", POOLCONFIG_MAXDESCLEN);
1264         i++;
1265
1266         strncpy(status[i].name, "recovery_2nd_stage_command", POOLCONFIG_MAXNAMELEN);
1267         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_2nd_stage_command);
1268         strncpy(status[i].desc, "execute a command in second stage.", POOLCONFIG_MAXDESCLEN);
1269         i++;
1270
1271         strncpy(status[i].name, "recovery_timeout", POOLCONFIG_MAXNAMELEN);
1272         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->recovery_timeout);
1273         strncpy(status[i].desc, "max time in seconds to wait for the recovering node's postmaster", POOLCONFIG_MAXDESCLEN);
1274         i++;
1275
1276         strncpy(status[i].name, "client_idle_limit_in_recovery", POOLCONFIG_MAXNAMELEN);
1277         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit_in_recovery);
1278         strncpy(status[i].desc, "if idle for this seconds, child connection closes in recovery 2nd statge", POOLCONFIG_MAXDESCLEN);
1279         i++;
1280
1281         strncpy(status[i].name, "parallel_mode", POOLCONFIG_MAXNAMELEN);
1282         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->parallel_mode);
1283         strncpy(status[i].desc, "if non 0, run in parallel query mode", POOLCONFIG_MAXDESCLEN);
1284         i++;
1285
1286         strncpy(status[i].name, "enable_query_cache", POOLCONFIG_MAXNAMELEN);
1287         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_query_cache);
1288         strncpy(status[i].desc, "if non 0, use query cache", POOLCONFIG_MAXDESCLEN);
1289         i++;
1290
1291         strncpy(status[i].name, "pgpool2_hostname", POOLCONFIG_MAXNAMELEN);
1292         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pgpool2_hostname);
1293         strncpy(status[i].desc, "pgpool2 hostname", POOLCONFIG_MAXDESCLEN);
1294         i++;
1295
1296         strncpy(status[i].name, "system_db_hostname", POOLCONFIG_MAXNAMELEN);
1297         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_hostname);
1298         strncpy(status[i].desc, "system DB hostname", POOLCONFIG_MAXDESCLEN);
1299         i++;
1300
1301         strncpy(status[i].name, "system_db_port", POOLCONFIG_MAXNAMELEN);
1302         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->system_db_port);
1303         strncpy(status[i].desc, "system DB port number", POOLCONFIG_MAXDESCLEN);
1304         i++;
1305
1306         strncpy(status[i].name, "system_db_dbname", POOLCONFIG_MAXNAMELEN);
1307         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_dbname);
1308         strncpy(status[i].desc, "system DB name", POOLCONFIG_MAXDESCLEN);
1309         i++;
1310
1311         strncpy(status[i].name, "system_db_schema", POOLCONFIG_MAXNAMELEN);
1312         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_schema);
1313         strncpy(status[i].desc, "system DB schema name", POOLCONFIG_MAXDESCLEN);
1314         i++;
1315
1316         strncpy(status[i].name, "system_db_user", POOLCONFIG_MAXNAMELEN);
1317         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_user);
1318         strncpy(status[i].desc, "user name to access system DB", POOLCONFIG_MAXDESCLEN);
1319         i++;
1320
1321         strncpy(status[i].name, "system_db_password", POOLCONFIG_MAXNAMELEN);
1322         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_password);
1323         strncpy(status[i].desc, "password to access system DB", POOLCONFIG_MAXDESCLEN);
1324         i++;
1325
1326         for (j = 0; j < NUM_BACKENDS; j++)
1327         {
1328                 if (BACKEND_INFO(j).backend_port == 0)
1329                         continue;
1330
1331                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_hostname%d", j);
1332                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", BACKEND_INFO(j).backend_hostname);
1333                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d hostname", j);
1334                 i++;
1335
1336                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_port%d", j);
1337                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_port);
1338                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d port number", j);
1339                 i++;
1340
1341                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_weight%d", j);
1342                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%f", BACKEND_INFO(j).backend_weight);
1343                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "weight of backend #%d", j);
1344                 i++;
1345
1346                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend status%d", j);
1347                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_status);
1348                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "status of backend #%d", j);
1349                 i++;
1350         }
1351
1352         nrows = i;
1353
1354         if (MAJOR(backend) == PROTO_MAJOR_V2)
1355         {
1356                 /* cursor response */
1357                 pool_write(frontend, "P", 1);
1358                 pool_write(frontend, cursorname, strlen(cursorname)+1);
1359         }
1360
1361         /* row description */
1362         pool_write(frontend, "T", 1);
1363
1364         if (MAJOR(backend) == PROTO_MAJOR_V3)
1365         {
1366                 len = sizeof(num_fields) + sizeof(len);
1367
1368                 for (i=0;i<num_fields;i++)
1369                 {
1370                         char *f = field_names[i];
1371                         len += strlen(f)+1;
1372                         len += sizeof(oid);
1373                         len += sizeof(colnum);
1374                         len += sizeof(oid);
1375                         len += sizeof(s);
1376                         len += sizeof(mod);
1377                         len += sizeof(s);
1378                 }
1379
1380                 len = htonl(len);
1381                 pool_write(frontend, &len, sizeof(len));
1382         }
1383
1384         n = htons(num_fields);
1385         pool_write(frontend, &n, sizeof(short));
1386
1387         for (i=0;i<num_fields;i++)
1388         {
1389                 char *f = field_names[i];
1390
1391                 pool_write(frontend, f, strlen(f)+1);           /* field name */
1392
1393                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1394                 {
1395                         pool_write(frontend, &oid, sizeof(oid));        /* table oid */
1396                         colnum = htons(i);
1397                         pool_write(frontend, &colnum, sizeof(colnum));  /* column number */
1398                 }
1399
1400                 pool_write(frontend, &oid, sizeof(oid));                /* data type oid */
1401                 s = htons(fsize);
1402                 pool_write(frontend, &s, sizeof(fsize));                /* field size */
1403                 pool_write(frontend, &mod, sizeof(mod));                /* modifier */
1404
1405                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1406                 {
1407                         s = htons(0);
1408                         pool_write(frontend, &s, sizeof(fsize));        /* field format (text) */
1409                 }
1410         }
1411         pool_flush(frontend);
1412
1413         if (MAJOR(backend) == PROTO_MAJOR_V2)
1414         {
1415                 /* ascii row */
1416                 for (i=0;i<nrows;i++)
1417                 {
1418                         pool_write(frontend, "D", 1);
1419                         pool_write_and_flush(frontend, nullmap, nbytes);
1420
1421                         size = strlen(status[i].name);
1422                         hsize = htonl(size+4);
1423                         pool_write(frontend, &hsize, sizeof(hsize));
1424                         pool_write(frontend, status[i].name, size);
1425
1426                         size = strlen(status[i].value);
1427                         hsize = htonl(size+4);
1428                         pool_write(frontend, &hsize, sizeof(hsize));
1429                         pool_write(frontend, status[i].value, size);
1430
1431                         size = strlen(status[i].desc);
1432                         hsize = htonl(size+4);
1433                         pool_write(frontend, &hsize, sizeof(hsize));
1434                         pool_write(frontend, status[i].desc, size);
1435                 }
1436         }
1437         else
1438         {
1439                 /* data row */
1440                 for (i=0;i<nrows;i++)
1441                 {
1442                         pool_write(frontend, "D", 1);
1443                         len = sizeof(len) + sizeof(nrows);
1444                         len += sizeof(int) + strlen(status[i].name);
1445                         len += sizeof(int) + strlen(status[i].value);
1446                         len += sizeof(int) + strlen(status[i].desc);
1447                         len = htonl(len);
1448                         pool_write(frontend, &len, sizeof(len));
1449                         s = htons(3);
1450                         pool_write(frontend, &s, sizeof(s));
1451
1452                         len = htonl(strlen(status[i].name));
1453                         pool_write(frontend, &len, sizeof(len));
1454                         pool_write(frontend, status[i].name, strlen(status[i].name));
1455
1456                         len = htonl(strlen(status[i].value));
1457                         pool_write(frontend, &len, sizeof(len));
1458                         pool_write(frontend, status[i].value, strlen(status[i].value));
1459
1460                         len = htonl(strlen(status[i].desc));
1461                         pool_write(frontend, &len, sizeof(len));
1462                         pool_write(frontend, status[i].desc, strlen(status[i].desc));
1463                 }
1464         }
1465
1466         /* complete command response */
1467         pool_write(frontend, "C", 1);
1468         if (MAJOR(backend) == PROTO_MAJOR_V3)
1469         {
1470                 len = htonl(sizeof(len) + strlen("SELECT")+1);
1471                 pool_write(frontend, &len, sizeof(len));
1472         }
1473         pool_write(frontend, "SELECT", strlen("SELECT")+1);
1474
1475         /* ready for query */
1476         pool_write(frontend, "Z", 1);
1477         if (MAJOR(backend) == PROTO_MAJOR_V3)
1478         {
1479                 len = htonl(sizeof(len) + 1);
1480                 pool_write(frontend, &len, sizeof(len));
1481                 pool_write(frontend, "I", 1);
1482         }
1483
1484         pool_flush(frontend);
1485 }
1486
1487 /*
1488  * send "terminate"(X) message to all backends, indicating that
1489  * backend should prepare to close connection to frontend (actually
1490  * pgpool). Note that caller must be protecedt from a signal
1491  * interruption while calling this function. Otherwise the number of
1492  * valid backends might be changed by failover/failback.
1493  */
1494 void pool_send_frontend_exits(POOL_CONNECTION_POOL *backend)
1495 {
1496         int len;
1497         int i;
1498
1499         for (i=0;i<NUM_BACKENDS;i++)
1500         {
1501                 /*
1502                  * send a terminate message to backend if there's an existing
1503                  * connection
1504                  */
1505                 if (VALID_BACKEND(i) && CONNECTION_SLOT(backend, i))
1506                 {
1507                         pool_write(CONNECTION(backend, i), "X", 1);
1508
1509                         if (MAJOR(backend) == PROTO_MAJOR_V3)
1510                         {
1511                                 len = htonl(4);
1512                                 pool_write(CONNECTION(backend, i), &len, sizeof(len));
1513                         }
1514
1515                         /*
1516                          * XXX we cannot call pool_flush() here since backend may already
1517                          * close the socket and pool_flush() automatically invokes fail
1518                          * over handler. This could happen in copy command (remember the
1519                          * famous "lost synchronization with server, resetting
1520                          * connection" message)
1521                          */
1522                         pool_set_nonblock(CONNECTION(backend, i)->fd);
1523                         pool_flush_it(CONNECTION(backend, i));
1524                         pool_unset_nonblock(CONNECTION(backend, i)->fd);
1525                 }
1526         }
1527 }
1528
1529 /*
1530  * -------------------------------------------------------
1531  * V3 functions
1532  * -------------------------------------------------------
1533  */
1534
1535 /*
1536  * This function transmits to a parallel Query to each backend,
1537  * and receives the results from backends .
1538  *
1539  */
1540 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend)
1541 {
1542         int len;
1543         char *p;
1544         int status;
1545
1546         if (send_to_frontend)
1547         {
1548                 pool_write(frontend, &kind, 1);
1549         }
1550
1551         status = pool_read(backend, &len, sizeof(len));
1552         if (status < 0)
1553         {
1554                 pool_error("ParallelForwardToFrontend: error while reading message length");
1555                 return POOL_END;
1556         }
1557
1558         if (send_to_frontend)
1559         {
1560                 pool_write(frontend, &len, sizeof(len));
1561         }
1562
1563         len = ntohl(len) - 4 ;
1564
1565         if (len <= 0)
1566                 return POOL_CONTINUE;
1567
1568         p = pool_read2(backend, len);
1569         if (p == NULL)
1570                 return POOL_END;
1571
1572         status = POOL_CONTINUE;
1573         if (send_to_frontend)
1574         {
1575                 status = pool_write(frontend, p, len);
1576                 if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP && status == 0)
1577                 {
1578                         query_cache_register(kind, frontend, database, p, len);
1579                 }
1580         }
1581
1582         return status;
1583 }
1584
1585 POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1586 {
1587         int len, len1 = 0;
1588         char *p = NULL;
1589         char *p1 = NULL;
1590         char *p2 = NULL;
1591         int status;
1592         int sendlen;
1593         int i;
1594         int command_ok_row_count = 0;
1595         int delete_or_update = 0;
1596         char kind1;
1597         POOL_STATUS ret;
1598
1599         /*
1600          * Check if packet kind == 'C'(Command complete), '1'(Parse
1601          * complete), '3'(Close complete). If so, then register or
1602          * unregister pending prepared statement.
1603          */
1604         if ((kind == 'C' || kind == '1' || kind == '3') &&
1605                 pending_function)
1606         {
1607                 pending_function(&prepared_list, pending_prepared_portal);
1608                 if (pending_prepared_portal &&
1609                         pending_prepared_portal->stmt &&
1610                         IsA(pending_prepared_portal->stmt, DeallocateStmt))
1611                 {
1612                         free(pending_prepared_portal->portal_name);
1613                         pending_prepared_portal->portal_name = NULL;
1614                         pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1615                         free(pending_prepared_portal);
1616                 }
1617         }
1618         else if (kind == 'E' && pending_function)
1619         {
1620                 /* An error occurred with PREPARE or DEALLOCATE command.
1621                  * Free pending portal object.
1622                  */
1623                 if (pending_prepared_portal)
1624                 {
1625                         free(pending_prepared_portal->portal_name);
1626                         pending_prepared_portal->portal_name = NULL;
1627                         pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1628                         free(pending_prepared_portal);
1629                 }
1630         }
1631         else if (kind == 'C' && select_in_transaction)
1632         {
1633                 select_in_transaction = 0;
1634                 execute_select = 0;
1635         }
1636
1637         /*
1638          * Remove a pending function if a received message is not
1639          * NoticeResponse.
1640          */
1641         if (kind != 'N')
1642         {
1643                 pending_function = NULL;
1644                 pending_prepared_portal = NULL;
1645         }
1646
1647         status = pool_read(MASTER(backend), &len, sizeof(len));
1648         len = ntohl(len);
1649         len -= 4;
1650         len1 = len;
1651
1652         p = pool_read2(MASTER(backend), len);
1653         if (p == NULL)
1654                 return POOL_END;
1655         p1 = malloc(len);
1656         if (p1 == NULL)
1657         {
1658                 pool_error("SimpleForwardToFrontend: malloc failed");
1659                 return POOL_ERROR;
1660         }
1661         memcpy(p1, p, len);
1662
1663         if (kind == 'C')        /* packet kind is "Command Complete"? */
1664         {
1665                 command_ok_row_count = extract_ntuples(p);
1666
1667                 /*
1668                  * if we are in the parallel mode, we have to sum up the number
1669                  * of affected rows
1670                  */
1671                 if (PARALLEL_MODE && is_parallel_table &&
1672                         (strstr(p, "UPDATE") || strstr(p, "DELETE")))
1673                 {
1674                         delete_or_update = 1;
1675                 }
1676         }
1677
1678         for (i=0;i<NUM_BACKENDS;i++)
1679         {
1680                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1681                 {
1682                         status = pool_read(CONNECTION(backend, i), &len, sizeof(len));
1683                         if (status < 0)
1684                         {
1685                                 pool_error("SimpleForwardToFrontend: error while reading message length");
1686                                 return POOL_END;
1687                         }
1688
1689                         len = ntohl(len);
1690                         len -= 4;
1691
1692                         p = pool_read2(CONNECTION(backend, i), len);
1693                         if (p == NULL)
1694                                 return POOL_END;
1695
1696                         if (len != len1)
1697                         {
1698                                 pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) %d th backend(%d) kind:(%c)",
1699                                                    len, i, len1, kind);
1700                         }
1701
1702                         if (kind == 'C')        /* packet kind is "Command Complete"? */
1703                         {
1704                                 int n = extract_ntuples(p);
1705
1706                                 /*
1707                                  * if we are in the parallel mode, we have to sum up the number
1708                                  * of affected rows
1709                                  */
1710                                 if (delete_or_update)
1711                                 {
1712                                         command_ok_row_count += n;
1713                                 }
1714                                 else if (command_ok_row_count != n) /* mismatch update rows */
1715                                 {
1716                                         mismatch_ntuples = 1;
1717                                 }
1718                         }
1719                 }
1720         }
1721
1722         if (mismatch_ntuples)
1723         {
1724                 String *msg = init_string("pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
1725                 string_append_char(msg, query_string_buffer);
1726                 string_append_char(msg, "\"");
1727                 pool_send_error_message(frontend, MAJOR(backend),
1728                                                                 "XX001", msg->data, "",
1729                                                                 "check data consistency between master and other db node",  __FILE__, __LINE__);
1730                 pool_error(msg->data);
1731                 free_string(msg);
1732         }
1733         else
1734         {
1735                 if (delete_or_update)
1736                 {
1737                         char tmp[32];
1738
1739                         strncpy(tmp, p1, 7);
1740                         sprintf(tmp+7, "%d", command_ok_row_count);
1741
1742                         p2 = strdup(tmp);
1743                         if (p2 == NULL)
1744                         {
1745                                 pool_error("SimpleForwardToFrontend: malloc failed");
1746                                 free(p1);
1747                                 return POOL_ERROR;
1748                         }
1749
1750                         free(p1);
1751                         p1 = p2;
1752                         len1 = strlen(p2) + 1;
1753                 }
1754
1755                 pool_write(frontend, &kind, 1);
1756                 sendlen = htonl(len1+4);
1757                 pool_write(frontend, &sendlen, sizeof(sendlen));
1758                 pool_write(frontend, p1, len1);
1759         }
1760
1761         /* save the received result for each kind */
1762         if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP)
1763         {
1764                 query_cache_register(kind, frontend, backend->info->database, p1, len1);
1765         }
1766
1767         free(p1);
1768         if (status)
1769                 return POOL_END;
1770
1771         if (kind == 'A')        /* notification response */
1772         {
1773                 pool_flush(frontend);   /* we need to immediately notice to frontend */
1774         }
1775         else if (kind == 'E')           /* error response? */
1776         {
1777                 int i;
1778                 int res1;
1779                 char *p1;
1780
1781                 /*
1782                  * check if the error was PANIC or FATAL. If so, we just flush
1783                  * the message and exit since the backend will close the
1784                  * channel immediately.
1785                  */
1786                 for (;;)
1787                 {
1788                         char e;
1789
1790                         e = *p++;
1791                         if (e == '\0')
1792                                 break;
1793
1794                         if (e == 'S' && (strcasecmp("PANIC", p) == 0 || strcasecmp("FATAL", p) == 0))
1795                         {
1796                                 pool_flush(frontend);
1797                                 return POOL_END;
1798                         }
1799                         else
1800                         {
1801                                 while (*p++)
1802                                         ;
1803                                 continue;
1804                         }
1805                 }
1806
1807                 if (select_in_transaction)
1808                 {
1809                         int i;
1810
1811                         in_load_balance = 0;
1812                         REPLICATION = 1;
1813                         for (i = 0; i < NUM_BACKENDS; i++)
1814                         {
1815                                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1816                                 {
1817                                         /*
1818                                          * We must abort transaction to sync transaction state.
1819                                          * If the error was caused by an Execute message,
1820                                          * we must send invalid Execute message to abort
1821                                          * transaction.
1822                                          *
1823                                          * Because extended query protocol ignores all
1824                                          * messages before receiving Sync message inside error state.
1825                                          */
1826                                         if (execute_select)
1827                                                 do_error_execute_command(backend, i, PROTO_MAJOR_V3);
1828                                         else
1829                                                 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1830                                 }
1831                         }
1832                         select_in_transaction = 0;
1833                         execute_select = 0;
1834                 }
1835
1836                 for (i = 0;i < NUM_BACKENDS; i++)
1837                 {
1838                         if (VALID_BACKEND(i))
1839                         {
1840                                 POOL_CONNECTION *cp = CONNECTION(backend, i);
1841
1842                                 /* We need to send "sync" message to backend in extend mode
1843                                  * so that it accepts next command.
1844                                  * Note that this may be overkill since client may send
1845                                  * it by itself. Moreover we do not need it in non-extend mode.
1846                                  * At this point we regard it is not harmful since error response
1847                                  * will not be sent too frequently.
1848                                  */
1849                                 pool_write(cp, "S", 1);
1850                                 res1 = htonl(4);
1851                                 if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
1852                                 {
1853                                         return POOL_END;
1854                                 }
1855                         }
1856                 }
1857
1858                 while ((ret = read_kind_from_backend(frontend, backend, &kind1)) == POOL_CONTINUE)
1859                 {
1860                         if (kind1 == 'Z') /* ReadyForQuery? */
1861                                 break;
1862
1863                         ret = SimpleForwardToFrontend(kind1, frontend, backend);
1864                         if (ret != POOL_CONTINUE)
1865                                 return ret;
1866                         pool_flush(frontend);
1867                 }
1868
1869                 if (ret != POOL_CONTINUE)
1870                         return ret;
1871
1872                 for (i = 0; i < NUM_BACKENDS; i++)
1873                 {
1874                         if (VALID_BACKEND(i))
1875                         {
1876                                 status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1));
1877                                 if (status < 0)
1878                                 {
1879                                         pool_error("SimpleForwardToFrontend: error while reading message length");
1880                                         return POOL_END;
1881                                 }
1882                                 res1 = ntohl(res1) - sizeof(res1);
1883                                 p1 = pool_read2(CONNECTION(backend, i), res1);
1884                                 if (p1 == NULL)
1885                                         return POOL_END;
1886                         }
1887                 }
1888         }
1889         return POOL_CONTINUE;
1890 }
1891
1892 POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1893 {
1894         int len;
1895         int sendlen;
1896         char *p;
1897         int i;
1898         char *name;
1899         POOL_STATUS ret;
1900
1901         for (i=0;i<NUM_BACKENDS;i++)
1902         {
1903                 if (VALID_BACKEND(i))
1904                 {
1905                         if (pool_write(CONNECTION(backend, i), &kind, 1))
1906                                 return POOL_END;
1907                 }
1908         }
1909
1910         if (pool_read(frontend, &sendlen, sizeof(sendlen)))
1911         {
1912                 return POOL_END;
1913         }
1914
1915         len = ntohl(sendlen) - 4;
1916
1917         for (i=0;i<NUM_BACKENDS;i++)
1918         {
1919                 if (VALID_BACKEND(i))
1920                 {
1921                         if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen)))
1922                                 return POOL_END;
1923                 }
1924         }
1925
1926         if (len == 0)
1927                 return POOL_CONTINUE;
1928         else if (len < 0)
1929         {
1930                 pool_error("SimpleForwardToBackend: invalid message length");
1931                 return POOL_END;
1932         }
1933
1934         p = pool_read2(frontend, len);
1935         if (p == NULL)
1936                 return POOL_END;
1937
1938         for (i=0;i<NUM_BACKENDS;i++)
1939         {
1940                 if (VALID_BACKEND(i))
1941                 {
1942                         if (pool_write_and_flush(CONNECTION(backend, i), p, len))
1943                                 return POOL_END;
1944                 }
1945         }
1946
1947         if (kind == 'B') /* Bind message */
1948         {
1949                 Portal *portal = NULL;
1950                 char *stmt_name, *portal_name;
1951
1952                 portal_name = p;
1953                 stmt_name = p + strlen(portal_name) + 1;
1954
1955                 pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name);
1956
1957                 if (*stmt_name == '\0')
1958                         portal = unnamed_statement;
1959                 else
1960                 {
1961                         portal = lookup_prepared_statement_by_statement(&prepared_list, stmt_name);
1962                 }
1963
1964                 if (*portal_name == '\0'){
1965                         unnamed_portal = portal;
1966                 }
1967                 else if (portal)
1968                 {
1969                         if (portal->portal_name)
1970                                 free(portal->portal_name);
1971                         portal->portal_name = strdup(portal_name);
1972                 }
1973         }
1974
1975         /* Close message with prepared statement name. */
1976         else if (kind == 'C' && *p == 'S' && *(p + 1))
1977         {
1978                 POOL_MEMORY_POOL *old_context = pool_memory;
1979                 DeallocateStmt *deallocate_stmt;
1980
1981                 pending_prepared_portal = create_portal();
1982                 if (pending_prepared_portal == NULL)
1983                 {
1984                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1985                         return POOL_END;
1986                 }
1987
1988                 pool_memory = pending_prepared_portal->prepare_ctxt;
1989                 name = pstrdup(p+1);
1990                 if (name == NULL)
1991                 {
1992                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1993                         pool_memory = old_context;
1994                         return POOL_END;
1995                 }
1996
1997                 /* Translate from Close message to DEALLOCATE statement.*/
1998                 deallocate_stmt = palloc(sizeof(DeallocateStmt));
1999                 if (deallocate_stmt == NULL)
2000                 {
2001                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
2002                         pool_memory = old_context;
2003                         return POOL_END;
2004                 }
2005                 deallocate_stmt->name = name;
2006                 pending_prepared_portal->stmt = (Node *)deallocate_stmt;
2007                 pending_prepared_portal->portal_name = NULL;
2008                 pending_function = del_prepared_list;
2009                 pool_memory = old_context;
2010         }
2011
2012         if (kind == 'B' || kind == 'D' || kind == 'C')
2013         {
2014                 int i;
2015                 char kind1;
2016
2017                 for (i = 0;i < NUM_BACKENDS; i++)
2018                 {
2019                         if (VALID_BACKEND(i))
2020                         {
2021                                 POOL_CONNECTION *cp = CONNECTION(backend, i);
2022
2023                                 /*
2024                                  * send "Flush" message so that backend notices us
2025                                  * the completion of the command
2026                                  */
2027                                 pool_write(cp, "H", 1);
2028                                 sendlen = htonl(4);
2029                                 if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
2030                                 {
2031                                         return POOL_END;
2032                                 }
2033                         }
2034                 }
2035
2036                 /*
2037                  * Describe message with a portal name will receive two messages.
2038                  * 1. ParameterDescription
2039                  * 2. RowDescriptions or NoData
2040                  * So we read one message here.
2041                  */
2042                 if (kind == 'D' && *p == 'S')
2043                 {
2044                         ret = read_kind_from_backend(frontend, backend, &kind1);
2045                         if (ret != POOL_CONTINUE)
2046                                 return ret;
2047                         SimpleForwardToFrontend(kind1, frontend, backend);
2048                         if (pool_flush(frontend))
2049                                 return POOL_END;
2050                 }
2051
2052                 /*
2053                  * Forward to frontend until a NOTICE message received.
2054                  */
2055                 for (;;)
2056                 {
2057                         ret = read_kind_from_backend(frontend, backend, &kind1);
2058                         if (ret != POOL_CONTINUE)
2059                                 return ret;
2060                         SimpleForwardToFrontend(kind1, frontend, backend);
2061                         if (pool_flush(frontend) < 0)
2062                                 return POOL_ERROR;
2063
2064                         if (kind1 != 'N')
2065                                 break;
2066                 }
2067         }
2068
2069         return POOL_CONTINUE;
2070 }
2071
2072 POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2073 {
2074         int len, len1 = 0;
2075         int *len_array;
2076         int sendlen;
2077         char *p;
2078         char *name;
2079         char *value;
2080         POOL_STATUS status;
2081         char parambuf[1024];            /* parameter + value string buffer. XXX is this enough? */
2082         int i;
2083
2084         pool_write(frontend, "S", 1);
2085
2086         len_array = pool_read_message_length2(backend);
2087
2088         if (len_array == NULL)
2089         {
2090                 return POOL_END;
2091         }
2092
2093         len = len_array[MASTER_NODE_ID];
2094         sendlen = htonl(len);
2095         pool_write(frontend, &sendlen, sizeof(sendlen));
2096
2097         for (i=0;i<NUM_BACKENDS;i++)
2098         {
2099                 if (VALID_BACKEND(i))
2100                 {
2101                         len = len_array[i];
2102                         len -= 4;
2103
2104                         p = pool_read2(CONNECTION(backend, i), len);
2105                         if (p == NULL)
2106                                 return POOL_END;
2107
2108                         name = p;
2109                         value = p + strlen(name) + 1;
2110
2111                         pool_debug("%d th backend: name: %s value: %s", i, name, value);
2112
2113                         if (IS_MASTER_NODE_ID(i))
2114                         {
2115                                 len1 = len;
2116                                 memcpy(parambuf, p, len);
2117                                 pool_add_param(&CONNECTION(backend, i)->params, name, value);
2118                         }
2119
2120 #ifdef DEBUG
2121                         pool_param_debug_print(&MASTER(backend)->params);
2122 #endif
2123                 }
2124         }
2125
2126         status = pool_write(frontend, parambuf, len1);
2127         return status;
2128 }
2129
2130 /*
2131  * Reset backend status. return values are:
2132  * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error
2133  */
2134 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt)
2135 {
2136         char *query;
2137         int qn;
2138
2139         /*
2140          * Reset all state variables
2141          */
2142         in_progress = 0;
2143         in_load_balance = 0;
2144
2145         /*
2146          * Until pgpool-II 2.2.3 we don't have following 2 lines.
2147          * If we were executing someting in load balance mode, and if
2148          * frontend failed before executing end_loadl_balance() in
2149          * ReadyForQuery(), these variables remained and we may do
2150          * something only in master node!
2151          */
2152         REPLICATION = pool_config->replication_mode;
2153         MASTER_SLAVE = pool_config->master_slave_mode;
2154
2155         force_replication = 0;
2156         internal_transaction_started = 0;
2157         mismatch_ntuples = 0;
2158         select_in_transaction = 0;
2159         execute_select = 0;
2160         receive_extended_begin = 0;
2161
2162         qn = pool_config->num_reset_queries;
2163
2164         /*
2165          * After execution of all SQL commands in the reset_query_list, we
2166          * remove all prepared objects in the prepared_list.
2167          */
2168         if (qcnt >= qn)
2169         {
2170                 if (prepared_list.cnt == 0)
2171                 {
2172                         /*
2173                          * Either no prepared objects were created or DISCARD ALL
2174                          * or DEALLOCATE ALL is on the reset_query_list and they
2175                          * were executed.  The latter causes call to
2176                          * reset_prepared_list which removes all prepared objects.
2177                          */
2178                         reset_prepared_list(&prepared_list);
2179                         return 2;
2180                 }
2181
2182                 /* Delete from prepared list */
2183                 if (send_deallocate(backend, &prepared_list, 0))
2184                 {
2185                         /* Deallocate failed. We are in unknown state. Ask caller
2186                          * to reset backend connection.
2187                          */
2188                         reset_prepared_list(&prepared_list);
2189                         return -1;
2190                 }
2191                 /*
2192                  * If DEALLOCATE returns ERROR response, instead of
2193                  * CommandComplete, del_prepared_list is not called and the
2194                  * prepared object keeps on sitting on the prepared list. This
2195                  * will cause infinite call to reset_backend.  So we call
2196                  * del_prepared_list() again. This is harmless since trying to
2197                  * remove same prepared object will be ignored.
2198                  */
2199                 del_prepared_list(&prepared_list, prepared_list.portal_list[0]);
2200                 return 1;
2201         }
2202
2203         query = pool_config->reset_query_list[qcnt];
2204
2205         /* if transaction state is idle, we don't need to issue ABORT */
2206         if (TSTATE(backend) == 'I' && !strcmp("ABORT", query))
2207                 return 0;
2208
2209         pool_set_timeout(10);
2210
2211         if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
2212         {
2213                 pool_set_timeout(0);
2214                 return -1;
2215         }
2216
2217         pool_set_timeout(0);
2218         return 1;
2219 }
2220
2221 /*
2222  * return non 0 if load balance is possible
2223  */
2224 int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql)
2225 {
2226         return (pool_config->load_balance_mode &&
2227                         (DUAL_MODE || pool_config->parallel_mode) &&
2228                         MAJOR(backend) == PROTO_MAJOR_V3 &&
2229                         TSTATE(backend) == 'I' &&
2230                         is_select_query(node, sql) &&
2231                         !is_sequence_query(node));
2232 }
2233
2234
2235 /*
2236  * returns non 0 if the SQL statement can be load
2237  * balanced. Followings are statemnts go into this category.
2238  *
2239  * - SELECT without FOR UPDATE/SHARE
2240  * - COPY TO STDOUT
2241  * - DECLARE..SELECT (without INTO nor FOR UPDATE/SHARE)
2242  * - FETCH
2243  * - CLOSE
2244  *
2245  * note that for SELECT INTO, this function returns 0
2246  */
2247 int is_select_query(Node *node, char *sql)
2248 {
2249         if (node == NULL)
2250                 return 0;
2251
2252         /*
2253          * 2009/5/1 Tatsuo says: This test is not bogus. As of 2.2, pgpool
2254          * sets Portal->sql_string to NULL for SQL command PREPARE.
2255          * Usually this is ok, since in most cases SQL command EXECUTE
2256          * follows anyway. Problem is, some applications mix PREPARE with
2257          * extended protocol command "EXECUTE" and so on. Execute() seems
2258          * to think this never happens but it is not real. Someday we
2259          * should extract actual query string from PrepareStmt->query and
2260          * set it to Portal->sql_string.
2261          */
2262         if (sql == NULL)
2263                 return 0;
2264
2265         if (pool_config->ignore_leading_white_space)
2266         {
2267                 /* ignore leading white spaces */
2268                 while (*sql && isspace(*sql))
2269                         sql++;
2270         }
2271
2272         if (IsA(node, SelectStmt) || IsA(node, DeclareCursorStmt))
2273         {
2274                 SelectStmt *select_stmt;
2275
2276                 if (IsA(node, SelectStmt))
2277                          select_stmt = (SelectStmt *)node;
2278                 else
2279                         select_stmt = (SelectStmt *)((DeclareCursorStmt *)node)->query;
2280
2281                 if (select_stmt->intoClause || select_stmt->lockingClause)
2282                         return 0;
2283
2284                 if (IsA(node, SelectStmt))
2285                         return (*sql == 's' || *sql == 'S' || *sql == '(');
2286                 else
2287                         return (*sql == 'd' || *sql == 'D');
2288         }
2289         else if (IsA(node, FetchStmt) || IsA(node, ClosePortalStmt))
2290         {
2291                 return (*sql == 'f' || *sql == 'F' || *sql == 'c' || *sql == 'C');
2292         }
2293         else if (IsA(node, CopyStmt))
2294         {
2295                 CopyStmt *copy_stmt = (CopyStmt *)node;
2296                 return (copy_stmt->is_from == FALSE &&
2297                                 copy_stmt->filename == NULL);
2298         }
2299         return 0;
2300 }
2301
2302 /*
2303  * returns non 0 if SQL is SELECT statement including nextval() or
2304  * setval() call
2305  */
2306 int is_sequence_query(Node *node)
2307 {
2308         SelectStmt *select_stmt;
2309         ListCell *lc;
2310
2311         if (node == NULL || !IsA(node, SelectStmt))
2312                 return 0;
2313
2314         select_stmt = (SelectStmt *)node;
2315         foreach (lc, select_stmt->targetList)
2316         {
2317                 if (IsA(lfirst(lc), ResTarget))
2318                 {
2319                         ResTarget *t;
2320                         FuncCall *fc;
2321                         ListCell *c;
2322
2323                         t = (ResTarget *) lfirst(lc);
2324                         if (IsA(t->val, FuncCall))
2325                         {
2326                                 fc = (FuncCall *) t->val;
2327                                 foreach (c, fc->funcname)
2328                                 {
2329                                         Value *v = lfirst(c);
2330                                         if (strncasecmp(v->val.str, "NEXTVAL", 7) == 0)
2331                                                 return 1;
2332                                         else if (strncasecmp(v->val.str, "SETVAL", 6) == 0)
2333                                                 return 1;
2334                                 }
2335                         }
2336                 }
2337         }
2338
2339         return 0;
2340 }
2341
2342 /*
2343  * returns non 0 if SQL is transaction starting command (START
2344  * TRANSACTION or BEGIN)
2345  */
2346 int is_start_transaction_query(Node *node)
2347 {
2348         TransactionStmt *stmt;
2349
2350         if (node == NULL || !IsA(node, TransactionStmt))
2351                 return 0;
2352
2353         stmt = (TransactionStmt *)node;
2354         return stmt->kind == TRANS_STMT_START || stmt->kind == TRANS_STMT_BEGIN;
2355 }
2356
2357 /*
2358  * returns non 0 if SQL is transaction commit or abort command (END
2359  * TRANSACTION or ROLLBACK or ABORT)
2360  */
2361 int is_commit_query(Node *node)
2362 {
2363         TransactionStmt *stmt;
2364
2365         if (node == NULL || !IsA(node, TransactionStmt))
2366                 return 0;
2367
2368         stmt = (TransactionStmt *)node;
2369         return stmt->kind == TRANS_STMT_COMMIT || stmt->kind == TRANS_STMT_ROLLBACK;
2370 }
2371
2372 /*
2373  * start load balance mode
2374  */
2375 void start_load_balance(POOL_CONNECTION_POOL *backend)
2376 {
2377 #ifdef NOT_USED
2378         double total_weight,r;
2379         int i;
2380
2381         /* save backend connection slots */
2382         for (i=0;i<NUM_BACKENDS;i++)
2383         {
2384                 if (VALID_BACKEND(i))
2385                 {
2386                         slots[i] = CONNECTION_SLOT(backend, i);
2387                 }
2388         }
2389 #endif
2390
2391         /* temporarily turn off replication mode */
2392         if (REPLICATION)
2393                 replication_was_enabled = 1;
2394         if (MASTER_SLAVE)
2395                 master_slave_was_enabled = 1;
2396
2397         REPLICATION = 0;
2398         MASTER_SLAVE = 0;
2399
2400 #ifdef NOTUSED
2401         backend->slots[0] = slots[selected_slot];
2402 #endif
2403         LOAD_BALANCE_STATUS(backend->info->load_balancing_node) = LOAD_SELECTED;
2404         selected_slot = backend->info->load_balancing_node;
2405
2406         /* start load balancing */
2407         in_load_balance = 1;
2408 }
2409
2410 /*
2411  * finish load balance mode
2412  */
2413 void end_load_balance(POOL_CONNECTION_POOL *backend)
2414 {
2415         in_load_balance = 0;
2416         LOAD_BALANCE_STATUS(selected_slot) = LOAD_UNSELECTED;
2417
2418 #ifdef NOT_USED
2419         /* restore backend connection slots */
2420
2421         for (i=0;i<NUM_BACKENDS;i++)
2422         {
2423                 if (VALID_BACKEND(i))
2424                 {
2425                         CONNECTION_SLOT(backend, i) = slots[i];
2426                 }
2427         }
2428 #endif
2429
2430         /* turn on replication mode */
2431         REPLICATION = replication_was_enabled;
2432         MASTER_SLAVE = master_slave_was_enabled;
2433
2434         replication_was_enabled = 0;
2435         master_slave_was_enabled = 0;
2436
2437         pool_debug("end_load_balance: end load balance mode");
2438 }
2439
2440 /*
2441  * send error message to frontend
2442  */
2443 void pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor,
2444                                                          char *code,
2445                                                          char *message,
2446                                                          char *detail,
2447                                                          char *hint,
2448                                                          char *file,
2449                                                          int line)
2450 {
2451         pool_send_severity_message(frontend, protoMajor, code, message, detail, hint, file, "ERROR", line);
2452 }
2453
2454 /*
2455  * send fatal message to frontend
2456  */
2457 void pool_send_fatal_message(POOL_CONNECTION *frontend, int protoMajor,
2458                                                          char *code,
2459                                                          char *message,
2460                                                          char *detail,
2461                                                          char *hint,
2462                                                          char *file,
2463                                                          int line)
2464 {
2465         pool_send_severity_message(frontend, protoMajor, code, message, detail, hint, file, "FATAL", line);
2466 }
2467
2468 /*
2469  * send severity message to frontend
2470  */
2471 void pool_send_severity_message(POOL_CONNECTION *frontend, int protoMajor,
2472                                                          char *code,
2473                                                          char *message,
2474                                                          char *detail,
2475                                                          char *hint,
2476                                                          char *file,
2477                                                          char *severity,
2478                                                          int line)
2479 {
2480 /*
2481  * Buffer length for each message part
2482  */
2483 #define MAXMSGBUF 256
2484 /*
2485  * Buffer length for result message buffer.
2486  * Since msg is consisted of 7 parts, msg buffer should be large
2487  * enough to hold those message parts
2488 */
2489 #define MAXDATA (MAXMSGBUF+1)*7+1
2490
2491         pool_set_nonblock(frontend->fd);
2492
2493         if (protoMajor == PROTO_MAJOR_V2)
2494         {
2495                 pool_write(frontend, "E", 1);
2496                 pool_write_and_flush(frontend, message, strlen(message)+1);
2497         }
2498         else if (protoMajor == PROTO_MAJOR_V3)
2499         {
2500                 char data[MAXDATA];
2501                 char msgbuf[MAXMSGBUF];
2502                 int len;
2503                 int thislen;
2504                 int sendlen;
2505
2506                 len = 0;
2507
2508                 pool_write(frontend, "E", 1);
2509
2510                 /* error level */
2511                 thislen = snprintf(msgbuf, MAXMSGBUF, "S%s", severity);
2512                 thislen = Min(thislen, MAXMSGBUF);
2513                 memcpy(data +len, msgbuf, thislen+1);
2514                 len += thislen + 1;
2515
2516                 /* code */
2517                 thislen = snprintf(msgbuf, MAXMSGBUF, "C%s", code);
2518                 thislen = Min(thislen, MAXMSGBUF);
2519                 memcpy(data +len, msgbuf, thislen+1);
2520                 len += thislen + 1;
2521
2522                 /* message */
2523                 thislen = snprintf(msgbuf, MAXMSGBUF, "M%s", message);
2524                 thislen = Min(thislen, MAXMSGBUF);
2525                 memcpy(data +len, msgbuf, thislen+1);
2526                 len += thislen + 1;
2527
2528                 /* detail */
2529                 if (*detail != '\0')
2530                 {
2531                         thislen = snprintf(msgbuf, MAXMSGBUF, "D%s", detail);
2532                         thislen = Min(thislen, MAXMSGBUF);
2533                         memcpy(data +len, msgbuf, thislen+1);
2534                         len += thislen + 1;
2535                 }
2536
2537                 /* hint */
2538                 if (*hint != '\0')
2539                 {
2540                         thislen = snprintf(msgbuf, MAXMSGBUF, "H%s", hint);
2541                         thislen = Min(thislen, MAXMSGBUF);
2542                         memcpy(data +len, msgbuf, thislen+1);
2543                         len += thislen + 1;
2544                 }
2545
2546                 /* file */
2547                 thislen = snprintf(msgbuf, MAXMSGBUF, "F%s", file);
2548                 thislen = Min(thislen, MAXMSGBUF);
2549                 memcpy(data +len, msgbuf, thislen+1);
2550                 len += thislen + 1;
2551
2552                 /* line */
2553                 thislen = snprintf(msgbuf, MAXMSGBUF, "L%d", line);
2554                 thislen = Min(thislen, MAXMSGBUF);
2555                 memcpy(data +len, msgbuf, thislen+1);
2556                 len += thislen + 1;
2557
2558                 /* stop null */
2559                 len++;
2560                 *(data + len - 1) = '\0';
2561
2562                 sendlen = len;
2563                 len = htonl(len + 4);
2564                 pool_write(frontend, &len, sizeof(len));
2565                 pool_write_and_flush(frontend, data, sendlen);
2566         }
2567         else
2568                 pool_error("send_error_message: unknown protocol major %d", protoMajor);
2569
2570         pool_unset_nonblock(frontend->fd);
2571 }
2572
2573 void pool_send_readyforquery(POOL_CONNECTION *frontend)
2574 {
2575         int len;
2576         pool_write(frontend, "Z", 1);
2577         len = 5;
2578         len = htonl(len);
2579         pool_write(frontend, &len, sizeof(len));
2580         pool_write(frontend, "I", 1);
2581         pool_flush(frontend);
2582 }
2583
2584 /*
2585  * Send a query to a backend in sync manner.
2586  * This function sends a query and waits for CommandComplete/ReadyForQuery.
2587  * If an error occured, it returns with POOL_ERROR.
2588  * This function does NOT handle SELECT/SHOW queries.
2589  * If no_ready_for_query is non 0, returns without reading the packet
2590  * length for ReadyForQuery. This mode is necessary when called from ReadyForQuery().
2591  */
2592 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend,
2593                                                           char *query, int protoMajor, int pid, int key, int no_ready_for_query)
2594 {
2595         int len;
2596         int status;
2597         char kind;
2598         char *string;
2599         int deadlock_detected = 0;
2600
2601         pool_debug("do_command: Query: %s", query);
2602
2603         /* send the query to the backend */
2604         if (send_simplequery_message(backend, strlen(query)+1, query, protoMajor) != POOL_CONTINUE)
2605                 return POOL_END;
2606
2607         /*
2608          * Wait for response from badckend while polling frontend connection is ok.
2609          * If not, cancel the transaction.
2610          */
2611         if (wait_for_query_response(frontend, backend, query, protoMajor) != POOL_CONTINUE)
2612         {
2613                 /* Cancel current transaction */
2614                 CancelPacket cancel_packet;
2615
2616                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
2617                 cancel_packet.pid = pid;
2618                 cancel_packet.key= key;
2619                 cancel_request(&cancel_packet);
2620                 return POOL_END;
2621         }
2622
2623         /*
2624          * We must check deadlock error here. If a deadlock error is
2625          * detected by a backend, other backend might not be noticed the
2626          * error.  In this case caller should send an error query to the
2627          * backend to abort the transaction. Otherwise the transaction
2628          * state might vary among backends(idle in transaction vs. abort).
2629          */
2630         deadlock_detected = detect_deadlock_error(backend, protoMajor);
2631         if (deadlock_detected < 0)
2632                 return POOL_END;
2633
2634         /*
2635          * Continue to read packets until we get ReadForQuery (Z).
2636          * Until that we may recieve one of:
2637          *
2638          * N: Notice response
2639          * E: Error response
2640          * C: Comand complete
2641          *
2642          * XXX: we ignore Notice and Error here. Even notice/error
2643          * messages are not sent to the frontend. May be it's ok since the
2644          * error was caused by our internal use of SQL command (otherwise users
2645          * will be confused).
2646          */
2647         for(;;)
2648         {
2649                 status = pool_read(backend, &kind, sizeof(kind));
2650                 if (status < 0)
2651                 {
2652                         pool_error("do_command: error while reading message kind");
2653                         return POOL_END;
2654                 }
2655
2656                 pool_debug("do_command: kind: %c", kind);
2657
2658                 if (kind == 'Z')                /* Ready for Query? */
2659                         break;          /* get out the loop without reading message lenghth */
2660
2661                 if (protoMajor == PROTO_MAJOR_V3)
2662                 {
2663                         if (pool_read(backend, &len, sizeof(len)) < 0)
2664                         {
2665                                 pool_error("do_command: error while reading message length");
2666                                 return POOL_END;
2667                         }
2668                         len = ntohl(len) - 4;
2669                         
2670                         if (kind != 'N' && kind != 'E' && kind != 'C')
2671                         {
2672                                 pool_error("do_command: error, kind is not N, E or C(%02x)", kind);
2673                                 return POOL_END;
2674                         }
2675                         string = pool_read2(backend, len);
2676                         if (string == NULL)
2677                         {
2678                                 pool_error("do_command: error while reading rest of message");
2679                                 return POOL_END;
2680                         }
2681                 }
2682                 else
2683                 {
2684                         string = pool_read_string(backend, &len, 0);
2685                         if (string == NULL)
2686                         {
2687                                 pool_error("do_command: error while reading rest of message");
2688                                 return POOL_END;
2689                         }
2690                 }
2691         }
2692
2693 /*
2694  * until 2008/11/12 we believed that we never had packets other than
2695  * 'Z' after receiving 'C'. However a counter example was presented by
2696  * a poor customer. So we replaced the whole thing with codes
2697  * above. In a side effect we were be able to get ride of nasty
2698  * "goto". Congratulations.
2699  */
2700 #ifdef NOT_USED
2701         /*
2702          * Expecting CompleteCommand
2703          */
2704 retry_read_packet:
2705         status = pool_read(backend, &kind, sizeof(kind));
2706         if (status < 0)
2707         {
2708                 pool_error("do_command: error while reading message kind");
2709                 return POOL_END;
2710         }
2711
2712         if (kind == 'E')
2713         {
2714                 pool_log("do_command: backend does not successfully complete command %s status %c", query, kind);
2715         }
2716
2717         /*
2718          * read command tag of CommandComplete response
2719          */
2720         if (protoMajor == PROTO_MAJOR_V3)
2721         {
2722                 if (pool_read(backend, &len, sizeof(len)) < 0)
2723                         return POOL_END;
2724                 len = ntohl(len) - 4;
2725                 string = pool_read2(backend, len);
2726                 if (string == NULL)
2727                         return POOL_END;
2728                 pool_debug("command tag: %s", string);
2729         }
2730         else
2731         {
2732                 string = pool_read_string(backend, &len, 0);
2733                 if (string == NULL)
2734                         return POOL_END;
2735         }
2736
2737         if (kind == 'N') /* warning? */
2738                 goto retry_read_packet;
2739
2740         /*
2741          * Expecting ReadyForQuery
2742          */
2743         status = pool_read(backend, &kind, sizeof(kind));
2744         if (status < 0)
2745         {
2746                 pool_error("do_command: error while reading message kind");
2747                 return POOL_END;
2748         }
2749
2750         if (kind != 'Z')
2751         {
2752                 pool_error("do_command: backend returns %c while expecting ReadyForQuery", kind);
2753                 return POOL_END;
2754         }
2755 #endif
2756
2757         if (no_ready_for_query)
2758                 return POOL_CONTINUE;
2759
2760         if (protoMajor == PROTO_MAJOR_V3)
2761         {
2762                 /* read packet lenghth for ready for query */
2763                 if (pool_read(backend, &len, sizeof(len)) < 0)
2764                 {
2765                         pool_error("do_command: error while reading message length");
2766                         return POOL_END;
2767                 }
2768
2769                 /* read transaction state */
2770                 status = pool_read(backend, &kind, sizeof(kind));
2771                 if (status < 0)
2772                 {
2773                         pool_error("do_command: error while reading transaction status");
2774                         return POOL_END;
2775                 }
2776
2777                 /* set transaction state */
2778                 pool_debug("do_command: transaction state: %c", kind);
2779                 backend->tstate = kind;
2780         }
2781
2782         return deadlock_detected ? POOL_DEADLOCK : POOL_CONTINUE;
2783 }
2784
2785 /*
2786  * Send a syntax error query to abort transaction and receive response
2787  * from backend and discard it until we get Error response.
2788  *
2789  * We need to sync transaction status in transaction block.
2790  * SELECT query is sent to master only.
2791  * If SELECT is error, we must abort transaction on other nodes.
2792  */
2793 POOL_STATUS do_error_command(POOL_CONNECTION *backend, int major)
2794 {
2795         char *error_query = POOL_ERROR_QUERY;
2796         int status, len;
2797         char kind;
2798         char *string;
2799
2800         if (send_simplequery_message(backend, strlen(error_query) + 1, error_query, major) != POOL_CONTINUE)
2801         {
2802                 return POOL_END;
2803         }
2804
2805         /*
2806          * Continue to read packets until we get Error response (E).
2807          * Until that we may recieve one of:
2808          *
2809          * N: Notice response
2810          * C: Comand complete
2811          *
2812          * XXX: we ignore Notice here. Even notice messages are not sent
2813          * to the frontend. May be it's ok since the error was caused by
2814          * our internal use of SQL command (otherwise users will be
2815          * confused).
2816          */
2817         do
2818         {
2819                 status = pool_read(backend, &kind, sizeof(kind));
2820                 if (status < 0)
2821                 {
2822                         pool_error("do_error_command: error while reading message kind");
2823                         return POOL_END;
2824                 }
2825
2826                 pool_debug("do_error_command: kind: %c", kind);
2827
2828                 if (major == PROTO_MAJOR_V3)
2829                 {
2830                         if (pool_read(backend, &len, sizeof(len)) < 0)
2831                         {
2832                                 pool_error("do_error_command: error while reading message length");
2833                                 return POOL_END;
2834                         }
2835                         len = ntohl(len) - 4;
2836                         string = pool_read2(backend, len);
2837                         if (string == NULL)
2838                         {
2839                                 pool_error("do_error_command: error while reading rest of message");
2840                                 return POOL_END;
2841                         }
2842                 }
2843                 else
2844                 {
2845                         string = pool_read_string(backend, &len, 0);
2846                         if (string == NULL)
2847                         {
2848                                 pool_error("do_error_command: error while reading rest of message");
2849                                 return POOL_END;
2850                         }
2851                 }
2852         } while (kind != 'E');
2853
2854 #ifdef NOT_USED
2855         /*
2856          * Expecting ErrorResponse
2857          */
2858         status = pool_read(backend, &kind, sizeof(kind));
2859         if (status < 0)
2860         {
2861                 pool_error("do_command: error while reading message kind");
2862                 return POOL_END;
2863         }
2864
2865         /*
2866          * read command tag of CommandComplete response
2867          */
2868         if (major == PROTO_MAJOR_V3)
2869         {
2870                 if (pool_read(backend, &len, sizeof(len)) < 0)
2871                         return POOL_END;
2872                 len = ntohl(len) - 4;
2873                 string = pool_read2(backend, len);
2874                 if (string == NULL)
2875                         return POOL_END;
2876                 pool_debug("command tag: %s", string);
2877         }
2878         else
2879         {
2880                 string = pool_read_string(backend, &len, 0);
2881                 if (string == NULL)
2882                         return POOL_END;
2883         }
2884 #endif
2885         return POOL_CONTINUE;
2886 }
2887
2888 /*
2889  * Send invalid portal execution to abort transaction.
2890  * We need to sync transaction status in transaction block.
2891  * SELECT query is sent to master only.
2892  * If SELECT is error, we must abort transaction on other nodes.
2893  */
2894 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major)
2895 {
2896         int status;
2897         char kind;
2898         char *string;
2899         char msg[1024] = "pgpoool_error_portal"; /* large enough */
2900         int len = strlen(msg);
2901
2902         memset(msg + len, 0, sizeof(int));
2903         if (send_execute_message(backend, node_id, len + 5, msg))
2904         {
2905                 return POOL_END;
2906         }
2907
2908         /*
2909          * Expecting ErrorResponse
2910          */
2911         status = pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
2912         if (status < 0)
2913         {
2914                 pool_error("do_error_execute_command: error while reading message kind");
2915                 return POOL_END;
2916         }
2917
2918         /*
2919          * read command tag of CommandComplete response
2920          */
2921         if (major == PROTO_MAJOR_V3)
2922         {
2923                 if (pool_read(CONNECTION(backend, node_id), &len, sizeof(len)) < 0)
2924                         return POOL_END;
2925                 len = ntohl(len) - 4;
2926                 string = pool_read2(CONNECTION(backend, node_id), len);
2927                 if (string == NULL)
2928                         return POOL_END;
2929                 pool_debug("command tag: %s", string);
2930         }
2931         else
2932         {
2933                 string = pool_read_string(CONNECTION(backend, node_id), &len, 0);
2934                 if (string == NULL)
2935                         return POOL_END;
2936         }
2937
2938         return POOL_CONTINUE;
2939 }
2940
2941 /*
2942  * Transmit an arbitrary Query to a specific node.
2943  * This function is only used in parallel mode
2944  */
2945 POOL_STATUS OneNode_do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, char *database)
2946 {
2947         int len,sendlen;
2948         int status;
2949         char kind;
2950         bool notice = false;
2951
2952         pool_debug("OneNode_do_command: Query: %s", query);
2953
2954         /* send the query to the backend */
2955         pool_write(backend, "Q", 1);
2956         len = strlen(query)+1;
2957
2958         sendlen = htonl(len + 4);
2959         pool_write(backend, &sendlen, sizeof(sendlen));
2960
2961         if (pool_write_and_flush(backend, query, len) < 0)
2962         {
2963                 return POOL_END;
2964         }
2965
2966         for(;;)
2967         {
2968                 status = pool_read(backend, &kind, sizeof(kind));
2969                 if (status < 0)
2970                 {
2971                         pool_error("OneNode_do_command: error while reading message kind");
2972                         return POOL_END;
2973                 }
2974                 
2975                 if (kind == 'N' && strstr(query,"dblink")) {
2976                         notice = true;
2977                         status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2978                 } else {
2979                         if(notice)
2980                                 status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2981                         else
2982                                 status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
2983                 }
2984                 if (kind == 'C' || kind =='E')
2985                 {
2986                         break;
2987                 }
2988         }
2989         /*
2990          * Expecting ReadyForQuery
2991          *
2992          */
2993         status = pool_read(backend, &kind, sizeof(kind));
2994
2995         if(notice)
2996                                 pool_send_error_message(frontend, 3, "XX000",
2997                                                                                 "pgpool2 sql restriction(notice from dblink)",query,"", 
2998                                                                                 __FILE__,__LINE__);
2999
3000         if (status < 0)
3001         {
3002                 pool_error("OneNode_do_command: error while reading message kind");
3003                 return POOL_END;
3004         }
3005
3006         if (kind != 'Z')
3007         {
3008                 pool_error("OneNode_do_command: backend does not return ReadyForQuery");
3009                 return POOL_END;
3010         }
3011
3012
3013         status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
3014         pool_flush(frontend);
3015
3016                 return status;
3017 }
3018
3019 /*
3020  * Free POOL_SELECT_RESULT object
3021  */
3022 static void free_select_result(POOL_SELECT_RESULT *result)
3023 {
3024         int i;
3025
3026         if (result->nullflags)
3027                 free(result->nullflags);
3028
3029         if (result->data)
3030         {
3031                 for(i=0;i<result->numrows;i++)
3032                 {
3033                         if (result->data[i])
3034                                 free(result->data[i]);
3035                 }
3036                 free(result->data);
3037         }
3038
3039         if (result->rowdesc)
3040         {
3041                 if (result->rowdesc->attrinfo)
3042                 {
3043                         for(i=0;i<result->rowdesc->num_attrs;i++)
3044                         {
3045                                 if (result->rowdesc->attrinfo[i].attrname)
3046                                         free(result->rowdesc->attrinfo[i].attrname);
3047                         }
3048                         free(result->rowdesc->attrinfo);
3049                 }
3050                 free(result->rowdesc);
3051         }
3052 }
3053
3054 /*
3055  * Send a SELECT to one DB node. This function works for V3 only.
3056  */
3057 POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result)
3058 {
3059 #define DO_QUERY_ALLOC_NUM 1024 /* memory allocation unit for POOL_SELECT_RESULT */
3060
3061         int i;
3062         int len;
3063         char kind;
3064         char *packet;
3065         char *p;
3066         short num_fields;
3067         int num_data;
3068         int intval;
3069         short shortval;
3070
3071         POOL_SELECT_RESULT *res;
3072         RowDesc *rowdesc;
3073         AttrInfo *attrinfo;
3074
3075         res = malloc(sizeof(*res));
3076         if (!res)
3077         {
3078                 pool_error("pool_query: malloc failed");
3079                 return POOL_ERROR;
3080         }
3081         rowdesc = malloc(sizeof(*rowdesc));
3082         if (!rowdesc)
3083         {
3084                 pool_error("pool_query: malloc failed");
3085                 return POOL_ERROR;
3086         }
3087         memset(res, 0, sizeof(*res));
3088         memset(rowdesc, 0, sizeof(*rowdesc));
3089         *result = res;
3090
3091         res->rowdesc = rowdesc;
3092
3093         num_data = 0;
3094
3095         res->nullflags = malloc(DO_QUERY_ALLOC_NUM*sizeof(int));
3096         if (!res->nullflags)
3097         {
3098                 pool_error("do_query: malloc failed");
3099                 return POOL_ERROR;
3100         }
3101         res->data = malloc(DO_QUERY_ALLOC_NUM*sizeof(char *));
3102         if (!res->data)
3103         {
3104                 pool_error("do_query: malloc failed");
3105                 return POOL_ERROR;
3106         }
3107
3108         /* send a query to the backend */
3109         if (send_simplequery_message(backend, strlen(query) + 1, query, PROTO_MAJOR_V3) != POOL_CONTINUE)
3110         {
3111                 return POOL_END;
3112         }
3113
3114         /*
3115          * Continue to read packets until we get Ready for command('Z')
3116          *
3117          * XXX: we ignore other than Z here. Even notice messages are not sent
3118          * to the frontend. May be it's ok since the error was caused by
3119          * our internal use of SQL command (otherwise users will be
3120          * confused).
3121          */
3122         for(;;)
3123         {
3124                 if (pool_read(backend, &kind, sizeof(kind)) < 0)
3125                 {
3126                         pool_error("do_query: error while reading message kind");
3127                         return POOL_END;
3128                 }
3129
3130                 pool_debug("do_query: kind: %c", kind);
3131
3132                 if (pool_read(backend, &len, sizeof(len)) < 0)
3133                 {
3134                         pool_error("do_query: error while reading message length");
3135                         return POOL_END;
3136                 }
3137                 len = ntohl(len) - 4;
3138                 packet = pool_read2(backend, len);
3139                 if (packet == NULL)
3140                 {
3141                         pool_error("do_query: error while reading rest of message");
3142                         return POOL_END;
3143                 }
3144
3145                 switch (kind)
3146                 {
3147                         case 'Z':       /* Ready for query */
3148                                 return POOL_CONTINUE;
3149                                 break;
3150
3151                         case 'T':       /* Row Description */
3152                                 p = packet;
3153                                 memcpy(&shortval, p, sizeof(short));
3154                                 num_fields = ntohs(shortval);           /* number of fields */
3155                                 pool_debug("num_fileds: %d", num_fields);
3156
3157                                 if (num_fields > 0)
3158                                 {
3159                                         rowdesc->num_attrs = num_fields;
3160                                         attrinfo = malloc(sizeof(*attrinfo)*num_fields);
3161                                         if (!attrinfo)
3162                                         {
3163                                                 pool_error("do_query: malloc failed");
3164                                                 return POOL_ERROR;
3165                                         }
3166                                         rowdesc->attrinfo = attrinfo;
3167
3168                                         p += sizeof(num_fields);
3169
3170                                         /* extract attribute info */
3171                                         for (i = 0;i<num_fields;i++)
3172                                         {
3173                                                 len = strlen(p) + 1;
3174                                                 attrinfo->attrname = malloc(len);
3175                                                 if (!attrinfo->attrname)
3176                                                 {
3177                                                         pool_error("do_query: malloc failed");
3178                                                         return POOL_ERROR;
3179                                                 }
3180                                                 memcpy(attrinfo->attrname, p, len);
3181                                                 p += len;
3182                                                 memcpy(&intval, p, sizeof(int));
3183                                                 attrinfo->oid = htonl(intval);
3184                                                 p += sizeof(int);
3185                                                 memcpy(&shortval, p, sizeof(short));
3186                                                 attrinfo->attrnumber = htons(shortval);
3187                                                 p += sizeof(short);
3188                                                 memcpy(&intval, p, sizeof(int));
3189                                                 attrinfo->typeoid = htonl(intval);
3190                                                 p += sizeof(int);
3191                                                 memcpy(&shortval, p, sizeof(short));
3192                                                 attrinfo->size = htons(shortval);
3193                                                 p += sizeof(short);
3194                                                 memcpy(&intval, p, sizeof(int));
3195                                                 attrinfo->mod = htonl(intval);
3196                                                 p += sizeof(int);
3197                                                 p += sizeof(short);             /* skip format code since we use "text" anyway */
3198
3199                                                 attrinfo++;
3200                                         }
3201                                 }
3202                                 break;
3203
3204                         case 'D':       /* data row */
3205                                 p = packet;
3206
3207                                 memcpy(&shortval, p, sizeof(short));
3208                                 num_fields = htons(shortval);
3209                                 p += sizeof(short);
3210
3211                                 if (num_fields > 0)
3212                                 {
3213                                         res->numrows++;
3214
3215                                         for (i=0;i<num_fields;i++)
3216                                         {
3217                                                 memcpy(&intval, p, sizeof(int));
3218                                                 len = htonl(intval);
3219                                                 p += sizeof(int);
3220
3221                                                 res->nullflags[num_data] = len;
3222
3223                                                 if (len > 0)    /* NOT NULL? */
3224                                                 {
3225                                                         res->data[num_data] = malloc(len + 1);
3226                                                         if (!res->data[num_data])
3227                                                         {
3228                                                                 pool_error("do_query: malloc failed");
3229                                                                 return POOL_ERROR;
3230                                                         }
3231                                                         memcpy(res->data[num_data], p, len);
3232                                                         *(res->data[num_data] + 1) = '\0';
3233
3234                                                         p += len;
3235                                                 }
3236
3237                                                 num_data++;
3238
3239                                                 if (num_data % DO_QUERY_ALLOC_NUM == 0)
3240                                                 {
3241                                                         res->nullflags = realloc(res->nullflags,
3242                                                                                                          (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(int));
3243                                                         if (!res->nullflags)
3244                                                         {
3245                                                                 pool_error("do_query: malloc failed");
3246                                                                 return POOL_ERROR;
3247                                                         }
3248                                                         res->data = realloc(res->data,
3249                                                                                                 (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(char *));
3250                                                         if (!res->data)
3251                                                         {
3252                                                                 pool_error("do_query: malloc failed");
3253                                                                 return POOL_ERROR;
3254                                                         }
3255                                                 }
3256                                         }
3257                                 }
3258                                 break;
3259
3260                         default:
3261                                 break;
3262                 }
3263
3264         }
3265         return POOL_CONTINUE;
3266 }
3267
3268
3269 /*
3270  * judge if we need to lock the table
3271  * to keep SERIAL consistency among servers
3272  */
3273 int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node)
3274 {
3275 /*
3276  * Query to know if the target table has SERIAL column or not.
3277  * This query is valid through PostgreSQL 7.3 to 8.3.
3278  */
3279 #define NEXTVALQUERY "SELECT count(*) FROM pg_catalog.pg_attrdef AS d, pg_catalog.pg_class AS c WHERE d.adrelid = c.oid AND d.adsrc ~ 'nextval' AND c.relname = '%s'"
3280
3281 #define INSERT_STATEMENT_MAX_CACHE              16
3282 #define MAX_ITEM_LENGTH 1024
3283
3284         /* table lookup cache structure */
3285         typedef struct {
3286                 char dbname[MAX_ITEM_LENGTH];   /* database name */
3287                 char relname[MAX_ITEM_LENGTH];  /* table name */
3288                 int     use_serial;     /* 1: use SERIAL data type */
3289                 int refcnt;             /* reference count */
3290         } MyRelCache;
3291
3292         static MyRelCache relcache[INSERT_STATEMENT_MAX_CACHE];
3293
3294         int i;
3295         char *str;
3296         char *rel;
3297         int use_serial = 0;
3298         char *dbname;
3299
3300         /*
3301          * for version 2 protocol, we cannot check if it's actually uses
3302          * SERIAL data types or not since the underlying infrastructure
3303          * (do_query) does not support the protocol. So we just return
3304          * false.
3305          */
3306         if (MAJOR(backend) == PROTO_MAJOR_V2)
3307                 return 0;
3308
3309         /* INSERT statement? */
3310         if (!IsA(node, InsertStmt))
3311                 return 0;
3312
3313         /* need to ignore leading white spaces? */
3314         if (pool_config->ignore_leading_white_space)
3315         {
3316                 /* ignore leading white spaces */
3317                 while (*query && isspace(*query))
3318                         query++;
3319         }
3320
3321         /* is there "NO_LOCK" comment? */
3322         if (strncasecmp(query, NO_LOCK_COMMENT, NO_LOCK_COMMENT_SZ) == 0)
3323                 return 0;
3324
3325         /* is there "LOCK" comment? */
3326         if (strncasecmp(query, LOCK_COMMENT, LOCK_COMMENT_SZ) == 0)
3327                 return 1;
3328
3329         if (pool_config->insert_lock == 0)      /* insert_lock is specified? */
3330                 return 0;
3331
3332         /*
3333          * if insert_lock is true, then check if the table actually uses
3334          * SERIAL data type
3335          */
3336
3337         /* obtain table name */
3338         str = get_insert_command_table_name((InsertStmt *)node);
3339         if (str == NULL)
3340         {
3341                 pool_error("need_insert_lock: get_insert_command_table_name failed");
3342                 return 0;
3343         }
3344
3345         /* eliminate double quotes */
3346         rel = malloc(strlen(str)+1);
3347         if (!rel)
3348         {
3349                 pool_error("need_insert_lock: malloc failed");
3350                 return 0;
3351         }
3352         for(i=0;*str;str++)
3353         {
3354                 if (*str != '"')
3355                         rel[i++] = *str;
3356         }
3357         rel[i] = '\0';
3358
3359         /* obtain database name */
3360         dbname = MASTER_CONNECTION(backend)->sp->database;
3361
3362         /* look for cache first */
3363         for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3364         {
3365                 if (strcasecmp(relcache[i].dbname, dbname) == 0 &&
3366                         strcasecmp(relcache[i].relname, rel) == 0)
3367                 {
3368                         relcache[i].refcnt++;
3369                         use_serial = relcache[i].use_serial;
3370                         break;
3371                 }
3372         }
3373
3374         if (i == INSERT_STATEMENT_MAX_CACHE)            /* not in cache? */
3375         {
3376                 char qbuf[1024];
3377                 int maxrefcnt = INT_MAX;
3378                 POOL_SELECT_RESULT *res = NULL;
3379                 int index = 0;
3380
3381                 snprintf(qbuf, sizeof(qbuf), NEXTVALQUERY, rel);
3382
3383                 /* check the system catalog if the table has SERIAL data type */
3384                 if (do_query(MASTER(backend), qbuf, &res) != POOL_CONTINUE)
3385                 {
3386                         pool_error("need_insert_lock: do_query failed");
3387                         if (res)
3388                                 free_select_result(res);
3389                         return 0;
3390                 }
3391
3392                 /*
3393                  * if the query returns some rows and found nextval() is used,
3394                  * then we assume it uses SERIAL data type
3395                  */
3396                 if (res->numrows >= 1 && strcmp(res->data[0], "0"))
3397                         use_serial = 1;
3398
3399                 free_select_result(res);
3400
3401                 for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3402                 {
3403                         if (relcache[i].refcnt == 0)
3404                         {
3405                                 index = i;
3406                                 break;
3407                         }
3408                         else if (relcache[i].refcnt < maxrefcnt)
3409                         {
3410                                 maxrefcnt = relcache[i].refcnt;
3411                                 index = i;
3412                         }
3413                 }
3414
3415                 /* register cache */
3416                 strncpy(relcache[index].dbname, dbname, MAX_ITEM_LENGTH);
3417                 strncpy(relcache[index].relname, rel, MAX_ITEM_LENGTH);
3418                 relcache[index].use_serial = use_serial;
3419                 relcache[index].refcnt++;
3420         }
3421         return use_serial;
3422 }
3423
3424 /*
3425  * if a transaction has not already started, start a new one.
3426  * issue LOCK TABLE IN SHARE ROW EXCLUSIVE MODE
3427  */
3428 POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node)
3429 {
3430         char *table;
3431         char qbuf[1024];
3432         POOL_STATUS status;
3433         int i, deadlock_detected = 0;
3434
3435         /* insert_lock can be used in V3 only */
3436         if (MAJOR(backend) != PROTO_MAJOR_V3)
3437                 return POOL_CONTINUE;
3438
3439         /* get table name */
3440         table = get_insert_command_table_name(node);
3441
3442         /* could not get table name. probably wrong SQL command */
3443         if (table == NULL)
3444         {
3445                 return POOL_CONTINUE;
3446         }
3447
3448         /* issue lock table command */
3449         snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
3450
3451         status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid,
3452                                                 MASTER_CONNECTION(backend)->key, 0);
3453         if (status == POOL_END)
3454         {
3455                 internal_transaction_started = 0;
3456                 return POOL_END;
3457         }
3458         else if (status == POOL_DEADLOCK)
3459                 deadlock_detected = 1;
3460
3461         for (i=0;i<NUM_BACKENDS;i++)
3462         {
3463                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
3464                 {
3465                         if (deadlock_detected)
3466                                 status = do_command(frontend, CONNECTION(backend, i), POOL_ERROR_QUERY, PROTO_MAJOR_V3,
3467                                                                         MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3468                         else
3469                                 status = do_command(frontend, CONNECTION(backend, i), qbuf, PROTO_MAJOR_V3, 
3470                                                                         MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3471
3472                         if (status != POOL_CONTINUE)
3473                         {
3474                                 internal_transaction_started = 0;
3475                                 return POOL_END;
3476                         }
3477                 }
3478         }
3479
3480         return POOL_CONTINUE;
3481 }
3482
3483 bool is_partition_table(POOL_CONNECTION_POOL *backend, Node *node)
3484 {
3485         DistDefInfo *info = NULL;
3486         RangeVar *var = NULL;;
3487
3488         if (IsA(node, UpdateStmt))
3489         {
3490                 UpdateStmt *update = (UpdateStmt*) node;
3491
3492                 if(!IsA(update->relation,RangeVar))
3493                         return false;
3494
3495                 var = (RangeVar *) update->relation;
3496         }
3497         else if (IsA(node, DeleteStmt))
3498         {
3499                 DeleteStmt *delete = (DeleteStmt*) node;
3500
3501                 if(!IsA(delete->relation,RangeVar))
3502                         return false;
3503
3504                 var = (RangeVar *) delete->relation;
3505         } else
3506                 return false;
3507
3508         info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
3509                                                                           var->schemaname,
3510                                                                           var->relname);
3511         if(info)
3512                 return true;
3513         else
3514                 return false;
3515 }
3516
3517 /*
3518  * obtain table name in INSERT statement
3519  */
3520 static char *get_insert_command_table_name(InsertStmt *node)
3521 {
3522         char *table = nodeToString(node->relation);
3523
3524         pool_debug("get_insert_command_table_name: extracted table name: %s", table);
3525         return table;
3526 }
3527
3528 /* judge if this is a DROP DATABASE command */
3529 int is_drop_database(Node *node)
3530 {
3531         return (IsA(node, DropdbStmt)) ? 1 : 0;
3532 }
3533
3534 /*
3535  * check if any pending data remains.  Also if there's some pending data in
3536  * frontend AND no processing any Query, then returns 0.
3537  * XXX: is this correct thing?
3538 */
3539 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
3540 {
3541         int i;
3542
3543         if (frontend->len > 0 && !in_progress)
3544                 return 0;
3545
3546         for (i=0;i<NUM_BACKENDS;i++)
3547         {
3548                 if (!VALID_BACKEND(i))
3549                         continue;
3550
3551                 if (CONNECTION(backend, i)->len > 0)
3552                         return 0;
3553         }
3554
3555         return 1;
3556 }
3557
3558 /*
3559  * check if query is needed to wait completion
3560  */
3561 int is_strict_query(Node *node)
3562 {
3563         switch (node->type)
3564         {
3565                 case T_SelectStmt:
3566                 {
3567                         SelectStmt *stmt = (SelectStmt *)node;
3568                         return (stmt->intoClause || stmt->lockingClause) ? 1 : 0;
3569                 }
3570
3571                 case T_UpdateStmt:
3572                 case T_InsertStmt:
3573                 case T_DeleteStmt:
3574                 case T_LockStmt:
3575                         return 1;
3576
3577                 default:
3578                         return 0;
3579         }
3580
3581         return 0;
3582 }
3583
3584 int check_copy_from_stdin(Node *node)
3585 {
3586         if (copy_schema)
3587                 free(copy_schema);
3588         if (copy_table)
3589                 free(copy_table);
3590         if (copy_null)
3591                 free(copy_null);
3592
3593         copy_schema = copy_table = copy_null = NULL;
3594
3595         if (IsA(node, CopyStmt))
3596         {
3597                 CopyStmt *stmt = (CopyStmt *)node;
3598                 if (stmt->is_from == TRUE && stmt->filename == NULL)
3599                 {
3600                         RangeVar *relation = (RangeVar *)stmt->relation;
3601                         ListCell *lc;
3602
3603                         /* query is COPY FROM STDIN */
3604                         if (relation->schemaname)
3605                                 copy_schema = strdup(relation->schemaname);
3606                         else
3607                                 copy_schema = strdup("public");
3608                         copy_table = strdup(relation->relname);
3609
3610                         copy_delimiter = '\t'; /* default delimiter */
3611                         copy_null = strdup("\\N"); /* default null string */
3612
3613                         /* look up delimiter and null string. */
3614                         foreach (lc, stmt->options)
3615                         {
3616                                 DefElem *elem = lfirst(lc);
3617                                 Value *v;
3618
3619                                 if (strcmp(elem->defname, "delimiter") == 0)
3620                                 {
3621                                         v = (Value *)elem->arg;
3622                                         copy_delimiter = v->val.str[0];
3623                                 }
3624                                 else if (strcmp(elem->defname, "null") == 0)
3625                                 {
3626                                         if (copy_null)
3627                                                 free(copy_null);
3628                                         v = (Value *)elem->arg;
3629                                         copy_null = strdup(v->val.str);
3630                                 }
3631                         }
3632                 }
3633                 return 1;
3634         }
3635
3636         return 0;
3637 }
3638
3639 /*
3640  * read kind from one backend
3641  */
3642 POOL_STATUS read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node)
3643 {
3644         if (VALID_BACKEND(node))
3645         {
3646                 char k;
3647                 if (pool_read(CONNECTION(backend, node), &k, 1) < 0)
3648                 {
3649                         pool_error("read_kind_from_one_backend: failed to read kind from %d th backend", node);
3650                         return POOL_ERROR;
3651                 }
3652
3653                 pool_debug("read_kind_from_one_backend: read kind from %d th backend %c", node, k);
3654
3655                 *kind = k;
3656                 return POOL_CONTINUE;
3657         }
3658         else
3659         {
3660                 pool_error("read_kind_from_one_backend: %d th backend is not valid", node);
3661                 return POOL_ERROR;
3662         }
3663 }
3664
3665 /*
3666  * read_kind_from_backend: read kind from backends.
3667  * the "frontend" parameter is used to send "kind mismatch" error message to the frontend.
3668  * the out parameter "decided_kind" is the packet kind decided by this function.
3669  * this function uses "decide by majority" method if kinds from all backends do not agree.
3670  */
3671 POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *decided_kind)
3672 {
3673         int i;
3674         unsigned char kind_list[MAX_NUM_BACKENDS];      /* records each backend's kind */
3675         unsigned char kind_map[256]; /* records which kind gets majority.
3676                                                                   *     256 is the number of distinct values expressed by unsigned char
3677                                                                  */
3678         unsigned char kind;
3679         int trust_kind; /* decided kind */
3680         int max_kind = 0;
3681         double max_count = 0;
3682         int degenerate_node_num = 0;            /* number of backends degeneration requested */
3683         int degenerate_node[MAX_NUM_BACKENDS];          /* degeneration requested backend list */
3684
3685         memset(kind_map, 0, sizeof(kind_map));
3686
3687         for (i=0;i<NUM_BACKENDS;i++)
3688         {
3689                 /* initialize degenerate record */
3690                 degenerate_node[i] = 0;
3691
3692                 if (VALID_BACKEND(i))
3693                 {
3694                         do
3695                         {
3696                                 char *p, *value;
3697                                 int len;
3698
3699                                 if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
3700                                 {
3701                                         pool_error("read_kind_from_backend: failed to read kind from %d th backend", i);
3702                                         return POOL_ERROR;
3703                                 }
3704
3705                                 /*
3706                                  * Read and discard parameter status
3707                                  */
3708                                 if (kind != 'S')
3709                                 {
3710                                         break;
3711                                 }
3712
3713                                 if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
3714                                 {
3715                                         pool_error("read_kind_from_backend: failed to read parameter status packet length from %d th backend", i);
3716                                         return POOL_ERROR;
3717                                 }
3718                                 len = htonl(len) - 4;
3719                                 p = pool_read2(CONNECTION(backend, i), len);
3720                                 if (p == NULL)
3721                                 {
3722                                         pool_error("read_kind_from_backend: failed to read parameter status packet from %d th backend", i);
3723                                 }
3724                                 value = p + strlen(p) + 1;
3725                                 pool_debug("read_kind_from_backend: parameter name: %s value: %s", p, value);
3726                         } while (kind == 'S');
3727
3728                         kind_list[i] = kind;
3729
3730                         pool_debug("read_kind_from_backend: read kind from %d th backend %c NUM_BACKENDS: %d", i, kind_list[i], NUM_BACKENDS);
3731
3732                         kind_map[kind]++;
3733
3734                         if (kind_map[kind] > max_count)
3735                         {
3736                                 max_kind = kind_list[i];
3737                                 max_count = kind_map[kind];
3738                         }
3739                 }
3740                 else
3741                         kind_list[i] = 0;
3742         }
3743
3744 #ifdef NOT_USED
3745         /* register kind map */
3746         for (i = 0; i < NUM_BACKENDS; i++)
3747         {
3748                 /* initialize degenerate record */
3749                 degenerate_node[i] = 0;
3750
3751                 /* kind is signed char.
3752                  * We must check negative number.
3753                  */
3754                 int id = kind_list[i] + 128;
3755
3756                 if (kind_list[i] == -1)
3757                         continue;
3758
3759                 kind_map[id]++;
3760                 if (kind_map[id] > max_count)
3761                 {
3762                         max_kind = kind_list[i];
3763                         max_count = kind_map[id];
3764                 }
3765         }
3766 #endif
3767
3768         if (max_count != NUM_BACKENDS)
3769         {
3770                 /*
3771                  * not all backends agree with kind. We need to do "decide by majority"
3772                  */
3773
3774                 if (max_count <= NUM_BACKENDS / 2.0)
3775                 {
3776                         /* no one gets majority. We trust master node's kind */
3777                         trust_kind = kind_list[MASTER_NODE_ID];
3778                 }
3779                 else /* max_count > NUM_BACKENDS / 2.0 */
3780                 {
3781                         /* trust majority's kind */
3782                         trust_kind = max_kind;
3783                 }
3784
3785                 for (i = 0; i < NUM_BACKENDS; i++)
3786                 {
3787                         if (kind_list[i] != 0 && trust_kind != kind_list[i])
3788                         {
3789                                 /* degenerate */
3790                                 pool_error("read_kind_from_backend: %d th kind %c does not match with master or majority connection kind %c",
3791                                                    i, kind_list[i], trust_kind);
3792                                 degenerate_node[degenerate_node_num++] = i;
3793                         }
3794                 }
3795         }
3796         else
3797                 trust_kind = kind_list[MASTER_NODE_ID];
3798
3799         *decided_kind = trust_kind;
3800
3801         if (degenerate_node_num)
3802         {
3803                 String *msg = init_string("kind mismatch among backends. ");
3804
3805                 string_append_char(msg, "Possible last query was: \"");
3806                 string_append_char(msg, query_string_buffer);
3807                 string_append_char(msg, "\" kind details are:");
3808
3809                 for (i=0;i<NUM_BACKENDS;i++)
3810                 {
3811                         char buf[32];
3812
3813                         if (kind_list[i])
3814                         {
3815                                 snprintf(buf, sizeof(buf), " %d[%c]", i, kind_list[i]);
3816                                 string_append_char(msg, buf);
3817                         }
3818                 }
3819
3820                 pool_send_error_message(frontend, MAJOR(backend), "XX000",
3821                                                                 msg->data, "",
3822                                                                 "check data consistency among db nodes",
3823                                                                 __FILE__, __LINE__);
3824                 pool_error(msg->data);
3825
3826                 free_string(msg);
3827
3828                 if (pool_config->replication_stop_on_mismatch)
3829                 {
3830                         degenerate_backend_set(degenerate_node, degenerate_node_num);
3831                         child_exit(1);
3832                 }
3833                 else
3834                         return POOL_ERROR;
3835         }
3836
3837         return POOL_CONTINUE;
3838 }
3839
3840 /*
3841  * Create portal object
3842  * Return object is allocated from heap memory.
3843  */
3844 Portal *create_portal(void)
3845 {
3846         Portal *p;
3847
3848         if ((p = malloc(sizeof(Portal))) == NULL)
3849                 return NULL;
3850
3851         p->prepare_ctxt = pool_memory_create(PREPARE_BLOCK_SIZE);
3852         if (p->prepare_ctxt == NULL)
3853         {
3854                 free(p);
3855                 return NULL;
3856         }
3857         return p;
3858 }
3859
3860 void init_prepared_list(void)
3861 {
3862         prepared_list.cnt = 0;
3863         prepared_list.size = INIT_STATEMENT_LIST_SIZE;
3864         prepared_list.portal_list = malloc(sizeof(Portal *) * prepared_list.size);
3865         if (prepared_list.portal_list == NULL)
3866         {
3867                 pool_error("init_prepared_list: malloc failed: %s", strerror(errno));
3868                 exit(1);
3869         }
3870 }
3871
3872 void add_prepared_list(PreparedStatementList *p, Portal *portal)
3873 {
3874         if (p->cnt == p->size)
3875         {
3876                 p->size *= 2;
3877                 p->portal_list = realloc(p->portal_list, sizeof(Portal *) * p->size);
3878                 if (p->portal_list == NULL)
3879                 {
3880                         pool_error("add_prepared_list: realloc failed: %s", strerror(errno));
3881                         exit(1);
3882                 }
3883         }
3884         p->portal_list[p->cnt++] = portal;
3885 }
3886
3887 void add_unnamed_portal(PreparedStatementList *p, Portal *portal)
3888 {
3889         if (unnamed_statement)
3890         {
3891                 pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3892                 free(unnamed_statement);
3893         }
3894
3895         unnamed_portal = NULL;
3896         unnamed_statement = portal;
3897 }
3898
3899 void del_prepared_list(PreparedStatementList *p, Portal *portal)
3900 {
3901         int i;
3902         DeallocateStmt *s = (DeallocateStmt *)portal->stmt;
3903
3904         /* DEALLOCATE ALL? */
3905         if (s->name == NULL)
3906         {
3907                 reset_prepared_list(p);
3908         }
3909         else
3910         {
3911                 for (i = 0; i < p->cnt; i++)
3912                 {
3913                         PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3914                         if (strcmp(p_stmt->name, s->name) == 0)
3915                                 break;
3916                 }
3917
3918                 if (i == p->cnt)
3919                         return;
3920
3921                 pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3922                 free(p->portal_list[i]->portal_name);
3923                 free(p->portal_list[i]);
3924                 if (i != p->cnt - 1)
3925                 {
3926                         memmove(&p->portal_list[i], &p->portal_list[i+1],
3927                                         sizeof(Portal *) * (p->cnt - i - 1));
3928                 }
3929                 p->cnt--;
3930         }
3931 }
3932
3933 void delete_all_prepared_list(PreparedStatementList *p, Portal *portal)
3934 {
3935         reset_prepared_list(p);
3936 }
3937
3938 static void reset_prepared_list(PreparedStatementList *p)
3939 {
3940         int i;
3941
3942         if (p)
3943         {
3944                 for (i = 0; i < p->cnt; i++)
3945                 {
3946                         pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3947                         free(p->portal_list[i]->portal_name);
3948                         free(p->portal_list[i]);
3949                 }
3950                 if (unnamed_statement)
3951                 {
3952                         pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3953                         free(unnamed_statement);
3954                 }
3955                 unnamed_portal = NULL;
3956                 unnamed_statement = NULL;
3957                 p->cnt = 0;
3958         }
3959 }
3960
3961 Portal *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name)
3962 {
3963         int i;
3964
3965         /* unnamed portal? */
3966         if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3967                 return unnamed_statement;
3968
3969         for (i = 0; i < p->cnt; i++)
3970         {
3971                 PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3972                 if (strcmp(p_stmt->name, name) == 0)
3973                         return p->portal_list[i];
3974         }
3975
3976         return NULL;
3977 }
3978
3979 Portal *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name)
3980 {
3981         int i;
3982
3983         /* unnamed portal? */
3984         if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3985                 return unnamed_portal;
3986
3987         for (i = 0; i < p->cnt; i++)
3988         {
3989                 if (p->portal_list[i]->portal_name &&
3990                         strcmp(p->portal_list[i]->portal_name, name) == 0)
3991                         return p->portal_list[i];
3992         }
3993
3994         return NULL;
3995 }
3996
3997 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p,
3998                                         int n)
3999 {
4000         char *query;
4001         int len;
4002         PrepareStmt *p_stmt;
4003
4004         if (p->cnt <= n)
4005                 return 1;
4006
4007         p_stmt = (PrepareStmt *)p->portal_list[n]->stmt;
4008         len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */
4009         query = malloc(len);
4010         if (query == NULL)
4011         {
4012                 pool_error("send_deallocate: malloc failed: %s", strerror(errno));
4013                 exit(1);
4014         }
4015         sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name);
4016
4017         if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
4018         {
4019                 free(query);
4020                 return 1;
4021         }
4022         free(query);
4023
4024         return 0;
4025 }
4026
4027 /*
4028  * parse_copy_data()
4029  *   Parses CopyDataRow string.
4030  *   Returns divide key value. If cannot parse data, returns NULL.
4031  */
4032 char *
4033 parse_copy_data(char *buf, int len, char delimiter, int col_id)
4034 {
4035         int i, j, field = 0;
4036         char *str, *p = NULL;
4037
4038         str = malloc(len + 1);
4039
4040         /* buf is terminated by '\n'. */
4041         /* skip '\n' in for loop.     */
4042         for (i = 0, j = 0; i < len - 1; i++)
4043         {
4044                 if (buf[i] == '\\' && i != len - 2) /* escape */
4045                 {
4046                         if (buf[i+1] == delimiter)
4047                         {
4048                                 i++;
4049                                 str[j++] = buf[i];
4050                         }
4051                         else
4052                         {
4053                                 str[j++] = buf[i];
4054                         }
4055                 }
4056                 else if (buf[i] == delimiter) /* delimiter */
4057                 {
4058                         if (field == col_id)
4059                         {
4060                                 break;
4061                         }
4062                         else
4063                         {
4064                                 field++;
4065                                 j = 0;
4066                         }
4067                 }
4068                 else
4069                 {
4070                         str[j++] = buf[i];
4071                 }
4072         }
4073
4074         if (field == col_id)
4075         {
4076                 str[j] = '\0';
4077                 p = malloc(j);
4078                 if (p == NULL)
4079                 {
4080                         pool_error("parse_copy_data: malloc failed: %s", strerror(errno));
4081                         return NULL;
4082                 }
4083                 strcpy(p, str);
4084                 p[j] = '\0';
4085                 pool_debug("parse_copy_data: divide key value is %s", p);
4086         }
4087
4088         free(str);
4089         return p;
4090 }
4091
4092 static void
4093 query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len)
4094 {
4095         static int inside_T;                    /* flag to see the result data sequence */
4096         int result;
4097
4098         if (is_select_pgcatalog || is_select_for_update)
4099                 return;
4100
4101         if (kind == 'T' && parsed_query)
4102         {
4103                 result = pool_query_cache_register(kind, frontend, database, data, data_len, parsed_query);
4104                 if (result < 0)
4105                 {
4106                         pool_error("pool_query_cache_register: query cache registration failed");
4107                         inside_T = 0;
4108                 }
4109                 else
4110                 {
4111                         inside_T = 1;
4112                 }
4113         }
4114         else if ((kind == 'D' || kind == 'C' || kind == 'E') && inside_T)
4115         {
4116                 result = pool_query_cache_register(kind, frontend, database, data, data_len, NULL);
4117                 if (kind == 'C' || kind == 'E' || result < 0)
4118                 {
4119                         if (result < 0)
4120                                 pool_error("pool_query_cache_register: query cache registration failed");
4121                         else
4122                                 pool_debug("pool_query_cache_register: query cache saved");
4123
4124                         inside_T = 0;
4125                         free(parsed_query);
4126                         parsed_query = NULL;
4127                 }
4128         }
4129 }
4130
4131 void query_ps_status(char *query, POOL_CONNECTION_POOL *backend)
4132 {
4133         StartupPacket *sp;
4134         char psbuf[1024];
4135         int i;
4136
4137         if (*query == '\0')
4138                 return;
4139
4140         sp = MASTER_CONNECTION(backend)->sp;
4141         i = snprintf(psbuf, sizeof(psbuf), "%s %s %s ",
4142                                  sp->user, sp->database, remote_ps_data);
4143
4144         /* skip spaces */
4145         while (*query && isspace(*query))
4146                 query++;
4147
4148         for (; i< sizeof(psbuf); i++)
4149         {
4150                 if (!*query || isspace(*query))
4151                         break;
4152
4153                 psbuf[i] = toupper(*query++);
4154         }
4155         psbuf[i] = '\0';
4156
4157         set_ps_display(psbuf, false);
4158 }
4159
4160 /* compare function for bsearch() */
4161 static int compare(const void *p1, const void *p2)
4162 {
4163         int     v1,     v2;
4164
4165         v1 = *(NodeTag *) p1;
4166         v2 = *(NodeTag *) p2;
4167         return (v1 > v2) ? 1 : ((v1 == v2) ? 0 : -1);
4168 }
4169
4170 /* return true if needed to start a transaction for the nodetag */
4171 static bool is_internal_transaction_needed(Node *node)
4172 {
4173         static NodeTag nodemap[] = {
4174                 T_InsertStmt,
4175                 T_DeleteStmt,
4176                 T_UpdateStmt,
4177                 T_SelectStmt,
4178                 T_AlterTableStmt,
4179                 T_AlterDomainStmt,
4180                 T_GrantStmt,
4181                 T_GrantRoleStmt,
4182                 T_ClosePortalStmt,
4183                 T_ClusterStmt,
4184                 T_CopyStmt,
4185                 T_CreateStmt,   /* CREAE TABLE */
4186                 T_DefineStmt,   /* CREATE AGGREGATE, OPERATOR, TYPE */
4187                 T_DropStmt,             /* DROP TABLE etc. */
4188                 T_TruncateStmt,
4189                 T_CommentStmt,
4190                 T_FetchStmt,
4191                 T_IndexStmt,    /* CREATE INDEX */
4192                 T_CreateFunctionStmt,
4193                 T_AlterFunctionStmt,
4194                 T_RemoveFuncStmt,
4195                 T_RenameStmt,   /* ALTER AGGREGATE etc. */
4196                 T_RuleStmt,             /* CREATE RULE */
4197                 T_NotifyStmt,
4198                 T_ListenStmt,
4199                 T_UnlistenStmt,
4200                 T_ViewStmt,             /* CREATE VIEW */
4201                 T_LoadStmt,
4202                 T_CreateDomainStmt,
4203                 /*
4204                   T_CreatedbStmt,       CREATE DATABASE/DROP DATABASE cannot execute inside a transaction block
4205                   T_DropdbStmt,
4206                 */
4207                 T_CreateSeqStmt,
4208                 T_AlterSeqStmt,
4209                 T_VariableSetStmt,              /* SET */
4210                 T_CreateTrigStmt,
4211                 T_DropPropertyStmt,
4212                 T_CreatePLangStmt,
4213                 T_DropPLangStmt,
4214                 T_CreateRoleStmt,
4215                 T_AlterRoleStmt,
4216                 T_DropRoleStmt,
4217                 T_LockStmt,
4218                 T_ConstraintsSetStmt,
4219                 T_ReindexStmt,
4220                 T_CreateSchemaStmt,
4221                 T_AlterDatabaseStmt,
4222                 T_AlterDatabaseSetStmt,
4223                 T_AlterRoleSetStmt,
4224                 T_CreateConversionStmt,
4225                 T_CreateCastStmt,
4226                 T_DropCastStmt,
4227                 T_CreateOpClassStmt,
4228                 T_CreateOpFamilyStmt,
4229                 T_AlterOpFamilyStmt,
4230                 T_RemoveOpClassStmt,
4231                 T_RemoveOpFamilyStmt,
4232                 T_PrepareStmt,
4233                 T_ExecuteStmt,
4234                 T_DeallocateStmt,
4235                 T_DeclareCursorStmt,
4236                 T_CreateTableSpaceStmt,
4237                 T_DropTableSpaceStmt,
4238                 T_AlterObjectSchemaStmt,
4239                 T_AlterOwnerStmt,
4240                 T_DropOwnedStmt,
4241                 T_ReassignOwnedStmt,
4242                 T_CompositeTypeStmt,    /* CREATE TYPE */
4243                 T_CreateEnumStmt,
4244                 T_AlterTSDictionaryStmt,
4245                 T_AlterTSConfigurationStmt
4246         };
4247
4248         if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]), sizeof(NodeTag), compare) != NULL)
4249         {
4250                 /*
4251                  * Check CREATE INDEX CONCURRENTLY. If so, do not start transaction
4252                  */
4253                 if (IsA(node, IndexStmt))
4254                 {
4255                         if (((IndexStmt *)node)->concurrent)
4256                                 return false;
4257                 }
4258
4259                 /*
4260                  * Check CLUSTER with no option. If so, do not start transaction
4261                  */
4262                 else if (IsA(node, ClusterStmt))
4263                 {
4264                         if (((ClusterStmt *)node)->relation == NULL)
4265                                 return false;
4266                 }
4267
4268                 return true;
4269
4270         }
4271         return false;
4272 }
4273
4274 POOL_STATUS start_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node)
4275 {
4276         int i;
4277
4278         if (TSTATE(backend) != 'I')
4279                 return POOL_CONTINUE;
4280
4281         /* if we are not in a transaction block,
4282          * start a new transaction
4283          */
4284         if (is_internal_transaction_needed(node))
4285         {
4286                 for (i=0;i<NUM_BACKENDS;i++)
4287                 {
4288                         if (VALID_BACKEND(i))
4289                         {
4290                                 if (do_command(frontend, CONNECTION(backend, i), "BEGIN", MAJOR(backend), 
4291                                                            MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
4292                                         return POOL_END;
4293                         }
4294                 }
4295
4296                 /* mark that we started new transaction */
4297                 internal_transaction_started = 1;
4298         }
4299         return POOL_CONTINUE;
4300 }
4301
4302
4303 POOL_STATUS end_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
4304 {
4305         int i;
4306 #ifdef HAVE_SIGPROCMASK
4307         sigset_t oldmask;
4308 #else
4309         int     oldmask;
4310 #endif
4311
4312         /*
4313          * We must block all signals. If pgpool SIGTERM, SIGINT or SIGQUIT
4314          * is delivered, it could cause data inconsistency.
4315          */
4316         POOL_SETMASK2(&BlockSig, &oldmask);
4317
4318         /* We need to commit from secondary to master. */
4319         for (i=0;i<NUM_BACKENDS;i++)
4320         {
4321                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
4322                 {
4323                         /* COMMIT success? */
4324                         if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend), 
4325                                                    MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4326                         {
4327                                 internal_transaction_started = 0;
4328                                 POOL_SETMASK(&oldmask);
4329                                 return POOL_END;
4330                         }
4331                 }
4332         }
4333
4334         /* commit on master */
4335         if (do_command(frontend, MASTER(backend), "COMMIT", MAJOR(backend), 
4336                                    MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4337         {
4338                 internal_transaction_started = 0;
4339                 POOL_SETMASK(&oldmask);
4340                 return POOL_END;
4341         }
4342
4343         internal_transaction_started = 0;
4344         POOL_SETMASK(&oldmask);
4345         return POOL_CONTINUE;
4346 }
4347
4348 /*
4349  * Extract the number of tuples from CommandComplete message
4350  */
4351 static int extract_ntuples(char *message)
4352 {
4353         char *rows;
4354
4355         if ((rows = strstr(message, "UPDATE")) || (rows = strstr(message, "DELETE")))
4356                 rows +=7;
4357         else if ((rows = strstr(message, "INSERT")))
4358         {
4359                 rows += 7;
4360                 while (*rows && *rows != ' ') rows++;
4361         }
4362         else
4363                 return 0;
4364
4365         return atoi(rows);
4366 }
4367
4368 static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major)
4369 {
4370         int r =  detect_error(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, 'E', false);
4371         if (r == SPECIFIED_ERROR)
4372         {
4373                 pool_debug("detect_stop_postmaster_error: receive admin shutdown error from a node.");
4374                 return r;
4375         }
4376
4377         r = detect_error(backend, CRASH_SHUTDOWN_ERROR_CODE, major, 'N', false);
4378         if (r == SPECIFIED_ERROR)
4379         {
4380                 pool_debug("detect_stop_postmaster_error: receive crash shutdown error from a node.");
4381         }
4382         return r;
4383 }
4384
4385 int detect_active_sql_transaction_error(POOL_CONNECTION *backend, int major)
4386 {
4387         int r =  detect_error(backend, ACTIVE_SQL_TRANSACTION_ERROR_CODE, major, 'E', true);
4388         if (r == SPECIFIED_ERROR)
4389         {
4390                 pool_debug("detect_active_sql_transaction_error: receive SET TRANSACTION ISOLATION LEVEL must be called before any query error from a node.");
4391         }
4392         return r;
4393 }
4394
4395 int detect_deadlock_error(POOL_CONNECTION *backend, int major)
4396 {
4397         int r =  detect_error(backend, DEADLOCK_ERROR_CODE, major, 'E', true);
4398         if (r == SPECIFIED_ERROR)
4399                 pool_debug("detect_deadlock_error: received deadlock error message from backend");
4400         return r;
4401 }
4402
4403 int detect_serialization_error(POOL_CONNECTION *backend, int major)
4404 {
4405         int r =  detect_error(backend, SERIALIZATION_FAIL_ERROR_CODE, major, 'E', true);
4406         if (r == SPECIFIED_ERROR)
4407                 pool_debug("detect_serialization_error: received serialization failure message from backend");
4408         return r;
4409 }
4410
4411 int detect_query_cancel_error(POOL_CONNECTION *backend, int major)
4412 {
4413         int r =  detect_error(backend, QUERY_CANCEL_ERROR_CODE, major, 'E', true);
4414         if (r == SPECIFIED_ERROR)
4415                 pool_debug("detect_query_cancel_error: received query cancel error message from backend");
4416         return r;
4417 }
4418
4419 /*
4420  * detect_error: Detect specified error from error code.
4421  */
4422 static int detect_error(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread)
4423 {
4424         int is_error = 0;
4425         char kind;
4426         int readlen = 0, len;
4427         static char buf[8192]; /* memory space is large enough */
4428         char *p, *str;
4429
4430         if (pool_read(backend, &kind, sizeof(kind)))
4431                 return POOL_END;
4432         readlen += sizeof(kind);
4433         p = buf;
4434         memcpy(p, &kind, sizeof(kind));
4435         p += sizeof(kind);
4436
4437         pool_debug("detect_error: kind: %c", kind);
4438
4439         /* Specified class? */
4440         if (kind == class)
4441         {
4442                 /* read actual message */
4443                 if (major == PROTO_MAJOR_V3)
4444                 {
4445                         char *e;
4446
4447                         if (pool_read(backend, &len, sizeof(len)) < 0)
4448                                 return POOL_END;
4449                         readlen += sizeof(len);
4450                         memcpy(p, &len, sizeof(len));
4451                         p += sizeof(len);
4452
4453                         len = ntohl(len) - 4;
4454                         str = malloc(len);
4455                         pool_read(backend, str, len);
4456                         readlen += len;
4457                         memcpy(p, str, len);
4458
4459                         /*
4460                          * Checks error code which is formatted 'Cxxxxxx'
4461                          * (xxxxxx is error code).
4462                          */
4463                         e = str;
4464                         while (*e)
4465                         {
4466                                 if (*e == 'C')
4467                                 {/* specified error? */
4468                                         is_error = (strcmp(e+1, error_code) == 0) ? SPECIFIED_ERROR : 0;
4469                                         break;
4470                                 }
4471                                 else
4472                                         e = e + strlen(e) + 1;
4473                         }
4474                         free(str);
4475                 }
4476                 else
4477                 {
4478                         str = pool_read_string(backend, &len, 0);
4479                         readlen += len;
4480                         memcpy(p, str, len);
4481                 }
4482         }
4483         if (unread || !is_error)
4484         {
4485                 /* put a message to read buffer */
4486                 if (pool_unread(backend, buf, readlen) != 0)
4487                         is_error = -1;
4488         }
4489
4490         return is_error;
4491 }
4492
4493 /*
4494  * read message length and rest of the packet then discard it
4495  */
4496 POOL_STATUS pool_discard_packet(POOL_CONNECTION_POOL *cp)
4497 {
4498         int status, len, i;
4499         char kind;
4500         char *string;
4501         POOL_CONNECTION *backend;
4502
4503         for (i=0;i<NUM_BACKENDS;i++)
4504         {
4505                 if (!VALID_BACKEND(i))
4506                 {
4507                         continue;
4508                 }
4509
4510                 backend = CONNECTION(cp, i);
4511
4512                 status = pool_read(backend, &kind, sizeof(kind));
4513                 if (status < 0)
4514                 {
4515                         pool_error("pool_discard_packet: error while reading message kind");
4516                         return POOL_END;
4517                 }
4518
4519                 pool_debug("pool_discard_packet: kind: %c", kind);
4520
4521                 if (MAJOR(cp) == PROTO_MAJOR_V3)
4522                 {
4523                         if (pool_read(backend, &len, sizeof(len)) < 0)
4524                         {
4525                                 pool_error("pool_discard_packet: error while reading message length");
4526                                 return POOL_END;
4527                         }
4528                         len = ntohl(len) - 4;
4529                         string = pool_read2(backend, len);
4530                         if (string == NULL)
4531                         {
4532                                 pool_error("pool_discard_packet: error while reading rest of message");
4533                                 return POOL_END;
4534                         }
4535                 }
4536                 else
4537                 {
4538                         string = pool_read_string(backend, &len, 0);
4539                         if (string == NULL)
4540                         {
4541                                 pool_error("pool_discard_packet: error while reading rest of message");
4542                                 return POOL_END;
4543                         }
4544                 }
4545         }
4546         return POOL_CONTINUE;
4547 }