]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_process_query.c
5cf2b6960765475e6446493818dea63a149e7540
[pgpool-ii/pgpool-ii_2.2.5] / pool_process_query.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.141.2.22 2009/10/02 07:53:08 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2009      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_process_query.c: query processing stuff
22  *
23 */
24 #include "config.h"
25 #include <errno.h>
26
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30 #ifdef HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #ifdef HAVE_SYS_SELECT_H
34 #include <sys/select.h>
35 #endif
36
37
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <string.h>
41 #include <netinet/in.h>
42 #include <ctype.h>
43
44 #include "pool.h"
45 #include "pool_signal.h"
46 #include "pool_proto_modules.h"
47
48 #ifndef FD_SETSIZE
49 #define FD_SETSIZE 512
50 #endif
51
52 #define INIT_STATEMENT_LIST_SIZE 8
53
54 #define ACTIVE_SQL_TRANSACTION_ERROR_CODE "25001"               /* SET TRANSACTION ISOLATION LEVEL must be called before any query */
55 #define DEADLOCK_ERROR_CODE "40P01"
56 #define SERIALIZATION_FAIL_ERROR_CODE "40001"
57 #define QUERY_CANCEL_ERROR_CODE "57014"
58 #define ADMIN_SHUTDOWN_ERROR_CODE "57P01"
59 #define CRASH_SHUTDOWN_ERROR_CODE "57P02"
60
61 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt);
62 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, int protoMajor, int pid, int key, int no_ready_for_query);
63 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major);
64 static char *get_insert_command_table_name(InsertStmt *node);
65 static void reset_prepared_list(PreparedStatementList *p);
66 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n);
67 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
68 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend);
69 static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len);
70 static int extract_ntuples(char *message);
71 static int detect_error(POOL_CONNECTION *master, char *error_code, int major, char class, bool unread);
72 static int detect_postmaster_down_error(POOL_CONNECTION *master, int major);
73 static void free_select_result(POOL_SELECT_RESULT *result);
74
75 static bool is_internal_transaction_needed(Node *node);
76 static int compare(const void *p1, const void *p2);
77
78 /* timeout sec for pool_check_fd */
79 static int timeoutsec;
80
81 int in_load_balance;    /* non 0 if in load balance mode */
82 int selected_slot;              /* selected DB node */
83 int master_slave_dml;   /* non 0 if master/slave mode is specified in config file */
84
85 /*
86  * main module for query processing
87  */
88 POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
89                                                            POOL_CONNECTION_POOL *backend,
90                                                            int connection_reuse,
91                                                            int first_ready_for_query_received)
92 {
93         char kind;      /* packet kind (backend) */
94         char fkind;     /* packet kind (frontend) */
95         short num_fields = 0;
96         fd_set  readmask;
97         fd_set  writemask;
98         fd_set  exceptmask;
99         int fds;
100         POOL_STATUS status;
101         int state;      /* 0: ok to issue commands 1: waiting for "ready for query" response */
102         int qcnt;
103         int i;
104
105         frontend->no_forward = connection_reuse;
106         qcnt = 0;
107         state = 0;
108
109         for (;;)
110         {
111                 kind = 0;
112                 fkind = 0;
113
114                 if (state == 0 && connection_reuse)
115                 {
116                         int st;
117
118                         /* send query for resetting connection such as "ROLLBACK" "RESET ALL"... */
119                         st = reset_backend(backend, qcnt);
120
121                         if (st < 0)             /* error? */
122                         {
123                                 /* probably we don't need this, since caller will
124                                  * close the connection to frontend after returning with POOL_END. But I
125                                  * guess I would like to be a paranoid...
126                                  */
127                                 frontend->no_forward = 0;
128                                 return POOL_END;
129                         }
130
131                         else if (st == 0)       /* no query issued? */
132                         {
133                                 qcnt++;
134                                 continue;
135                         }
136
137                         else if (st == 1)       /* more query remains */
138                         {
139                                 state = 1;
140                                 qcnt++;
141                                 continue;
142                         }
143
144                         else    /* no more query(st == 2) */
145                         {
146                                 TSTATE(backend) = 'I';
147                                 frontend->no_forward = 0;
148                                 return POOL_CONTINUE;
149                         }
150
151                 }
152
153                 /*
154                  * if all backends do not have any pending data in the
155                  * receiving data cache, then issue select(2) to wait for new
156                  * data arrival
157                  */
158                 if (is_cache_empty(frontend, backend))
159                 {
160                         struct timeval timeoutdata;
161                         struct timeval *timeout;
162                         int num_fds, was_error = 0;
163
164                     /*
165                          * frontend idle counters. depends on the following
166                          * select(2) call's time out is 1 second.
167                          */
168                         int idle_count = 0;     /* for other than in recovery */
169                         int idle_count_in_recovery = 0; /* for in recovery */
170
171                 SELECT_RETRY:
172                         FD_ZERO(&readmask);
173                         FD_ZERO(&writemask);
174                         FD_ZERO(&exceptmask);
175
176                         num_fds = 0;
177
178                         /*
179                          * Do not read a message from frontend while backends process a query.
180                          */
181                         if (!connection_reuse && !in_progress)
182                         {
183                                 FD_SET(frontend->fd, &readmask);
184                                 FD_SET(frontend->fd, &exceptmask);
185                                 num_fds = Max(frontend->fd + 1, num_fds);
186                         }
187
188                         /*
189                          * If we are in load balance mode and the selected node is
190                          * down, we need to re-select load_balancing_node.  Note
191                          * that we cannnot use VALID_BACKEND macro here.  If
192                          * in_load_balance == 1, VALID_BACKEND macro may return 0.
193                          */
194                         if (pool_config->load_balance_mode &&
195                                 BACKEND_INFO(backend->info->load_balancing_node).backend_status == CON_DOWN)
196                         {
197                                 /* select load balancing node */
198                                 backend->info->load_balancing_node = select_load_balancing_node();
199                         }
200
201                         for (i=0;i<NUM_BACKENDS;i++)
202                         {
203                                 if (VALID_BACKEND(i))
204                                 {
205                                         num_fds = Max(CONNECTION(backend, i)->fd + 1, num_fds);
206                                         FD_SET(CONNECTION(backend, i)->fd, &readmask);
207                                         FD_SET(CONNECTION(backend, i)->fd, &exceptmask);
208                                 }
209                         }
210
211                         /*
212                          * wait for data arriving from frontend and backend
213                          */
214                         if (pool_config->client_idle_limit > 0 ||
215                                 pool_config->client_idle_limit_in_recovery > 0)
216                         {
217                                 timeoutdata.tv_sec = 1;
218                                 timeoutdata.tv_usec = 0;
219                                 timeout = &timeoutdata;
220                         }
221                         else
222                                 timeout = NULL;
223
224                         fds = select(num_fds, &readmask, &writemask, &exceptmask, timeout);
225
226                         if (fds == -1)
227                         {
228                                 if (errno == EINTR)
229                                         continue;
230
231                                 pool_error("select() failed. reason: %s", strerror(errno));
232                                 return POOL_ERROR;
233                         }
234
235                         /* select timeout */
236                         if (fds == 0)
237                         {
238                                 if (*InRecovery == 0 && pool_config->client_idle_limit > 0)
239                                 {
240                                         idle_count++;
241
242                                         if (idle_count > pool_config->client_idle_limit)
243                                         {
244                                                 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit(%d) reached", pool_config->client_idle_limit);
245                                                 return POOL_END;
246                                         }
247                                 }
248                                 else if (*InRecovery > 0 && pool_config->client_idle_limit_in_recovery > 0)
249                                 {
250                                         idle_count_in_recovery++;
251
252                                         if (idle_count_in_recovery > pool_config->client_idle_limit_in_recovery)
253                                         {
254                                                 pool_log("pool_process_query: child connection forced to terminate due to client_idle_limit_in_recovery(%d) reached", pool_config->client_idle_limit_in_recovery);
255                                                 return POOL_END;
256                                         }
257                                 }
258                                 goto SELECT_RETRY;
259                         }
260
261                         for (i = 0; i < NUM_BACKENDS; i++)
262                         {
263                                 if (VALID_BACKEND(i))
264                                 {
265                                         /*
266                                          * make sure that connection slot exists
267                                          */
268                                         if (CONNECTION_SLOT(backend, i) == 0)
269                                         {
270                                                 pool_log("FATAL ERROR: VALID_BACKEND returns non 0 but connection slot is empty. backend id:%d RAW_MODE:%d in_load_balance:%d LOAD_BALANCE_STATUS:%d status:%d",
271                                                                  i, RAW_MODE, in_load_balance, LOAD_BALANCE_STATUS(i), BACKEND_INFO(i).backend_status);
272                                                 was_error = 1;
273                                                 break;
274                                         }
275
276                                         if (FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
277                                         {
278                                                 /*
279                                                  * admin shutdown postmaster or postmaster goes down
280                                                  */
281                                                 if (detect_postmaster_down_error(CONNECTION(backend, i), MAJOR(backend)) == SPECIFIED_ERROR)
282                                                 {
283                                                         /* detach backend node. */
284                                                         was_error = 1;
285                                                         if (!VALID_BACKEND(i))
286                                                                 break;
287                                                         notice_backend_error(i);
288                                                         sleep(5);
289                                                         break;
290                                                 }
291                                                 status = read_kind_from_backend(frontend, backend, &kind);
292                                                 if (status != POOL_CONTINUE)
293                                                         return status;
294                                                 break;
295                                         }
296                                 }
297                         }
298
299                         if (was_error)
300                                 continue;
301
302                         if (!connection_reuse && !in_progress)
303                         {
304                                 if (FD_ISSET(frontend->fd, &exceptmask))
305                                         return POOL_END;
306                                 else if (FD_ISSET(frontend->fd, &readmask))
307                                 {
308                                         status = ProcessFrontendResponse(frontend, backend);
309                                         if (status != POOL_CONTINUE)
310                                                 return status;
311
312                                         continue;
313                                 }
314                                 if (kind == 0)
315                                         continue;
316                         }
317
318                         if (FD_ISSET(MASTER(backend)->fd, &exceptmask))
319                         {
320                                 return POOL_ERROR;
321                         }
322                 }
323                 else
324                 {
325                         if (frontend->len > 0 && !in_progress)
326                         {
327                                 status = ProcessFrontendResponse(frontend, backend);
328                                 if (status != POOL_CONTINUE)
329                                         return status;
330
331                                 continue;
332                         }
333                 }
334
335                 /* this is the synchronous point */
336                 if (kind == 0)
337                 {
338                         status = read_kind_from_backend(frontend, backend, &kind);
339                         if (status != POOL_CONTINUE)
340                                 return status;
341                 }
342
343                 /* reload config file */
344                 if (got_sighup)
345                 {
346                         pool_get_config(get_config_file_name(), RELOAD_CONFIG);
347                         if (pool_config->enable_pool_hba)
348                                 load_hba(get_hba_file_name());
349                         if (pool_config->parallel_mode)
350                                 pool_memset_system_db_info(system_db_info->info);
351                         got_sighup = 0;
352                 }
353
354                 first_ready_for_query_received = 0;
355
356                 /*
357                  * Process backend Response
358                  */
359
360                 /*
361                  * Sanity check
362                  */
363                 if (kind == 0)
364                 {
365                         pool_error("pool_process_query: kind is 0!");
366                         return POOL_ERROR;
367                 }
368
369                 pool_debug("pool_process_query: kind from backend: %c", kind);
370
371                 if (MAJOR(backend) == PROTO_MAJOR_V3)
372                 {
373                         switch (kind)
374                         {
375                                 case 'G':
376                                         /* CopyIn response */
377                                         status = CopyInResponse(frontend, backend);
378                                         break;
379                                 case 'S':
380                                         /* Parameter Status */
381                                         status = ParameterStatus(frontend, backend);
382                                         break;
383                                 case 'Z':
384                                         /* Ready for query */
385                                         status = ReadyForQuery(frontend, backend, 1);
386                                         break;
387                                 default:
388                                         status = SimpleForwardToFrontend(kind, frontend, backend);
389                                         if (pool_flush(frontend))
390                                                 return POOL_END;
391                                         break;
392                         }
393                 }
394                 else
395                 {
396                         switch (kind)
397                         {
398                                 case 'A':
399                                         /* Notification  response */
400                                         status = NotificationResponse(frontend, backend);
401                                         break;
402
403                                 case 'B':
404                                         /* BinaryRow */
405                                         status = BinaryRow(frontend, backend, num_fields);
406                                         break;
407
408                                 case 'C':
409                                         /* Complete command response */
410                                         status = CompleteCommandResponse(frontend, backend);
411                                         break;
412
413                                 case 'D':
414                                         /* AsciiRow */
415                                         status = AsciiRow(frontend, backend, num_fields);
416                                         break;
417
418                                 case 'E':
419                                         /* Error Response */
420                                         status = ErrorResponse(frontend, backend);
421                                         break;
422
423                                 case 'G':
424                                         /* CopyIn Response */
425                                         status = CopyInResponse(frontend, backend);
426                                         break;
427
428                                 case 'H':
429                                         /* CopyOut Response */
430                                         status = CopyOutResponse(frontend, backend);
431                                         break;
432
433                                 case 'I':
434                                         /* Empty Query Response */
435                                         status = EmptyQueryResponse(frontend, backend);
436                                         break;
437
438                                 case 'N':
439                                         /* Notice Response */
440                                         status = NoticeResponse(frontend, backend);
441                                         break;
442
443                                 case 'P':
444                                         /* CursorResponse */
445                                         status = CursorResponse(frontend, backend);
446                                         break;
447
448                                 case 'T':
449                                         /* RowDescription */
450                                         status = RowDescription(frontend, backend, &num_fields);
451                                         break;
452
453                                 case 'V':
454                                         /* FunctionResultResponse and FunctionVoidResponse */
455                                         status = FunctionResultResponse(frontend, backend);
456                                         break;
457
458                                 case 'Z':
459                                         /* Ready for query */
460                                         status = ReadyForQuery(frontend, backend, 1);
461                                         break;
462
463                                 default:
464                                         pool_error("Unknown message type %c(%02x)", kind, kind);
465                                         exit(1);
466                         }
467                 }
468
469                 if (status != POOL_CONTINUE)
470                         return status;
471
472                 if (kind == 'Z' && frontend->no_forward && state == 1)
473                 {
474                         state = 0;
475                 }
476
477         }
478         return POOL_CONTINUE;
479 }
480
481
482 /*
483  * set_fd,isset_fs,zero_fd are used
484  * for check fd in parallel mode
485  */
486
487 /* used only in pool_parallel_exec */
488 #define BITS (8 * sizeof(long int))
489
490 static void set_fd(unsigned long fd ,unsigned long *setp)
491 {
492         unsigned long tmp = fd / FD_SETSIZE;
493         unsigned long rem = fd % FD_SETSIZE;
494         setp[tmp] |= (1UL<<rem);
495 }
496
497 /* used only in pool_parallel_exec */
498 static int isset_fd(unsigned long fd, unsigned long *setp)
499 {
500         unsigned long tmp = fd / FD_SETSIZE;
501         unsigned long rem = fd % FD_SETSIZE;
502         return (setp[tmp] & (1UL<<rem)) != 0;
503 }
504
505 /* used only in pool_parallel_exec */
506 static void zero_fd(unsigned long *setp)
507 {
508         unsigned long *tmp = setp;
509         int i = FD_SETSIZE / BITS;
510         while(i)
511         {
512                 i--;
513                 *tmp = 0;
514                 tmp++;
515         }
516 }
517
518 /*
519  * This function transmits to a parallel Query, and does processing
520  * that receives the result to each back end.
521  */
522 POOL_STATUS pool_parallel_exec(POOL_CONNECTION *frontend,
523                                                                           POOL_CONNECTION_POOL *backend, char *string,
524                                                                           Node *node,bool send_to_frontend)
525 {
526         int len;
527         int fds;
528         int i;
529         char kind;
530         fd_set readmask;
531         fd_set writemask;
532         fd_set exceptmask;
533         unsigned long donemask[FD_SETSIZE / BITS];
534         static char *sq = "show pool_status";
535         POOL_STATUS status;
536         struct timeval timeout;
537         int num_fds;
538         int used_count = 0;
539         int error_flag = 0;
540         unsigned long datacount = 0;
541
542         timeout.tv_sec = 1;
543         timeout.tv_usec = 0;
544
545         len = strlen(string) + 1;
546
547         if (is_drop_database(node))
548         {
549                 int stime = 5;  /* XXX give arbitrary time to allow closing idle connections */
550
551                 pool_debug("Query: sending HUP signal to parent");
552
553                 kill(getppid(), SIGHUP);        /* send HUP signal to parent */
554
555                 /* we need to loop over here since we will get HUP signal while sleeping */
556                 while (stime > 0)
557                         stime = sleep(stime);
558         }
559
560         /* process status reporting? */
561         if (strncasecmp(sq, string, strlen(sq)) == 0)
562         {
563                 pool_debug("process reporting");
564                 process_reporting(frontend, backend);
565                 in_progress = 0;
566                 return POOL_CONTINUE;
567         }
568
569         /* In this loop,forward the query to the all backends */
570         for (i=0;i<NUM_BACKENDS;i++)
571         {
572                 if (!VALID_BACKEND(i))
573                         continue;
574
575                 pool_write(CONNECTION(backend, i), "Q", 1);
576
577                 if (MAJOR(backend) == PROTO_MAJOR_V3)
578                 {
579                         int sendlen = htonl(len + 4);
580                         pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen));
581                 }
582
583                 if (pool_write_and_flush(CONNECTION(backend, i), string, len) < 0)
584                 {
585                         return POOL_END;
586                 }
587
588                 /*
589                  * in "strict mode" we need to wait for backend completing the query.
590                  * note that this is not applied if "NO STRICT" is specified as a comment.
591                  */
592                 if (is_strict_query(node))
593                 {
594                         pool_debug("waiting for backend %d completing the query", i);
595                         if (synchronize(CONNECTION(backend, i)))
596                                 return POOL_END;
597                 }
598         }
599
600         if (!is_cache_empty(frontend, backend))
601         {
602                 return POOL_END;
603         }
604
605         zero_fd(donemask);
606
607         /* In this loop, receive data from the all backends and send data to frontend */
608         for (;;)
609         {
610                 FD_ZERO(&readmask);
611                 FD_ZERO(&writemask);
612                 FD_ZERO(&exceptmask);
613                 num_fds = 0;
614
615                 for (i=0;i<NUM_BACKENDS;i++)
616                 {
617                         if (VALID_BACKEND(i))
618                         {
619                                 int fd = CONNECTION(backend,i)->fd;
620                                 num_fds = Max(fd + 1, num_fds);
621                                 if(!isset_fd(fd,donemask))
622                                 {
623                                         FD_SET(fd, &readmask);
624                                         FD_SET(fd, &exceptmask);
625                                         pool_debug("pool_parallel_query:  %d th FD_SET: %d",i, CONNECTION(backend, i)->fd);
626                                 }
627                         }
628                 }
629
630                 pool_debug("pool_parallel_query: num_fds: %d", num_fds);
631
632                 fds = select(num_fds, &readmask, &writemask, &exceptmask, NULL);
633
634                 if (fds == -1)
635                 {
636                         if (errno == EINTR)
637                                 continue;
638
639                                 pool_error("select() failed. reason: %s", strerror(errno));
640                         return POOL_ERROR;
641                  }
642
643                 if (fds == 0)
644                 {
645                         return POOL_CONTINUE;
646                 }
647
648                 /* get header of protocol */
649                 for (i=0;i<NUM_BACKENDS;i++)
650                 {
651                         if (!VALID_BACKEND(i) ||
652                                 !FD_ISSET(CONNECTION(backend, i)->fd, &readmask))
653                         {
654                                 continue;
655                         }
656                         else
657                         {
658                                 status = read_kind_from_one_backend(frontend, backend, &kind,i);
659                                 if (status != POOL_CONTINUE)
660                                         return status;
661
662                                 if (used_count == 0)
663                                 {
664                                         status = ParallelForwardToFrontend(kind,
665                                                                                                                 frontend,
666                                                                                                                 CONNECTION(backend, i),
667                                                                                                                 backend->info->database,
668                                                                                                                 send_to_frontend);
669                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
670                                 }
671                                 else
672                                 {
673                                         status = ParallelForwardToFrontend(kind,
674                                                                                                                 frontend,
675                                                                                                                 CONNECTION(backend, i),
676                                                                                                                 backend->info->database,
677                                                                                                                 false);
678                                         pool_debug("pool_parallel_exec: dummy kind from backend: %c", kind);
679                                 }
680
681                                 if (status != POOL_CONTINUE)
682                                         return status;
683
684                                 if(kind == 'C' || kind == 'E' || kind == 'c')
685                                 {
686                                         if(used_count == NUM_BACKENDS -1)
687                                                 return POOL_CONTINUE;
688
689                                         used_count++;
690                                         set_fd(CONNECTION(backend, i)->fd, donemask);
691                                         continue;
692                                 }
693
694                                 /* get body of protocol */
695                                 for(;;)
696                                 {
697                                         if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
698                                         {
699                                                 pool_error("pool_parallel_exec: failed to read kind from %d th backend", i);
700                                                 return POOL_ERROR;
701                                         }
702
703                                         /*
704                                          * Sanity check
705                                          */
706                                         if (kind == 0)
707                                         {
708                                                 pool_error("pool_parallel_exec: kind is 0!");
709                                                 return POOL_ERROR;
710                                         }
711
712                                         if((kind == 'E' ) &&
713                                                 used_count != NUM_BACKENDS -1)
714                                         {
715                                                 if(error_flag ==0)
716                                                 {
717                                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
718
719                                                         status = ParallelForwardToFrontend(kind,
720                                                                                                                         frontend,
721                                                                                                                         CONNECTION(backend, i),
722                                                                                                                         backend->info->database,
723                                                                                                                         send_to_frontend);
724                                                         error_flag++;
725                                                 } else {
726                                                         pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
727                                                         status = ParallelForwardToFrontend(kind,
728                                                                                                                         frontend,
729                                                                                                                         CONNECTION(backend, i),
730                                                                                                                         backend->info->database,
731                                                                                                                         false);
732                                                 }
733                                                 used_count++;
734                                                 set_fd(CONNECTION(backend, i)->fd, donemask);
735                                                 break;
736                                         }
737
738                                         if((kind == 'c' || kind == 'C') &&
739                                            used_count != NUM_BACKENDS -1)
740                                         {
741                                                 pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
742                                                 status = ParallelForwardToFrontend(kind,
743                                                                                                                         frontend,
744                                                                                                                         CONNECTION(backend, i),
745                                                                                                                         backend->info->database,
746                                                                                                                         false);
747                                                 used_count++;
748                                                 set_fd(CONNECTION(backend, i)->fd, donemask);
749                                                 break;
750                                         }
751                                         if((kind == 'C' || kind == 'c' || kind == 'E') &&
752                                                 used_count == NUM_BACKENDS -1)
753                                         {
754                                                 pool_debug("pool_parallel_exec: kind from backend: D %lu", datacount);
755
756                                                 if(error_flag == 0)
757                                                 {
758                                                         pool_debug("pool_parallel_exec: kind from backend: %c", kind);
759                                                         status = ParallelForwardToFrontend(kind,
760                                                                                                                         frontend,
761                                                                                                                         CONNECTION(backend, i),
762                                                                                                                         backend->info->database,
763                                                                                                                         send_to_frontend);
764                                                 } else {
765                                                         pool_debug("pool_parallel_exec: dummy from backend: %c", kind);
766                                                         status = ParallelForwardToFrontend(kind,
767                                                                                                                         frontend,
768                                                                                                                         CONNECTION(backend, i),
769                                                                                                                         backend->info->database,
770                                                                                                                         false);
771                                                 }
772                                                 return POOL_CONTINUE;
773                                         }
774
775                                         if(kind == 'D')
776                                                 datacount++;
777                                         else
778                                                 pool_debug("pool_parallel_exec: kind from backend: %c", kind);
779
780                                         status = ParallelForwardToFrontend(kind,
781                                                                                                                 frontend,
782                                                                                                                 CONNECTION(backend, i),
783                                                                                                                 backend->info->database,
784                                                                                                                 send_to_frontend);
785
786                                         if (status != POOL_CONTINUE)
787                                         {
788                                                 return status;
789                                         }
790                                         else
791                                         {
792                                                 pool_flush(frontend);
793                                         }
794                                 }
795                         }
796                 }
797         }
798 }
799
800
801
802 /*
803  * send SimpleQuery message to a node.
804  */
805 POOL_STATUS send_simplequery_message(POOL_CONNECTION *backend, int len, char *string, int major)
806 {
807         /* forward the query to the backend */
808         pool_write(backend, "Q", 1);
809
810         if (major == PROTO_MAJOR_V3)
811         {
812                 int sendlen = htonl(len + 4);
813                 pool_write(backend, &sendlen, sizeof(sendlen));
814         }
815
816         if (pool_write_and_flush(backend, string, len) < 0)
817         {
818                 return POOL_END;
819         }
820
821         return POOL_CONTINUE;
822 }
823
824 /*
825  * Wait for query response from single node. This checks frontend
826  * connection by writing dummy parameter status packet every 30
827  * seccond, and if the connection broke, returns error since there's
828  * no point in that waiting until backend returns response.
829  */
830 POOL_STATUS wait_for_query_response(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *string, int protoVersion)
831 {
832 #define DUMMY_PARAMETER "pgpool_dummy_param"
833 #define DUMMY_VALUE "pgpool_dummy_value"
834
835         int status;
836         int plen;
837
838         pool_debug("wait_for_query_response: waiting for backend %d completing the query", backend->db_node_id);
839
840         for (;;)
841         {
842                 /* Check to see if data from backend is ready */
843                 pool_set_timeout(30);
844                 status = pool_check_fd(backend);
845                 pool_set_timeout(0);
846
847                 if (status < 0) /* error ? */
848                 {
849                         pool_error("wait_for_query_response: backend error occured while waiting for backend response");
850                         return POOL_END;
851                 }
852                 else if (status > 0)            /* data is not ready */
853                 {
854                         if (protoVersion == PROTO_MAJOR_V3)
855                         {
856                                 /* Write dummy parameter staus packet to check if the socket to frontend is ok */
857                                 if (pool_write(frontend, "S", 1) < 0)
858                                         return POOL_END;
859                                 plen = sizeof(DUMMY_PARAMETER)+sizeof(DUMMY_VALUE)+sizeof(plen);
860                                 plen = htonl(plen);
861                                 if (pool_write(frontend, &plen, sizeof(plen)) < 0)
862                                         return POOL_END;
863                                 if (pool_write(frontend, DUMMY_PARAMETER, sizeof(DUMMY_PARAMETER)) < 0)
864                                         return POOL_END;
865                                 if (pool_write(frontend, DUMMY_VALUE, sizeof(DUMMY_VALUE)) < 0)
866                                         return POOL_END;
867                                 if (pool_flush_it(frontend) < 0)
868                                 {
869                                         pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
870                                         return POOL_END;
871                                 }
872
873                         } else          /* Protocol version 2 */
874                         {
875 /*
876  * If you want to monitor client connection even if you are using V2 protocol,
877  * define following
878  */
879 #undef SEND_NOTICE_ON_PROTO2
880 #ifdef SEND_NOTICE_ON_PROTO2
881                                 static char *notice_message = {"keep alive checking from pgpool-II"};
882
883                                 /* Write notice message packet to check if the socket to frontend is ok */
884                                 if (pool_write(frontend, "N", 1) < 0)
885                                         return POOL_END;
886                                 if (pool_write(frontend, notice_message, strlen(notice_message)+1) < 0)
887                                         return POOL_END;
888                                 if (pool_flush_it(frontend) < 0)
889                                 {
890                                         pool_error("wait_for_query_response: frontend error occured while waiting for backend reply");
891                                         return POOL_END;
892                                 }
893 #endif
894                         }
895                 }
896                 else
897                         break;
898         }
899
900         return POOL_CONTINUE;
901 }
902
903
904 /*
905  * Extended query protocol has to send Flush message.
906  */
907 POOL_STATUS send_extended_protocol_message(POOL_CONNECTION_POOL *backend,
908                                                                                                   int node_id, char *kind,
909                                                                                                   int len, char *string)
910 {
911         POOL_CONNECTION *cp = CONNECTION(backend, node_id);
912         int sendlen;
913
914         /* forward the query to the backend */
915         pool_write(cp, kind, 1);
916         sendlen = htonl(len + 4);
917         pool_write(cp, &sendlen, sizeof(sendlen));
918         pool_write(cp, string, len);
919
920         /*
921          * send "Flush" message so that backend notices us
922          * the completion of the command
923          */
924         pool_write(cp, "H", 1);
925         sendlen = htonl(4);
926         if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
927         {
928                 return POOL_ERROR;
929         }
930
931         return POOL_CONTINUE;
932 }
933
934 POOL_STATUS send_execute_message(POOL_CONNECTION_POOL *backend,
935                                                                                 int node_id, int len, char *string)
936 {
937         return send_extended_protocol_message(backend, node_id, "E", len, string);
938 }
939
940 /*
941  * wait until read data is ready
942  */
943 int synchronize(POOL_CONNECTION *cp)
944 {
945         return pool_check_fd(cp);
946 }
947
948 /*
949  * set timeout in seconds for pool_check_fd
950  * if timeoutval < 0, we assume no timeout(wait forever).
951  */
952 void pool_set_timeout(int timeoutval)
953 {
954         if (timeoutval > 0)
955                 timeoutsec = timeoutval;
956         else
957                 timeoutsec = 0;
958 }
959
960 /*
961  * Wait until read data is ready.
962  * return values: 0: normal 1: data is not ready -1: error
963  */
964 int pool_check_fd(POOL_CONNECTION *cp)
965 {
966         fd_set readmask;
967         fd_set exceptmask;
968         int fd;
969         int fds;
970         struct timeval timeout;
971         struct timeval *timeoutp;
972
973         fd = cp->fd;
974
975         if (timeoutsec > 0)
976         {
977                 timeout.tv_sec = timeoutsec;
978                 timeout.tv_usec = 0;
979                 timeoutp = &timeout;
980         }
981         else
982                 timeoutp = NULL;
983
984         for (;;)
985         {
986                 FD_ZERO(&readmask);
987                 FD_ZERO(&exceptmask);
988                 FD_SET(fd, &readmask);
989                 FD_SET(fd, &exceptmask);
990
991                 fds = select(fd+1, &readmask, NULL, &exceptmask, timeoutp);
992                 if (fds == -1)
993                 {
994                         if (errno == EAGAIN || errno == EINTR)
995                                 continue;
996
997                         pool_error("pool_check_fd: select() failed. reason %s", strerror(errno));
998                         break;
999                 }
1000                 else if (fds == 0)              /* timeout */
1001                         return 1;
1002
1003                 if (FD_ISSET(fd, &exceptmask))
1004                 {
1005                         pool_error("pool_check_fd: exception occurred");
1006                         break;
1007                 }
1008                 return 0;
1009         }
1010         return -1;
1011 }
1012
1013 /*
1014  * Process "show pool_status" query.
1015  */
1016 void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1017 {
1018         static char *cursorname = "blank";
1019         static short num_fields = 3;
1020         static char *field_names[] = {"item", "value", "description"};
1021         static int oid = 0;
1022         static short fsize = -1;
1023         static int mod = 0;
1024         short n;
1025         int i, j;
1026         short s;
1027         int len;
1028         short colnum;
1029
1030         static unsigned char nullmap[2] = {0xff, 0xff};
1031         int nbytes = (num_fields + 7)/8;
1032
1033 #define POOLCONFIG_MAXNAMELEN 32
1034 #define POOLCONFIG_MAXVALLEN 512
1035 #define POOLCONFIG_MAXDESCLEN 64
1036
1037         typedef struct {
1038                 char name[POOLCONFIG_MAXNAMELEN+1];
1039                 char value[POOLCONFIG_MAXVALLEN+1];
1040                 char desc[POOLCONFIG_MAXDESCLEN+1];
1041         } POOL_REPORT_STATUS;
1042
1043 #define MAXITEMS 128
1044
1045         POOL_REPORT_STATUS status[MAXITEMS];
1046
1047         short nrows;
1048         int size;
1049         int hsize;
1050
1051         i = 0;
1052
1053         strncpy(status[i].name, "listen_addresses", POOLCONFIG_MAXNAMELEN);
1054         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->listen_addresses);
1055         strncpy(status[i].desc, "host name(s) or IP address(es) to listen to", POOLCONFIG_MAXDESCLEN);
1056         i++;
1057
1058         strncpy(status[i].name, "port", POOLCONFIG_MAXNAMELEN);
1059         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->port);
1060         strncpy(status[i].desc, "pgpool accepting port number", POOLCONFIG_MAXDESCLEN);
1061         i++;
1062
1063         strncpy(status[i].name, "socket_dir", POOLCONFIG_MAXNAMELEN);
1064         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->socket_dir);
1065         strncpy(status[i].desc, "pgpool socket directory", POOLCONFIG_MAXDESCLEN);
1066         i++;
1067
1068         strncpy(status[i].name, "num_init_children", POOLCONFIG_MAXNAMELEN);
1069         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_init_children);
1070         strncpy(status[i].desc, "# of children initially pre-forked", POOLCONFIG_MAXDESCLEN);
1071         i++;
1072
1073         strncpy(status[i].name, "child_life_time", POOLCONFIG_MAXNAMELEN);
1074         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_life_time);
1075         strncpy(status[i].desc, "if idle for this seconds, child exits", POOLCONFIG_MAXDESCLEN);
1076         i++;
1077
1078         strncpy(status[i].name, "connection_life_time", POOLCONFIG_MAXNAMELEN);
1079         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_life_time);
1080         strncpy(status[i].desc, "if idle for this seconds, connection closes", POOLCONFIG_MAXDESCLEN);
1081         i++;
1082
1083         strncpy(status[i].name, "client_idle_limit", POOLCONFIG_MAXNAMELEN);
1084         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit);
1085         strncpy(status[i].desc, "if idle for this seconds, child connection closes", POOLCONFIG_MAXDESCLEN);
1086         i++;
1087
1088         strncpy(status[i].name, "child_max_connections", POOLCONFIG_MAXNAMELEN);
1089         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_max_connections);
1090         strncpy(status[i].desc, "if max_connections received, chile exits", POOLCONFIG_MAXDESCLEN);
1091         i++;
1092
1093         strncpy(status[i].name, "max_pool", POOLCONFIG_MAXNAMELEN);
1094         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->max_pool);
1095         strncpy(status[i].desc, "max # of connection pool per child", POOLCONFIG_MAXDESCLEN);
1096         i++;
1097
1098         strncpy(status[i].name, "authentication_timeout", POOLCONFIG_MAXNAMELEN);
1099         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->authentication_timeout);
1100         strncpy(status[i].desc, "maximum time in seconds to complete client authentication", POOLCONFIG_MAXNAMELEN);
1101         i++;
1102
1103         strncpy(status[i].name, "logdir", POOLCONFIG_MAXNAMELEN);
1104         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->logdir);
1105         strncpy(status[i].desc, "logging directory", POOLCONFIG_MAXDESCLEN);
1106         i++;
1107
1108         strncpy(status[i].name, "pid_file_name", POOLCONFIG_MAXNAMELEN);
1109         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pid_file_name);
1110         strncpy(status[i].desc, "path to pid file", POOLCONFIG_MAXDESCLEN);
1111         i++;
1112
1113         strncpy(status[i].name, "backend_socket_dir", POOLCONFIG_MAXNAMELEN);
1114         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->backend_socket_dir);
1115         strncpy(status[i].desc, "Unix domain socket directory for the PostgreSQL server", POOLCONFIG_MAXDESCLEN);
1116         i++;
1117
1118         strncpy(status[i].name, "replication_mode", POOLCONFIG_MAXNAMELEN);
1119         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_mode);
1120         strncpy(status[i].desc, "non 0 if operating in replication mode", POOLCONFIG_MAXDESCLEN);
1121         i++;
1122
1123         strncpy(status[i].name, "load_balance_mode", POOLCONFIG_MAXNAMELEN);
1124         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->load_balance_mode);
1125         strncpy(status[i].desc, "non 0 if operating in load balancing mode", POOLCONFIG_MAXDESCLEN);
1126         i++;
1127
1128         strncpy(status[i].name, "replication_stop_on_mismatch", POOLCONFIG_MAXNAMELEN);
1129         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_stop_on_mismatch);
1130         strncpy(status[i].desc, "stop replication mode on fatal error", POOLCONFIG_MAXDESCLEN);
1131         i++;
1132
1133         strncpy(status[i].name, "replicate_select", POOLCONFIG_MAXNAMELEN);
1134         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replicate_select);
1135         strncpy(status[i].desc, "non 0 if SELECT statement is replicated", POOLCONFIG_MAXDESCLEN);
1136         i++;
1137
1138         strncpy(status[i].name, "reset_query_list", POOLCONFIG_MAXNAMELEN);
1139         *(status[i].value) = '\0';
1140         for (j=0;j<pool_config->num_reset_queries;j++)
1141         {
1142                 int len;
1143                 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1144                 strncat(status[i].value, pool_config->reset_query_list[j], len);
1145                 len = POOLCONFIG_MAXVALLEN - strlen(status[i].value);
1146                 strncat(status[i].value, ";", len);
1147         }
1148         strncpy(status[i].desc, "queries issued at the end of session", POOLCONFIG_MAXDESCLEN);
1149         i++;
1150
1151         strncpy(status[i].name, "print_timestamp", POOLCONFIG_MAXNAMELEN);
1152         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->print_timestamp);
1153         strncpy(status[i].desc, "if true print time stamp to each log line", POOLCONFIG_MAXDESCLEN);
1154         i++;
1155
1156         strncpy(status[i].name, "master_slave_mode", POOLCONFIG_MAXNAMELEN);
1157         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_mode);
1158         strncpy(status[i].desc, "if true, operate in master/slave mode", POOLCONFIG_MAXDESCLEN);
1159         i++;
1160
1161         strncpy(status[i].name, "connection_cache", POOLCONFIG_MAXNAMELEN);
1162         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_cache);
1163         strncpy(status[i].desc, "if true, cache connection pool", POOLCONFIG_MAXDESCLEN);
1164         i++;
1165
1166         strncpy(status[i].name, "health_check_timeout", POOLCONFIG_MAXNAMELEN);
1167         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_timeout);
1168         strncpy(status[i].desc, "health check timeout", POOLCONFIG_MAXDESCLEN);
1169         i++;
1170
1171         strncpy(status[i].name, "health_check_period", POOLCONFIG_MAXNAMELEN);
1172         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_period);
1173         strncpy(status[i].desc, "health check period", POOLCONFIG_MAXDESCLEN);
1174         i++;
1175
1176         strncpy(status[i].name, "health_check_user", POOLCONFIG_MAXNAMELEN);
1177         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->health_check_user);
1178         strncpy(status[i].desc, "health check user", POOLCONFIG_MAXDESCLEN);
1179         i++;
1180
1181         strncpy(status[i].name, "failover_command", POOLCONFIG_MAXNAMELEN);
1182         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1183         strncpy(status[i].desc, "failover command", POOLCONFIG_MAXDESCLEN);
1184         i++;
1185
1186         strncpy(status[i].name, "failback_command", POOLCONFIG_MAXNAMELEN);
1187         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->failover_command);
1188         strncpy(status[i].desc, "failback command", POOLCONFIG_MAXDESCLEN);
1189         i++;
1190
1191         strncpy(status[i].name, "insert_lock", POOLCONFIG_MAXNAMELEN);
1192         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->insert_lock);
1193         strncpy(status[i].desc, "insert lock", POOLCONFIG_MAXDESCLEN);
1194         i++;
1195
1196         strncpy(status[i].name, "ignore_leading_white_space", POOLCONFIG_MAXNAMELEN);
1197         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->ignore_leading_white_space);
1198         strncpy(status[i].desc, "ignore leading white spaces", POOLCONFIG_MAXDESCLEN);
1199         i++;
1200
1201         strncpy(status[i].name, "replication_enabled", POOLCONFIG_MAXNAMELEN);
1202         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_enabled);
1203         strncpy(status[i].desc, "non 0 if actually operating in replication mode", POOLCONFIG_MAXDESCLEN);
1204         i++;
1205
1206         strncpy(status[i].name, "master_slave_enabled", POOLCONFIG_MAXNAMELEN);
1207         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_enabled);
1208         strncpy(status[i].desc, "non 0 if actually operating in master/slave", POOLCONFIG_MAXDESCLEN);
1209         i++;
1210
1211         strncpy(status[i].name, "num_reset_queries", POOLCONFIG_MAXNAMELEN);
1212         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_reset_queries);
1213         strncpy(status[i].desc, "number of queries in reset_query_list", POOLCONFIG_MAXDESCLEN);
1214         i++;
1215
1216         strncpy(status[i].name, "pcp_port", POOLCONFIG_MAXNAMELEN);
1217         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_port);
1218         strncpy(status[i].desc, "PCP port # to bind", POOLCONFIG_MAXDESCLEN);
1219         i++;
1220
1221         strncpy(status[i].name, "pcp_socket_dir", POOLCONFIG_MAXNAMELEN);
1222         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pcp_socket_dir);
1223         strncpy(status[i].desc, "PCP socket directory", POOLCONFIG_MAXDESCLEN);
1224         i++;
1225
1226         strncpy(status[i].name, "pcp_timeout", POOLCONFIG_MAXNAMELEN);
1227         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_timeout);
1228         strncpy(status[i].desc, "PCP timeout for an idle client", POOLCONFIG_MAXDESCLEN);
1229         i++;
1230
1231         strncpy(status[i].name, "log_statement", POOLCONFIG_MAXNAMELEN);
1232         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_statement);
1233         strncpy(status[i].desc, "if non 0, logs all SQL statements", POOLCONFIG_MAXDESCLEN);
1234         i++;
1235
1236         strncpy(status[i].name, "log_connections", POOLCONFIG_MAXNAMELEN);
1237         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_connections);
1238         strncpy(status[i].desc, "if true, print incoming connections to the log", POOLCONFIG_MAXDESCLEN);
1239         i++;
1240
1241         strncpy(status[i].name, "log_hostname", POOLCONFIG_MAXNAMELEN);
1242         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_hostname);
1243         strncpy(status[i].desc, "if true, resolve hostname for ps and log print", POOLCONFIG_MAXDESCLEN);
1244         i++;
1245
1246         strncpy(status[i].name, "enable_pool_hba", POOLCONFIG_MAXNAMELEN);
1247         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_pool_hba);
1248         strncpy(status[i].desc, "if true, use pool_hba.conf for client authentication", POOLCONFIG_MAXDESCLEN);
1249         i++;
1250
1251         strncpy(status[i].name, "recovery_user", POOLCONFIG_MAXNAMELEN);
1252         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_user);
1253         strncpy(status[i].desc, "online recovery user", POOLCONFIG_MAXDESCLEN);
1254         i++;
1255
1256         strncpy(status[i].name, "recovery_password", POOLCONFIG_MAXNAMELEN);
1257         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_password);
1258         strncpy(status[i].desc, "online recovery password", POOLCONFIG_MAXDESCLEN);
1259         i++;
1260
1261         strncpy(status[i].name, "recovery_1st_stage_command", POOLCONFIG_MAXNAMELEN);
1262         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_1st_stage_command);
1263         strncpy(status[i].desc, "execute a command in first stage.", POOLCONFIG_MAXDESCLEN);
1264         i++;
1265
1266         strncpy(status[i].name, "recovery_2nd_stage_command", POOLCONFIG_MAXNAMELEN);
1267         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->recovery_2nd_stage_command);
1268         strncpy(status[i].desc, "execute a command in second stage.", POOLCONFIG_MAXDESCLEN);
1269         i++;
1270
1271         strncpy(status[i].name, "recovery_timeout", POOLCONFIG_MAXNAMELEN);
1272         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->recovery_timeout);
1273         strncpy(status[i].desc, "max time in seconds to wait for the recovering node's postmaster", POOLCONFIG_MAXDESCLEN);
1274         i++;
1275
1276         strncpy(status[i].name, "client_idle_limit_in_recovery", POOLCONFIG_MAXNAMELEN);
1277         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->client_idle_limit_in_recovery);
1278         strncpy(status[i].desc, "if idle for this seconds, child connection closes in recovery 2nd statge", POOLCONFIG_MAXDESCLEN);
1279         i++;
1280
1281         strncpy(status[i].name, "parallel_mode", POOLCONFIG_MAXNAMELEN);
1282         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->parallel_mode);
1283         strncpy(status[i].desc, "if non 0, run in parallel query mode", POOLCONFIG_MAXDESCLEN);
1284         i++;
1285
1286         strncpy(status[i].name, "enable_query_cache", POOLCONFIG_MAXNAMELEN);
1287         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_query_cache);
1288         strncpy(status[i].desc, "if non 0, use query cache", POOLCONFIG_MAXDESCLEN);
1289         i++;
1290
1291         strncpy(status[i].name, "pgpool2_hostname", POOLCONFIG_MAXNAMELEN);
1292         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pgpool2_hostname);
1293         strncpy(status[i].desc, "pgpool2 hostname", POOLCONFIG_MAXDESCLEN);
1294         i++;
1295
1296         strncpy(status[i].name, "system_db_hostname", POOLCONFIG_MAXNAMELEN);
1297         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_hostname);
1298         strncpy(status[i].desc, "system DB hostname", POOLCONFIG_MAXDESCLEN);
1299         i++;
1300
1301         strncpy(status[i].name, "system_db_port", POOLCONFIG_MAXNAMELEN);
1302         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->system_db_port);
1303         strncpy(status[i].desc, "system DB port number", POOLCONFIG_MAXDESCLEN);
1304         i++;
1305
1306         strncpy(status[i].name, "system_db_dbname", POOLCONFIG_MAXNAMELEN);
1307         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_dbname);
1308         strncpy(status[i].desc, "system DB name", POOLCONFIG_MAXDESCLEN);
1309         i++;
1310
1311         strncpy(status[i].name, "system_db_schema", POOLCONFIG_MAXNAMELEN);
1312         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_schema);
1313         strncpy(status[i].desc, "system DB schema name", POOLCONFIG_MAXDESCLEN);
1314         i++;
1315
1316         strncpy(status[i].name, "system_db_user", POOLCONFIG_MAXNAMELEN);
1317         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_user);
1318         strncpy(status[i].desc, "user name to access system DB", POOLCONFIG_MAXDESCLEN);
1319         i++;
1320
1321         strncpy(status[i].name, "system_db_password", POOLCONFIG_MAXNAMELEN);
1322         snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_password);
1323         strncpy(status[i].desc, "password to access system DB", POOLCONFIG_MAXDESCLEN);
1324         i++;
1325
1326         for (j = 0; j < NUM_BACKENDS; j++)
1327         {
1328                 if (BACKEND_INFO(j).backend_port == 0)
1329                         continue;
1330
1331                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_hostname%d", j);
1332                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", BACKEND_INFO(j).backend_hostname);
1333                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d hostname", j);
1334                 i++;
1335
1336                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_port%d", j);
1337                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_port);
1338                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d port number", j);
1339                 i++;
1340
1341                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_weight%d", j);
1342                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%f", BACKEND_INFO(j).backend_weight);
1343                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "weight of backend #%d", j);
1344                 i++;
1345
1346                 snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend status%d", j);
1347                 snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_status);
1348                 snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "status of backend #%d", j);
1349                 i++;
1350         }
1351
1352         nrows = i;
1353
1354         if (MAJOR(backend) == PROTO_MAJOR_V2)
1355         {
1356                 /* cursor response */
1357                 pool_write(frontend, "P", 1);
1358                 pool_write(frontend, cursorname, strlen(cursorname)+1);
1359         }
1360
1361         /* row description */
1362         pool_write(frontend, "T", 1);
1363
1364         if (MAJOR(backend) == PROTO_MAJOR_V3)
1365         {
1366                 len = sizeof(num_fields) + sizeof(len);
1367
1368                 for (i=0;i<num_fields;i++)
1369                 {
1370                         char *f = field_names[i];
1371                         len += strlen(f)+1;
1372                         len += sizeof(oid);
1373                         len += sizeof(colnum);
1374                         len += sizeof(oid);
1375                         len += sizeof(s);
1376                         len += sizeof(mod);
1377                         len += sizeof(s);
1378                 }
1379
1380                 len = htonl(len);
1381                 pool_write(frontend, &len, sizeof(len));
1382         }
1383
1384         n = htons(num_fields);
1385         pool_write(frontend, &n, sizeof(short));
1386
1387         for (i=0;i<num_fields;i++)
1388         {
1389                 char *f = field_names[i];
1390
1391                 pool_write(frontend, f, strlen(f)+1);           /* field name */
1392
1393                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1394                 {
1395                         pool_write(frontend, &oid, sizeof(oid));        /* table oid */
1396                         colnum = htons(i);
1397                         pool_write(frontend, &colnum, sizeof(colnum));  /* column number */
1398                 }
1399
1400                 pool_write(frontend, &oid, sizeof(oid));                /* data type oid */
1401                 s = htons(fsize);
1402                 pool_write(frontend, &s, sizeof(fsize));                /* field size */
1403                 pool_write(frontend, &mod, sizeof(mod));                /* modifier */
1404
1405                 if (MAJOR(backend) == PROTO_MAJOR_V3)
1406                 {
1407                         s = htons(0);
1408                         pool_write(frontend, &s, sizeof(fsize));        /* field format (text) */
1409                 }
1410         }
1411         pool_flush(frontend);
1412
1413         if (MAJOR(backend) == PROTO_MAJOR_V2)
1414         {
1415                 /* ascii row */
1416                 for (i=0;i<nrows;i++)
1417                 {
1418                         pool_write(frontend, "D", 1);
1419                         pool_write_and_flush(frontend, nullmap, nbytes);
1420
1421                         size = strlen(status[i].name);
1422                         hsize = htonl(size+4);
1423                         pool_write(frontend, &hsize, sizeof(hsize));
1424                         pool_write(frontend, status[i].name, size);
1425
1426                         size = strlen(status[i].value);
1427                         hsize = htonl(size+4);
1428                         pool_write(frontend, &hsize, sizeof(hsize));
1429                         pool_write(frontend, status[i].value, size);
1430
1431                         size = strlen(status[i].desc);
1432                         hsize = htonl(size+4);
1433                         pool_write(frontend, &hsize, sizeof(hsize));
1434                         pool_write(frontend, status[i].desc, size);
1435                 }
1436         }
1437         else
1438         {
1439                 /* data row */
1440                 for (i=0;i<nrows;i++)
1441                 {
1442                         pool_write(frontend, "D", 1);
1443                         len = sizeof(len) + sizeof(nrows);
1444                         len += sizeof(int) + strlen(status[i].name);
1445                         len += sizeof(int) + strlen(status[i].value);
1446                         len += sizeof(int) + strlen(status[i].desc);
1447                         len = htonl(len);
1448                         pool_write(frontend, &len, sizeof(len));
1449                         s = htons(3);
1450                         pool_write(frontend, &s, sizeof(s));
1451
1452                         len = htonl(strlen(status[i].name));
1453                         pool_write(frontend, &len, sizeof(len));
1454                         pool_write(frontend, status[i].name, strlen(status[i].name));
1455
1456                         len = htonl(strlen(status[i].value));
1457                         pool_write(frontend, &len, sizeof(len));
1458                         pool_write(frontend, status[i].value, strlen(status[i].value));
1459
1460                         len = htonl(strlen(status[i].desc));
1461                         pool_write(frontend, &len, sizeof(len));
1462                         pool_write(frontend, status[i].desc, strlen(status[i].desc));
1463                 }
1464         }
1465
1466         /* complete command response */
1467         pool_write(frontend, "C", 1);
1468         if (MAJOR(backend) == PROTO_MAJOR_V3)
1469         {
1470                 len = htonl(sizeof(len) + strlen("SELECT")+1);
1471                 pool_write(frontend, &len, sizeof(len));
1472         }
1473         pool_write(frontend, "SELECT", strlen("SELECT")+1);
1474
1475         /* ready for query */
1476         pool_write(frontend, "Z", 1);
1477         if (MAJOR(backend) == PROTO_MAJOR_V3)
1478         {
1479                 len = htonl(sizeof(len) + 1);
1480                 pool_write(frontend, &len, sizeof(len));
1481                 pool_write(frontend, "I", 1);
1482         }
1483
1484         pool_flush(frontend);
1485 }
1486
1487 /*
1488  * send "terminate"(X) message to all backends, indicating that
1489  * backend should prepare to close connection to frontend (actually
1490  * pgpool). Note that caller must be protecedt from a signal
1491  * interruption while calling this function. Otherwise the number of
1492  * valid backends might be changed by failover/failback.
1493  */
1494 void pool_send_frontend_exits(POOL_CONNECTION_POOL *backend)
1495 {
1496         int len;
1497         int i;
1498
1499         for (i=0;i<NUM_BACKENDS;i++)
1500         {
1501                 /*
1502                  * send a terminate message to backend if there's an existing
1503                  * connection
1504                  */
1505                 if (VALID_BACKEND(i) && CONNECTION_SLOT(backend, i))
1506                 {
1507                         pool_write(CONNECTION(backend, i), "X", 1);
1508
1509                         if (MAJOR(backend) == PROTO_MAJOR_V3)
1510                         {
1511                                 len = htonl(4);
1512                                 pool_write(CONNECTION(backend, i), &len, sizeof(len));
1513                         }
1514
1515                         /*
1516                          * XXX we cannot call pool_flush() here since backend may already
1517                          * close the socket and pool_flush() automatically invokes fail
1518                          * over handler. This could happen in copy command (remember the
1519                          * famous "lost synchronization with server, resetting
1520                          * connection" message)
1521                          */
1522                         pool_set_nonblock(CONNECTION(backend, i)->fd);
1523                         pool_flush_it(CONNECTION(backend, i));
1524                         pool_unset_nonblock(CONNECTION(backend, i)->fd);
1525                 }
1526         }
1527 }
1528
1529 /*
1530  * -------------------------------------------------------
1531  * V3 functions
1532  * -------------------------------------------------------
1533  */
1534
1535 /*
1536  * This function transmits to a parallel Query to each backend,
1537  * and receives the results from backends .
1538  *
1539  */
1540 static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend)
1541 {
1542         int len;
1543         char *p;
1544         int status;
1545
1546         if (send_to_frontend)
1547         {
1548                 pool_write(frontend, &kind, 1);
1549         }
1550
1551         status = pool_read(backend, &len, sizeof(len));
1552         if (status < 0)
1553         {
1554                 pool_error("ParallelForwardToFrontend: error while reading message length");
1555                 return POOL_END;
1556         }
1557
1558         if (send_to_frontend)
1559         {
1560                 pool_write(frontend, &len, sizeof(len));
1561         }
1562
1563         len = ntohl(len) - 4 ;
1564
1565         if (len <= 0)
1566                 return POOL_CONTINUE;
1567
1568         p = pool_read2(backend, len);
1569         if (p == NULL)
1570                 return POOL_END;
1571
1572         status = POOL_CONTINUE;
1573         if (send_to_frontend)
1574         {
1575                 status = pool_write(frontend, p, len);
1576                 if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP && status == 0)
1577                 {
1578                         query_cache_register(kind, frontend, database, p, len);
1579                 }
1580         }
1581
1582         return status;
1583 }
1584
1585 POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1586 {
1587         int len, len1 = 0;
1588         char *p = NULL;
1589         char *p1 = NULL;
1590         char *p2 = NULL;
1591         int status;
1592         int sendlen;
1593         int i;
1594         int command_ok_row_count = 0;
1595         int delete_or_update = 0;
1596         char kind1;
1597         POOL_STATUS ret;
1598
1599         /*
1600          * Check if packet kind == 'C'(Command complete), '1'(Parse
1601          * complete), '3'(Close complete). If so, then register or
1602          * unregister pending prepared statement.
1603          */
1604         if ((kind == 'C' || kind == '1' || kind == '3') &&
1605                 pending_function)
1606         {
1607                 pending_function(&prepared_list, pending_prepared_portal);
1608                 if (pending_prepared_portal &&
1609                         pending_prepared_portal->stmt &&
1610                         IsA(pending_prepared_portal->stmt, DeallocateStmt))
1611                 {
1612                         free(pending_prepared_portal->portal_name);
1613                         pending_prepared_portal->portal_name = NULL;
1614                         pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1615                         free(pending_prepared_portal);
1616                 }
1617         }
1618         else if (kind == 'E' && pending_function)
1619         {
1620                 /* An error occurred with PREPARE or DEALLOCATE command.
1621                  * Free pending portal object.
1622                  */
1623                 if (pending_prepared_portal)
1624                 {
1625                         free(pending_prepared_portal->portal_name);
1626                         pending_prepared_portal->portal_name = NULL;
1627                         pool_memory_delete(pending_prepared_portal->prepare_ctxt, 0);
1628                         free(pending_prepared_portal);
1629                 }
1630         }
1631         else if (kind == 'C' && select_in_transaction)
1632         {
1633                 select_in_transaction = 0;
1634                 execute_select = 0;
1635         }
1636
1637         /*
1638          * Remove a pending function if a received message is not
1639          * NoticeResponse.
1640          */
1641         if (kind != 'N')
1642         {
1643                 pending_function = NULL;
1644                 pending_prepared_portal = NULL;
1645         }
1646
1647         status = pool_read(MASTER(backend), &len, sizeof(len));
1648         len = ntohl(len);
1649         len -= 4;
1650         len1 = len;
1651
1652         p = pool_read2(MASTER(backend), len);
1653         if (p == NULL)
1654                 return POOL_END;
1655         p1 = malloc(len);
1656         if (p1 == NULL)
1657         {
1658                 pool_error("SimpleForwardToFrontend: malloc failed");
1659                 return POOL_ERROR;
1660         }
1661         memcpy(p1, p, len);
1662
1663         if (kind == 'C')        /* packet kind is "Command Complete"? */
1664         {
1665                 command_ok_row_count = extract_ntuples(p);
1666
1667                 /*
1668                  * if we are in the parallel mode, we have to sum up the number
1669                  * of affected rows
1670                  */
1671                 if (PARALLEL_MODE && is_parallel_table &&
1672                         (strstr(p, "UPDATE") || strstr(p, "DELETE")))
1673                 {
1674                         delete_or_update = 1;
1675                 }
1676         }
1677
1678         for (i=0;i<NUM_BACKENDS;i++)
1679         {
1680                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1681                 {
1682                         status = pool_read(CONNECTION(backend, i), &len, sizeof(len));
1683                         if (status < 0)
1684                         {
1685                                 pool_error("SimpleForwardToFrontend: error while reading message length");
1686                                 return POOL_END;
1687                         }
1688
1689                         len = ntohl(len);
1690                         len -= 4;
1691
1692                         p = pool_read2(CONNECTION(backend, i), len);
1693                         if (p == NULL)
1694                                 return POOL_END;
1695
1696                         if (len != len1)
1697                         {
1698                                 pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) %d th backend(%d) kind:(%c)",
1699                                                    len, i, len1, kind);
1700                         }
1701
1702                         if (kind == 'C')        /* packet kind is "Command Complete"? */
1703                         {
1704                                 int n = extract_ntuples(p);
1705
1706                                 /*
1707                                  * if we are in the parallel mode, we have to sum up the number
1708                                  * of affected rows
1709                                  */
1710                                 if (delete_or_update)
1711                                 {
1712                                         command_ok_row_count += n;
1713                                 }
1714                                 else if (command_ok_row_count != n) /* mismatch update rows */
1715                                 {
1716                                         mismatch_ntuples = 1;
1717                                 }
1718                         }
1719                 }
1720         }
1721
1722         if (mismatch_ntuples)
1723         {
1724                 String *msg = init_string("pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \"");
1725                 string_append_char(msg, query_string_buffer);
1726                 string_append_char(msg, "\"");
1727                 pool_send_error_message(frontend, MAJOR(backend),
1728                                                                 "XX001", msg->data, "",
1729                                                                 "check data consistency between master and other db node",  __FILE__, __LINE__);
1730                 pool_error(msg->data);
1731                 free_string(msg);
1732         }
1733         else
1734         {
1735                 if (delete_or_update)
1736                 {
1737                         char tmp[32];
1738
1739                         strncpy(tmp, p1, 7);
1740                         sprintf(tmp+7, "%d", command_ok_row_count);
1741
1742                         p2 = strdup(tmp);
1743                         if (p2 == NULL)
1744                         {
1745                                 pool_error("SimpleForwardToFrontend: malloc failed");
1746                                 free(p1);
1747                                 return POOL_ERROR;
1748                         }
1749
1750                         free(p1);
1751                         p1 = p2;
1752                         len1 = strlen(p2) + 1;
1753                 }
1754
1755                 pool_write(frontend, &kind, 1);
1756                 sendlen = htonl(len1+4);
1757                 pool_write(frontend, &sendlen, sizeof(sendlen));
1758                 pool_write(frontend, p1, len1);
1759         }
1760
1761         /* save the received result for each kind */
1762         if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP)
1763         {
1764                 query_cache_register(kind, frontend, backend->info->database, p1, len1);
1765         }
1766
1767         free(p1);
1768         if (status)
1769                 return POOL_END;
1770
1771         if (kind == 'A')        /* notification response */
1772         {
1773                 pool_flush(frontend);   /* we need to immediately notice to frontend */
1774         }
1775         else if (kind == 'E')           /* error response? */
1776         {
1777                 int i;
1778                 int res1;
1779                 char *p1;
1780
1781                 /*
1782                  * check if the error was PANIC or FATAL. If so, we just flush
1783                  * the message and exit since the backend will close the
1784                  * channel immediately.
1785                  */
1786                 for (;;)
1787                 {
1788                         char e;
1789
1790                         e = *p++;
1791                         if (e == '\0')
1792                                 break;
1793
1794                         if (e == 'S' && (strcasecmp("PANIC", p) == 0 || strcasecmp("FATAL", p) == 0))
1795                         {
1796                                 pool_flush(frontend);
1797                                 return POOL_END;
1798                         }
1799                         else
1800                         {
1801                                 while (*p++)
1802                                         ;
1803                                 continue;
1804                         }
1805                 }
1806
1807                 if (select_in_transaction)
1808                 {
1809                         int i;
1810
1811                         in_load_balance = 0;
1812                         REPLICATION = 1;
1813                         for (i = 0; i < NUM_BACKENDS; i++)
1814                         {
1815                                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
1816                                 {
1817                                         /*
1818                                          * We must abort transaction to sync transaction state.
1819                                          * If the error was caused by an Execute message,
1820                                          * we must send invalid Execute message to abort
1821                                          * transaction.
1822                                          *
1823                                          * Because extended query protocol ignores all
1824                                          * messages before receiving Sync message inside error state.
1825                                          */
1826                                         if (execute_select)
1827                                                 do_error_execute_command(backend, i, PROTO_MAJOR_V3);
1828                                         else
1829                                                 do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1830                                 }
1831                         }
1832                         select_in_transaction = 0;
1833                         execute_select = 0;
1834                 }
1835
1836                 for (i = 0;i < NUM_BACKENDS; i++)
1837                 {
1838                         if (VALID_BACKEND(i))
1839                         {
1840                                 POOL_CONNECTION *cp = CONNECTION(backend, i);
1841
1842                                 /* We need to send "sync" message to backend in extend mode
1843                                  * so that it accepts next command.
1844                                  * Note that this may be overkill since client may send
1845                                  * it by itself. Moreover we do not need it in non-extend mode.
1846                                  * At this point we regard it is not harmful since error response
1847                                  * will not be sent too frequently.
1848                                  */
1849                                 pool_write(cp, "S", 1);
1850                                 res1 = htonl(4);
1851                                 if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0)
1852                                 {
1853                                         return POOL_END;
1854                                 }
1855                         }
1856                 }
1857
1858                 while ((ret = read_kind_from_backend(frontend, backend, &kind1)) == POOL_CONTINUE)
1859                 {
1860                         if (kind1 == 'Z') /* ReadyForQuery? */
1861                                 break;
1862
1863                         ret = SimpleForwardToFrontend(kind1, frontend, backend);
1864                         if (ret != POOL_CONTINUE)
1865                                 return ret;
1866                         pool_flush(frontend);
1867                 }
1868
1869                 if (ret != POOL_CONTINUE)
1870                         return ret;
1871
1872                 for (i = 0; i < NUM_BACKENDS; i++)
1873                 {
1874                         if (VALID_BACKEND(i))
1875                         {
1876                                 status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1));
1877                                 if (status < 0)
1878                                 {
1879                                         pool_error("SimpleForwardToFrontend: error while reading message length");
1880                                         return POOL_END;
1881                                 }
1882                                 res1 = ntohl(res1) - sizeof(res1);
1883                                 p1 = pool_read2(CONNECTION(backend, i), res1);
1884                                 if (p1 == NULL)
1885                                         return POOL_END;
1886                         }
1887                 }
1888         }
1889         return POOL_CONTINUE;
1890 }
1891
1892 POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1893 {
1894         int len;
1895         int sendlen;
1896         char *p;
1897         int i;
1898         char *name;
1899         POOL_STATUS ret;
1900
1901         for (i=0;i<NUM_BACKENDS;i++)
1902         {
1903                 if (VALID_BACKEND(i))
1904                 {
1905                         if (pool_write(CONNECTION(backend, i), &kind, 1))
1906                                 return POOL_END;
1907                 }
1908         }
1909
1910         if (pool_read(frontend, &sendlen, sizeof(sendlen)))
1911         {
1912                 return POOL_END;
1913         }
1914
1915         len = ntohl(sendlen) - 4;
1916
1917         for (i=0;i<NUM_BACKENDS;i++)
1918         {
1919                 if (VALID_BACKEND(i))
1920                 {
1921                         if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen)))
1922                                 return POOL_END;
1923                 }
1924         }
1925
1926         if (len == 0)
1927                 return POOL_CONTINUE;
1928         else if (len < 0)
1929         {
1930                 pool_error("SimpleForwardToBackend: invalid message length");
1931                 return POOL_END;
1932         }
1933
1934         p = pool_read2(frontend, len);
1935         if (p == NULL)
1936                 return POOL_END;
1937
1938         for (i=0;i<NUM_BACKENDS;i++)
1939         {
1940                 if (VALID_BACKEND(i))
1941                 {
1942                         if (pool_write_and_flush(CONNECTION(backend, i), p, len))
1943                                 return POOL_END;
1944                 }
1945         }
1946
1947         if (kind == 'B') /* Bind message */
1948         {
1949                 Portal *portal = NULL;
1950                 char *stmt_name, *portal_name;
1951
1952                 portal_name = p;
1953                 stmt_name = p + strlen(portal_name) + 1;
1954
1955                 pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name);
1956
1957                 if (*stmt_name == '\0')
1958                         portal = unnamed_statement;
1959                 else
1960                 {
1961                         portal = lookup_prepared_statement_by_statement(&prepared_list, stmt_name);
1962                 }
1963
1964                 if (*portal_name == '\0'){
1965                         unnamed_portal = portal;
1966                 }
1967                 else if (portal)
1968                 {
1969                         if (portal->portal_name)
1970                                 free(portal->portal_name);
1971                         portal->portal_name = strdup(portal_name);
1972                 }
1973         }
1974
1975         /* Close message with prepared statement name. */
1976         else if (kind == 'C' && *p == 'S' && *(p + 1))
1977         {
1978                 POOL_MEMORY_POOL *old_context = pool_memory;
1979                 DeallocateStmt *deallocate_stmt;
1980
1981                 pending_prepared_portal = create_portal();
1982                 if (pending_prepared_portal == NULL)
1983                 {
1984                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1985                         return POOL_END;
1986                 }
1987
1988                 pool_memory = pending_prepared_portal->prepare_ctxt;
1989                 name = pstrdup(p+1);
1990                 if (name == NULL)
1991                 {
1992                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
1993                         pool_memory = old_context;
1994                         return POOL_END;
1995                 }
1996
1997                 /* Translate from Close message to DEALLOCATE statement.*/
1998                 deallocate_stmt = palloc(sizeof(DeallocateStmt));
1999                 if (deallocate_stmt == NULL)
2000                 {
2001                         pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno));
2002                         pool_memory = old_context;
2003                         return POOL_END;
2004                 }
2005                 deallocate_stmt->name = name;
2006                 pending_prepared_portal->stmt = (Node *)deallocate_stmt;
2007                 pending_prepared_portal->portal_name = NULL;
2008                 pending_function = del_prepared_list;
2009                 pool_memory = old_context;
2010         }
2011
2012         if (kind == 'B' || kind == 'D' || kind == 'C')
2013         {
2014                 int i;
2015                 char kind1;
2016
2017                 for (i = 0;i < NUM_BACKENDS; i++)
2018                 {
2019                         if (VALID_BACKEND(i))
2020                         {
2021                                 POOL_CONNECTION *cp = CONNECTION(backend, i);
2022
2023                                 /*
2024                                  * send "Flush" message so that backend notices us
2025                                  * the completion of the command
2026                                  */
2027                                 pool_write(cp, "H", 1);
2028                                 sendlen = htonl(4);
2029                                 if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
2030                                 {
2031                                         return POOL_END;
2032                                 }
2033                         }
2034                 }
2035
2036                 /*
2037                  * Describe message with a portal name will receive two messages.
2038                  * 1. ParameterDescription
2039                  * 2. RowDescriptions or NoData
2040                  * So we read one message here.
2041                  */
2042                 if (kind == 'D' && *p == 'S')
2043                 {
2044                         ret = read_kind_from_backend(frontend, backend, &kind1);
2045                         if (ret != POOL_CONTINUE)
2046                                 return ret;
2047                         SimpleForwardToFrontend(kind1, frontend, backend);
2048                         if (pool_flush(frontend))
2049                                 return POOL_END;
2050                 }
2051
2052                 /*
2053                  * Forward to frontend until a NOTICE message received.
2054                  */
2055                 for (;;)
2056                 {
2057                         ret = read_kind_from_backend(frontend, backend, &kind1);
2058                         if (ret != POOL_CONTINUE)
2059                                 return ret;
2060                         SimpleForwardToFrontend(kind1, frontend, backend);
2061                         if (pool_flush(frontend) < 0)
2062                                 return POOL_ERROR;
2063
2064                         if (kind1 != 'N')
2065                                 break;
2066                 }
2067         }
2068
2069         return POOL_CONTINUE;
2070 }
2071
2072 POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2073 {
2074         int len, len1 = 0;
2075         int *len_array;
2076         int sendlen;
2077         char *p;
2078         char *name;
2079         char *value;
2080         POOL_STATUS status;
2081         char parambuf[1024];            /* parameter + value string buffer. XXX is this enough? */
2082         int i;
2083
2084         pool_write(frontend, "S", 1);
2085
2086         len_array = pool_read_message_length2(backend);
2087
2088         if (len_array == NULL)
2089         {
2090                 return POOL_END;
2091         }
2092
2093         len = len_array[MASTER_NODE_ID];
2094         sendlen = htonl(len);
2095         pool_write(frontend, &sendlen, sizeof(sendlen));
2096
2097         for (i=0;i<NUM_BACKENDS;i++)
2098         {
2099                 if (VALID_BACKEND(i))
2100                 {
2101                         len = len_array[i];
2102                         len -= 4;
2103
2104                         p = pool_read2(CONNECTION(backend, i), len);
2105                         if (p == NULL)
2106                                 return POOL_END;
2107
2108                         name = p;
2109                         value = p + strlen(name) + 1;
2110
2111                         pool_debug("%d th backend: name: %s value: %s", i, name, value);
2112
2113                         if (IS_MASTER_NODE_ID(i))
2114                         {
2115                                 len1 = len;
2116                                 memcpy(parambuf, p, len);
2117                                 pool_add_param(&CONNECTION(backend, i)->params, name, value);
2118                         }
2119
2120 #ifdef DEBUG
2121                         pool_param_debug_print(&MASTER(backend)->params);
2122 #endif
2123                 }
2124         }
2125
2126         status = pool_write(frontend, parambuf, len1);
2127         return status;
2128 }
2129
2130 /*
2131  * Reset backend status. return values are:
2132  * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error
2133  */
2134 static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt)
2135 {
2136         char *query;
2137         int qn;
2138
2139         /*
2140          * Reset all state variables
2141          */
2142         in_progress = 0;
2143         in_load_balance = 0;
2144
2145         /*
2146          * Until pgpool-II 2.2.3 we don't have following 2 lines.
2147          * If we were executing someting in load balance mode, and if
2148          * frontend failed before executing end_loadl_balance() in
2149          * ReadyForQuery(), these variables remained and we may do
2150          * something only in master node!
2151          */
2152         REPLICATION = pool_config->replication_mode;
2153         MASTER_SLAVE = pool_config->master_slave_mode;
2154
2155         force_replication = 0;
2156         internal_transaction_started = 0;
2157         mismatch_ntuples = 0;
2158         select_in_transaction = 0;
2159         execute_select = 0;
2160         receive_extended_begin = 0;
2161
2162         qn = pool_config->num_reset_queries;
2163
2164         /*
2165          * After execution of all SQL commands in the reset_query_list, we
2166          * remove all prepared objects in the prepared_list.
2167          */
2168         if (qcnt >= qn)
2169         {
2170                 if (prepared_list.cnt == 0)
2171                 {
2172                         /*
2173                          * Either no prepared objects were created or DISCARD ALL
2174                          * or DEALLOCATE ALL is on the reset_query_list and they
2175                          * were executed.  The latter causes call to
2176                          * reset_prepared_list which removes all prepared objects.
2177                          */
2178                         reset_prepared_list(&prepared_list);
2179                         return 2;
2180                 }
2181
2182                 /* Delete from prepared list */
2183                 if (send_deallocate(backend, &prepared_list, 0))
2184                 {
2185                         /* Deallocate failed. We are in unknown state. Ask caller
2186                          * to reset backend connection.
2187                          */
2188                         reset_prepared_list(&prepared_list);
2189                         return -1;
2190                 }
2191                 /*
2192                  * If DEALLOCATE returns ERROR response, instead of
2193                  * CommandComplete, del_prepared_list is not called and the
2194                  * prepared object keeps on sitting on the prepared list. This
2195                  * will cause infinite call to reset_backend.  So we call
2196                  * del_prepared_list() again. This is harmless since trying to
2197                  * remove same prepared object will be ignored.
2198                  */
2199                 del_prepared_list(&prepared_list, prepared_list.portal_list[0]);
2200                 return 1;
2201         }
2202
2203         query = pool_config->reset_query_list[qcnt];
2204
2205         /* if transaction state is idle, we don't need to issue ABORT */
2206         if (TSTATE(backend) == 'I' && !strcmp("ABORT", query))
2207                 return 0;
2208
2209         pool_set_timeout(10);
2210
2211         if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
2212         {
2213                 pool_set_timeout(0);
2214                 return -1;
2215         }
2216
2217         pool_set_timeout(0);
2218         return 1;
2219 }
2220
2221 /*
2222  * return non 0 if load balance is possible
2223  */
2224 int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql)
2225 {
2226         return (pool_config->load_balance_mode &&
2227                         (DUAL_MODE || pool_config->parallel_mode) &&
2228                         MAJOR(backend) == PROTO_MAJOR_V3 &&
2229                         TSTATE(backend) == 'I' &&
2230                         is_select_query(node, sql) &&
2231                         !is_sequence_query(node));
2232 }
2233
2234
2235 /*
2236  * returns non 0 if the SQL statement can be load
2237  * balanced. Followings are statemnts go into this category.
2238  *
2239  * - SELECT without FOR UPDATE/SHARE
2240  * - COPY TO STDOUT
2241  * - DECLARE..SELECT (without INTO nor FOR UPDATE/SHARE)
2242  * - FETCH
2243  * - CLOSE
2244  *
2245  * note that for SELECT INTO, this function returns 0
2246  */
2247 int is_select_query(Node *node, char *sql)
2248 {
2249         if (node == NULL)
2250                 return 0;
2251
2252         /*
2253          * 2009/5/1 Tatsuo says: This test is not bogus. As of 2.2, pgpool
2254          * sets Portal->sql_string to NULL for SQL command PREPARE.
2255          * Usually this is ok, since in most cases SQL command EXECUTE
2256          * follows anyway. Problem is, some applications mix PREPARE with
2257          * extended protocol command "EXECUTE" and so on. Execute() seems
2258          * to think this never happens but it is not real. Someday we
2259          * should extract actual query string from PrepareStmt->query and
2260          * set it to Portal->sql_string.
2261          */
2262         if (sql == NULL)
2263                 return 0;
2264
2265         if (pool_config->ignore_leading_white_space)
2266         {
2267                 /* ignore leading white spaces */
2268                 while (*sql && isspace(*sql))
2269                         sql++;
2270         }
2271
2272         if (IsA(node, SelectStmt) || IsA(node, DeclareCursorStmt))
2273         {
2274                 SelectStmt *select_stmt;
2275
2276                 if (IsA(node, SelectStmt))
2277                          select_stmt = (SelectStmt *)node;
2278                 else
2279                         select_stmt = (SelectStmt *)((DeclareCursorStmt *)node)->query;
2280
2281                 if (select_stmt->intoClause || select_stmt->lockingClause)
2282                         return 0;
2283
2284                 if (IsA(node, SelectStmt))
2285                         return (*sql == 's' || *sql == 'S' || *sql == '(');
2286                 else
2287                         return (*sql == 'd' || *sql == 'D');
2288         }
2289         else if (IsA(node, FetchStmt) || IsA(node, ClosePortalStmt))
2290         {
2291                 return (*sql == 'f' || *sql == 'F' || *sql == 'c' || *sql == 'C');
2292         }
2293         else if (IsA(node, CopyStmt))
2294         {
2295                 CopyStmt *copy_stmt = (CopyStmt *)node;
2296                 return (copy_stmt->is_from == FALSE &&
2297                                 copy_stmt->filename == NULL);
2298         }
2299         return 0;
2300 }
2301
2302 /*
2303  * returns non 0 if SQL is SELECT statement including nextval() or
2304  * setval() call
2305  */
2306 int is_sequence_query(Node *node)
2307 {
2308         SelectStmt *select_stmt;
2309         ListCell *lc;
2310
2311         if (node == NULL || !IsA(node, SelectStmt))
2312                 return 0;
2313
2314         select_stmt = (SelectStmt *)node;
2315         foreach (lc, select_stmt->targetList)
2316         {
2317                 if (IsA(lfirst(lc), ResTarget))
2318                 {
2319                         ResTarget *t;
2320                         FuncCall *fc;
2321                         ListCell *c;
2322
2323                         t = (ResTarget *) lfirst(lc);
2324                         if (IsA(t->val, FuncCall))
2325                         {
2326                                 fc = (FuncCall *) t->val;
2327                                 foreach (c, fc->funcname)
2328                                 {
2329                                         Value *v = lfirst(c);
2330                                         if (strncasecmp(v->val.str, "NEXTVAL", 7) == 0)
2331                                                 return 1;
2332                                         else if (strncasecmp(v->val.str, "SETVAL", 6) == 0)
2333                                                 return 1;
2334                                 }
2335                         }
2336                 }
2337         }
2338
2339         return 0;
2340 }
2341
2342 /*
2343  * returns non 0 if SQL is transaction starting command (START
2344  * TRANSACTION or BEGIN)
2345  */
2346 int is_start_transaction_query(Node *node)
2347 {
2348         TransactionStmt *stmt;
2349
2350         if (node == NULL || !IsA(node, TransactionStmt))
2351                 return 0;
2352
2353         stmt = (TransactionStmt *)node;
2354         return stmt->kind == TRANS_STMT_START || stmt->kind == TRANS_STMT_BEGIN;
2355 }
2356
2357 /*
2358  * returns non 0 if SQL is transaction commit or abort command (END
2359  * TRANSACTION or ROLLBACK or ABORT)
2360  */
2361 int is_commit_query(Node *node)
2362 {
2363         TransactionStmt *stmt;
2364
2365         if (node == NULL || !IsA(node, TransactionStmt))
2366                 return 0;
2367
2368         stmt = (TransactionStmt *)node;
2369         return stmt->kind == TRANS_STMT_COMMIT || stmt->kind == TRANS_STMT_ROLLBACK;
2370 }
2371
2372 /*
2373  * start load balance mode
2374  */
2375 void start_load_balance(POOL_CONNECTION_POOL *backend)
2376 {
2377 #ifdef NOT_USED
2378         double total_weight,r;
2379         int i;
2380
2381         /* save backend connection slots */
2382         for (i=0;i<NUM_BACKENDS;i++)
2383         {
2384                 if (VALID_BACKEND(i))
2385                 {
2386                         slots[i] = CONNECTION_SLOT(backend, i);
2387                 }
2388         }
2389 #endif
2390
2391         /* temporarily turn off replication mode */
2392         if (REPLICATION)
2393                 replication_was_enabled = 1;
2394         if (MASTER_SLAVE)
2395                 master_slave_was_enabled = 1;
2396
2397         REPLICATION = 0;
2398         MASTER_SLAVE = 0;
2399
2400 #ifdef NOTUSED
2401         backend->slots[0] = slots[selected_slot];
2402 #endif
2403         LOAD_BALANCE_STATUS(backend->info->load_balancing_node) = LOAD_SELECTED;
2404         selected_slot = backend->info->load_balancing_node;
2405
2406         /* start load balancing */
2407         in_load_balance = 1;
2408 }
2409
2410 /*
2411  * finish load balance mode
2412  */
2413 void end_load_balance(POOL_CONNECTION_POOL *backend)
2414 {
2415         in_load_balance = 0;
2416         LOAD_BALANCE_STATUS(selected_slot) = LOAD_UNSELECTED;
2417
2418 #ifdef NOT_USED
2419         /* restore backend connection slots */
2420
2421         for (i=0;i<NUM_BACKENDS;i++)
2422         {
2423                 if (VALID_BACKEND(i))
2424                 {
2425                         CONNECTION_SLOT(backend, i) = slots[i];
2426                 }
2427         }
2428 #endif
2429
2430         /* turn on replication mode */
2431         REPLICATION = replication_was_enabled;
2432         MASTER_SLAVE = master_slave_was_enabled;
2433
2434         replication_was_enabled = 0;
2435         master_slave_was_enabled = 0;
2436
2437         pool_debug("end_load_balance: end load balance mode");
2438 }
2439
2440 /*
2441  * send error message to frontend
2442  */
2443 void pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor,
2444                                                          char *code,
2445                                                          char *message,
2446                                                          char *detail,
2447                                                          char *hint,
2448                                                          char *file,
2449                                                          int line)
2450 {
2451 /*
2452  * Buffer length for each message part
2453  */
2454 #define MAXMSGBUF 256
2455 /*
2456  * Buffer length for result message buffer.
2457  * Since msg is consisted of 7 parts, msg buffer should be large
2458  * enough to hold those message parts
2459 */
2460 #define MAXDATA (MAXMSGBUF+1)*7+1
2461
2462         pool_set_nonblock(frontend->fd);
2463
2464         if (protoMajor == PROTO_MAJOR_V2)
2465         {
2466                 pool_write(frontend, "E", 1);
2467                 pool_write_and_flush(frontend, message, strlen(message)+1);
2468         }
2469         else if (protoMajor == PROTO_MAJOR_V3)
2470         {
2471                 char data[MAXDATA];
2472                 char msgbuf[MAXMSGBUF];
2473                 int len;
2474                 int thislen;
2475                 int sendlen;
2476
2477                 len = 0;
2478
2479                 pool_write(frontend, "E", 1);
2480
2481                 /* error level */
2482                 thislen = snprintf(msgbuf, MAXMSGBUF, "SERROR");
2483                 thislen = Min(thislen, MAXMSGBUF);
2484                 memcpy(data +len, msgbuf, thislen+1);
2485                 len += thislen + 1;
2486
2487                 /* code */
2488                 thislen = snprintf(msgbuf, MAXMSGBUF, "C%s", code);
2489                 thislen = Min(thislen, MAXMSGBUF);
2490                 memcpy(data +len, msgbuf, thislen+1);
2491                 len += thislen + 1;
2492
2493                 /* message */
2494                 thislen = snprintf(msgbuf, MAXMSGBUF, "M%s", message);
2495                 thislen = Min(thislen, MAXMSGBUF);
2496                 memcpy(data +len, msgbuf, thislen+1);
2497                 len += thislen + 1;
2498
2499                 /* detail */
2500                 if (*detail != '\0')
2501                 {
2502                         thislen = snprintf(msgbuf, MAXMSGBUF, "D%s", detail);
2503                         thislen = Min(thislen, MAXMSGBUF);
2504                         memcpy(data +len, msgbuf, thislen+1);
2505                         len += thislen + 1;
2506                 }
2507
2508                 /* hint */
2509                 if (*hint != '\0')
2510                 {
2511                         thislen = snprintf(msgbuf, MAXMSGBUF, "H%s", hint);
2512                         thislen = Min(thislen, MAXMSGBUF);
2513                         memcpy(data +len, msgbuf, thislen+1);
2514                         len += thislen + 1;
2515                 }
2516
2517                 /* file */
2518                 thislen = snprintf(msgbuf, MAXMSGBUF, "F%s", file);
2519                 thislen = Min(thislen, MAXMSGBUF);
2520                 memcpy(data +len, msgbuf, thislen+1);
2521                 len += thislen + 1;
2522
2523                 /* line */
2524                 thislen = snprintf(msgbuf, MAXMSGBUF, "L%d", line);
2525                 thislen = Min(thislen, MAXMSGBUF);
2526                 memcpy(data +len, msgbuf, thislen+1);
2527                 len += thislen + 1;
2528
2529                 /* stop null */
2530                 len++;
2531                 *(data + len - 1) = '\0';
2532
2533                 sendlen = len;
2534                 len = htonl(len + 4);
2535                 pool_write(frontend, &len, sizeof(len));
2536                 pool_write_and_flush(frontend, data, sendlen);
2537         }
2538         else
2539                 pool_error("send_error_message: unknown protocol major %d", protoMajor);
2540
2541         pool_unset_nonblock(frontend->fd);
2542 }
2543
2544 void pool_send_readyforquery(POOL_CONNECTION *frontend)
2545 {
2546         int len;
2547         pool_write(frontend, "Z", 1);
2548         len = 5;
2549         len = htonl(len);
2550         pool_write(frontend, &len, sizeof(len));
2551         pool_write(frontend, "I", 1);
2552         pool_flush(frontend);
2553 }
2554
2555 /*
2556  * Send a query to a backend in sync manner.
2557  * This function sends a query and waits for CommandComplete/ReadyForQuery.
2558  * If an error occured, it returns with POOL_ERROR.
2559  * This function does NOT handle SELECT/SHOW queries.
2560  * If no_ready_for_query is non 0, returns without reading the packet
2561  * length for ReadyForQuery. This mode is necessary when called from ReadyForQuery().
2562  */
2563 static POOL_STATUS do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend,
2564                                                           char *query, int protoMajor, int pid, int key, int no_ready_for_query)
2565 {
2566         int len;
2567         int status;
2568         char kind;
2569         char *string;
2570         int deadlock_detected = 0;
2571
2572         pool_debug("do_command: Query: %s", query);
2573
2574         /* send the query to the backend */
2575         if (send_simplequery_message(backend, strlen(query)+1, query, protoMajor) != POOL_CONTINUE)
2576                 return POOL_END;
2577
2578         /*
2579          * Wait for response from badckend while polling frontend connection is ok.
2580          * If not, cancel the transaction.
2581          */
2582         if (wait_for_query_response(frontend, backend, query, protoMajor) != POOL_CONTINUE)
2583         {
2584                 /* Cancel current transaction */
2585                 CancelPacket cancel_packet;
2586
2587                 cancel_packet.protoVersion = htonl(PROTO_CANCEL);
2588                 cancel_packet.pid = pid;
2589                 cancel_packet.key= key;
2590                 cancel_request(&cancel_packet);
2591                 return POOL_END;
2592         }
2593
2594         /*
2595          * We must check deadlock error here. If a deadlock error is
2596          * detected by a backend, other backend might not be noticed the
2597          * error.  In this case caller should send an error query to the
2598          * backend to abort the transaction. Otherwise the transaction
2599          * state might vary among backends(idle in transaction vs. abort).
2600          */
2601         deadlock_detected = detect_deadlock_error(backend, protoMajor);
2602         if (deadlock_detected < 0)
2603                 return POOL_END;
2604
2605         /*
2606          * Continue to read packets until we get ReadForQuery (Z).
2607          * Until that we may recieve one of:
2608          *
2609          * N: Notice response
2610          * E: Error response
2611          * C: Comand complete
2612          *
2613          * XXX: we ignore Notice and Error here. Even notice/error
2614          * messages are not sent to the frontend. May be it's ok since the
2615          * error was caused by our internal use of SQL command (otherwise users
2616          * will be confused).
2617          */
2618         for(;;)
2619         {
2620                 status = pool_read(backend, &kind, sizeof(kind));
2621                 if (status < 0)
2622                 {
2623                         pool_error("do_command: error while reading message kind");
2624                         return POOL_END;
2625                 }
2626
2627                 pool_debug("do_command: kind: %c", kind);
2628
2629                 if (kind == 'Z')                /* Ready for Query? */
2630                         break;          /* get out the loop without reading message lenghth */
2631
2632                 if (protoMajor == PROTO_MAJOR_V3)
2633                 {
2634                         if (pool_read(backend, &len, sizeof(len)) < 0)
2635                         {
2636                                 pool_error("do_command: error while reading message length");
2637                                 return POOL_END;
2638                         }
2639                         len = ntohl(len) - 4;
2640                         
2641                         if (kind != 'N' && kind != 'E' && kind != 'C')
2642                         {
2643                                 pool_error("do_command: error, kind is not N, E or C(%02x)", kind);
2644                                 return POOL_END;
2645                         }
2646                         string = pool_read2(backend, len);
2647                         if (string == NULL)
2648                         {
2649                                 pool_error("do_command: error while reading rest of message");
2650                                 return POOL_END;
2651                         }
2652                 }
2653                 else
2654                 {
2655                         string = pool_read_string(backend, &len, 0);
2656                         if (string == NULL)
2657                         {
2658                                 pool_error("do_command: error while reading rest of message");
2659                                 return POOL_END;
2660                         }
2661                 }
2662         }
2663
2664 /*
2665  * until 2008/11/12 we believed that we never had packets other than
2666  * 'Z' after receiving 'C'. However a counter example was presented by
2667  * a poor customer. So we replaced the whole thing with codes
2668  * above. In a side effect we were be able to get ride of nasty
2669  * "goto". Congratulations.
2670  */
2671 #ifdef NOT_USED
2672         /*
2673          * Expecting CompleteCommand
2674          */
2675 retry_read_packet:
2676         status = pool_read(backend, &kind, sizeof(kind));
2677         if (status < 0)
2678         {
2679                 pool_error("do_command: error while reading message kind");
2680                 return POOL_END;
2681         }
2682
2683         if (kind == 'E')
2684         {
2685                 pool_log("do_command: backend does not successfully complete command %s status %c", query, kind);
2686         }
2687
2688         /*
2689          * read command tag of CommandComplete response
2690          */
2691         if (protoMajor == PROTO_MAJOR_V3)
2692         {
2693                 if (pool_read(backend, &len, sizeof(len)) < 0)
2694                         return POOL_END;
2695                 len = ntohl(len) - 4;
2696                 string = pool_read2(backend, len);
2697                 if (string == NULL)
2698                         return POOL_END;
2699                 pool_debug("command tag: %s", string);
2700         }
2701         else
2702         {
2703                 string = pool_read_string(backend, &len, 0);
2704                 if (string == NULL)
2705                         return POOL_END;
2706         }
2707
2708         if (kind == 'N') /* warning? */
2709                 goto retry_read_packet;
2710
2711         /*
2712          * Expecting ReadyForQuery
2713          */
2714         status = pool_read(backend, &kind, sizeof(kind));
2715         if (status < 0)
2716         {
2717                 pool_error("do_command: error while reading message kind");
2718                 return POOL_END;
2719         }
2720
2721         if (kind != 'Z')
2722         {
2723                 pool_error("do_command: backend returns %c while expecting ReadyForQuery", kind);
2724                 return POOL_END;
2725         }
2726 #endif
2727
2728         if (no_ready_for_query)
2729                 return POOL_CONTINUE;
2730
2731         if (protoMajor == PROTO_MAJOR_V3)
2732         {
2733                 /* read packet lenghth for ready for query */
2734                 if (pool_read(backend, &len, sizeof(len)) < 0)
2735                 {
2736                         pool_error("do_command: error while reading message length");
2737                         return POOL_END;
2738                 }
2739
2740                 /* read transaction state */
2741                 status = pool_read(backend, &kind, sizeof(kind));
2742                 if (status < 0)
2743                 {
2744                         pool_error("do_command: error while reading transaction status");
2745                         return POOL_END;
2746                 }
2747
2748                 /* set transaction state */
2749                 pool_debug("do_command: transaction state: %c", kind);
2750                 backend->tstate = kind;
2751         }
2752
2753         return deadlock_detected ? POOL_DEADLOCK : POOL_CONTINUE;
2754 }
2755
2756 /*
2757  * Send a syntax error query to abort transaction and receive response
2758  * from backend and discard it until we get Error response.
2759  *
2760  * We need to sync transaction status in transaction block.
2761  * SELECT query is sent to master only.
2762  * If SELECT is error, we must abort transaction on other nodes.
2763  */
2764 POOL_STATUS do_error_command(POOL_CONNECTION *backend, int major)
2765 {
2766         char *error_query = POOL_ERROR_QUERY;
2767         int status, len;
2768         char kind;
2769         char *string;
2770
2771         if (send_simplequery_message(backend, strlen(error_query) + 1, error_query, major) != POOL_CONTINUE)
2772         {
2773                 return POOL_END;
2774         }
2775
2776         /*
2777          * Continue to read packets until we get Error response (E).
2778          * Until that we may recieve one of:
2779          *
2780          * N: Notice response
2781          * C: Comand complete
2782          *
2783          * XXX: we ignore Notice here. Even notice messages are not sent
2784          * to the frontend. May be it's ok since the error was caused by
2785          * our internal use of SQL command (otherwise users will be
2786          * confused).
2787          */
2788         do
2789         {
2790                 status = pool_read(backend, &kind, sizeof(kind));
2791                 if (status < 0)
2792                 {
2793                         pool_error("do_error_command: error while reading message kind");
2794                         return POOL_END;
2795                 }
2796
2797                 pool_debug("do_error_command: kind: %c", kind);
2798
2799                 if (major == PROTO_MAJOR_V3)
2800                 {
2801                         if (pool_read(backend, &len, sizeof(len)) < 0)
2802                         {
2803                                 pool_error("do_error_command: error while reading message length");
2804                                 return POOL_END;
2805                         }
2806                         len = ntohl(len) - 4;
2807                         string = pool_read2(backend, len);
2808                         if (string == NULL)
2809                         {
2810                                 pool_error("do_error_command: error while reading rest of message");
2811                                 return POOL_END;
2812                         }
2813                 }
2814                 else
2815                 {
2816                         string = pool_read_string(backend, &len, 0);
2817                         if (string == NULL)
2818                         {
2819                                 pool_error("do_error_command: error while reading rest of message");
2820                                 return POOL_END;
2821                         }
2822                 }
2823         } while (kind != 'E');
2824
2825 #ifdef NOT_USED
2826         /*
2827          * Expecting ErrorResponse
2828          */
2829         status = pool_read(backend, &kind, sizeof(kind));
2830         if (status < 0)
2831         {
2832                 pool_error("do_command: error while reading message kind");
2833                 return POOL_END;
2834         }
2835
2836         /*
2837          * read command tag of CommandComplete response
2838          */
2839         if (major == PROTO_MAJOR_V3)
2840         {
2841                 if (pool_read(backend, &len, sizeof(len)) < 0)
2842                         return POOL_END;
2843                 len = ntohl(len) - 4;
2844                 string = pool_read2(backend, len);
2845                 if (string == NULL)
2846                         return POOL_END;
2847                 pool_debug("command tag: %s", string);
2848         }
2849         else
2850         {
2851                 string = pool_read_string(backend, &len, 0);
2852                 if (string == NULL)
2853                         return POOL_END;
2854         }
2855 #endif
2856         return POOL_CONTINUE;
2857 }
2858
2859 /*
2860  * Send invalid portal execution to abort transaction.
2861  * We need to sync transaction status in transaction block.
2862  * SELECT query is sent to master only.
2863  * If SELECT is error, we must abort transaction on other nodes.
2864  */
2865 static POOL_STATUS do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major)
2866 {
2867         int status;
2868         char kind;
2869         char *string;
2870         char msg[1024] = "pgpoool_error_portal"; /* large enough */
2871         int len = strlen(msg);
2872
2873         memset(msg + len, 0, sizeof(int));
2874         if (send_execute_message(backend, node_id, len + 5, msg))
2875         {
2876                 return POOL_END;
2877         }
2878
2879         /*
2880          * Expecting ErrorResponse
2881          */
2882         status = pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
2883         if (status < 0)
2884         {
2885                 pool_error("do_error_execute_command: error while reading message kind");
2886                 return POOL_END;
2887         }
2888
2889         /*
2890          * read command tag of CommandComplete response
2891          */
2892         if (major == PROTO_MAJOR_V3)
2893         {
2894                 if (pool_read(CONNECTION(backend, node_id), &len, sizeof(len)) < 0)
2895                         return POOL_END;
2896                 len = ntohl(len) - 4;
2897                 string = pool_read2(CONNECTION(backend, node_id), len);
2898                 if (string == NULL)
2899                         return POOL_END;
2900                 pool_debug("command tag: %s", string);
2901         }
2902         else
2903         {
2904                 string = pool_read_string(CONNECTION(backend, node_id), &len, 0);
2905                 if (string == NULL)
2906                         return POOL_END;
2907         }
2908
2909         return POOL_CONTINUE;
2910 }
2911
2912 /*
2913  * Transmit an arbitrary Query to a specific node.
2914  * This function is only used in parallel mode
2915  */
2916 POOL_STATUS OneNode_do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, char *database)
2917 {
2918         int len,sendlen;
2919         int status;
2920         char kind;
2921         bool notice = false;
2922
2923         pool_debug("OneNode_do_command: Query: %s", query);
2924
2925         /* send the query to the backend */
2926         pool_write(backend, "Q", 1);
2927         len = strlen(query)+1;
2928
2929         sendlen = htonl(len + 4);
2930         pool_write(backend, &sendlen, sizeof(sendlen));
2931
2932         if (pool_write_and_flush(backend, query, len) < 0)
2933         {
2934                 return POOL_END;
2935         }
2936
2937         for(;;)
2938         {
2939                 status = pool_read(backend, &kind, sizeof(kind));
2940                 if (status < 0)
2941                 {
2942                         pool_error("OneNode_do_command: error while reading message kind");
2943                         return POOL_END;
2944                 }
2945                 
2946                 if (kind == 'N' && strstr(query,"dblink")) {
2947                         notice = true;
2948                         status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2949                 } else {
2950                         if(notice)
2951                                 status = ParallelForwardToFrontend(kind, frontend, backend, database, false);
2952                         else
2953                                 status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
2954                 }
2955                 if (kind == 'C' || kind =='E')
2956                 {
2957                         break;
2958                 }
2959         }
2960         /*
2961          * Expecting ReadyForQuery
2962          *
2963          */
2964         status = pool_read(backend, &kind, sizeof(kind));
2965
2966         if(notice)
2967                                 pool_send_error_message(frontend, 3, "XX000",
2968                                                                                 "pgpool2 sql restriction(notice from dblink)",query,"", 
2969                                                                                 __FILE__,__LINE__);
2970
2971         if (status < 0)
2972         {
2973                 pool_error("OneNode_do_command: error while reading message kind");
2974                 return POOL_END;
2975         }
2976
2977         if (kind != 'Z')
2978         {
2979                 pool_error("OneNode_do_command: backend does not return ReadyForQuery");
2980                 return POOL_END;
2981         }
2982
2983
2984         status = ParallelForwardToFrontend(kind, frontend, backend, database, true);
2985         pool_flush(frontend);
2986
2987                 return status;
2988 }
2989
2990 /*
2991  * Free POOL_SELECT_RESULT object
2992  */
2993 static void free_select_result(POOL_SELECT_RESULT *result)
2994 {
2995         int i;
2996
2997         if (result->nullflags)
2998                 free(result->nullflags);
2999
3000         if (result->data)
3001         {
3002                 for(i=0;i<result->numrows;i++)
3003                 {
3004                         if (result->data[i])
3005                                 free(result->data[i]);
3006                 }
3007                 free(result->data);
3008         }
3009
3010         if (result->rowdesc)
3011         {
3012                 if (result->rowdesc->attrinfo)
3013                 {
3014                         for(i=0;i<result->rowdesc->num_attrs;i++)
3015                         {
3016                                 if (result->rowdesc->attrinfo[i].attrname)
3017                                         free(result->rowdesc->attrinfo[i].attrname);
3018                         }
3019                         free(result->rowdesc->attrinfo);
3020                 }
3021                 free(result->rowdesc);
3022         }
3023 }
3024
3025 /*
3026  * Send a SELECT to one DB node. This function works for V3 only.
3027  */
3028 POOL_STATUS do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result)
3029 {
3030 #define DO_QUERY_ALLOC_NUM 1024 /* memory allocation unit for POOL_SELECT_RESULT */
3031
3032         int i;
3033         int len;
3034         char kind;
3035         char *packet;
3036         char *p;
3037         short num_fields;
3038         int num_data;
3039         int intval;
3040         short shortval;
3041
3042         POOL_SELECT_RESULT *res;
3043         RowDesc *rowdesc;
3044         AttrInfo *attrinfo;
3045
3046         res = malloc(sizeof(*res));
3047         if (!res)
3048         {
3049                 pool_error("pool_query: malloc failed");
3050                 return POOL_ERROR;
3051         }
3052         rowdesc = malloc(sizeof(*rowdesc));
3053         if (!rowdesc)
3054         {
3055                 pool_error("pool_query: malloc failed");
3056                 return POOL_ERROR;
3057         }
3058         memset(res, 0, sizeof(*res));
3059         memset(rowdesc, 0, sizeof(*rowdesc));
3060         *result = res;
3061
3062         res->rowdesc = rowdesc;
3063
3064         num_data = 0;
3065
3066         res->nullflags = malloc(DO_QUERY_ALLOC_NUM*sizeof(int));
3067         if (!res->nullflags)
3068         {
3069                 pool_error("do_query: malloc failed");
3070                 return POOL_ERROR;
3071         }
3072         res->data = malloc(DO_QUERY_ALLOC_NUM*sizeof(char *));
3073         if (!res->data)
3074         {
3075                 pool_error("do_query: malloc failed");
3076                 return POOL_ERROR;
3077         }
3078
3079         /* send a query to the backend */
3080         if (send_simplequery_message(backend, strlen(query) + 1, query, PROTO_MAJOR_V3) != POOL_CONTINUE)
3081         {
3082                 return POOL_END;
3083         }
3084
3085         /*
3086          * Continue to read packets until we get Ready for command('Z')
3087          *
3088          * XXX: we ignore other than Z here. Even notice messages are not sent
3089          * to the frontend. May be it's ok since the error was caused by
3090          * our internal use of SQL command (otherwise users will be
3091          * confused).
3092          */
3093         for(;;)
3094         {
3095                 if (pool_read(backend, &kind, sizeof(kind)) < 0)
3096                 {
3097                         pool_error("do_query: error while reading message kind");
3098                         return POOL_END;
3099                 }
3100
3101                 pool_debug("do_query: kind: %c", kind);
3102
3103                 if (pool_read(backend, &len, sizeof(len)) < 0)
3104                 {
3105                         pool_error("do_query: error while reading message length");
3106                         return POOL_END;
3107                 }
3108                 len = ntohl(len) - 4;
3109                 packet = pool_read2(backend, len);
3110                 if (packet == NULL)
3111                 {
3112                         pool_error("do_query: error while reading rest of message");
3113                         return POOL_END;
3114                 }
3115
3116                 switch (kind)
3117                 {
3118                         case 'Z':       /* Ready for query */
3119                                 return POOL_CONTINUE;
3120                                 break;
3121
3122                         case 'T':       /* Row Description */
3123                                 p = packet;
3124                                 memcpy(&shortval, p, sizeof(short));
3125                                 num_fields = ntohs(shortval);           /* number of fields */
3126                                 pool_debug("num_fileds: %d", num_fields);
3127
3128                                 if (num_fields > 0)
3129                                 {
3130                                         rowdesc->num_attrs = num_fields;
3131                                         attrinfo = malloc(sizeof(*attrinfo)*num_fields);
3132                                         if (!attrinfo)
3133                                         {
3134                                                 pool_error("do_query: malloc failed");
3135                                                 return POOL_ERROR;
3136                                         }
3137                                         rowdesc->attrinfo = attrinfo;
3138
3139                                         p += sizeof(num_fields);
3140
3141                                         /* extract attribute info */
3142                                         for (i = 0;i<num_fields;i++)
3143                                         {
3144                                                 len = strlen(p) + 1;
3145                                                 attrinfo->attrname = malloc(len);
3146                                                 if (!attrinfo->attrname)
3147                                                 {
3148                                                         pool_error("do_query: malloc failed");
3149                                                         return POOL_ERROR;
3150                                                 }
3151                                                 memcpy(attrinfo->attrname, p, len);
3152                                                 p += len;
3153                                                 memcpy(&intval, p, sizeof(int));
3154                                                 attrinfo->oid = htonl(intval);
3155                                                 p += sizeof(int);
3156                                                 memcpy(&shortval, p, sizeof(short));
3157                                                 attrinfo->attrnumber = htons(shortval);
3158                                                 p += sizeof(short);
3159                                                 memcpy(&intval, p, sizeof(int));
3160                                                 attrinfo->typeoid = htonl(intval);
3161                                                 p += sizeof(int);
3162                                                 memcpy(&shortval, p, sizeof(short));
3163                                                 attrinfo->size = htons(shortval);
3164                                                 p += sizeof(short);
3165                                                 memcpy(&intval, p, sizeof(int));
3166                                                 attrinfo->mod = htonl(intval);
3167                                                 p += sizeof(int);
3168                                                 p += sizeof(short);             /* skip format code since we use "text" anyway */
3169
3170                                                 attrinfo++;
3171                                         }
3172                                 }
3173                                 break;
3174
3175                         case 'D':       /* data row */
3176                                 p = packet;
3177
3178                                 memcpy(&shortval, p, sizeof(short));
3179                                 num_fields = htons(shortval);
3180                                 p += sizeof(short);
3181
3182                                 if (num_fields > 0)
3183                                 {
3184                                         res->numrows++;
3185
3186                                         for (i=0;i<num_fields;i++)
3187                                         {
3188                                                 memcpy(&intval, p, sizeof(int));
3189                                                 len = htonl(intval);
3190                                                 p += sizeof(int);
3191
3192                                                 res->nullflags[num_data] = len;
3193
3194                                                 if (len > 0)    /* NOT NULL? */
3195                                                 {
3196                                                         res->data[num_data] = malloc(len + 1);
3197                                                         if (!res->data[num_data])
3198                                                         {
3199                                                                 pool_error("do_query: malloc failed");
3200                                                                 return POOL_ERROR;
3201                                                         }
3202                                                         memcpy(res->data[num_data], p, len);
3203                                                         *(res->data[num_data] + 1) = '\0';
3204
3205                                                         p += len;
3206                                                 }
3207
3208                                                 num_data++;
3209
3210                                                 if (num_data % DO_QUERY_ALLOC_NUM == 0)
3211                                                 {
3212                                                         res->nullflags = realloc(res->nullflags,
3213                                                                                                          (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(int));
3214                                                         if (!res->nullflags)
3215                                                         {
3216                                                                 pool_error("do_query: malloc failed");
3217                                                                 return POOL_ERROR;
3218                                                         }
3219                                                         res->data = realloc(res->data,
3220                                                                                                 (num_data/DO_QUERY_ALLOC_NUM +1)*DO_QUERY_ALLOC_NUM*sizeof(char *));
3221                                                         if (!res->data)
3222                                                         {
3223                                                                 pool_error("do_query: malloc failed");
3224                                                                 return POOL_ERROR;
3225                                                         }
3226                                                 }
3227                                         }
3228                                 }
3229                                 break;
3230
3231                         default:
3232                                 break;
3233                 }
3234
3235         }
3236         return POOL_CONTINUE;
3237 }
3238
3239
3240 /*
3241  * judge if we need to lock the table
3242  * to keep SERIAL consistency among servers
3243  */
3244 int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node)
3245 {
3246 /*
3247  * Query to know if the target table has SERIAL column or not.
3248  * This query is valid through PostgreSQL 7.3 to 8.3.
3249  */
3250 #define NEXTVALQUERY "SELECT count(*) FROM pg_catalog.pg_attrdef AS d, pg_catalog.pg_class AS c WHERE d.adrelid = c.oid AND d.adsrc ~ 'nextval' AND c.relname = '%s'"
3251
3252 #define INSERT_STATEMENT_MAX_CACHE              16
3253 #define MAX_ITEM_LENGTH 1024
3254
3255         /* table lookup cache structure */
3256         typedef struct {
3257                 char dbname[MAX_ITEM_LENGTH];   /* database name */
3258                 char relname[MAX_ITEM_LENGTH];  /* table name */
3259                 int     use_serial;     /* 1: use SERIAL data type */
3260                 int refcnt;             /* reference count */
3261         } MyRelCache;
3262
3263         static MyRelCache relcache[INSERT_STATEMENT_MAX_CACHE];
3264
3265         int i;
3266         char *str;
3267         char *rel;
3268         int use_serial = 0;
3269         char *dbname;
3270
3271         /*
3272          * for version 2 protocol, we cannot check if it's actually uses
3273          * SERIAL data types or not since the underlying infrastructure
3274          * (do_query) does not support the protocol. So we just return
3275          * false.
3276          */
3277         if (MAJOR(backend) == PROTO_MAJOR_V2)
3278                 return 0;
3279
3280         /* INSERT statement? */
3281         if (!IsA(node, InsertStmt))
3282                 return 0;
3283
3284         /* need to ignore leading white spaces? */
3285         if (pool_config->ignore_leading_white_space)
3286         {
3287                 /* ignore leading white spaces */
3288                 while (*query && isspace(*query))
3289                         query++;
3290         }
3291
3292         /* is there "NO_LOCK" comment? */
3293         if (strncasecmp(query, NO_LOCK_COMMENT, NO_LOCK_COMMENT_SZ) == 0)
3294                 return 0;
3295
3296         /* is there "LOCK" comment? */
3297         if (strncasecmp(query, LOCK_COMMENT, LOCK_COMMENT_SZ) == 0)
3298                 return 1;
3299
3300         if (pool_config->insert_lock == 0)      /* insert_lock is specified? */
3301                 return 0;
3302
3303         /*
3304          * if insert_lock is true, then check if the table actually uses
3305          * SERIAL data type
3306          */
3307
3308         /* obtain table name */
3309         str = get_insert_command_table_name((InsertStmt *)node);
3310         if (str == NULL)
3311         {
3312                 pool_error("need_insert_lock: get_insert_command_table_name failed");
3313                 return 0;
3314         }
3315
3316         /* eliminate double quotes */
3317         rel = malloc(strlen(str)+1);
3318         if (!rel)
3319         {
3320                 pool_error("need_insert_lock: malloc failed");
3321                 return 0;
3322         }
3323         for(i=0;*str;str++)
3324         {
3325                 if (*str != '"')
3326                         rel[i++] = *str;
3327         }
3328         rel[i] = '\0';
3329
3330         /* obtain database name */
3331         dbname = MASTER_CONNECTION(backend)->sp->database;
3332
3333         /* look for cache first */
3334         for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3335         {
3336                 if (strcasecmp(relcache[i].dbname, dbname) == 0 &&
3337                         strcasecmp(relcache[i].relname, rel) == 0)
3338                 {
3339                         relcache[i].refcnt++;
3340                         use_serial = relcache[i].use_serial;
3341                         break;
3342                 }
3343         }
3344
3345         if (i == INSERT_STATEMENT_MAX_CACHE)            /* not in cache? */
3346         {
3347                 char qbuf[1024];
3348                 int maxrefcnt = INT_MAX;
3349                 POOL_SELECT_RESULT *res = NULL;
3350                 int index = 0;
3351
3352                 snprintf(qbuf, sizeof(qbuf), NEXTVALQUERY, rel);
3353
3354                 /* check the system catalog if the table has SERIAL data type */
3355                 if (do_query(MASTER(backend), qbuf, &res) != POOL_CONTINUE)
3356                 {
3357                         pool_error("need_insert_lock: do_query failed");
3358                         if (res)
3359                                 free_select_result(res);
3360                         return 0;
3361                 }
3362
3363                 /*
3364                  * if the query returns some rows and found nextval() is used,
3365                  * then we assume it uses SERIAL data type
3366                  */
3367                 if (res->numrows >= 1 && strcmp(res->data[0], "0"))
3368                         use_serial = 1;
3369
3370                 free_select_result(res);
3371
3372                 for (i=0;i<INSERT_STATEMENT_MAX_CACHE;i++)
3373                 {
3374                         if (relcache[i].refcnt == 0)
3375                         {
3376                                 index = i;
3377                                 break;
3378                         }
3379                         else if (relcache[i].refcnt < maxrefcnt)
3380                         {
3381                                 maxrefcnt = relcache[i].refcnt;
3382                                 index = i;
3383                         }
3384                 }
3385
3386                 /* register cache */
3387                 strncpy(relcache[index].dbname, dbname, MAX_ITEM_LENGTH);
3388                 strncpy(relcache[index].relname, rel, MAX_ITEM_LENGTH);
3389                 relcache[index].use_serial = use_serial;
3390                 relcache[index].refcnt++;
3391         }
3392         return use_serial;
3393 }
3394
3395 /*
3396  * if a transaction has not already started, start a new one.
3397  * issue LOCK TABLE IN SHARE ROW EXCLUSIVE MODE
3398  */
3399 POOL_STATUS insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node)
3400 {
3401         char *table;
3402         char qbuf[1024];
3403         POOL_STATUS status;
3404         int i, deadlock_detected = 0;
3405
3406         /* insert_lock can be used in V3 only */
3407         if (MAJOR(backend) != PROTO_MAJOR_V3)
3408                 return POOL_CONTINUE;
3409
3410         /* get table name */
3411         table = get_insert_command_table_name(node);
3412
3413         /* could not get table name. probably wrong SQL command */
3414         if (table == NULL)
3415         {
3416                 return POOL_CONTINUE;
3417         }
3418
3419         /* issue lock table command */
3420         snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
3421
3422         status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid,
3423                                                 MASTER_CONNECTION(backend)->key, 0);
3424         if (status == POOL_END)
3425         {
3426                 internal_transaction_started = 0;
3427                 return POOL_END;
3428         }
3429         else if (status == POOL_DEADLOCK)
3430                 deadlock_detected = 1;
3431
3432         for (i=0;i<NUM_BACKENDS;i++)
3433         {
3434                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
3435                 {
3436                         if (deadlock_detected)
3437                                 status = do_command(frontend, CONNECTION(backend, i), POOL_ERROR_QUERY, PROTO_MAJOR_V3,
3438                                                                         MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3439                         else
3440                                 status = do_command(frontend, CONNECTION(backend, i), qbuf, PROTO_MAJOR_V3, 
3441                                                                         MASTER_CONNECTION(backend)->pid, MASTER_CONNECTION(backend)->key, 0);
3442
3443                         if (status != POOL_CONTINUE)
3444                         {
3445                                 internal_transaction_started = 0;
3446                                 return POOL_END;
3447                         }
3448                 }
3449         }
3450
3451         return POOL_CONTINUE;
3452 }
3453
3454 bool is_partition_table(POOL_CONNECTION_POOL *backend, Node *node)
3455 {
3456         DistDefInfo *info = NULL;
3457         RangeVar *var = NULL;;
3458
3459         if (IsA(node, UpdateStmt))
3460         {
3461                 UpdateStmt *update = (UpdateStmt*) node;
3462
3463                 if(!IsA(update->relation,RangeVar))
3464                         return false;
3465
3466                 var = (RangeVar *) update->relation;
3467         }
3468         else if (IsA(node, DeleteStmt))
3469         {
3470                 DeleteStmt *delete = (DeleteStmt*) node;
3471
3472                 if(!IsA(delete->relation,RangeVar))
3473                         return false;
3474
3475                 var = (RangeVar *) delete->relation;
3476         } else
3477                 return false;
3478
3479         info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
3480                                                                           var->schemaname,
3481                                                                           var->relname);
3482         if(info)
3483                 return true;
3484         else
3485                 return false;
3486 }
3487
3488 /*
3489  * obtain table name in INSERT statement
3490  */
3491 static char *get_insert_command_table_name(InsertStmt *node)
3492 {
3493         char *table = nodeToString(node->relation);
3494
3495         pool_debug("get_insert_command_table_name: extracted table name: %s", table);
3496         return table;
3497 }
3498
3499 /* judge if this is a DROP DATABASE command */
3500 int is_drop_database(Node *node)
3501 {
3502         return (IsA(node, DropdbStmt)) ? 1 : 0;
3503 }
3504
3505 /*
3506  * check if any pending data remains.  Also if there's some pending data in
3507  * frontend AND no processing any Query, then returns 0.
3508  * XXX: is this correct thing?
3509 */
3510 static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
3511 {
3512         int i;
3513
3514         if (frontend->len > 0 && !in_progress)
3515                 return 0;
3516
3517         for (i=0;i<NUM_BACKENDS;i++)
3518         {
3519                 if (!VALID_BACKEND(i))
3520                         continue;
3521
3522                 if (CONNECTION(backend, i)->len > 0)
3523                         return 0;
3524         }
3525
3526         return 1;
3527 }
3528
3529 /*
3530  * check if query is needed to wait completion
3531  */
3532 int is_strict_query(Node *node)
3533 {
3534         switch (node->type)
3535         {
3536                 case T_SelectStmt:
3537                 {
3538                         SelectStmt *stmt = (SelectStmt *)node;
3539                         return (stmt->intoClause || stmt->lockingClause) ? 1 : 0;
3540                 }
3541
3542                 case T_UpdateStmt:
3543                 case T_InsertStmt:
3544                 case T_DeleteStmt:
3545                 case T_LockStmt:
3546                         return 1;
3547
3548                 default:
3549                         return 0;
3550         }
3551
3552         return 0;
3553 }
3554
3555 int check_copy_from_stdin(Node *node)
3556 {
3557         if (copy_schema)
3558                 free(copy_schema);
3559         if (copy_table)
3560                 free(copy_table);
3561         if (copy_null)
3562                 free(copy_null);
3563
3564         copy_schema = copy_table = copy_null = NULL;
3565
3566         if (IsA(node, CopyStmt))
3567         {
3568                 CopyStmt *stmt = (CopyStmt *)node;
3569                 if (stmt->is_from == TRUE && stmt->filename == NULL)
3570                 {
3571                         RangeVar *relation = (RangeVar *)stmt->relation;
3572                         ListCell *lc;
3573
3574                         /* query is COPY FROM STDIN */
3575                         if (relation->schemaname)
3576                                 copy_schema = strdup(relation->schemaname);
3577                         else
3578                                 copy_schema = strdup("public");
3579                         copy_table = strdup(relation->relname);
3580
3581                         copy_delimiter = '\t'; /* default delimiter */
3582                         copy_null = strdup("\\N"); /* default null string */
3583
3584                         /* look up delimiter and null string. */
3585                         foreach (lc, stmt->options)
3586                         {
3587                                 DefElem *elem = lfirst(lc);
3588                                 Value *v;
3589
3590                                 if (strcmp(elem->defname, "delimiter") == 0)
3591                                 {
3592                                         v = (Value *)elem->arg;
3593                                         copy_delimiter = v->val.str[0];
3594                                 }
3595                                 else if (strcmp(elem->defname, "null") == 0)
3596                                 {
3597                                         if (copy_null)
3598                                                 free(copy_null);
3599                                         v = (Value *)elem->arg;
3600                                         copy_null = strdup(v->val.str);
3601                                 }
3602                         }
3603                 }
3604                 return 1;
3605         }
3606
3607         return 0;
3608 }
3609
3610 /*
3611  * read kind from one backend
3612  */
3613 POOL_STATUS read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node)
3614 {
3615         if (VALID_BACKEND(node))
3616         {
3617                 char k;
3618                 if (pool_read(CONNECTION(backend, node), &k, 1) < 0)
3619                 {
3620                         pool_error("read_kind_from_one_backend: failed to read kind from %d th backend", node);
3621                         return POOL_ERROR;
3622                 }
3623
3624                 pool_debug("read_kind_from_one_backend: read kind from %d th backend %c", node, k);
3625
3626                 *kind = k;
3627                 return POOL_CONTINUE;
3628         }
3629         else
3630         {
3631                 pool_error("read_kind_from_one_backend: %d th backend is not valid", node);
3632                 return POOL_ERROR;
3633         }
3634 }
3635
3636 /*
3637  * read_kind_from_backend: read kind from backends.
3638  * the "frontend" parameter is used to send "kind mismatch" error message to the frontend.
3639  * the out parameter "decided_kind" is the packet kind decided by this function.
3640  * this function uses "decide by majority" method if kinds from all backends do not agree.
3641  */
3642 POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *decided_kind)
3643 {
3644         int i;
3645         unsigned char kind_list[MAX_NUM_BACKENDS];      /* records each backend's kind */
3646         unsigned char kind_map[256]; /* records which kind gets majority.
3647                                                                   *     256 is the number of distinct values expressed by unsigned char
3648                                                                  */
3649         unsigned char kind;
3650         int trust_kind; /* decided kind */
3651         int max_kind = 0;
3652         double max_count = 0;
3653         int degenerate_node_num = 0;            /* number of backends degeneration requested */
3654         int degenerate_node[MAX_NUM_BACKENDS];          /* degeneration requested backend list */
3655
3656         memset(kind_map, 0, sizeof(kind_map));
3657
3658         for (i=0;i<NUM_BACKENDS;i++)
3659         {
3660                 /* initialize degenerate record */
3661                 degenerate_node[i] = 0;
3662
3663                 if (VALID_BACKEND(i))
3664                 {
3665                         do
3666                         {
3667                                 char *p, *value;
3668                                 int len;
3669
3670                                 if (pool_read(CONNECTION(backend, i), &kind, 1) < 0)
3671                                 {
3672                                         pool_error("read_kind_from_backend: failed to read kind from %d th backend", i);
3673                                         return POOL_ERROR;
3674                                 }
3675
3676                                 /*
3677                                  * Read and discard parameter status
3678                                  */
3679                                 if (kind != 'S')
3680                                 {
3681                                         break;
3682                                 }
3683
3684                                 if (pool_read(CONNECTION(backend, i), &len, sizeof(len)) < 0)
3685                                 {
3686                                         pool_error("read_kind_from_backend: failed to read parameter status packet length from %d th backend", i);
3687                                         return POOL_ERROR;
3688                                 }
3689                                 len = htonl(len) - 4;
3690                                 p = pool_read2(CONNECTION(backend, i), len);
3691                                 if (p == NULL)
3692                                 {
3693                                         pool_error("read_kind_from_backend: failed to read parameter status packet from %d th backend", i);
3694                                 }
3695                                 value = p + strlen(p) + 1;
3696                                 pool_debug("read_kind_from_backend: parameter name: %s value: %s", p, value);
3697                         } while (kind == 'S');
3698
3699                         kind_list[i] = kind;
3700
3701                         pool_debug("read_kind_from_backend: read kind from %d th backend %c NUM_BACKENDS: %d", i, kind_list[i], NUM_BACKENDS);
3702
3703                         kind_map[kind]++;
3704
3705                         if (kind_map[kind] > max_count)
3706                         {
3707                                 max_kind = kind_list[i];
3708                                 max_count = kind_map[kind];
3709                         }
3710                 }
3711                 else
3712                         kind_list[i] = 0;
3713         }
3714
3715 #ifdef NOT_USED
3716         /* register kind map */
3717         for (i = 0; i < NUM_BACKENDS; i++)
3718         {
3719                 /* initialize degenerate record */
3720                 degenerate_node[i] = 0;
3721
3722                 /* kind is signed char.
3723                  * We must check negative number.
3724                  */
3725                 int id = kind_list[i] + 128;
3726
3727                 if (kind_list[i] == -1)
3728                         continue;
3729
3730                 kind_map[id]++;
3731                 if (kind_map[id] > max_count)
3732                 {
3733                         max_kind = kind_list[i];
3734                         max_count = kind_map[id];
3735                 }
3736         }
3737 #endif
3738
3739         if (max_count != NUM_BACKENDS)
3740         {
3741                 /*
3742                  * not all backends agree with kind. We need to do "decide by majority"
3743                  */
3744
3745                 if (max_count <= NUM_BACKENDS / 2.0)
3746                 {
3747                         /* no one gets majority. We trust master node's kind */
3748                         trust_kind = kind_list[MASTER_NODE_ID];
3749                 }
3750                 else /* max_count > NUM_BACKENDS / 2.0 */
3751                 {
3752                         /* trust majority's kind */
3753                         trust_kind = max_kind;
3754                 }
3755
3756                 for (i = 0; i < NUM_BACKENDS; i++)
3757                 {
3758                         if (kind_list[i] != 0 && trust_kind != kind_list[i])
3759                         {
3760                                 /* degenerate */
3761                                 pool_error("read_kind_from_backend: %d th kind %c does not match with master or majority connection kind %c",
3762                                                    i, kind_list[i], trust_kind);
3763                                 degenerate_node[degenerate_node_num++] = i;
3764                         }
3765                 }
3766         }
3767         else
3768                 trust_kind = kind_list[MASTER_NODE_ID];
3769
3770         *decided_kind = trust_kind;
3771
3772         if (degenerate_node_num)
3773         {
3774                 String *msg = init_string("kind mismatch among backends. ");
3775
3776                 string_append_char(msg, "Possible last query was: \"");
3777                 string_append_char(msg, query_string_buffer);
3778                 string_append_char(msg, "\" kind details are:");
3779
3780                 for (i=0;i<NUM_BACKENDS;i++)
3781                 {
3782                         char buf[32];
3783
3784                         if (kind_list[i])
3785                         {
3786                                 snprintf(buf, sizeof(buf), " %d[%c]", i, kind_list[i]);
3787                                 string_append_char(msg, buf);
3788                         }
3789                 }
3790
3791                 pool_send_error_message(frontend, MAJOR(backend), "XX000",
3792                                                                 msg->data, "",
3793                                                                 "check data consistency among db nodes",
3794                                                                 __FILE__, __LINE__);
3795                 pool_error(msg->data);
3796
3797                 free_string(msg);
3798
3799                 if (pool_config->replication_stop_on_mismatch)
3800                 {
3801                         degenerate_backend_set(degenerate_node, degenerate_node_num);
3802                         child_exit(1);
3803                 }
3804                 else
3805                         return POOL_ERROR;
3806         }
3807
3808         return POOL_CONTINUE;
3809 }
3810
3811 /*
3812  * Create portal object
3813  * Return object is allocated from heap memory.
3814  */
3815 Portal *create_portal(void)
3816 {
3817         Portal *p;
3818
3819         if ((p = malloc(sizeof(Portal))) == NULL)
3820                 return NULL;
3821
3822         p->prepare_ctxt = pool_memory_create(PREPARE_BLOCK_SIZE);
3823         if (p->prepare_ctxt == NULL)
3824         {
3825                 free(p);
3826                 return NULL;
3827         }
3828         return p;
3829 }
3830
3831 void init_prepared_list(void)
3832 {
3833         prepared_list.cnt = 0;
3834         prepared_list.size = INIT_STATEMENT_LIST_SIZE;
3835         prepared_list.portal_list = malloc(sizeof(Portal *) * prepared_list.size);
3836         if (prepared_list.portal_list == NULL)
3837         {
3838                 pool_error("init_prepared_list: malloc failed: %s", strerror(errno));
3839                 exit(1);
3840         }
3841 }
3842
3843 void add_prepared_list(PreparedStatementList *p, Portal *portal)
3844 {
3845         if (p->cnt == p->size)
3846         {
3847                 p->size *= 2;
3848                 p->portal_list = realloc(p->portal_list, sizeof(Portal *) * p->size);
3849                 if (p->portal_list == NULL)
3850                 {
3851                         pool_error("add_prepared_list: realloc failed: %s", strerror(errno));
3852                         exit(1);
3853                 }
3854         }
3855         p->portal_list[p->cnt++] = portal;
3856 }
3857
3858 void add_unnamed_portal(PreparedStatementList *p, Portal *portal)
3859 {
3860         if (unnamed_statement)
3861         {
3862                 pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3863                 free(unnamed_statement);
3864         }
3865
3866         unnamed_portal = NULL;
3867         unnamed_statement = portal;
3868 }
3869
3870 void del_prepared_list(PreparedStatementList *p, Portal *portal)
3871 {
3872         int i;
3873         DeallocateStmt *s = (DeallocateStmt *)portal->stmt;
3874
3875         /* DEALLOCATE ALL? */
3876         if (s->name == NULL)
3877         {
3878                 reset_prepared_list(p);
3879         }
3880         else
3881         {
3882                 for (i = 0; i < p->cnt; i++)
3883                 {
3884                         PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3885                         if (strcmp(p_stmt->name, s->name) == 0)
3886                                 break;
3887                 }
3888
3889                 if (i == p->cnt)
3890                         return;
3891
3892                 pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3893                 free(p->portal_list[i]->portal_name);
3894                 free(p->portal_list[i]);
3895                 if (i != p->cnt - 1)
3896                 {
3897                         memmove(&p->portal_list[i], &p->portal_list[i+1],
3898                                         sizeof(Portal *) * (p->cnt - i - 1));
3899                 }
3900                 p->cnt--;
3901         }
3902 }
3903
3904 void delete_all_prepared_list(PreparedStatementList *p, Portal *portal)
3905 {
3906         reset_prepared_list(p);
3907 }
3908
3909 static void reset_prepared_list(PreparedStatementList *p)
3910 {
3911         int i;
3912
3913         if (p)
3914         {
3915                 for (i = 0; i < p->cnt; i++)
3916                 {
3917                         pool_memory_delete(p->portal_list[i]->prepare_ctxt, 0);
3918                         free(p->portal_list[i]->portal_name);
3919                         free(p->portal_list[i]);
3920                 }
3921                 if (unnamed_statement)
3922                 {
3923                         pool_memory_delete(unnamed_statement->prepare_ctxt, 0);
3924                         free(unnamed_statement);
3925                 }
3926                 unnamed_portal = NULL;
3927                 unnamed_statement = NULL;
3928                 p->cnt = 0;
3929         }
3930 }
3931
3932 Portal *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name)
3933 {
3934         int i;
3935
3936         /* unnamed portal? */
3937         if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3938                 return unnamed_statement;
3939
3940         for (i = 0; i < p->cnt; i++)
3941         {
3942                 PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt;
3943                 if (strcmp(p_stmt->name, name) == 0)
3944                         return p->portal_list[i];
3945         }
3946
3947         return NULL;
3948 }
3949
3950 Portal *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name)
3951 {
3952         int i;
3953
3954         /* unnamed portal? */
3955         if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"'))
3956                 return unnamed_portal;
3957
3958         for (i = 0; i < p->cnt; i++)
3959         {
3960                 if (p->portal_list[i]->portal_name &&
3961                         strcmp(p->portal_list[i]->portal_name, name) == 0)
3962                         return p->portal_list[i];
3963         }
3964
3965         return NULL;
3966 }
3967
3968 static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p,
3969                                         int n)
3970 {
3971         char *query;
3972         int len;
3973         PrepareStmt *p_stmt;
3974
3975         if (p->cnt <= n)
3976                 return 1;
3977
3978         p_stmt = (PrepareStmt *)p->portal_list[n]->stmt;
3979         len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */
3980         query = malloc(len);
3981         if (query == NULL)
3982         {
3983                 pool_error("send_deallocate: malloc failed: %s", strerror(errno));
3984                 exit(1);
3985         }
3986         sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name);
3987
3988         if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE)
3989         {
3990                 free(query);
3991                 return 1;
3992         }
3993         free(query);
3994
3995         return 0;
3996 }
3997
3998 /*
3999  * parse_copy_data()
4000  *   Parses CopyDataRow string.
4001  *   Returns divide key value. If cannot parse data, returns NULL.
4002  */
4003 char *
4004 parse_copy_data(char *buf, int len, char delimiter, int col_id)
4005 {
4006         int i, j, field = 0;
4007         char *str, *p = NULL;
4008
4009         str = malloc(len + 1);
4010
4011         /* buf is terminated by '\n'. */
4012         /* skip '\n' in for loop.     */
4013         for (i = 0, j = 0; i < len - 1; i++)
4014         {
4015                 if (buf[i] == '\\' && i != len - 2) /* escape */
4016                 {
4017                         if (buf[i+1] == delimiter)
4018                         {
4019                                 i++;
4020                                 str[j++] = buf[i];
4021                         }
4022                         else
4023                         {
4024                                 str[j++] = buf[i];
4025                         }
4026                 }
4027                 else if (buf[i] == delimiter) /* delimiter */
4028                 {
4029                         if (field == col_id)
4030                         {
4031                                 break;
4032                         }
4033                         else
4034                         {
4035                                 field++;
4036                                 j = 0;
4037                         }
4038                 }
4039                 else
4040                 {
4041                         str[j++] = buf[i];
4042                 }
4043         }
4044
4045         if (field == col_id)
4046         {
4047                 str[j] = '\0';
4048                 p = malloc(j);
4049                 if (p == NULL)
4050                 {
4051                         pool_error("parse_copy_data: malloc failed: %s", strerror(errno));
4052                         return NULL;
4053                 }
4054                 strcpy(p, str);
4055                 p[j] = '\0';
4056                 pool_debug("parse_copy_data: divide key value is %s", p);
4057         }
4058
4059         free(str);
4060         return p;
4061 }
4062
4063 static void
4064 query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len)
4065 {
4066         static int inside_T;                    /* flag to see the result data sequence */
4067         int result;
4068
4069         if (is_select_pgcatalog || is_select_for_update)
4070                 return;
4071
4072         if (kind == 'T' && parsed_query)
4073         {
4074                 result = pool_query_cache_register(kind, frontend, database, data, data_len, parsed_query);
4075                 if (result < 0)
4076                 {
4077                         pool_error("pool_query_cache_register: query cache registration failed");
4078                         inside_T = 0;
4079                 }
4080                 else
4081                 {
4082                         inside_T = 1;
4083                 }
4084         }
4085         else if ((kind == 'D' || kind == 'C' || kind == 'E') && inside_T)
4086         {
4087                 result = pool_query_cache_register(kind, frontend, database, data, data_len, NULL);
4088                 if (kind == 'C' || kind == 'E' || result < 0)
4089                 {
4090                         if (result < 0)
4091                                 pool_error("pool_query_cache_register: query cache registration failed");
4092                         else
4093                                 pool_debug("pool_query_cache_register: query cache saved");
4094
4095                         inside_T = 0;
4096                         free(parsed_query);
4097                         parsed_query = NULL;
4098                 }
4099         }
4100 }
4101
4102 void query_ps_status(char *query, POOL_CONNECTION_POOL *backend)
4103 {
4104         StartupPacket *sp;
4105         char psbuf[1024];
4106         int i;
4107
4108         if (*query == '\0')
4109                 return;
4110
4111         sp = MASTER_CONNECTION(backend)->sp;
4112         i = snprintf(psbuf, sizeof(psbuf), "%s %s %s ",
4113                                  sp->user, sp->database, remote_ps_data);
4114
4115         /* skip spaces */
4116         while (*query && isspace(*query))
4117                 query++;
4118
4119         for (; i< sizeof(psbuf); i++)
4120         {
4121                 if (!*query || isspace(*query))
4122                         break;
4123
4124                 psbuf[i] = toupper(*query++);
4125         }
4126         psbuf[i] = '\0';
4127
4128         set_ps_display(psbuf, false);
4129 }
4130
4131 /* compare function for bsearch() */
4132 static int compare(const void *p1, const void *p2)
4133 {
4134         int     v1,     v2;
4135
4136         v1 = *(NodeTag *) p1;
4137         v2 = *(NodeTag *) p2;
4138         return (v1 > v2) ? 1 : ((v1 == v2) ? 0 : -1);
4139 }
4140
4141 /* return true if needed to start a transaction for the nodetag */
4142 static bool is_internal_transaction_needed(Node *node)
4143 {
4144         static NodeTag nodemap[] = {
4145                 T_InsertStmt,
4146                 T_DeleteStmt,
4147                 T_UpdateStmt,
4148                 T_SelectStmt,
4149                 T_AlterTableStmt,
4150                 T_AlterDomainStmt,
4151                 T_GrantStmt,
4152                 T_GrantRoleStmt,
4153                 T_ClosePortalStmt,
4154                 T_ClusterStmt,
4155                 T_CopyStmt,
4156                 T_CreateStmt,   /* CREAE TABLE */
4157                 T_DefineStmt,   /* CREATE AGGREGATE, OPERATOR, TYPE */
4158                 T_DropStmt,             /* DROP TABLE etc. */
4159                 T_TruncateStmt,
4160                 T_CommentStmt,
4161                 T_FetchStmt,
4162                 T_IndexStmt,    /* CREATE INDEX */
4163                 T_CreateFunctionStmt,
4164                 T_AlterFunctionStmt,
4165                 T_RemoveFuncStmt,
4166                 T_RenameStmt,   /* ALTER AGGREGATE etc. */
4167                 T_RuleStmt,             /* CREATE RULE */
4168                 T_NotifyStmt,
4169                 T_ListenStmt,
4170                 T_UnlistenStmt,
4171                 T_ViewStmt,             /* CREATE VIEW */
4172                 T_LoadStmt,
4173                 T_CreateDomainStmt,
4174                 /*
4175                   T_CreatedbStmt,       CREATE DATABASE/DROP DATABASE cannot execute inside a transaction block
4176                   T_DropdbStmt,
4177                 */
4178                 T_CreateSeqStmt,
4179                 T_AlterSeqStmt,
4180                 T_VariableSetStmt,              /* SET */
4181                 T_CreateTrigStmt,
4182                 T_DropPropertyStmt,
4183                 T_CreatePLangStmt,
4184                 T_DropPLangStmt,
4185                 T_CreateRoleStmt,
4186                 T_AlterRoleStmt,
4187                 T_DropRoleStmt,
4188                 T_LockStmt,
4189                 T_ConstraintsSetStmt,
4190                 T_ReindexStmt,
4191                 T_CreateSchemaStmt,
4192                 T_AlterDatabaseStmt,
4193                 T_AlterDatabaseSetStmt,
4194                 T_AlterRoleSetStmt,
4195                 T_CreateConversionStmt,
4196                 T_CreateCastStmt,
4197                 T_DropCastStmt,
4198                 T_CreateOpClassStmt,
4199                 T_CreateOpFamilyStmt,
4200                 T_AlterOpFamilyStmt,
4201                 T_RemoveOpClassStmt,
4202                 T_RemoveOpFamilyStmt,
4203                 T_PrepareStmt,
4204                 T_ExecuteStmt,
4205                 T_DeallocateStmt,
4206                 T_DeclareCursorStmt,
4207                 T_CreateTableSpaceStmt,
4208                 T_DropTableSpaceStmt,
4209                 T_AlterObjectSchemaStmt,
4210                 T_AlterOwnerStmt,
4211                 T_DropOwnedStmt,
4212                 T_ReassignOwnedStmt,
4213                 T_CompositeTypeStmt,    /* CREATE TYPE */
4214                 T_CreateEnumStmt,
4215                 T_AlterTSDictionaryStmt,
4216                 T_AlterTSConfigurationStmt
4217         };
4218
4219         if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]), sizeof(NodeTag), compare) != NULL)
4220         {
4221                 /*
4222                  * Check CREATE INDEX CONCURRENTLY. If so, do not start transaction
4223                  */
4224                 if (IsA(node, IndexStmt))
4225                 {
4226                         if (((IndexStmt *)node)->concurrent)
4227                                 return false;
4228                 }
4229
4230                 /*
4231                  * Check CLUSTER with no option. If so, do not start transaction
4232                  */
4233                 else if (IsA(node, ClusterStmt))
4234                 {
4235                         if (((ClusterStmt *)node)->relation == NULL)
4236                                 return false;
4237                 }
4238
4239                 return true;
4240
4241         }
4242         return false;
4243 }
4244
4245 POOL_STATUS start_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node)
4246 {
4247         int i;
4248
4249         if (TSTATE(backend) != 'I')
4250                 return POOL_CONTINUE;
4251
4252         /* if we are not in a transaction block,
4253          * start a new transaction
4254          */
4255         if (is_internal_transaction_needed(node))
4256         {
4257                 for (i=0;i<NUM_BACKENDS;i++)
4258                 {
4259                         if (VALID_BACKEND(i))
4260                         {
4261                                 if (do_command(frontend, CONNECTION(backend, i), "BEGIN", MAJOR(backend), 
4262                                                            MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
4263                                         return POOL_END;
4264                         }
4265                 }
4266
4267                 /* mark that we started new transaction */
4268                 internal_transaction_started = 1;
4269         }
4270         return POOL_CONTINUE;
4271 }
4272
4273
4274 POOL_STATUS end_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
4275 {
4276         int i;
4277 #ifdef HAVE_SIGPROCMASK
4278         sigset_t oldmask;
4279 #else
4280         int     oldmask;
4281 #endif
4282
4283         /*
4284          * We must block all signals. If pgpool SIGTERM, SIGINT or SIGQUIT
4285          * is delivered, it could cause data inconsistency.
4286          */
4287         POOL_SETMASK2(&BlockSig, &oldmask);
4288
4289         /* We need to commit from secondary to master. */
4290         for (i=0;i<NUM_BACKENDS;i++)
4291         {
4292                 if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
4293                 {
4294                         /* COMMIT success? */
4295                         if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend), 
4296                                                    MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4297                         {
4298                                 internal_transaction_started = 0;
4299                                 POOL_SETMASK(&oldmask);
4300                                 return POOL_END;
4301                         }
4302                 }
4303         }
4304
4305         /* commit on master */
4306         if (do_command(frontend, MASTER(backend), "COMMIT", MAJOR(backend), 
4307                                    MASTER_CONNECTION(backend)->pid,     MASTER_CONNECTION(backend)->key, 1) != POOL_CONTINUE)
4308         {
4309                 internal_transaction_started = 0;
4310                 POOL_SETMASK(&oldmask);
4311                 return POOL_END;
4312         }
4313
4314         internal_transaction_started = 0;
4315         POOL_SETMASK(&oldmask);
4316         return POOL_CONTINUE;
4317 }
4318
4319 /*
4320  * Extract the number of tuples from CommandComplete message
4321  */
4322 static int extract_ntuples(char *message)
4323 {
4324         char *rows;
4325
4326         if ((rows = strstr(message, "UPDATE")) || (rows = strstr(message, "DELETE")))
4327                 rows +=7;
4328         else if ((rows = strstr(message, "INSERT")))
4329         {
4330                 rows += 7;
4331                 while (*rows && *rows != ' ') rows++;
4332         }
4333         else
4334                 return 0;
4335
4336         return atoi(rows);
4337 }
4338
4339 static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major)
4340 {
4341         int r =  detect_error(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, 'E', false);
4342         if (r == SPECIFIED_ERROR)
4343         {
4344                 pool_debug("detect_stop_postmaster_error: receive admin shutdown error from a node.");
4345                 return r;
4346         }
4347
4348         r = detect_error(backend, CRASH_SHUTDOWN_ERROR_CODE, major, 'N', false);
4349         if (r == SPECIFIED_ERROR)
4350         {
4351                 pool_debug("detect_stop_postmaster_error: receive crash shutdown error from a node.");
4352         }
4353         return r;
4354 }
4355
4356 int detect_active_sql_transaction_error(POOL_CONNECTION *backend, int major)
4357 {
4358         int r =  detect_error(backend, ACTIVE_SQL_TRANSACTION_ERROR_CODE, major, 'E', true);
4359         if (r == SPECIFIED_ERROR)
4360         {
4361                 pool_debug("detect_active_sql_transaction_error: receive SET TRANSACTION ISOLATION LEVEL must be called before any query error from a node.");
4362         }
4363         return r;
4364 }
4365
4366 int detect_deadlock_error(POOL_CONNECTION *backend, int major)
4367 {
4368         int r =  detect_error(backend, DEADLOCK_ERROR_CODE, major, 'E', true);
4369         if (r == SPECIFIED_ERROR)
4370                 pool_debug("detect_deadlock_error: received deadlock error message from backend");
4371         return r;
4372 }
4373
4374 int detect_serialization_error(POOL_CONNECTION *backend, int major)
4375 {
4376         int r =  detect_error(backend, SERIALIZATION_FAIL_ERROR_CODE, major, 'E', true);
4377         if (r == SPECIFIED_ERROR)
4378                 pool_debug("detect_serialization_error: received serialization failure message from backend");
4379         return r;
4380 }
4381
4382 int detect_query_cancel_error(POOL_CONNECTION *backend, int major)
4383 {
4384         int r =  detect_error(backend, QUERY_CANCEL_ERROR_CODE, major, 'E', true);
4385         if (r == SPECIFIED_ERROR)
4386                 pool_debug("detect_query_cancel_error: received query cancel error message from backend");
4387         return r;
4388 }
4389
4390 /*
4391  * detect_error: Detect specified error from error code.
4392  */
4393 static int detect_error(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread)
4394 {
4395         int is_error = 0;
4396         char kind;
4397         int readlen = 0, len;
4398         static char buf[8192]; /* memory space is large enough */
4399         char *p, *str;
4400
4401         if (pool_read(backend, &kind, sizeof(kind)))
4402                 return POOL_END;
4403         readlen += sizeof(kind);
4404         p = buf;
4405         memcpy(p, &kind, sizeof(kind));
4406         p += sizeof(kind);
4407
4408         pool_debug("detect_error: kind: %c", kind);
4409
4410         /* Specified class? */
4411         if (kind == class)
4412         {
4413                 /* read actual message */
4414                 if (major == PROTO_MAJOR_V3)
4415                 {
4416                         char *e;
4417
4418                         if (pool_read(backend, &len, sizeof(len)) < 0)
4419                                 return POOL_END;
4420                         readlen += sizeof(len);
4421                         memcpy(p, &len, sizeof(len));
4422                         p += sizeof(len);
4423
4424                         len = ntohl(len) - 4;
4425                         str = malloc(len);
4426                         pool_read(backend, str, len);
4427                         readlen += len;
4428                         memcpy(p, str, len);
4429
4430                         /*
4431                          * Checks error code which is formatted 'Cxxxxxx'
4432                          * (xxxxxx is error code).
4433                          */
4434                         e = str;
4435                         while (*e)
4436                         {
4437                                 if (*e == 'C')
4438                                 {/* specified error? */
4439                                         is_error = (strcmp(e+1, error_code) == 0) ? SPECIFIED_ERROR : 0;
4440                                         break;
4441                                 }
4442                                 else
4443                                         e = e + strlen(e) + 1;
4444                         }
4445                         free(str);
4446                 }
4447                 else
4448                 {
4449                         str = pool_read_string(backend, &len, 0);
4450                         readlen += len;
4451                         memcpy(p, str, len);
4452                 }
4453         }
4454         if (unread || !is_error)
4455         {
4456                 /* put a message to read buffer */
4457                 if (pool_unread(backend, buf, readlen) != 0)
4458                         is_error = -1;
4459         }
4460
4461         return is_error;
4462 }
4463
4464 /*
4465  * read message length and rest of the packet then discard it
4466  */
4467 POOL_STATUS pool_discard_packet(POOL_CONNECTION_POOL *cp)
4468 {
4469         int status, len, i;
4470         char kind;
4471         char *string;
4472         POOL_CONNECTION *backend;
4473
4474         for (i=0;i<NUM_BACKENDS;i++)
4475         {
4476                 if (!VALID_BACKEND(i))
4477                 {
4478                         continue;
4479                 }
4480
4481                 backend = CONNECTION(cp, i);
4482
4483                 status = pool_read(backend, &kind, sizeof(kind));
4484                 if (status < 0)
4485                 {
4486                         pool_error("pool_discard_packet: error while reading message kind");
4487                         return POOL_END;
4488                 }
4489
4490                 pool_debug("pool_discard_packet: kind: %c", kind);
4491
4492                 if (MAJOR(cp) == PROTO_MAJOR_V3)
4493                 {
4494                         if (pool_read(backend, &len, sizeof(len)) < 0)
4495                         {
4496                                 pool_error("pool_discard_packet: error while reading message length");
4497                                 return POOL_END;
4498                         }
4499                         len = ntohl(len) - 4;
4500                         string = pool_read2(backend, len);
4501                         if (string == NULL)
4502                         {
4503                                 pool_error("pool_discard_packet: error while reading rest of message");
4504                                 return POOL_END;
4505                         }
4506                 }
4507                 else
4508                 {
4509                         string = pool_read_string(backend, &len, 0);
4510                         if (string == NULL)
4511                         {
4512                                 pool_error("pool_discard_packet: error while reading rest of message");
4513                                 return POOL_END;
4514                         }
4515                 }
4516         }
4517         return POOL_CONTINUE;
4518 }