3 * $Header: /cvsroot/pgpool/pgpool-II/pool_stream.c,v 1.14.2.2 2009/08/22 04:19:49 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2009 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * pool_stream.c: stream I/O modules
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
39 #define READBUFSZ 1024
40 #define WRITEBUFSZ 8192
42 static int mystrlen(char *str, int upper, int *flag);
43 static int mystrlinelen(char *str, int upper, int *flag);
44 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
45 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
48 * open read/write file descriptors.
49 * returns POOL_CONNECTION on success otherwise NULL.
51 POOL_CONNECTION *pool_open(int fd)
55 cp = (POOL_CONNECTION *)malloc(sizeof(POOL_CONNECTION));
58 pool_error("pool_open: malloc failed: %s", strerror(errno));
62 memset(cp, 0, sizeof(*cp));
64 /* initialize write buffer */
65 cp->wbuf = malloc(WRITEBUFSZ);
68 pool_error("pool_open: malloc failed");
71 cp->wbufsz = WRITEBUFSZ;
74 /* initialize pending data buffer */
75 cp->hp = malloc(READBUFSZ);
78 pool_error("pool_open: malloc failed");
81 cp->bufsz = READBUFSZ;
94 * close read/write file descriptors.
96 void pool_close(POOL_CONNECTION *cp)
99 * shutdown connection to the client so that pgpool is not blocked
111 pool_discard_params(&cp->params);
116 * read len bytes from cp
117 * returns 0 on success otherwise -1.
119 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
121 static char readbuf[READBUFSZ];
126 consume_size = consume_pending_data(cp, buf, len);
132 if (pool_check_fd(cp))
134 if (!IS_MASTER_NODE_ID(cp->db_node_id))
136 pool_log("pool_read: data is not ready in DB node: %d. abort this session",
142 pool_error("pool_read: pool_check_fd failed (%s)", strerror(errno));
147 readlen = read(cp->fd, readbuf, READBUFSZ);
150 if (errno == EINTR || errno == EAGAIN)
152 pool_debug("pool_read: retrying due to %s", strerror(errno));
156 pool_error("pool_read: read failed (%s)", strerror(errno));
160 /* fatal error, notice to parent and exit */
161 notice_backend_error(cp->db_node_id);
169 else if (readlen == 0)
173 pool_error("pool_read: EOF encountered with backend");
177 /* fatal error, notice to parent and exit */
178 notice_backend_error(IS_MASTER_NODE_ID(cp->db_node_id));
185 * if backend offers authentication method, frontend could close connection
193 /* overrun. we need to save remaining data to pending buffer */
194 if (save_pending_data(cp, readbuf+len, readlen-len))
196 memmove(buf, readbuf, len);
200 memmove(buf, readbuf, readlen);
209 * read exactly len bytes from cp
210 * returns buffer address on success otherwise NULL.
212 char *pool_read2(POOL_CONNECTION *cp, int len)
220 req_size = cp->len + len;
222 if (req_size > cp->bufsz2)
224 alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
225 cp->buf2 = realloc(cp->buf2, alloc_size);
226 if (cp->buf2 == NULL)
228 pool_error("pool_read2: failed to realloc");
231 cp->bufsz2 = alloc_size;
236 consume_size = consume_pending_data(cp, buf, len);
242 if (pool_check_fd(cp))
244 if (!IS_MASTER_NODE_ID(cp->db_node_id))
246 pool_log("pool_read2: data is not ready in DB node:%d. abort this session",
252 pool_error("pool_read2: pool_check_fd failed (%s)", strerror(errno));
257 readlen = read(cp->fd, buf, len);
260 if (errno == EINTR || errno == EAGAIN)
262 pool_debug("pool_read2: retrying due to %s", strerror(errno));
266 pool_error("pool_read2: read failed (%s)", strerror(errno));
270 /* fatal error, notice to parent and exit */
271 notice_backend_error(cp->db_node_id);
279 else if (readlen == 0)
283 pool_error("pool_read2: EOF encountered with backend");
287 /* fatal error, notice to parent and exit */
288 notice_backend_error(IS_MASTER_NODE_ID(cp->db_node_id));
295 * if backend offers authentication method, frontend could close connection
309 * write len bytes to cp the write buffer.
310 * returns 0 on success otherwise -1.
312 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
316 pool_error("pool_write: invalid request size: %d", len);
325 int remainder = WRITEBUFSZ - cp->wbufpo;
327 if (cp->wbufpo >= WRITEBUFSZ)
330 * Write buffer is full. so flush buffer.
331 * wbufpo is reset in pool_flush_it().
333 if (pool_flush_it(cp) == -1)
335 remainder = WRITEBUFSZ;
338 /* check buffer size */
339 if (remainder >= len)
341 /* OK, buffer size is enough. */
344 memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
345 cp->wbufpo += remainder;
356 int pool_flush_it(POOL_CONNECTION *cp)
381 FD_ZERO(&exceptmask);
382 FD_SET(cp->fd, &writemask);
383 FD_SET(cp->fd, &exceptmask);
385 sts = select(cp->fd+1, NULL, &writemask, &exceptmask, NULL);
388 if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
391 pool_error("pool_flush_it: select() failed. reason: %s", strerror(errno));
399 else if (FD_ISSET(cp->fd, &exceptmask))
401 pool_log("pool_flush_it: exception occured");
407 sts = write(cp->fd, cp->wbuf + offset, wlen);
415 /* write completed */
421 pool_error("pool_flush_it: invalid write size %d", sts);
428 /* need to write remaining data */
434 else if (errno == EAGAIN || errno == EINTR)
441 pool_error("pool_flush_it: write failed (%s) offset: %d wlen: %d",
442 strerror(errno), offset, wlen);
454 * flush write buffer and degenerate/failover if error occurs
456 int pool_flush(POOL_CONNECTION *cp)
458 if (pool_flush_it(cp) == -1)
462 notice_backend_error(cp->db_node_id);
468 * ignore error on frontend. we need to continue the
469 * processing with backends
478 * combo of pool_write and pool_flush
480 int pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
482 if (pool_write(cp, buf, len))
484 return pool_flush(cp);
488 * read a string until EOF or NULL is encountered.
489 * if line is not 0, read until new line is encountered.
491 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
501 static char pbuf[READBUFSZ];
507 /* initialize read buffer */
510 cp->sbuf = malloc(READBUFSZ);
511 if (cp->sbuf == NULL)
513 pool_error("pool_read_string: malloc failed");
516 cp->sbufsz = READBUFSZ;
520 /* any pending data? */
524 strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
526 strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
528 /* buffer is too small? */
529 if ((strlength + 1) > cp->sbufsz)
531 cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
532 cp->sbuf = realloc(cp->sbuf, cp->sbufsz);
533 if (cp->sbuf == NULL)
535 pool_error("pool_read_string: realloc failed");
540 /* consume pending and save to read string buffer */
541 consume_size = consume_pending_data(cp, cp->sbuf, strlength);
545 /* is the string null terminated? */
546 if (consume_size == strlength && !flag)
548 /* not null or line terminated.
549 * we need to read more since we have not encountered NULL or new line yet
551 readsize = cp->sbufsz - strlength;
556 pool_debug("pool_read_string: read all from pending data. po:%d len:%d",
562 readsize = cp->sbufsz;
567 if (pool_check_fd(cp))
569 if (!IS_MASTER_NODE_ID(cp->db_node_id))
571 pool_log("pool_read_string: data is not ready in DB node:%d. abort this session",
577 pool_error("pool_read_string: pool_check_fd failed (%s)", strerror(errno));
582 readlen = read(cp->fd, cp->sbuf+readp, readsize);
585 pool_error("pool_read_string: read() failed. reason:%s", strerror(errno));
589 notice_backend_error(cp->db_node_id);
597 else if (readlen == 0) /* EOF detected */
600 * just returns an error, not trigger failover or degeneration
602 pool_error("pool_read_string: read () EOF detected");
608 strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
610 strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
612 if (strlength < readlen)
614 save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
616 pool_debug("pool_read_string: total result %d with pending data po:%d len:%d", *len, cp->po, cp->len);
622 /* encountered null or newline? */
625 /* ok we have read all data */
626 pool_debug("pool_read_string: total result %d ", *len);
631 readsize = READBUFSZ;
633 if ((*len+readsize) > cp->sbufsz)
635 cp->sbufsz += READBUFSZ;
637 cp->sbuf = realloc(cp->sbuf, cp->sbufsz);
638 if (cp->sbuf == NULL)
640 pool_error("pool_read_string: realloc failed");
649 * returns the byte length of str, including \0, no more than upper.
650 * if encountered \0, flag is set to non 0.
652 * mystrlen("abc", 2) returns 2
653 * mystrlen("abc", 3) returns 3
654 * mystrlen("abc", 4) returns 4
655 * mystrlen("abc", 5) returns 4
657 static int mystrlen(char *str, int upper, int *flag)
663 for (len = 0;len < upper; len++, str++)
676 * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
677 * if encountered \0 or \n, flag is set to non 0.
679 * mystrlinelen("abc", 2) returns 2
680 * mystrlinelen("abc", 3) returns 3
681 * mystrlinelen("abc", 4) returns 4
682 * mystrlinelen("abc", 5) returns 4
683 * mystrlinelen("abcd\nefg", 4) returns 4
684 * mystrlinelen("abcd\nefg", 5) returns 5
685 * mystrlinelen("abcd\nefg", 6) returns 5
687 static int mystrlinelen(char *str, int upper, int *flag)
693 for (len = 0;len < upper; len++, str++)
695 if (!*str || *str == '\n')
708 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
718 reqlen = cp->po + cp->len + len;
720 /* pending buffer is enough? */
721 if (reqlen > cp->bufsz)
723 /* too small, enlarge it */
724 realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
725 p = realloc(cp->hp, realloc_size);
728 pool_error("save_pending_data: realloc failed");
732 cp->bufsz = realloc_size;
736 memmove(cp->hp + cp->po + cp->len, data, len);
743 * consume pending data. returns actually consumed data length.
745 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
752 consume_size = Min(len, cp->len);
753 memmove(data, cp->hp + cp->po, consume_size);
754 cp->len -= consume_size;
759 cp->po += consume_size;
765 * pool_unread: Put back data to input buffer
767 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
770 int n = cp->len + len;
775 realloc_size = (n/READBUFSZ+1)*READBUFSZ;
776 p = realloc(cp->hp, realloc_size);
779 pool_error("pool_unread: realloc failed");
785 memmove(p + len, cp->hp + cp->po, cp->len);
786 memmove(p, data, len);