]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pcp/pcp_stream.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pcp / pcp_stream.c
1 /*
2  * $Header: /cvsroot/pgpool/pgpool-II/pcp/pcp_stream.c,v 1.4 2008/01/29 01:56:38 y-asaba Exp $
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL 
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2008      PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * PCP buffer management module.
21  */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <time.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #include "pcp.h"
33 #include "pcp_stream.h"
34
35 static int consume_pending_data(PCP_CONNECTION *pc, void *data, int len);
36 static int save_pending_data(PCP_CONNECTION *pc, void *data, int len);
37 static int pcp_check_fd(PCP_CONNECTION *pc, int notimeout);
38
39 /* --------------------------------
40  * pcp_open - allocate read & write buffers for PCP_CONNECION
41  *
42  * return newly allocated PCP_CONNECTION on success, NULL if malloc() fails
43  * --------------------------------
44  */
45 PCP_CONNECTION *
46 pcp_open(int fd)
47 {
48     PCP_CONNECTION *pc;
49
50     pc = (PCP_CONNECTION *)malloc(sizeof(PCP_CONNECTION));
51     if (pc == NULL)
52         {
53                 errorcode = NOMEMERR;
54         return NULL;
55         }
56     memset(pc, 0, sizeof(*pc));
57
58     /* initialize write buffer */
59     pc->wbuf = malloc(WRITEBUFSZ);
60     if (pc->wbuf == NULL)
61         return NULL;
62     pc->wbufsz = WRITEBUFSZ;
63     pc->wbufpo = 0;
64
65     /* initialize pending data buffer */
66     pc->hp = malloc(READBUFSZ);
67     if (pc->hp == NULL)
68         {
69                 errorcode = NOMEMERR;
70         return NULL;
71         }
72     pc->bufsz = READBUFSZ;
73     pc->po = 0;
74     pc->len = 0;
75
76     pc->fd = fd;
77
78     return pc;
79 }
80
81 /* --------------------------------
82  * pcp_close - deallocate read & write buffers for PCP_CONNECION
83  * --------------------------------
84  */
85 void
86 pcp_close(PCP_CONNECTION *pc)
87 {
88     close(pc->fd);
89     free(pc->wbuf);
90     free(pc->hp);
91     free(pc);
92 }
93
94 /* --------------------------------
95  * pcp_read - read 'len' bytes from 'pc'
96  *
97  * return 0 on success, -1 otherwise
98  * --------------------------------
99  */
100 int
101 pcp_read(PCP_CONNECTION *pc, void *buf, int len)
102 {
103     static char readbuf[READBUFSZ];
104
105     int consume_size;
106     int readlen;
107     int notimeout = 0;
108
109     consume_size = consume_pending_data(pc, buf, len);
110     len -= consume_size;
111     buf += consume_size;
112
113     while (len > 0)
114     {
115         if (pcp_check_fd(pc, notimeout))
116                 {
117                         errorcode = TIMEOUTERR;
118                         return -1;
119                 }
120
121         readlen = read(pc->fd, readbuf, READBUFSZ);
122         if (readlen == -1)
123         {
124             if (errno == EAGAIN || errno == EINTR)
125                 continue;
126
127                         errorcode = READERR;
128                         return -1;
129         }
130         else if (readlen == 0)
131                 {
132                         errorcode = EOFERR;
133                         return -1;
134                 }
135
136         if (len < readlen)
137         {
138             /* overrun. we need to save remaining data to pending buffer */
139             if (save_pending_data(pc, readbuf+len, readlen-len))
140                         {
141                                 errorcode = NOMEMERR;
142                 return -1;
143                         }
144             memmove(buf, readbuf, len);
145             break;
146         }
147
148         memmove(buf, readbuf, readlen);
149         buf += readlen;
150         len -= readlen;
151     }
152
153     return 0;
154 }
155
156 /* --------------------------------
157  * pcp_write - write 'len' bytes to 'pc' buffer
158  *
159  * return 0 on success, -1 otherwise
160  * --------------------------------
161  */
162 int
163 pcp_write(PCP_CONNECTION *pc, void *buf, int len)
164 {
165     int reqlen;
166
167     if (len < 0)
168         {
169                 errorcode = INVALERR;
170         return -1;
171         }
172
173     /* check buffer size */
174     reqlen = pc->wbufpo + len;
175
176     if (reqlen > pc->wbufsz)
177     {
178         char *p;
179
180         reqlen = (reqlen/WRITEBUFSZ+1)*WRITEBUFSZ;
181         p = realloc(pc->wbuf, reqlen);
182         if (p == NULL)
183                 {
184                         errorcode = NOMEMERR;
185                         return -1;
186                 }
187
188         pc->wbuf = p;
189         pc->wbufsz = reqlen;
190     }
191
192     memcpy(pc->wbuf+pc->wbufpo, buf, len);
193     pc->wbufpo += len;
194
195     return 0;
196 }
197
198 /* --------------------------------
199  * pcp_flush - send pending data in buffer to 'pc'
200  *
201  * return 0 on success, -1 otherwise
202  * --------------------------------
203  */
204 int
205 pcp_flush(PCP_CONNECTION *pc)
206 {
207     int sts;
208     int wlen;
209     int offset;
210     wlen = pc->wbufpo;
211
212     if (wlen == 0)
213     {
214         return 0;
215     }
216
217     offset = 0;
218
219     for (;;)
220     {
221         errno = 0;
222
223         sts = write(pc->fd, pc->wbuf + offset, wlen);
224
225         if (sts > 0)
226         {
227             wlen -= sts;
228
229             if (wlen == 0)
230             {
231                 /* write completed */
232                 break;
233             }
234
235             else if (wlen < 0)
236             {
237                                 errorcode = WRITEERR;
238                 return -1;
239             }
240
241             else
242             {
243                 /* need to write remaining data */
244                 offset += sts;
245                 continue;
246             }
247         }
248         else if (errno == EAGAIN || errno == EINTR)
249         {
250             continue;
251         }
252         else
253                 {
254                         errorcode = WRITEERR;
255             return -1;
256                 }
257     }
258
259     pc->wbufpo = 0;
260
261     return 0;
262 }
263
264 /* --------------------------------
265  * consume_pending_data - read pending data from 'pc' buffer
266  *
267  * return the size of data read in
268  * --------------------------------
269  */
270 static int
271 consume_pending_data(PCP_CONNECTION *pc, void *data, int len)
272 {
273     int consume_size;
274
275     if (pc->len <= 0)
276         return 0;
277
278     consume_size = Min(len, pc->len);
279     memmove(data, pc->hp + pc->po, consume_size);
280     pc->len -= consume_size;
281
282     if (pc->len <= 0)
283         pc->po = 0;
284     else
285         pc->po += consume_size;
286
287     return consume_size;
288 }
289
290 /* --------------------------------
291  * save_pending_data - save excessively read data into 'pc' buffer
292  *
293  * return 0 on success, -1 otherwise
294  * --------------------------------
295  */
296 static int
297 save_pending_data(PCP_CONNECTION *pc, void *data, int len)
298 {
299     int reqlen;
300     size_t realloc_size;
301     char *p;
302
303     /* to be safe */
304     if (pc->len == 0)
305         pc->po = 0;
306
307     reqlen = pc->po + pc->len + len;
308
309     /* pending buffer is enough? */
310     if (reqlen > pc->bufsz)
311     {
312         /* too small, enlarge it */
313         realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
314         p = realloc(pc->hp, realloc_size);
315         if (p == NULL)
316                 {
317                         errorcode = NOMEMERR;
318             return -1;
319                 }
320         pc->bufsz = realloc_size;
321         pc->hp = p;
322     }
323
324     memmove(pc->hp + pc->po + pc->len, data, len);
325     pc->len += len;
326
327     return 0;
328 }
329
330 /* --------------------------------
331  * pcp_check_fd - watch for fd which is ready to be read
332  *
333  * return 0 on success, -1 otherwise
334  * --------------------------------
335  */
336 static int
337 pcp_check_fd(PCP_CONNECTION *pc, int notimeout)
338 {
339     fd_set readmask;
340     fd_set exceptmask;
341     int fd;
342     int fds;
343     struct timeval timeout;
344     struct timeval *tp;
345
346     fd = pc->fd;
347
348     for (;;)
349     {
350         FD_ZERO(&readmask);
351         FD_ZERO(&exceptmask);
352         FD_SET(fd, &readmask);
353         FD_SET(fd, &exceptmask);
354
355         if (notimeout || (pcp_timeout.tv_sec + pcp_timeout.tv_usec == 0))
356             tp = NULL;
357         else
358         {
359                         /***** haven't got timeout option yet. hard-code it *****/
360                         timeout.tv_sec = pcp_timeout.tv_sec;
361                         timeout.tv_usec = pcp_timeout.tv_usec;
362                         tp = &timeout;
363         }
364
365         fds = select(fd+1, &readmask, NULL, &exceptmask, tp);
366
367         if (fds == -1)
368         {
369             if (errno == EAGAIN || errno == EINTR)
370                 continue;
371
372             break;
373         }
374
375         if (FD_ISSET(fd, &exceptmask))
376             break;
377
378         if (fds == 0)
379             break;
380
381         return 0;
382     }
383
384     return -1;
385 }