3 * $Header: /cvsroot/pgpool/pgpool-II/child.c,v 1.26.2.8 2009/09/06 03:52:12 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2009 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * child.c: child process main
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
30 #include <arpa/inet.h>
32 #ifdef HAVE_NETINET_TCP_H
33 #include <netinet/tcp.h>
35 #ifdef HAVE_SYS_SELECT_H
36 #include <sys/select.h>
60 static POOL_CONNECTION *do_accept(int unix_fd, int inet_fd, struct timeval *timeout);
61 static StartupPacket *read_startup_packet(POOL_CONNECTION *cp);
62 static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend);
63 static RETSIGTYPE die(int sig);
64 static RETSIGTYPE close_idle_connection(int sig);
65 static RETSIGTYPE wakeup_handler(int sig);
66 static RETSIGTYPE reload_config_handler(int sig);
67 static RETSIGTYPE authentication_timeout(int sig);
68 static int send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
69 static void send_frontend_exits(void);
70 static int s_do_auth(POOL_CONNECTION_POOL_SLOT *cp, char *password);
71 static void connection_count_up(void);
72 static void connection_count_down(void);
75 * non 0 means SIGTERM(smart shutdown) or SIGINT(fast shutdown) has arrived
77 static int exit_request;
79 static int idle; /* non 0 means this child is in idle state */
80 static int accepted = 0;
85 char remote_ps_data[NI_MAXHOST]; /* used for set_ps_display */
87 volatile sig_atomic_t got_sighup = 0;
92 void do_child(int unix_fd, int inet_fd)
94 POOL_CONNECTION *frontend;
95 POOL_CONNECTION_POOL *backend;
99 struct timeval timeout;
100 static int connected;
101 int connections_count = 0; /* used if child_max_connections > 0 */
102 int first_ready_for_query_received; /* for master/slave mode */
104 char psbuf[NI_MAXHOST + 128];
106 pool_debug("I am %d", getpid());
108 /* Identify myself via ps */
109 init_ps_display("", "", "", "");
111 /* set up signal handlers */
112 signal(SIGALRM, SIG_DFL);
113 signal(SIGTERM, die);
115 signal(SIGHUP, reload_config_handler);
116 signal(SIGQUIT, die);
117 signal(SIGCHLD, SIG_DFL);
118 signal(SIGUSR1, close_idle_connection);
119 signal(SIGUSR2, wakeup_handler);
120 signal(SIGPIPE, SIG_IGN);
123 /* set listen fds to none block */
124 pool_set_nonblock(unix_fd);
127 pool_set_nonblock(inet_fd);
131 /* initialize random seed */
132 gettimeofday(&now, &tz);
133 srandom((unsigned int) now.tv_usec);
135 /* initialize systemdb connection */
136 if (pool_config->parallel_mode || pool_config->enable_query_cache)
139 if (PQstatus(system_db_info->pgconn) != CONNECTION_OK)
141 pool_error("Could not make persistent libpq system DB connection");
144 system_db_info->connection = make_persistent_db_connection(pool_config->system_db_hostname,
145 pool_config->system_db_port,
146 pool_config->system_db_dbname,
147 pool_config->system_db_user,
148 pool_config->system_db_password);
149 if (system_db_info->connection == NULL)
151 pool_error("Could not make persistent system DB connection");
155 /* initialize connection pool */
163 timeout.tv_sec = pool_config->child_life_time;
166 init_prepared_list();
170 int connection_reuse = 1;
174 /* pgpool stop request already sent? */
184 /* perform accept() */
185 frontend = do_accept(unix_fd, inet_fd, &timeout);
187 if (frontend == NULL) /* connection request from frontend timed out */
189 /* check select() timeout */
190 if (connected && pool_config->child_life_time > 0 &&
191 timeout.tv_sec == 0 && timeout.tv_usec == 0)
193 pool_debug("child life %d seconds expired", pool_config->child_life_time);
195 * Doesn't need to call this. child_exit() calls it.
196 * send_frontend_exits();
203 /* set frontend fd to blocking */
204 pool_unset_nonblock(frontend->fd);
206 /* set busy flag and clear child idle timer */
210 /* check backend timer is expired */
211 if (backend_timer_expired)
213 pool_backend_timer();
214 backend_timer_expired = 0;
217 /* read the startup packet */
219 sp = read_startup_packet(frontend);
222 /* failed to read the startup packet. return to the accept() loop */
223 pool_close(frontend);
224 connection_count_down();
228 /* cancel request? */
229 if (sp->major == 1234 && sp->minor == 5678)
231 cancel_request((CancelPacket *)sp->startup_packet);
233 pool_close(frontend);
234 pool_free_startup_packet(sp);
235 connection_count_down();
240 if (sp->major == 1234 && sp->minor == 5679)
242 /* SSL not supported */
243 pool_debug("SSLRequest: sent N; retry startup");
246 pool_close(frontend);
247 pool_free_startup_packet(sp);
248 connection_count_down();
253 * say to the frontend "we do not suppport SSL"
254 * note that this is not a NOTICE response despite it's an 'N'!
256 pool_write_and_flush(frontend, "N", 1);
258 pool_free_startup_packet(sp);
262 if (pool_config->enable_pool_hba)
265 * do client authentication.
266 * Note that ClientAuthentication does not return if frontend
267 * was rejected; it simply terminates this process.
269 frontend->protoVersion = sp->major;
270 frontend->database = strdup(sp->database);
271 if (frontend->database == NULL)
273 pool_error("do_child: strdup failed: %s\n", strerror(errno));
276 frontend->username = strdup(sp->user);
277 if (frontend->username == NULL)
279 pool_error("do_child: strdup failed: %s\n", strerror(errno));
282 ClientAuthentication(frontend);
286 * Ok, negotiaton with frontend has been done. Let's go to the next step.
290 * if there's no connection associated with user and database,
291 * we need to connect to the backend and send the startup packet.
294 first_ready_for_query_received = 0; /* for master/slave mode */
296 /* look for existing connection */
298 backend = pool_get_cp(sp->user, sp->database, sp->major, 1);
304 /* existing connection associated with same user/database/major found.
305 * however we should make sure that the startup packet contents are identical.
306 * OPTION data and others might be different.
308 if (sp->len != MASTER_CONNECTION(backend)->sp->len)
310 pool_debug("pool_process_query: connection exists but startup packet length is not identical");
313 else if(memcmp(sp->startup_packet, MASTER_CONNECTION(backend)->sp->startup_packet, sp->len) != 0)
315 pool_debug("pool_process_query: connection exists but startup packet contents is not identical");
321 /* we need to discard existing connection since startup packet is different */
322 pool_discard_cp(sp->user, sp->database, sp->major);
329 /* create a new connection to backend */
330 connection_reuse = 0;
332 if ((backend = connect_backend(sp, frontend)) == NULL)
334 connection_count_down();
338 /* in master/slave mode, the first "ready for query"
339 * packet should be treated as if we were not in the
343 first_ready_for_query_received = 1;
350 * save startup packet info
352 for (i = 0; i < NUM_BACKENDS; i++)
354 if (VALID_BACKEND(i))
358 pool_free_startup_packet(backend->slots[i]->sp);
361 backend->slots[i]->sp = sp;
365 /* reuse existing connection to backend */
367 if (pool_do_reauth(frontend, backend))
369 pool_close(frontend);
370 connection_count_down();
374 if (MAJOR(backend) == 3)
376 if (send_params(frontend, backend))
378 pool_close(frontend);
379 connection_count_down();
384 /* send ReadyForQuery to frontend */
385 pool_write(frontend, "Z", 1);
387 if (MAJOR(backend) == 3)
393 pool_write(frontend, &len, sizeof(len));
394 tstate = TSTATE(backend);
395 pool_write(frontend, &tstate, 1);
398 if (pool_flush(frontend) < 0)
400 pool_close(frontend);
401 connection_count_down();
410 sp = MASTER_CONNECTION(backend)->sp;
411 snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
412 sp->user, sp->database, remote_ps_data);
413 set_ps_display(psbuf, false);
415 if (MAJOR(backend) == PROTO_MAJOR_V2)
416 TSTATE(backend) = 'I';
418 if (pool_config->load_balance_mode)
420 /* select load balancing node */
421 backend->info->load_balancing_node = select_load_balancing_node();
424 /* query process loop */
429 status = pool_process_query(frontend, backend, 0, first_ready_for_query_received);
431 sp = MASTER_CONNECTION(backend)->sp;
438 * do not cache connection if:
439 * pool_config->connection_cahe == 0 or
440 * database name is template0, template1, postgres or regression
442 if (pool_config->connection_cache == 0 ||
443 !strcmp(sp->database, "template0") ||
444 !strcmp(sp->database, "template1") ||
445 !strcmp(sp->database, "postgres") ||
446 !strcmp(sp->database, "regression"))
448 pool_close(frontend);
449 pool_send_frontend_exits(backend);
450 pool_discard_cp(sp->user, sp->database, sp->major);
456 /* send reset request to backend */
457 status1 = pool_process_query(frontend, backend, 1, 0);
458 pool_close(frontend);
460 /* if we detect errors on resetting connection, we need to discard
461 * this connection since it might be in unknown status
463 if (status1 != POOL_CONTINUE)
465 pool_debug("error in resetting connections. discarding connection pools...");
466 pool_send_frontend_exits(backend);
467 pool_discard_cp(sp->user, sp->database, sp->major);
470 pool_connection_pool_timer(backend);
474 /* error occured. discard backend connection pool
475 and disconnect connection to the frontend */
478 pool_discard_cp(sp->user, sp->database);
479 pool_close(frontend);
480 notice_backend_error();
482 pool_log("do_child: exits with status 1 due to error");
486 /* fatal error occured. just exit myself... */
488 notice_backend_error(1);
492 /* not implemented yet */
494 do_accept(unix_fd, inet_fd, &timeout);
495 pool_debug("accept while idle");
502 if (status != POOL_CONTINUE)
507 connection_count_down();
509 timeout.tv_sec = pool_config->child_life_time;
512 /* increment queries counter if necessary */
513 if ( pool_config->child_max_connections > 0 )
516 /* check if maximum connections count for this child reached */
517 if ( ( pool_config->child_max_connections > 0 ) &&
518 ( connections_count >= pool_config->child_max_connections ) )
520 pool_log("child exiting, %d connections reached", pool_config->child_max_connections);
521 send_frontend_exits();
528 /* -------------------------------------------------------------------
530 * -------------------------------------------------------------------
536 void pool_set_nonblock(int fd)
540 /* set fd to none blocking */
541 var = fcntl(fd, F_GETFL, 0);
544 pool_error("fcntl failed. %s", strerror(errno));
547 if (fcntl(fd, F_SETFL, var | O_NONBLOCK) == -1)
549 pool_error("fcntl failed. %s", strerror(errno));
555 * unset non-block flag
557 void pool_unset_nonblock(int fd)
561 /* set fd to none blocking */
562 var = fcntl(fd, F_GETFL, 0);
565 pool_error("fcntl failed. %s", strerror(errno));
568 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
570 pool_error("fcntl failed. %s", strerror(errno));
576 * perform accept() and return new fd
578 static POOL_CONNECTION *do_accept(int unix_fd, int inet_fd, struct timeval *timeout)
589 #ifdef ACCEPT_PERFORMANCE
590 struct timeval now1, now2;
594 struct timeval *timeoutval;
595 struct timeval tv1, tv2, tmback = {0, 0};
597 char remote_host[NI_MAXHOST];
598 char remote_port[NI_MAXSERV];
600 set_ps_display("wait for connection request", false);
603 FD_SET(unix_fd, &readmask);
605 FD_SET(inet_fd, &readmask);
607 if (timeout->tv_sec == 0 && timeout->tv_usec == 0)
611 timeoutval = timeout;
612 tmback.tv_sec = timeout->tv_sec;
613 tmback.tv_usec = timeout->tv_usec;
614 gettimeofday(&tv1, NULL);
617 pool_log("before select = {%d, %d}", timeoutval->tv_sec, timeoutval->tv_usec);
618 pool_log("g:before select = {%d, %d}", tv1.tv_sec, tv1.tv_usec);
622 fds = select(Max(unix_fd, inet_fd)+1, &readmask, NULL, NULL, timeoutval);
625 /* check backend timer is expired */
626 if (backend_timer_expired)
628 pool_backend_timer();
629 backend_timer_expired = 0;
633 * following code fragment computes remaining timeout val in a
634 * portable way. Linux does this automazically but other platforms do not.
638 gettimeofday(&tv2, NULL);
640 tmback.tv_usec -= tv2.tv_usec - tv1.tv_usec;
641 tmback.tv_sec -= tv2.tv_sec - tv1.tv_sec;
643 if (tmback.tv_usec < 0)
646 if (tmback.tv_sec < 0)
649 timeout->tv_usec = 0;
653 tmback.tv_usec += 1000000;
654 timeout->tv_sec = tmback.tv_sec;
655 timeout->tv_usec = tmback.tv_usec;
659 pool_log("g:after select = {%d, %d}", tv2.tv_sec, tv2.tv_usec);
660 pool_log("after select = {%d, %d}", timeout->tv_sec, timeout->tv_usec);
668 if (errno == EAGAIN || errno == EINTR)
671 pool_error("select() failed. reason %s", strerror(errno));
681 if (FD_ISSET(unix_fd, &readmask))
686 if (FD_ISSET(inet_fd, &readmask))
693 * Note that some SysV systems do not work here. For those
694 * systems, we need some locking mechanism for the fd.
696 memset(&saddr, 0, sizeof(saddr));
697 saddr.salen = sizeof(saddr.addr);
699 #ifdef ACCEPT_PERFORMANCE
700 gettimeofday(&now1,0);
705 /* wait if recovery is started */
706 while (*InRecovery == 1)
711 afd = accept(fd, (struct sockaddr *)&saddr.addr, &saddr.salen);
714 /* check backend timer is expired */
715 if (backend_timer_expired)
717 pool_backend_timer();
718 backend_timer_expired = 0;
723 if (errno == EINTR && *InRecovery)
727 * "Resource temporarily unavailable" (EAGAIN or EWOULDBLOCK)
728 * can be silently ignored. And EINTR can be ignored.
730 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
731 pool_error("accept() failed. reason: %s", strerror(errno));
734 #ifdef ACCEPT_PERFORMANCE
735 gettimeofday(&now2,0);
736 atime += (now2.tv_sec - now1.tv_sec)*1000000 + (now2.tv_usec - now1.tv_usec);
740 pool_log("cnt: %d atime: %ld", cnt, atime);
744 /* reload config file */
747 pool_get_config(get_config_file_name(), RELOAD_CONFIG);
748 if (pool_config->enable_pool_hba)
749 load_hba(get_hba_file_name());
750 if (pool_config->parallel_mode)
751 pool_memset_system_db_info(system_db_info->info);
755 connection_count_up();
758 if (pool_config->parallel_mode)
761 * do not accept new connection if any of DB node or SystemDB is down when operating in
766 for (i=0;i<NUM_BACKENDS;i++)
768 if (BACKEND_INFO(i).backend_status == CON_DOWN || SYSDB_STATUS == CON_DOWN)
771 char *msg = "pgpool is not available in parallel query mode";
773 if (SYSDB_STATUS == CON_DOWN)
774 pool_log("Cannot accept() new connection. SystemDB is down");
776 pool_log("Cannot accept() new connection. %d th backend is down", i);
778 if ((cp = pool_open(afd)) == NULL)
784 sp = read_startup_packet(cp);
787 /* failed to read the startup packet. return to the accept() loop */
792 pool_debug("do_accept: send error message to frontend");
794 if (sp->major == PROTO_MAJOR_V3)
798 if (SYSDB_STATUS == CON_DOWN)
799 snprintf(buf, sizeof(buf), "SystemDB is down");
801 snprintf(buf, sizeof(buf), "%d th backend is down", i);
803 pool_send_error_message(cp, sp->major, "08S01",
806 ((SYSDB_STATUS == CON_DOWN) ? "repair the SystemDB and restart pgpool"
807 : "repair the backend and restart pgpool"),
813 pool_send_error_message(cp, sp->major,
829 * do not accept new connection if all DB nodes are down when operating in
835 for (i=0;i<NUM_BACKENDS;i++)
837 if (VALID_BACKEND(i))
844 pool_log("Cannot accept() new connection. all backends are down");
849 pool_debug("I am %d accept fd %d", getpid(), afd);
851 pool_getnameinfo_all(&saddr, remote_host, remote_port);
852 snprintf(remote_ps_data, sizeof(remote_ps_data),
853 remote_port[0] == '\0' ? "%s" : "%s(%s)",
854 remote_host, remote_port);
856 set_ps_display("accept connection", false);
858 /* log who is connecting */
859 if (pool_config->log_connections)
861 pool_log("connection received: host=%s%s%s",
862 remote_host, remote_port[0] ? " port=" : "", remote_port);
865 /* set NODELAY and KEEPALIVE options if INET connection */
870 if (setsockopt(afd, IPPROTO_TCP, TCP_NODELAY,
874 pool_error("do_accept: setsockopt() failed: %s", strerror(errno));
878 if (setsockopt(afd, SOL_SOCKET, SO_KEEPALIVE,
882 pool_error("do_accept: setsockopt() failed: %s", strerror(errno));
888 if ((cp = pool_open(afd)) == NULL)
894 /* save ip addres for hba */
895 memcpy(&cp->raddr, &saddr, sizeof(SockAddr));
896 if (cp->raddr.addr.ss_family == 0)
897 cp->raddr.addr.ss_family = AF_UNIX;
903 * read startup packet
905 static StartupPacket *read_startup_packet(POOL_CONNECTION *cp)
908 StartupPacket_v2 *sp2;
913 sp = (StartupPacket *)calloc(sizeof(*sp), 1);
916 pool_error("read_startup_packet: out of memory");
920 if (pool_config->authentication_timeout > 0)
922 pool_signal(SIGALRM, authentication_timeout);
923 alarm(pool_config->authentication_timeout);
926 /* read startup packet length */
927 if (pool_read(cp, &len, sizeof(len)))
936 pool_error("read_startup_packet: incorrect packet length (%d)", len);
938 else if (len >= MAX_STARTUP_PACKET_LENGTH)
940 pool_error("read_startup_packet: invalid startup packet");
941 pool_free_startup_packet(sp);
945 sp->startup_packet = calloc(len, 1);
946 if (!sp->startup_packet)
948 pool_error("read_startup_packet: out of memory");
949 pool_free_startup_packet(sp);
951 pool_signal(SIGALRM, SIG_IGN);
955 /* read startup packet */
956 if (pool_read(cp, sp->startup_packet, len))
958 pool_free_startup_packet(sp);
960 pool_signal(SIGALRM, SIG_IGN);
965 memcpy(&protov, sp->startup_packet, sizeof(protov));
966 sp->major = ntohl(protov)>>16;
967 sp->minor = ntohl(protov) & 0x0000ffff;
968 p = sp->startup_packet;
972 case PROTO_MAJOR_V2: /* V2 */
973 sp2 = (StartupPacket_v2 *)(sp->startup_packet);
975 sp->database = calloc(SM_DATABASE+1, 1);
978 pool_error("read_startup_packet: out of memory");
979 pool_free_startup_packet(sp);
981 pool_signal(SIGALRM, SIG_IGN);
984 strncpy(sp->database, sp2->database, SM_DATABASE);
986 sp->user = calloc(SM_USER+1, 1);
989 pool_error("read_startup_packet: out of memory");
990 pool_free_startup_packet(sp);
992 pool_signal(SIGALRM, SIG_IGN);
995 strncpy(sp->user, sp2->user, SM_USER);
999 case PROTO_MAJOR_V3: /* V3 */
1000 p += sizeof(int); /* skip protocol version info */
1004 if (!strcmp("user", p))
1006 p += (strlen(p) + 1);
1007 sp->user = strdup(p);
1010 pool_error("read_startup_packet: out of memory");
1011 pool_free_startup_packet(sp);
1013 pool_signal(SIGALRM, SIG_IGN);
1017 else if (!strcmp("database", p))
1019 p += (strlen(p) + 1);
1020 sp->database = strdup(p);
1023 pool_error("read_startup_packet: out of memory");
1024 pool_free_startup_packet(sp);
1026 pool_signal(SIGALRM, SIG_IGN);
1030 p += (strlen(p) + 1);
1034 case 1234: /* cancel or SSL request */
1035 /* set dummy database, user info */
1036 sp->database = calloc(1, 1);
1039 pool_error("read_startup_packet: out of memory");
1040 pool_free_startup_packet(sp);
1042 pool_signal(SIGALRM, SIG_IGN);
1045 sp->user = calloc(1, 1);
1048 pool_error("read_startup_packet: out of memory");
1049 pool_free_startup_packet(sp);
1051 pool_signal(SIGALRM, SIG_IGN);
1057 pool_error("read_startup_packet: invalid major no: %d", sp->major);
1058 pool_free_startup_packet(sp);
1060 pool_signal(SIGALRM, SIG_IGN);
1064 pool_debug("Protocol Major: %d Minor: %d database: %s user: %s",
1065 sp->major, sp->minor, sp->database, sp->user);
1067 pool_signal(SIGALRM, SIG_IGN);
1072 * send startup packet
1074 int send_startup_packet(POOL_CONNECTION_POOL_SLOT *cp)
1078 len = htonl(cp->sp->len + sizeof(len));
1079 pool_write(cp->con, &len, sizeof(len));
1080 return pool_write_and_flush(cp->con, cp->sp->startup_packet, cp->sp->len);
1084 * process cancel request
1086 void cancel_request(CancelPacket *sp)
1090 POOL_CONNECTION *con;
1092 ConnectionInfo *c = NULL;
1095 pool_debug("Cancel request received");
1097 /* look for cancel key from shmem info */
1098 for (i=0;i<pool_config->num_init_children*pool_config->max_pool;i++)
1102 if (c->pid == sp->pid && c->key == sp->key)
1104 pool_debug("found pid:%d key:%d i:%d",c->pid, c->key,i);
1105 c = &con_info[i/pool_config->max_pool * pool_config->max_pool];
1109 if (i == pool_config->num_init_children*pool_config->max_pool)
1110 return; /* invalid key */
1112 for (i=0;i<NUM_BACKENDS;i++,c++)
1114 if (!VALID_BACKEND(i))
1117 if (*(BACKEND_INFO(i).backend_hostname) == '\0')
1118 fd = connect_unix_domain_socket(i);
1120 fd = connect_inet_domain_socket(i);
1124 pool_error("Could not create socket for sending cancel request for backend %d", i);
1128 con = pool_open(fd);
1132 len = htonl(sizeof(len) + sizeof(CancelPacket));
1133 pool_write(con, &len, sizeof(len));
1135 cp.protoVersion = sp->protoVersion;
1139 pool_debug("pid:%d key: %d",cp.pid,cp.key);
1141 if (pool_write_and_flush(con, &cp, sizeof(CancelPacket)) < 0)
1142 pool_error("Could not send cancel request packet for backend %d", i);
1147 * this is needed to enure that the next DB node executes the
1148 * query supposed to be canceled.
1154 static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend)
1156 POOL_CONNECTION_POOL *backend;
1159 /* connect to the backend */
1160 backend = pool_create_cp();
1161 if (backend == NULL)
1163 pool_send_error_message(frontend, sp->major, "XX000", "connection cache is full", "",
1164 "increase max_pool", __FILE__, __LINE__);
1165 pool_close(frontend);
1166 pool_free_startup_packet(sp);
1170 for (i=0;i<NUM_BACKENDS;i++)
1172 if (VALID_BACKEND(i))
1174 /* set DB node id */
1175 CONNECTION(backend, i)->db_node_id = i;
1177 /* mark this is a backend connection */
1178 CONNECTION(backend, i)->isbackend = 1;
1181 * save startup packet info
1183 CONNECTION_SLOT(backend, i)->sp = sp;
1185 /* send startup packet */
1186 if (send_startup_packet(CONNECTION_SLOT(backend, i)) < 0)
1188 pool_error("do_child: fails to send startup packet to the %d th backend", i);
1189 pool_discard_cp(sp->user, sp->database, sp->major);
1190 pool_close(frontend);
1197 * do authentication stuff
1199 if (pool_do_auth(frontend, backend))
1201 pool_close(frontend);
1202 pool_discard_cp(sp->user, sp->database, sp->major);
1210 * signal handler for SIGINT and SIGQUUT
1212 static RETSIGTYPE die(int sig)
1216 pool_debug("child receives shutdown request signal %d", sig);
1220 case SIGTERM: /* smart shutdown */
1223 pool_debug("child receives smart shutdown request but it's not in idle state");
1228 case SIGINT: /* fast shutdown */
1229 case SIGQUIT: /* immediate shutdown */
1237 * child_exit() does this. So we don't need it.
1238 * send_frontend_exits();
1244 * signal handler for SIGUSR1
1245 * close all idle connections
1247 static RETSIGTYPE close_idle_connection(int sig)
1250 POOL_CONNECTION_POOL *p = pool_connection_pool;
1251 ConnectionInfo *info;
1253 pool_debug("child receives close connection request");
1255 for (j=0;j<pool_config->max_pool;j++, p++)
1257 if (!MASTER_CONNECTION(p))
1259 if (!MASTER_CONNECTION(p)->sp)
1261 if (MASTER_CONNECTION(p)->sp->user == NULL)
1264 if (MASTER_CONNECTION(p)->closetime > 0) /* idle connection? */
1266 pool_debug("close_idle_connection: close idle connection: user %s database %s", MASTER_CONNECTION(p)->sp->user, MASTER_CONNECTION(p)->sp->database);
1267 pool_send_frontend_exits(p);
1269 for (i=0;i<NUM_BACKENDS;i++)
1271 if (!VALID_BACKEND(i))
1276 /* only first backend allocated the memory for the start up packet */
1277 pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
1279 pool_close(CONNECTION(p, i));
1282 memset(p, 0, sizeof(POOL_CONNECTION_POOL));
1284 memset(p->info, 0, sizeof(ConnectionInfo));
1290 * signal handler for SIGALRM
1293 static RETSIGTYPE authentication_timeout(int sig)
1295 pool_log("authentication is timeout");
1300 * send frontend exiting messages to all connections. this is called
1301 * in any case when child process exits, for example failover, child
1302 * life time expires or child max connections expires.
1304 static void send_frontend_exits(void)
1307 POOL_CONNECTION_POOL *p = pool_connection_pool;
1309 #ifdef HAVE_SIGPROCMASK
1315 POOL_SETMASK2(&BlockSig, &oldmask);
1317 for (i=0;i<pool_config->max_pool;i++, p++)
1319 if (!MASTER_CONNECTION(p))
1321 if (!MASTER_CONNECTION(p)->sp)
1323 if (MASTER_CONNECTION(p)->sp->user == NULL)
1325 pool_send_frontend_exits(p);
1328 POOL_SETMASK(&oldmask);
1331 static int send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
1338 while (pool_get_param(&MASTER(backend)->params, index++, &name, &value) == 0)
1340 pool_write(frontend, "S", 1);
1341 len = sizeof(sendlen) + strlen(name) + 1 + strlen(value) + 1;
1342 sendlen = htonl(len);
1343 pool_write(frontend, &sendlen, sizeof(sendlen));
1344 pool_write(frontend, name, strlen(name) + 1);
1345 pool_write(frontend, value, strlen(value) + 1);
1348 if (pool_flush(frontend))
1350 pool_error("pool_send_params: pool_flush() failed");
1356 void pool_free_startup_packet(StartupPacket *sp)
1360 if (sp->startup_packet)
1361 free(sp->startup_packet);
1371 * Do house keeping works when pgpool child process exits
1373 void child_exit(int code)
1375 /* count down global connection counter */
1377 connection_count_down();
1379 /* prepare to shutdown connections to system db */
1380 if(pool_config->parallel_mode || pool_config->enable_query_cache)
1382 if (system_db_info->pgconn)
1383 pool_close_libpq_connection();
1384 if (pool_system_db_connection())
1385 pool_close(pool_system_db_connection()->con);
1388 /* let backend know now we are exiting */
1389 send_frontend_exits();
1395 * create a persistent connection
1397 POOL_CONNECTION_POOL_SLOT *make_persistent_db_connection(
1398 char *hostname, int port, char *dbname, char *user, char *password)
1400 POOL_CONNECTION_POOL_SLOT *cp;
1403 #define MAX_USER_AND_DATABASE 1024
1405 /* V3 startup packet */
1408 char data[MAX_USER_AND_DATABASE];
1411 static StartupPacket_v3 *startup_packet;
1415 cp = malloc(sizeof(POOL_CONNECTION_POOL));
1418 pool_error("make_persistent_db_connection: could not allocate memory");
1421 memset(cp, 0, sizeof(POOL_CONNECTION_POOL));
1423 startup_packet = malloc(sizeof(*startup_packet));
1424 if (startup_packet == NULL)
1426 pool_error("make_persistent_db_connection: could not allocate memory");
1429 memset(startup_packet, 0, sizeof(*startup_packet));
1430 startup_packet->protoVersion = htonl(0x00030000); /* set V3 proto major/minor */
1435 if (*hostname == '\0')
1437 fd = connect_unix_domain_socket_by_port(port, pool_config->backend_socket_dir);
1441 fd = connect_inet_domain_socket_by_port(hostname, port);
1446 pool_error("make_persistent_db_connection: connection to %s(%d) failed", hostname, port);
1450 cp->con = pool_open(fd);
1452 cp->con->isbackend = 1;
1455 * build V3 startup packet
1457 len = snprintf(startup_packet->data, sizeof(startup_packet->data), "user") + 1;
1458 len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data)-len, "%s", user) + 1;
1459 if (len1 >= (sizeof(startup_packet->data)-len))
1461 pool_error("make_persistent_db_connection: too long user name");
1466 len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data)-len, "database") + 1;
1467 if (len1 >= (sizeof(startup_packet->data)-len))
1469 pool_error("make_persistent_db_connection: too long user name");
1474 len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data)-len, "%s", dbname) + 1;
1475 if (len1 >= (sizeof(startup_packet->data)-len))
1477 pool_error("make_persistent_db_connection: too long database name");
1481 startup_packet->data[len++] = '\0';
1483 cp->sp = malloc(sizeof(StartupPacket));
1486 pool_error("make_persistent_db_connection: could not allocate memory");
1490 cp->sp->startup_packet = (char *)startup_packet;
1491 cp->sp->len = len + 4;
1494 cp->sp->database = strdup(dbname);
1495 if (cp->sp->database == NULL)
1497 pool_error("make_persistent_db_connection: could not allocate memory");
1500 cp->sp->user = strdup(user);
1501 if (cp->sp->user == NULL)
1503 pool_error("make_persistent_db_connection: could not allocate memory");
1508 * send startup packet
1510 status = send_startup_packet(cp);
1513 pool_error("make_persistent_db_connection: send_startup_packet failed");
1520 if (s_do_auth(cp, password))
1522 pool_error("make_persistent_db_connection: s_do_auth failed");
1530 * do authentication for cp
1532 static int s_do_auth(POOL_CONNECTION_POOL_SLOT *cp, char *password)
1543 * read kind expecting 'R' packet (authentication response)
1545 status = pool_read(cp->con, &kind, sizeof(kind));
1548 pool_error("s_do_auth: error while reading message kind");
1554 pool_error("s_do_auth: expecting R got %c", kind);
1558 /* read message length */
1559 status = pool_read(cp->con, &length, sizeof(length));
1562 pool_error("s_do_auth: error while reading message length");
1565 length = ntohl(length);
1567 /* read auth kind */
1568 status = pool_read(cp->con, &auth_kind, sizeof(auth_kind));
1571 pool_error("s_do_auth: error while reading auth kind");
1574 auth_kind = ntohl(auth_kind);
1575 pool_debug("s_do_auth: auth kind: %d", auth_kind);
1577 if (auth_kind == 0) /* trust authentication? */
1579 cp->con->auth_kind = 0;
1581 else if (auth_kind == 3) /* clear text password? */
1583 int size = htonl(strlen(password) + 5);
1585 pool_write(cp->con, "p", 1);
1586 pool_write(cp->con, &size, sizeof(size));
1587 pool_write_and_flush(cp->con, password, strlen(password) + 1);
1588 status = pool_flush(cp->con);
1591 pool_error("s_do_auth: error while sending clear text password");
1594 return s_do_auth(cp, password);
1596 else if (auth_kind == 4) /* crypt password? */
1600 char *crypt_password;
1602 status = pool_read(cp->con, &salt, 2);
1605 pool_error("s_do_auth: error while reading crypt salt");
1610 crypt_password = crypt(password, salt);
1611 size = htonl(strlen(crypt_password) + 5);
1612 pool_write(cp->con, "p", 1);
1613 pool_write(cp->con, &size, sizeof(size));
1614 pool_write_and_flush(cp->con, crypt_password, strlen(crypt_password) + 1);
1615 status = pool_flush(cp->con);
1618 pool_error("s_do_auth: error while sending crypt password");
1621 return s_do_auth(cp, password);
1623 else if (auth_kind == 5) /* md5 password? */
1629 status = pool_read(cp->con, &salt, 4);
1632 pool_error("s_do_auth: error while reading md5 salt");
1636 buf = malloc(2 * (MD5_PASSWD_LEN + 4)); /* hash + "md5" + '\0' */
1639 pool_error("s_do_auth(): malloc failed: %s", strerror(errno));
1642 memset(buf, 0, 2 * (MD5_PASSWD_LEN + 4));
1644 /* build md5 password */
1645 buf1 = buf + MD5_PASSWD_LEN + 4;
1646 pool_md5_encrypt(password, cp->sp->user, strlen(cp->sp->user), buf1);
1647 pool_md5_encrypt(buf1, salt, 4, buf + 3);
1648 memcpy(buf, "md5", 3);
1650 size = htonl(strlen(buf) + 5);
1651 pool_write(cp->con, "p", 1);
1652 pool_write(cp->con, &size, sizeof(size));
1653 pool_write_and_flush(cp->con, buf, strlen(buf) + 1);
1654 status = pool_flush(cp->con);
1657 pool_error("s_do_auth: error while sending md5 password");
1661 status = s_do_auth(cp, password);
1667 pool_error("s_do_auth: auth kind %d not supported yet", auth_kind);
1676 status = pool_read(cp->con, &kind, sizeof(kind));
1679 pool_error("s_do_auth: error while reading message kind");
1685 case 'K': /* backend key data */
1686 pool_debug("s_do_auth: backend key data received");
1688 /* read message length */
1689 status = pool_read(cp->con, &length, sizeof(length));
1692 pool_error("s_do_auth: error while reading message length");
1695 if (ntohl(length) != 12)
1697 pool_error("s_do_auth: backend key data length is not 12 (%d)", ntohl(length));
1701 if (pool_read(cp->con, &pid, sizeof(pid)) < 0)
1703 pool_error("s_do_auth: failed to read pid");
1709 if (pool_read(cp->con, &key, sizeof(key)) < 0)
1711 pool_error("s_do_auth: failed to read key");
1716 /* read kind expecting 'Z' (ready for query) */
1717 status = pool_read(cp->con, &kind, sizeof(kind));
1720 pool_error("s_do_auth: error while reading kind");
1726 pool_error("s_do_auth: expecting Z got %c", kind);
1730 /* read message length */
1731 status = pool_read(cp->con, &length, sizeof(length));
1734 pool_error("s_do_auth: error while reading message length");
1737 length = ntohl(length);
1739 /* read transaction state */
1740 status = pool_read(cp->con, &state, sizeof(state));
1743 pool_error("s_do_auth: error while reading transaction state");
1747 pool_debug("s_do_auth: transaction state: %c", state);
1748 cp->con->tstate = state;
1752 case 'S': /* parameter status */
1753 pool_debug("s_do_auth: parameter status data received");
1755 status = pool_read(cp->con, &length, sizeof(length));
1758 pool_error("s_do_auth: error while reading message length");
1762 length = ntohl(length);
1765 p = pool_read2(cp->con, length);
1771 pool_error("s_do_auth: unknown response \"%c\" before processing BackendKeyData",
1780 * Count up connection counter (from frontend to pgpool)
1783 static void connection_count_up(void)
1785 pool_semaphore_lock(CONN_COUNTER_SEM);
1786 Req_info->conn_counter++;
1787 pool_semaphore_unlock(CONN_COUNTER_SEM);
1791 * Count down connection counter (from frontend to pgpool)
1794 static void connection_count_down(void)
1796 pool_semaphore_lock(CONN_COUNTER_SEM);
1798 * Make sure that we do not decrement too much. If failed to read
1799 * a start up packet, or receive cancel request etc.,
1800 * connection_count_down() is called and goes back to the
1801 * connection accept loop. Problem is, at the very beginning of
1802 * the connection accept loop, if we have received a signal, we
1803 * call child_exit() which calls connection_count_down() again.
1805 if (Req_info->conn_counter > 0)
1806 Req_info->conn_counter--;
1807 pool_semaphore_unlock(CONN_COUNTER_SEM);
1812 * Wakeup all process
1814 static RETSIGTYPE wakeup_handler(int sig)
1820 * Select load balancing node
1822 int select_load_balancing_node(void)
1824 double total_weight,r;
1827 /* choose a backend in random manner with weight */
1828 selected_slot = MASTER_NODE_ID;
1831 for (i=0;i<NUM_BACKENDS;i++)
1833 if (VALID_BACKEND(i))
1835 total_weight += BACKEND_INFO(i).backend_weight;
1838 r = (((double)random())/RAND_MAX) * total_weight;
1840 for (i=0;i<NUM_BACKENDS;i++)
1842 if (VALID_BACKEND(i) && BACKEND_INFO(i).backend_weight > 0.0)
1844 if(r >= total_weight)
1848 total_weight += BACKEND_INFO(i).backend_weight;
1852 pool_debug("select_load_balancing_node: selected backend id is %d", selected_slot);
1853 return selected_slot;
1856 /* SIGHUP handler */
1857 static RETSIGTYPE reload_config_handler(int sig)