]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_connection_pool.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_connection_pool.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_connection_pool.c,v 1.13.2.3 2009/08/22 04:19:49 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2009      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * poo_connection_pool.c: connection pool stuff
22  */
23 #include "config.h"
24
25 #include <sys/types.h>
26 #include <sys/time.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
32 #endif
33 #ifdef HAVE_NETINET_TCP_H
34 #include <netinet/tcp.h>
35 #endif
36 #include <netdb.h>
37
38 #include <stdio.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44
45 #include "pool.h"
46
47 POOL_CONNECTION_POOL *pool_connection_pool;     /* connection pool */
48 volatile sig_atomic_t backend_timer_expired = 0; /* flag for connection closed timer is expired */
49
50 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot);
51 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p);
52 static int check_socket_status(int fd);
53
54 /*
55 * initialize connection pools. this should be called once at the startup.
56 */
57 int pool_init_cp(void)
58 {
59         int i;
60
61         pool_connection_pool = (POOL_CONNECTION_POOL *)malloc(sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
62         if (pool_connection_pool == NULL)
63         {
64                 pool_error("pool_init_cp: malloc() failed");
65                 return -1;
66         }
67         memset(pool_connection_pool, 0, sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
68
69         for (i = 0; i < pool_config->max_pool; i++)
70         {
71                 pool_connection_pool[i].info = &(MY_PROCESS_INFO.connection_info[i]);
72                 memset(pool_connection_pool[i].info, 0, sizeof(ConnectionInfo));
73         }
74         return 0;
75 }
76
77 /*
78 * find connection by user and database
79 */
80 POOL_CONNECTION_POOL *pool_get_cp(char *user, char *database, int protoMajor, int check_socket)
81 {
82 #ifdef HAVE_SIGPROCMASK
83         sigset_t oldmask;
84 #else
85         int     oldmask;
86 #endif
87
88         int i, j, freed = 0;
89         ConnectionInfo *info;
90
91         POOL_CONNECTION_POOL *p = pool_connection_pool;
92
93         if (p == NULL)
94         {
95                 pool_error("pool_get_cp: pool_connection_pool is not initialized");
96                 return NULL;
97         }
98
99         POOL_SETMASK2(&BlockSig, &oldmask);
100
101         for (i=0;i<pool_config->max_pool;i++)
102         {
103                 if (MASTER_CONNECTION(p) &&
104                         MASTER_CONNECTION(p)->sp &&
105                         MASTER_CONNECTION(p)->sp->major == protoMajor &&
106                         MASTER_CONNECTION(p)->sp->user != NULL &&
107                         strcmp(MASTER_CONNECTION(p)->sp->user, user) == 0 &&
108                         strcmp(MASTER_CONNECTION(p)->sp->database, database) == 0)
109                 {
110                         int sock_broken = 0;
111
112                         /* mark this connection is under use */
113                         MASTER_CONNECTION(p)->closetime = 0;
114                         p->info->counter++;
115                         POOL_SETMASK(&oldmask);
116
117                         if (check_socket)
118                         {
119                                 for (j=0;j<NUM_BACKENDS;j++)
120                                 {
121                                         if (!VALID_BACKEND(j))
122                                                 continue;
123
124                                         if  (CONNECTION_SLOT(p, j))
125                                         {
126                                                 sock_broken = check_socket_status(CONNECTION(p, j)->fd);
127                                                 if (sock_broken < 0)
128                                                         break;
129                                         }
130                                         else
131                                         {
132                                                 sock_broken = -1;
133                                                 break;
134                                         }
135                                 }
136
137                                 if (sock_broken < 0)
138                                 {
139                                         pool_log("connection closed. retry to create new connection pool.");
140                                         for (j=0;j<NUM_BACKENDS;j++)
141                                         {
142                                                 if (!VALID_BACKEND(j) || (CONNECTION_SLOT(p, j) == NULL))
143                                                         continue;
144
145                                                 if (!freed)
146                                                 {
147                                                         pool_free_startup_packet(CONNECTION_SLOT(p, j)->sp);
148                                                         freed = 1;
149                                                 }
150
151                                                 pool_close(CONNECTION(p, j));
152                                                 free(CONNECTION_SLOT(p, j));
153                                         }
154                                         info = p->info;
155                                         memset(p, 0, sizeof(POOL_CONNECTION_POOL_SLOT));
156                                         p->info = info;
157                                         memset(p->info, 0, sizeof(ConnectionInfo));
158                                         POOL_SETMASK(&oldmask);
159                                         return NULL;
160                                 }
161                         }
162                         POOL_SETMASK(&oldmask);
163                         return p;
164                 }
165                 p++;
166         }
167
168         POOL_SETMASK(&oldmask);
169         return NULL;
170 }
171
172 /*
173  * disconnect and release a connection to the database
174  */
175 void pool_discard_cp(char *user, char *database, int protoMajor)
176 {
177         POOL_CONNECTION_POOL *p = pool_get_cp(user, database, protoMajor, 0);
178         ConnectionInfo *info;
179         int i, freed = 0;
180
181         if (p == NULL)
182         {
183                 pool_error("pool_discard_cp: cannot get connection pool for user %s datbase %s", user, database);
184                 return;
185         }
186
187         for (i=0;i<NUM_BACKENDS;i++)
188         {
189                 if (!VALID_BACKEND(i))
190                         continue;
191
192                 if (!freed)
193                 {
194                         pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
195                         freed = 1;
196                 }
197                 pool_close(CONNECTION(p, i));
198                 free(CONNECTION_SLOT(p, i));
199         }
200
201         info = p->info;
202         memset(p, 0, sizeof(POOL_CONNECTION_POOL));
203         p->info = info;
204         memset(p->info, 0, sizeof(ConnectionInfo));
205 }
206
207
208 /*
209 * create a connection pool by user and database
210 */
211 POOL_CONNECTION_POOL *pool_create_cp(void)
212 {
213         int i, freed = 0;
214         time_t closetime;
215         POOL_CONNECTION_POOL *oldestp;
216         ConnectionInfo *info;
217
218         POOL_CONNECTION_POOL *p = pool_connection_pool;
219
220         if (p == NULL)
221         {
222                 pool_error("pool_create_cp: pool_connection_pool is not initialized");
223                 return NULL;
224         }
225
226         for (i=0;i<pool_config->max_pool;i++)
227         {
228                 if (MASTER_CONNECTION(p) == NULL)
229                         return new_connection(p);
230                 p++;
231         }
232
233         pool_debug("no empty connection slot was found");
234
235         /*
236          * no empty connection slot was found. look for the oldest connection and discard it.
237          */
238         oldestp = p = pool_connection_pool;
239         closetime = MASTER_CONNECTION(p)->closetime;
240         for (i=0;i<pool_config->max_pool;i++)
241         {
242                 pool_debug("user: %s database: %s closetime: %ld",
243                                    MASTER_CONNECTION(p)->sp->user,
244                                    MASTER_CONNECTION(p)->sp->database,
245                                    MASTER_CONNECTION(p)->closetime);
246                 if (MASTER_CONNECTION(p)->closetime < closetime)
247                 {
248                         closetime = MASTER_CONNECTION(p)->closetime;
249                         oldestp = p;
250                 }
251                 p++;
252         }
253
254         p = oldestp;
255         pool_send_frontend_exits(p);
256
257         pool_debug("discarding old %d th connection. user: %s database: %s",
258                            oldestp - pool_connection_pool,
259                            MASTER_CONNECTION(p)->sp->user,
260                            MASTER_CONNECTION(p)->sp->database);
261
262         for (i=0;i<NUM_BACKENDS;i++)
263         {
264                 if (!VALID_BACKEND(i))
265                         continue;
266
267                 if (!freed)
268                 {
269                         pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
270                         freed = 1;
271                 }
272
273                 pool_close(CONNECTION(p, i));
274                 free(CONNECTION_SLOT(p, i));
275         }
276
277         info = p->info;
278         memset(p, 0, sizeof(POOL_CONNECTION_POOL));
279         p->info = info;
280         memset(p->info, 0, sizeof(ConnectionInfo));
281
282         return new_connection(p);
283 }
284
285 /*
286  * set backend connection close timer
287  */
288 void pool_connection_pool_timer(POOL_CONNECTION_POOL *backend)
289 {
290         POOL_CONNECTION_POOL *p = pool_connection_pool;
291         int i;
292
293         pool_debug("pool_connection_pool_timer: set close time %ld", time(NULL));
294
295         MASTER_CONNECTION(backend)->closetime = time(NULL);             /* set connection close time */
296
297         if (pool_config->connection_life_time == 0)
298                 return;
299
300         /* look for any other timeout */
301         for (i=0;i<pool_config->max_pool;i++, p++)
302         {
303                 if (!MASTER_CONNECTION(p))
304                         continue;
305                 if (!MASTER_CONNECTION(p)->sp)
306                         continue;
307                 if (MASTER_CONNECTION(p)->sp->user == NULL)
308                         continue;
309
310                 if (p != backend && MASTER_CONNECTION(p)->closetime)
311                         return;
312         }
313
314         /* no other timer found. set my timer */
315         pool_debug("pool_connection_pool_timer: set alarm after %d seconds", pool_config->connection_life_time);
316         pool_signal(SIGALRM, pool_backend_timer_handler);
317         alarm(pool_config->connection_life_time);
318 }
319
320 /*
321  * backend connection close timer handler
322  */
323 RETSIGTYPE pool_backend_timer_handler(int sig)
324 {
325         backend_timer_expired = 1;
326 }
327
328 void pool_backend_timer(void)
329 {
330 #define TMINTMAX 0x7fffffff
331
332         POOL_CONNECTION_POOL *p = pool_connection_pool;
333         int i, j;
334         time_t now;
335         time_t nearest = TMINTMAX;
336         ConnectionInfo *info;
337
338         POOL_SETMASK(&BlockSig);
339
340         now = time(NULL);
341
342         pool_debug("pool_backend_timer_handler called at %ld", now);
343
344         for (i=0;i<pool_config->max_pool;i++, p++)
345         {
346                 if (!MASTER_CONNECTION(p))
347                         continue;
348                 if (!MASTER_CONNECTION(p)->sp)
349                         continue;
350                 if (MASTER_CONNECTION(p)->sp->user == NULL)
351                         continue;
352
353                 /* timer expire? */
354                 if (MASTER_CONNECTION(p)->closetime)
355                 {
356                         int freed = 0;
357
358                         pool_debug("pool_backend_timer_handler: expire time: %ld",
359                                            MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time);
360
361                         if (now >= (MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time))
362                         {
363                                 /* discard expired connection */
364                                 pool_debug("pool_backend_timer_handler: expires user %s database %s",
365                                                    MASTER_CONNECTION(p)->sp->user, MASTER_CONNECTION(p)->sp->database);
366
367                                 pool_send_frontend_exits(p);
368
369                                 for (j=0;j<NUM_BACKENDS;j++)
370                                 {
371                                         if (!VALID_BACKEND(j))
372                                                 continue;
373
374                                         if (!freed)
375                                         {
376                                                 pool_free_startup_packet(CONNECTION_SLOT(p, j)->sp);
377                                                 freed = 1;
378                                         }
379
380                                         pool_close(CONNECTION(p, j));
381                                         free(CONNECTION_SLOT(p, j));
382                                 }
383                                 info = p->info;
384                                 memset(p, 0, sizeof(POOL_CONNECTION_POOL));
385                                 p->info = info;
386                                 memset(p->info, 0, sizeof(ConnectionInfo));
387                         }
388                         else
389                         {
390                                 /* look for nearest timer */
391                                 if (MASTER_CONNECTION(p)->closetime < nearest)
392                                         nearest = MASTER_CONNECTION(p)->closetime;
393                         }
394                 }
395         }
396
397         /* any remaining timer */
398         if (nearest != TMINTMAX)
399         {
400                 nearest = pool_config->connection_life_time - (now - nearest);
401                 if (nearest <= 0)
402                   nearest = 1;
403                 pool_signal(SIGALRM, pool_backend_timer_handler);
404                 alarm(nearest);
405         }
406
407         POOL_SETMASK(&UnBlockSig);
408 }
409
410 /*
411  * connect to postmaster through INET domain socket
412  */
413 int connect_inet_domain_socket(int slot)
414 {
415         char *host;
416         int port;
417
418         host = pool_config->backend_desc->backend_info[slot].backend_hostname;
419         port = pool_config->backend_desc->backend_info[slot].backend_port;
420
421         return connect_inet_domain_socket_by_port(host, port);
422 }
423
424 /*
425  * connect to postmaster through UNIX domain socket
426  */
427 int connect_unix_domain_socket(int slot)
428 {
429         int port;
430         char *socket_dir;
431
432         port = pool_config->backend_desc->backend_info[slot].backend_port;
433         socket_dir = pool_config->backend_socket_dir;
434
435         return connect_unix_domain_socket_by_port(port, socket_dir);
436 }
437
438 int connect_unix_domain_socket_by_port(int port, char *socket_dir)
439 {
440         struct sockaddr_un addr;
441         int fd;
442         int len;
443
444         fd = socket(AF_UNIX, SOCK_STREAM, 0);
445         if (fd == -1)
446         {
447                 pool_error("connect_unix_domain_socket_by_port: setsockopt() failed: %s", strerror(errno));
448                 return -1;
449         }
450
451         memset((char *) &addr, 0, sizeof(addr));
452         ((struct sockaddr *)&addr)->sa_family = AF_UNIX;
453         snprintf(addr.sun_path, sizeof(addr.sun_path), "%s/.s.PGSQL.%d", socket_dir, port);
454         len = sizeof(struct sockaddr_un);
455
456         for (;;)
457         {
458                 if (connect(fd, (struct sockaddr *)&addr, len) < 0)
459                 {
460                         if (errno == EINTR || errno == EAGAIN)
461                                 continue;
462
463                         pool_error("connect_unix_domain_socket_by_port: connect() failed: %s", strerror(errno));
464                         close(fd);
465                         return -1;
466                 }
467                 break;
468         }
469
470         return fd;
471 }
472
473 int connect_inet_domain_socket_by_port(char *host, int port)
474 {
475         int fd;
476         int len;
477         int on = 1;
478         struct sockaddr_in addr;
479         struct hostent *hp;
480
481         fd = socket(AF_INET, SOCK_STREAM, 0);
482         if (fd < 0)
483         {
484                 pool_error("connect_inet_domain_socket_by_port: socket() failed: %s", strerror(errno));
485                 return -1;
486         }
487
488         /* set nodelay */
489         if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
490                                    (char *) &on,
491                                    sizeof(on)) < 0)
492         {
493                 pool_error("connect_inet_domain_socket_by_port: setsockopt() failed: %s", strerror(errno));
494                 close(fd);
495                 return -1;
496         }
497
498         memset((char *) &addr, 0, sizeof(addr));
499         ((struct sockaddr *)&addr)->sa_family = AF_INET;
500
501         addr.sin_port = htons(port);
502         len = sizeof(struct sockaddr_in);
503
504         hp = gethostbyname(host);
505         if ((hp == NULL) || (hp->h_addrtype != AF_INET))
506         {
507                 pool_error("connect_inet_domain_socket: gethostbyname() failed: %s host: %s", strerror(errno), host);
508                 close(fd);
509                 return -1;
510         }
511         memmove((char *) &(addr.sin_addr),
512                         (char *) hp->h_addr,
513                         hp->h_length);
514
515         for (;;)
516         {
517                 if (connect(fd, (struct sockaddr *)&addr, len) < 0)
518                 {
519                         if (errno == EINTR || errno == EAGAIN)
520                                 continue;
521
522                         pool_error("connect_inet_domain_socket: connect() failed: %s",strerror(errno));
523                         close(fd);
524                         return -1;
525                 }
526                 break;
527         }
528
529         return fd;
530 }
531
532 /*
533  * create connection pool
534  */
535 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot)
536 {
537         BackendInfo *b = &pool_config->backend_desc->backend_info[slot];
538         int fd;
539
540         if (*b->backend_hostname == '\0')
541         {
542                 fd = connect_unix_domain_socket(slot);
543         }
544         else
545         {
546                 fd = connect_inet_domain_socket(slot);
547         }
548
549         if (fd < 0)
550         {
551                 pool_error("connection to %s(%d) failed", b->backend_hostname, b->backend_port);
552                 return NULL;
553         }
554
555         cp->con = pool_open(fd);
556         cp->closetime = 0;
557         return cp;
558 }
559
560 /*
561  * create actual connections to backends
562  */
563 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
564 {
565         POOL_CONNECTION_POOL_SLOT *s;
566         int active_backend_count = 0;
567         int i;
568
569         for (i=0;i<NUM_BACKENDS;i++)
570         {
571                 pool_debug("new_connection: connecting %d backend", i);
572
573                 if (!VALID_BACKEND(i))
574                 {
575                         pool_debug("new_connection: skipping slot %d because backend_status = %d",
576                                            i, BACKEND_INFO(i).backend_status);
577                         continue;
578                 }
579
580                 s = malloc(sizeof(POOL_CONNECTION_POOL_SLOT));
581                 if (s == NULL)
582                 {
583                         pool_error("new_connection: malloc() failed");
584                         return NULL;
585                 }
586
587                 if (create_cp(s, i) == NULL)
588                 {
589                         /* connection failed. mark this backend down */
590                         pool_error("new_connection: create_cp() failed");
591
592                         /* send failover request to parent if operated in pgpool-I mode */
593                         /* notice_backend_error() returns immediately if in pgpool-II */
594                         notice_backend_error(i);
595                         child_exit(1);
596                 }
597
598                 p->slots[i] = s;
599
600                 if (pool_init_params(&s->con->params))
601                 {
602                         return NULL;
603                 }
604
605                 BACKEND_INFO(i).backend_status = CON_UP;
606                 active_backend_count++;
607         }
608
609         if (active_backend_count > 0)
610         {
611                 p->info->create_time = time(NULL);
612                 return p;
613         }
614
615         return NULL;
616 }
617
618 /* check_socket_status()
619  * RETURN: 0 => OK
620  *        -1 => broken socket.
621  */
622 static int check_socket_status(int fd)
623 {
624         fd_set rfds;
625         int result;
626         struct timeval t;
627
628         for (;;)
629         {
630                 FD_ZERO(&rfds);
631                 FD_SET(fd, &rfds);
632
633                 t.tv_sec = t.tv_usec = 0;
634
635                 result = select(fd+1, &rfds, NULL, NULL, &t);
636                 if (result < 0 && errno == EINTR)
637                 {
638                         continue;
639                 }
640                 else
641                 {
642                         return (result == 0 ? 0 : -1);
643                 }
644         }
645
646         return -1;
647 }