]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_stream.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_stream.c
1 /* -*-pgsql-c-*- */
2 /*
3 * $Header: /cvsroot/pgpool/pgpool-II/pool_stream.c,v 1.14.2.2 2009/08/22 04:19:49 t-ishii Exp $
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2009       PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose.  It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_stream.c: stream I/O modules
22 *
23 */
24
25 #include "config.h"
26
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <unistd.h>
36
37 #include "pool.h"
38
39 #define READBUFSZ 1024
40 #define WRITEBUFSZ 8192
41
42 static int mystrlen(char *str, int upper, int *flag);
43 static int mystrlinelen(char *str, int upper, int *flag);
44 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
45 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
46
47 /*
48 * open read/write file descriptors.
49 * returns POOL_CONNECTION on success otherwise NULL.
50 */
51 POOL_CONNECTION *pool_open(int fd)
52 {
53         POOL_CONNECTION *cp;
54
55         cp = (POOL_CONNECTION *)malloc(sizeof(POOL_CONNECTION));
56         if (cp == NULL)
57         {
58                 pool_error("pool_open: malloc failed: %s", strerror(errno));
59                 return NULL;
60         }
61
62         memset(cp, 0, sizeof(*cp));
63
64         /* initialize write buffer */
65         cp->wbuf = malloc(WRITEBUFSZ);
66         if (cp->wbuf == NULL)
67         {
68                 pool_error("pool_open: malloc failed");
69                 return NULL;
70         }
71         cp->wbufsz = WRITEBUFSZ;
72         cp->wbufpo = 0;
73
74         /* initialize pending data buffer */
75         cp->hp = malloc(READBUFSZ);
76         if (cp->hp == NULL)
77         {
78                 pool_error("pool_open: malloc failed");
79                 return NULL;
80         }
81         cp->bufsz = READBUFSZ;
82         cp->po = 0;
83         cp->len = 0;
84         cp->sbuf = NULL;
85         cp->sbufsz = 0;
86         cp->buf2 = NULL;
87         cp->sbufsz = 0;
88
89         cp->fd = fd;
90         return cp;
91 }
92
93 /*
94 * close read/write file descriptors.
95 */
96 void pool_close(POOL_CONNECTION *cp)
97 {
98         /*
99          * shutdown connection to the client so that pgpool is not blocked
100          */
101         if (!cp->isbackend)
102                 shutdown(cp->fd, 1);
103         close(cp->fd);
104
105         free(cp->wbuf);
106         free(cp->hp);
107         if (cp->sbuf)
108                 free(cp->sbuf);
109         if (cp->buf2)
110                 free(cp->buf2);
111         pool_discard_params(&cp->params);
112         free(cp);
113 }
114
115 /*
116 * read len bytes from cp
117 * returns 0 on success otherwise -1.
118 */
119 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
120 {
121         static char readbuf[READBUFSZ];
122
123         int consume_size;
124         int readlen;
125
126         consume_size = consume_pending_data(cp, buf, len);
127         len -= consume_size;
128         buf += consume_size;
129
130         while (len > 0)
131         {
132                 if (pool_check_fd(cp))
133                 {
134                         if (!IS_MASTER_NODE_ID(cp->db_node_id))
135                         {
136                                 pool_log("pool_read: data is not ready in DB node: %d. abort this session",
137                                                  cp->db_node_id);
138                                 exit(1);
139                         }
140                         else
141                         {
142                                 pool_error("pool_read: pool_check_fd failed (%s)", strerror(errno));
143                             return -1;
144                         }
145                 }
146
147                 readlen = read(cp->fd, readbuf, READBUFSZ);
148                 if (readlen == -1)
149                 {
150                         if (errno == EINTR || errno == EAGAIN)
151                         {
152                                 pool_debug("pool_read: retrying due to %s", strerror(errno));
153                                 continue;
154                         }
155
156                         pool_error("pool_read: read failed (%s)", strerror(errno));
157
158                         if (cp->isbackend)
159                         {
160                             /* fatal error, notice to parent and exit */
161                                 notice_backend_error(cp->db_node_id);
162                                 child_exit(1);
163                         }
164                         else
165                         {
166                             return -1;
167                         }
168                 }
169                 else if (readlen == 0)
170                 {
171                         if (cp->isbackend)
172                         {
173                                 pool_error("pool_read: EOF encountered with backend");
174                                 return -1;
175
176 #ifdef NOT_USED
177                             /* fatal error, notice to parent and exit */
178                             notice_backend_error(IS_MASTER_NODE_ID(cp->db_node_id));
179                                 child_exit(1);
180 #endif
181                         }
182                         else
183                         {
184                                 /*
185                                  * if backend offers authentication method, frontend could close connection
186                                  */
187                                 return -1;
188                         }
189                 }
190
191                 if (len < readlen)
192                 {
193                         /* overrun. we need to save remaining data to pending buffer */
194                         if (save_pending_data(cp, readbuf+len, readlen-len))
195                                 return -1;
196                         memmove(buf, readbuf, len);
197                         break;
198                 }
199
200                 memmove(buf, readbuf, readlen);
201                 buf += readlen;
202                 len -= readlen;
203         }
204
205         return 0;
206 }
207
208 /*
209 * read exactly len bytes from cp
210 * returns buffer address on success otherwise NULL.
211 */
212 char *pool_read2(POOL_CONNECTION *cp, int len)
213 {
214         char *buf;
215         int req_size;
216         int alloc_size;
217         int consume_size;
218         int readlen;
219
220         req_size = cp->len + len;
221
222         if (req_size > cp->bufsz2)
223         {
224                 alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
225                 cp->buf2 = realloc(cp->buf2, alloc_size);
226                 if (cp->buf2 == NULL)
227                 {
228                         pool_error("pool_read2: failed to realloc");
229                         exit(1);
230                 }
231                 cp->bufsz2 = alloc_size;
232         }
233
234         buf = cp->buf2;
235
236         consume_size = consume_pending_data(cp, buf, len);
237         len -= consume_size;
238         buf += consume_size;
239
240         while (len > 0)
241         {
242                 if (pool_check_fd(cp))
243                 {
244                         if (!IS_MASTER_NODE_ID(cp->db_node_id))
245                         {
246                                 pool_log("pool_read2: data is not ready in DB node:%d. abort this session",
247                                                  cp->db_node_id);
248                                 exit(1);
249                         }
250                         else
251                         {
252                                 pool_error("pool_read2: pool_check_fd failed (%s)", strerror(errno));
253                             return NULL;
254                         }
255                 }
256
257                 readlen = read(cp->fd, buf, len);
258                 if (readlen == -1)
259                 {
260                         if (errno == EINTR || errno == EAGAIN)
261                         {
262                                 pool_debug("pool_read2: retrying due to %s", strerror(errno));
263                                 continue;
264                         }
265
266                         pool_error("pool_read2: read failed (%s)", strerror(errno));
267
268                         if (cp->isbackend)
269                         {
270                             /* fatal error, notice to parent and exit */
271                                 notice_backend_error(cp->db_node_id);
272                                 child_exit(1);
273                         }
274                         else
275                         {
276                             return NULL;
277                         }
278                 }
279                 else if (readlen == 0)
280                 {
281                         if (cp->isbackend)
282                         {
283                                 pool_error("pool_read2: EOF encountered with backend");
284                                 return NULL;
285
286 #ifdef NOT_USED
287                             /* fatal error, notice to parent and exit */
288                             notice_backend_error(IS_MASTER_NODE_ID(cp->db_node_id));
289                                 child_exit(1);
290 #endif
291                         }
292                         else
293                         {
294                                 /*
295                                  * if backend offers authentication method, frontend could close connection
296                                  */
297                                 return NULL;
298                         }
299                 }
300
301                 buf += readlen;
302                 len -= readlen;
303         }
304
305         return cp->buf2;
306 }
307
308 /*
309 * write len bytes to cp the write buffer.
310 * returns 0 on success otherwise -1.
311 */
312 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
313 {
314         if (len < 0)
315         {
316                 pool_error("pool_write: invalid request size: %d", len);
317                 return -1;
318         }
319
320         if (cp->no_forward)
321                 return 0;
322
323         while (len > 0)
324         {
325                 int remainder = WRITEBUFSZ - cp->wbufpo;
326
327                 if (cp->wbufpo >= WRITEBUFSZ)
328                 {
329                         /*
330                          * Write buffer is full. so flush buffer.
331                          * wbufpo is reset in pool_flush_it().
332                          */
333                         if (pool_flush_it(cp) == -1)
334                                 return -1;
335                         remainder = WRITEBUFSZ;
336                 }
337
338                 /* check buffer size */
339                 if (remainder >= len)
340                 {
341                         /* OK, buffer size is enough. */
342                         remainder = len;
343                 }
344                 memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
345                 cp->wbufpo += remainder;
346                 buf += remainder;
347                 len -= remainder;
348         }
349
350         return 0;
351 }
352
353 /*
354  * flush write buffer
355  */
356 int pool_flush_it(POOL_CONNECTION *cp)
357 {
358         int sts;
359         int wlen;
360         int offset;
361         wlen = cp->wbufpo;
362
363         if (wlen == 0)
364         {
365                 return 0;
366         }
367
368         offset = 0;
369
370         for (;;)
371         {
372                 errno = 0;
373
374 #ifdef NOT_USED
375                 if (!cp->isbackend)
376                 {
377                         fd_set  writemask;
378                         fd_set  exceptmask;
379
380                         FD_ZERO(&writemask);
381                         FD_ZERO(&exceptmask);
382                         FD_SET(cp->fd, &writemask);
383                         FD_SET(cp->fd, &exceptmask);
384
385                         sts = select(cp->fd+1, NULL, &writemask, &exceptmask, NULL);
386                         if (sts == -1)
387                         {
388                                 if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
389                                         continue;
390
391                                 pool_error("pool_flush_it: select() failed. reason: %s", strerror(errno));
392                                 cp->wbufpo = 0;
393                                 return -1;
394                         }
395                         else if (sts == 0)
396                         {
397                                 continue;
398                         }
399                         else if (FD_ISSET(cp->fd, &exceptmask))
400                         {
401                                 pool_log("pool_flush_it: exception occured");
402                                 cp->wbufpo = 0;
403                                 return -1;
404                         }
405                 }
406 #endif
407                 sts = write(cp->fd, cp->wbuf + offset, wlen);
408
409                 if (sts > 0)
410                 {
411                         wlen -= sts;
412
413                         if (wlen == 0)
414                         {
415                                 /* write completed */
416                                 break;
417                         }
418
419                         else if (wlen < 0)
420                         {
421                                 pool_error("pool_flush_it: invalid write size %d", sts);
422                                 cp->wbufpo = 0;
423                                 return -1;
424                         }
425
426                         else
427                         {
428                                 /* need to write remaining data */
429                                 offset += sts;
430                                 continue;
431                         }
432                 }
433
434                 else if (errno == EAGAIN || errno == EINTR)
435                 {
436                         continue;
437                 }
438
439                 else
440                 {
441                         pool_error("pool_flush_it: write failed (%s) offset: %d wlen: %d",
442                                            strerror(errno), offset, wlen);
443                         cp->wbufpo = 0;
444                         return -1;
445                 }
446         }
447
448         cp->wbufpo = 0;
449
450         return 0;
451 }
452
453 /*
454 * flush write buffer and degenerate/failover if error occurs
455 */
456 int pool_flush(POOL_CONNECTION *cp)
457 {
458         if (pool_flush_it(cp) == -1)
459         {
460                 if (cp->isbackend)
461                 {
462                         notice_backend_error(cp->db_node_id);
463                         child_exit(1);
464                 }
465                 else
466                 {
467                         /*
468                          * ignore error on frontend. we need to continue the
469                          * processing with backends
470                          */
471                         return 0;
472                 }
473         }
474         return 0;
475 }
476
477 /*
478 * combo of pool_write and pool_flush
479 */
480 int pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
481 {
482         if (pool_write(cp, buf, len))
483                 return -1;
484         return pool_flush(cp);
485 }
486
487 /*
488  * read a string until EOF or NULL is encountered.
489  * if line is not 0, read until new line is encountered.
490 */
491 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
492 {
493         int readp;
494         int readsize;
495         int readlen;
496         int strlength;
497         int flag;
498         int consume_size;
499
500 #ifdef DEBUG
501         static char pbuf[READBUFSZ];
502 #endif
503
504         *len = 0;
505         readp = 0;
506
507         /* initialize read buffer */
508         if (cp->sbufsz == 0)
509         {
510                 cp->sbuf = malloc(READBUFSZ);
511                 if (cp->sbuf == NULL)
512                 {
513                         pool_error("pool_read_string: malloc failed");
514                         return NULL;
515                 }
516                 cp->sbufsz = READBUFSZ;
517                 *cp->sbuf = '\0';
518         }
519
520         /* any pending data? */
521         if (cp->len)
522         {
523                 if (line)
524                         strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
525                 else
526                         strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
527
528                 /* buffer is too small? */
529                 if ((strlength + 1) > cp->sbufsz)
530                 {
531                         cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
532                         cp->sbuf = realloc(cp->sbuf, cp->sbufsz);
533                         if (cp->sbuf == NULL)
534                         {
535                                 pool_error("pool_read_string: realloc failed");
536                                 return NULL;
537                         }
538                 }
539
540                 /* consume pending and save to read string buffer */
541                 consume_size = consume_pending_data(cp, cp->sbuf, strlength);
542
543                 *len = strlength;
544
545                 /* is the string null terminated? */
546                 if (consume_size == strlength && !flag)
547                 {
548                         /* not null or line terminated.
549                          * we need to read more since we have not encountered NULL or new line yet
550                          */
551                         readsize = cp->sbufsz - strlength;
552                         readp = strlength;
553                 }
554                 else
555                 {
556                         pool_debug("pool_read_string: read all from pending data. po:%d len:%d",
557                                            cp->po, cp->len);
558                         return cp->sbuf;
559                 }
560         } else
561         {
562                 readsize = cp->sbufsz;
563         }
564
565         for (;;)
566         {
567                 if (pool_check_fd(cp))
568                 {
569                         if (!IS_MASTER_NODE_ID(cp->db_node_id))
570                         {
571                                 pool_log("pool_read_string: data is not ready in DB node:%d. abort this session",
572                                                  cp->db_node_id);
573                                 exit(1);
574                         }
575                         else
576                         {
577                                 pool_error("pool_read_string: pool_check_fd failed (%s)", strerror(errno));
578                             return NULL;
579                         }
580                 }
581
582                 readlen = read(cp->fd, cp->sbuf+readp, readsize);
583                 if (readlen == -1)
584                 {
585                         pool_error("pool_read_string: read() failed. reason:%s", strerror(errno));
586
587                         if (cp->isbackend)
588                         {
589                                 notice_backend_error(cp->db_node_id);
590                                 child_exit(1);
591                         }
592                         else
593                         {
594                             return NULL;
595                         }
596                 }
597                 else if (readlen == 0)  /* EOF detected */
598                 {
599                         /*
600                          * just returns an error, not trigger failover or degeneration
601                          */
602                         pool_error("pool_read_string: read () EOF detected");
603                         return NULL;
604                 }
605
606                 /* check overrun */
607                 if (line)
608                         strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
609                 else
610                         strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
611
612                 if (strlength < readlen)
613                 {
614                         save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
615                         *len += strlength;
616                         pool_debug("pool_read_string: total result %d with pending data po:%d len:%d", *len, cp->po, cp->len);
617                         return cp->sbuf;
618                 }
619
620                 *len += readlen;
621
622                 /* encountered null or newline? */
623                 if (flag)
624                 {
625                         /* ok we have read all data */
626                         pool_debug("pool_read_string: total result %d ", *len);
627                         break;
628                 }
629
630                 readp += readlen;
631                 readsize = READBUFSZ;
632
633                 if ((*len+readsize) > cp->sbufsz)
634                 {
635                         cp->sbufsz += READBUFSZ;
636
637                         cp->sbuf = realloc(cp->sbuf, cp->sbufsz);
638                         if (cp->sbuf == NULL)
639                         {
640                                 pool_error("pool_read_string: realloc failed");
641                                 return NULL;
642                         }
643                 }
644         }
645         return cp->sbuf;
646 }
647
648 /*
649  * returns the byte length of str, including \0, no more than upper.
650  * if encountered \0, flag is set to non 0.
651  * example:
652  *      mystrlen("abc", 2) returns 2
653  *      mystrlen("abc", 3) returns 3
654  *      mystrlen("abc", 4) returns 4
655  *      mystrlen("abc", 5) returns 4
656  */
657 static int mystrlen(char *str, int upper, int *flag)
658 {
659         int len;
660
661         *flag = 0;
662
663         for (len = 0;len < upper; len++, str++)
664         {
665             if (!*str)
666             {
667                         len++;
668                         *flag = 1;
669                         break;
670             }
671         }
672         return len;
673 }
674
675 /*
676  * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
677  * if encountered \0 or \n, flag is set to non 0.
678  * example:
679  *      mystrlinelen("abc", 2) returns 2
680  *      mystrlinelen("abc", 3) returns 3
681  *      mystrlinelen("abc", 4) returns 4
682  *      mystrlinelen("abc", 5) returns 4
683  *      mystrlinelen("abcd\nefg", 4) returns 4
684  *      mystrlinelen("abcd\nefg", 5) returns 5
685  *      mystrlinelen("abcd\nefg", 6) returns 5
686  */
687 static int mystrlinelen(char *str, int upper, int *flag)
688 {
689         int len;
690
691         *flag = 0;
692
693         for (len = 0;len < upper; len++, str++)
694         {
695             if (!*str || *str == '\n')
696             {
697                         len++;
698                         *flag = 1;
699                         break;
700             }
701         }
702         return len;
703 }
704
705 /*
706  * save pending data
707  */
708 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
709 {
710         int reqlen;
711         size_t realloc_size;
712         char *p;
713
714         /* to be safe */
715         if (cp->len == 0)
716                 cp->po = 0;
717
718         reqlen = cp->po + cp->len + len;
719
720         /* pending buffer is enough? */
721         if (reqlen > cp->bufsz)
722         {
723                 /* too small, enlarge it */
724                 realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
725                 p = realloc(cp->hp, realloc_size);
726                 if (p == NULL)
727                 {
728                         pool_error("save_pending_data: realloc failed");
729                         return -1;
730                 }
731
732                 cp->bufsz = realloc_size;
733                 cp->hp = p;
734         }
735
736         memmove(cp->hp + cp->po + cp->len, data, len);
737         cp->len += len;
738
739         return 0;
740 }
741
742 /*
743  * consume pending data. returns actually consumed data length.
744  */
745 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
746 {
747         int consume_size;
748
749         if (cp->len <= 0)
750                 return 0;
751
752         consume_size = Min(len, cp->len);
753         memmove(data, cp->hp + cp->po, consume_size);
754         cp->len -= consume_size;
755
756         if (cp->len <= 0)
757                 cp->po = 0;
758         else
759                 cp->po += consume_size;
760
761         return consume_size;
762 }
763
764 /*
765  * pool_unread: Put back data to input buffer
766  */
767 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
768 {
769         void *p = cp->hp;
770         int n = cp->len + len;
771         int realloc_size;
772
773         if (cp->bufsz < n)
774         {
775                 realloc_size = (n/READBUFSZ+1)*READBUFSZ;
776                 p = realloc(cp->hp, realloc_size);
777                 if (p == NULL)
778                 {
779                         pool_error("pool_unread: realloc failed");
780                         return -1;
781                 }
782                 cp->hp = p;
783         }
784         if (cp->len != 0)
785                 memmove(p + len, cp->hp + cp->po, cp->len);
786         memmove(p, data, len);
787         cp->len = n;
788         cp->po = 0;
789         return 0;
790 }