2 * $Header: /cvsroot/pgpool/pgpool-II/pcp/pcp_stream.c,v 1.4 2008/01/29 01:56:38 y-asaba Exp $
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
7 * Copyright (c) 2003-2008 PgPool Global Development Group
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
20 * PCP buffer management module.
28 #include <sys/types.h>
33 #include "pcp_stream.h"
35 static int consume_pending_data(PCP_CONNECTION *pc, void *data, int len);
36 static int save_pending_data(PCP_CONNECTION *pc, void *data, int len);
37 static int pcp_check_fd(PCP_CONNECTION *pc, int notimeout);
39 /* --------------------------------
40 * pcp_open - allocate read & write buffers for PCP_CONNECION
42 * return newly allocated PCP_CONNECTION on success, NULL if malloc() fails
43 * --------------------------------
50 pc = (PCP_CONNECTION *)malloc(sizeof(PCP_CONNECTION));
56 memset(pc, 0, sizeof(*pc));
58 /* initialize write buffer */
59 pc->wbuf = malloc(WRITEBUFSZ);
62 pc->wbufsz = WRITEBUFSZ;
65 /* initialize pending data buffer */
66 pc->hp = malloc(READBUFSZ);
72 pc->bufsz = READBUFSZ;
81 /* --------------------------------
82 * pcp_close - deallocate read & write buffers for PCP_CONNECION
83 * --------------------------------
86 pcp_close(PCP_CONNECTION *pc)
94 /* --------------------------------
95 * pcp_read - read 'len' bytes from 'pc'
97 * return 0 on success, -1 otherwise
98 * --------------------------------
101 pcp_read(PCP_CONNECTION *pc, void *buf, int len)
103 static char readbuf[READBUFSZ];
109 consume_size = consume_pending_data(pc, buf, len);
115 if (pcp_check_fd(pc, notimeout))
117 errorcode = TIMEOUTERR;
121 readlen = read(pc->fd, readbuf, READBUFSZ);
124 if (errno == EAGAIN || errno == EINTR)
130 else if (readlen == 0)
138 /* overrun. we need to save remaining data to pending buffer */
139 if (save_pending_data(pc, readbuf+len, readlen-len))
141 errorcode = NOMEMERR;
144 memmove(buf, readbuf, len);
148 memmove(buf, readbuf, readlen);
156 /* --------------------------------
157 * pcp_write - write 'len' bytes to 'pc' buffer
159 * return 0 on success, -1 otherwise
160 * --------------------------------
163 pcp_write(PCP_CONNECTION *pc, void *buf, int len)
169 errorcode = INVALERR;
173 /* check buffer size */
174 reqlen = pc->wbufpo + len;
176 if (reqlen > pc->wbufsz)
180 reqlen = (reqlen/WRITEBUFSZ+1)*WRITEBUFSZ;
181 p = realloc(pc->wbuf, reqlen);
184 errorcode = NOMEMERR;
192 memcpy(pc->wbuf+pc->wbufpo, buf, len);
198 /* --------------------------------
199 * pcp_flush - send pending data in buffer to 'pc'
201 * return 0 on success, -1 otherwise
202 * --------------------------------
205 pcp_flush(PCP_CONNECTION *pc)
223 sts = write(pc->fd, pc->wbuf + offset, wlen);
231 /* write completed */
237 errorcode = WRITEERR;
243 /* need to write remaining data */
248 else if (errno == EAGAIN || errno == EINTR)
254 errorcode = WRITEERR;
264 /* --------------------------------
265 * consume_pending_data - read pending data from 'pc' buffer
267 * return the size of data read in
268 * --------------------------------
271 consume_pending_data(PCP_CONNECTION *pc, void *data, int len)
278 consume_size = Min(len, pc->len);
279 memmove(data, pc->hp + pc->po, consume_size);
280 pc->len -= consume_size;
285 pc->po += consume_size;
290 /* --------------------------------
291 * save_pending_data - save excessively read data into 'pc' buffer
293 * return 0 on success, -1 otherwise
294 * --------------------------------
297 save_pending_data(PCP_CONNECTION *pc, void *data, int len)
307 reqlen = pc->po + pc->len + len;
309 /* pending buffer is enough? */
310 if (reqlen > pc->bufsz)
312 /* too small, enlarge it */
313 realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
314 p = realloc(pc->hp, realloc_size);
317 errorcode = NOMEMERR;
320 pc->bufsz = realloc_size;
324 memmove(pc->hp + pc->po + pc->len, data, len);
330 /* --------------------------------
331 * pcp_check_fd - watch for fd which is ready to be read
333 * return 0 on success, -1 otherwise
334 * --------------------------------
337 pcp_check_fd(PCP_CONNECTION *pc, int notimeout)
343 struct timeval timeout;
351 FD_ZERO(&exceptmask);
352 FD_SET(fd, &readmask);
353 FD_SET(fd, &exceptmask);
355 if (notimeout || (pcp_timeout.tv_sec + pcp_timeout.tv_usec == 0))
359 /***** haven't got timeout option yet. hard-code it *****/
360 timeout.tv_sec = pcp_timeout.tv_sec;
361 timeout.tv_usec = pcp_timeout.tv_usec;
365 fds = select(fd+1, &readmask, NULL, &exceptmask, tp);
369 if (errno == EAGAIN || errno == EINTR)
375 if (FD_ISSET(fd, &exceptmask))