]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_query_cache.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_query_cache.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_query_cache.c,v 1.9.2.1 2009/08/22 04:19:49 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2008      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_query_cache.c: query cache
22  *
23  */
24
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <time.h>
32 #include <sys/time.h>
33 #include <unistd.h>
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
36 #endif
37
38 #include "pool.h"
39 #include "md5.h"
40
41 #define QUERY_CACHE_TABLE_NAME "query_cache"
42 #define CACHE_REGISTER_PREPARED_STMT "register_prepared_stmt"
43 #define DEFAULT_CACHE_SIZE 8192
44
45 #define SYSDB_CON (SYSDB_CONNECTION->con)
46 #define SYSDB_MAJOR (SYSDB_CONNECTION->sp->major)
47 #define CACHE_TABLE_INFO (SYSDB_INFO->query_cache_table_info)
48
49 /* data structure to store RowDescription and DataRow cache */
50 typedef struct
51 {
52         char *md5_query;                        /* query in md5 hushed format*/
53         char *query;                            /* query string */
54         char *cache;                            /* cached data */
55         int   cache_size;                       /* cached data size */
56         int   cache_offset;                     /* points the end of cache */
57         char *db_name;                          /* database name */
58         char *create_time;                      /* cache create timestamp in ISO format */
59 } QueryCacheInfo;
60
61 typedef enum
62 {
63         CACHE_FOUND, CACHE_NOT_FOUND, CACHE_ERROR
64 } CACHE_STATUS;
65
66 static QueryCacheInfo *query_cache_info;
67
68 static CACHE_STATUS search_system_db_for_cache(POOL_CONNECTION *frontend, char *sql, int sql_len, struct timeval *t, char tstate);
69 static int ForwardCacheToFrontend(POOL_CONNECTION *frontend, char *cache, char tstate);
70 static int init_query_cache_info(POOL_CONNECTION *pc, char *database, char *query);
71 static void free_query_cache_info(void);
72 static int malloc_failed(void *p);
73 static int write_cache(void *buf, int len);
74 static char *pq_time_to_str(time_t t);
75 static int system_db_connection_exists(void);
76 static void define_prepared_statements(void);
77
78 /* --------------------------------
79  * pool_clear_cache - clears cache data from the SystemDB
80  *
81  * return 0 on success, -1 otherwise
82  * --------------------------------
83  */
84 int
85 pool_clear_cache_by_time(Interval *interval, int size)
86 {
87         PGresult *pg_result = NULL;
88         long interval_in_days = 0;
89         long interval_in_seconds = 0;
90         time_t query_delete_timepoint;
91         char *query_delete_timepoint_in_str = NULL;
92         int sql_len;
93         char *sql = NULL;
94         int i;
95
96         if (! system_db_connection_exists())
97                 return -1;
98
99         for (i = 0; i < size; i++)
100         {
101                 int q = interval[i].quantity;
102
103                 switch (interval[i].unit)
104                 {
105                         case millennium:
106                         case millenniums:
107                                 q *= 10;
108
109                         case century:
110                         case centuries:
111                                 q *= 10;
112
113                         case decade:
114                         case decades:
115                                 q *= 10;
116
117                         case year:
118                         case years:
119                                 q *= 12;
120
121                         case month:
122                         case months:
123                                 q *= 31;                /* should I change this according to the month? */
124                                 interval_in_days += q;
125                                 break;
126
127                         case week:
128                         case weeks:
129                                 q *= 7;
130
131                         case day:
132                         case days:
133                                 q *= 24;
134
135                         case hour:
136                         case hours:
137                                 q *= 60;
138
139                         case minute:
140                         case minutes:
141                                 q *= 60;
142
143                         case second:
144                         case seconds:
145                                 interval_in_seconds += q;
146                                 break;
147                 }
148         }
149
150         interval_in_seconds = (interval_in_days * 86400) + interval_in_seconds;
151         query_delete_timepoint = time(NULL) - interval_in_seconds;
152
153         query_delete_timepoint_in_str = pq_time_to_str(query_delete_timepoint);
154         if (malloc_failed(query_delete_timepoint_in_str))
155                 return -1;
156
157         sql_len =
158                 strlen(pool_config->system_db_schema) +
159                 strlen(QUERY_CACHE_TABLE_NAME) +
160                 strlen(query_delete_timepoint_in_str) +
161                 64;
162
163         sql = (char *)malloc(sql_len);
164         if (malloc_failed(sql))
165         {
166                 free(query_delete_timepoint_in_str);
167                 return -1;
168         }
169
170         snprintf(sql, sql_len,
171                          "DELETE FROM %s.%s WHERE create_time <= '%s'",
172                          pool_config->system_db_schema,
173                          QUERY_CACHE_TABLE_NAME,
174                          query_delete_timepoint_in_str);
175
176         pool_debug("pool_clear_cache: delete all query cache created before '%s'", query_delete_timepoint_in_str);
177
178         pg_result = PQexec(system_db_info->pgconn, sql);
179         if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
180         {
181                 pool_error("pool_clear_cache: PQexec() failed. reason: %s",
182                                    PQerrorMessage(system_db_info->pgconn));
183
184                 PQclear(pg_result);
185                 free(query_delete_timepoint_in_str);
186                 free(sql);
187                 return -1;
188         }
189
190         PQclear(pg_result);
191         free(query_delete_timepoint_in_str);
192         free(sql);
193
194         return 0;
195 }
196
197 /* --------------------------------
198  * pool_query_cache_table_exists - checks if query_cache table exists in the SystemDB
199  *
200  * This function is called once and only once from the pgpool parent process.
201  * return 1 if query_cache table exists, 0 otherwise.
202  * --------------------------------
203  */
204 int
205 pool_query_cache_table_exists(void)
206 {
207         PGresult *pg_result = NULL;
208         char *sql = NULL;
209         int sql_len = strlen(pool_config->system_db_schema) + strlen(QUERY_CACHE_TABLE_NAME) + 64;
210
211         if (! system_db_connection_exists())
212                 return 0;
213
214         sql = (char *)malloc(sql_len);
215         if (malloc_failed(sql))
216                 return 0;
217
218         snprintf(sql, sql_len,
219                          "SELECT hash, query, value, dbname, create_time FROM %s.%s LIMIT 1",
220                          pool_config->system_db_schema,
221                          QUERY_CACHE_TABLE_NAME);
222
223         pg_result = PQexec(system_db_info->pgconn, sql);
224         if (!pg_result || PQresultStatus(pg_result) != PGRES_TUPLES_OK)
225         {
226                 pool_error("pool_query_cache_table_exists: PQexec() failed. reason: %s",
227                                    PQerrorMessage(system_db_info->pgconn));
228
229                 PQclear(pg_result);
230                 free(sql);
231                 return 0;
232         }
233
234         PQclear(pg_result);
235         pool_close_libpq_connection();
236         free(sql);
237
238         return 1;
239 }
240
241 /* --------------------------------
242  * pool_query_cache_lookup - retrieve query cache from the SystemDB
243  *
244  * creates a SQL query string for searching a cache from the SystemDB.
245  *
246  * returns POOL_CONTINUE if cache is found. returns POOL_END if cache was
247  * not found. returns POOL_ERROR if an error has been encountered while
248  * searching.
249  *
250  * Note that POOL_END and POOL_ERROR are treated the same by the caller
251  * (pool_process_query.c).
252  * POOL_END and POOL_ERROR both indicates to the caller that the search
253  * query must be forwarded to the backends in order to retrieve data and
254  * the result be cached.
255  * Only difference is that POOL_ERROR indicates that some fatal error has
256  * occured; query cache function, however, should be seemless to the user
257  * whether cache was not found or error has occured during cache retrieve.
258  * --------------------------------
259  */
260 POOL_STATUS
261 pool_query_cache_lookup(POOL_CONNECTION *frontend, char *query, char *database, char tstate)
262 {
263         char *sql = NULL;
264         int sql_len;
265         char md5_query[33];
266         struct timeval timeout;
267         int status;
268
269         if (! system_db_connection_exists())
270                 return POOL_ERROR;              /* same as POOL_END ... at least for now */
271
272         sql_len =
273                 strlen(pool_config->system_db_schema) +
274                 strlen(QUERY_CACHE_TABLE_NAME) +
275                 sizeof(md5_query) +
276                 strlen(database) +
277                 64;
278         sql = (char *)malloc(sql_len);
279         if (malloc_failed(sql))
280                 return POOL_ERROR;              /* should I exit here rather than returning an error? */
281
282         /* cached data lookup */
283         pool_md5_hash(query, strlen(query), md5_query);
284         snprintf(sql, sql_len, "SELECT value FROM %s.%s WHERE hash = '%s' AND dbname = '%s'",
285                          pool_config->system_db_schema,
286                          QUERY_CACHE_TABLE_NAME,
287                          md5_query,
288                          database);
289
290         /* set timeout value for select */
291         timeout.tv_sec = pool_config->child_life_time;
292         timeout.tv_usec = 0;
293
294         pool_debug("pool_query_cache_lookup: searching cache for query: \"%s\"", query);
295         status = search_system_db_for_cache(frontend, sql, strlen(sql)+1, &timeout, tstate);
296
297         /* make sure that the remaining data is discarded */
298         SYSDB_CON->po = 0;
299         SYSDB_CON->len = 0;
300
301         free(sql);
302
303         /* cache found, and no backend communication needed */
304         if (status == CACHE_FOUND)
305         {
306                 return POOL_CONTINUE;
307         }
308
309         /* cache not found */
310
311         if (status == CACHE_ERROR)
312         {
313                 pool_error("pool_query_cache_lookup: query cache lookup failed");
314                 /* reset the SystemDB connection */
315                 if (system_db_info->pgconn)
316                         pool_close_libpq_connection();
317                 return POOL_ERROR;              /* same as POOL_END ... at least for now */
318         }
319
320         pool_debug("pool_query_cache_lookup: query cache not found");
321         return POOL_END;
322 }
323
324 /* --------------------------------
325  * search_system_db_for_cache - search for query cache in libpq protocol level
326  *
327  * sends a cache searching query string using libpq protocol to the SystemDB.
328  * if the SystemDB returns cache, forward the data to the frontend, and return
329  * CACHE_FOUND. if cache was not found, silently discards the remaining data
330  * returned by the SystemDB, and return CACHE_NOT_FOUND. returns CACHE_ERROR
331  * if an error was encountered.
332  * --------------------------------
333  */
334 static CACHE_STATUS
335 search_system_db_for_cache(POOL_CONNECTION *frontend, char *sql, int sql_len, struct timeval *t, char tstate)
336 {
337         fd_set readmask;
338         int fds;
339         int num_fds;
340         struct timeval *timeout = NULL;
341         char kind;
342         int readlen;
343         char *data = NULL;
344         CACHE_STATUS return_value = CACHE_ERROR;
345         int cache_found = 0;
346
347         pool_debug("pool_query_cache_lookup: executing query: \"%s\"", sql);
348
349         pool_write(SYSDB_CON, "Q", 1);
350         if (SYSDB_MAJOR == PROTO_MAJOR_V3)
351         {
352                 int sendlen = htonl(sql_len + 4);
353                 pool_write(SYSDB_CON, &sendlen, sizeof(sendlen));
354         }
355         if (pool_write_and_flush(SYSDB_CON, sql, sql_len) < 0)
356         {
357                 pool_error("pool_query_cache_lookup: error while sending data to the SystemDB");
358                 return CACHE_ERROR;
359         }
360
361         if ((t->tv_sec + t->tv_usec) == 0)
362                 timeout = NULL;
363         else
364                 timeout = t;
365
366         /* don't really need select() or for(;;) here, but we may need it someday... or not */
367         for (;;)
368         {
369                 FD_ZERO(&readmask);
370                 num_fds = 0;
371
372                 num_fds = SYSDB_CON->fd + 1;
373                 FD_SET(SYSDB_CON->fd, &readmask);
374                 fds = select(num_fds, &readmask, NULL, NULL, timeout);
375                 if (fds == -1)
376                 {
377                         if (errno == EINTR)
378                                 continue;
379
380                         pool_error("pool_query_cache_lookup: select() failed. reason: %s", strerror(errno));
381                         return CACHE_ERROR;
382                 }
383
384                 /* select() timeout */
385                 if (fds == 0)
386                         return CACHE_ERROR;
387
388                 for (;;)
389                 {
390                         if (! FD_ISSET(SYSDB_CON->fd, &readmask))
391                         {
392                                 pool_error("pool_query_cache_lookup: select() failed");
393                                 return CACHE_ERROR;
394                         }
395
396                         /* read kind */
397                         if (pool_read(SYSDB_CON, &kind, sizeof(kind)) < 0)
398                         {
399                                 pool_error("pool_query_cache_lookup: error while reading message kind");
400                                 return CACHE_ERROR;
401                         }
402                         pool_debug("pool_query_cache_lookup: received %c from systemdb", kind);
403
404                         /* just do the routine work of reading data in. data won't be used */
405                         if (kind == 'T')
406                         {
407                                 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
408                                 {
409                                         if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
410                                         {
411                                                 pool_error("pool_query_cache_lookup: error while reading message length");
412                                                 return CACHE_ERROR;
413                                         }
414                                         readlen = ntohl(readlen) - sizeof(int);
415                                         data = pool_read2(SYSDB_CON, readlen);
416                                 }
417                                 else
418                                 {
419                                         data = pool_read_string(SYSDB_CON, &readlen, 0);
420                                 }
421                         }
422                         else if (kind == 'D') /* cache found! forward it to the frontend */
423                         {
424                                 char *cache;
425                                 int status;
426
427                                 cache_found = 1;
428
429                                 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
430                                 {
431                                         if (pool_read(SYSDB_CON, &readlen, sizeof(readlen)) < 0)
432                                         {
433                                                 pool_error("pool_query_cache_lookup: error while reading message length");
434                                                 return CACHE_ERROR;
435                                         }
436                                         readlen = ntohl(readlen) - sizeof(int);
437                                         cache = pool_read2(SYSDB_CON, readlen);
438                                 }
439                                 else
440                                 {
441                                         cache = pool_read_string(SYSDB_CON, &readlen, 0);
442                                 }
443
444                                 if (cache == NULL)
445                                 {
446                                         pool_error("pool_query_cache_lookup: error while reading message body");
447                                         return CACHE_ERROR;
448                                 }
449
450                                 cache[readlen] = '\0';
451
452                                 cache += sizeof(short); /* number of columns in 'D' (we know it's always going to be 1, so skip) */
453                                 cache += sizeof(int); /* length of escaped bytea cache in string format. don't need the length */
454
455                                 status = ForwardCacheToFrontend(frontend, cache, tstate);
456                                 if (status < 0)
457                                 {
458                                         /* fatal error has occured while forwarding cache */
459                                         pool_error("pool_query_cache_lookup: query cache forwarding failed");
460                                         return_value = CACHE_ERROR;
461                                 }
462                         }
463                         else if (kind == 'C') /* see if 'D' was received */
464                         {
465                                 if (cache_found)
466                                         return_value = CACHE_FOUND;
467                                 else
468                                         return_value = CACHE_NOT_FOUND;
469
470                                 /* must discard the remaining data */
471                                 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
472                                 {
473                                         if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
474                                         {
475                                                 pool_error("pool_query_cache_lookup: error while reading message length");
476                                                 return CACHE_ERROR;
477                                         }
478                                         readlen = ntohl(readlen) - sizeof(int);
479                                         data = pool_read2(SYSDB_CON, readlen);
480                                 }
481                                 else
482                                 {
483                                         data = pool_read_string(SYSDB_CON, &readlen, 0);
484                                 }
485                         }
486                         else if (kind == 'Z')
487                         {
488                                 /* must discard the remaining data */
489                                 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
490                                 {
491                                         if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
492                                         {
493                                                 pool_error("pool_query_cache_lookup: error while reading message length");
494                                                 return CACHE_ERROR;
495                                         }
496                                         readlen = ntohl(readlen) - sizeof(int);
497                                         data = pool_read2(SYSDB_CON, readlen);
498                                 }
499                                 else
500                                 {
501                                         data = pool_read_string(SYSDB_CON, &readlen, 0);
502                                 }
503
504                                 break;
505                         }
506                         else if (kind == 'E')
507                         {
508                                 /* must discard the remaining data */
509                                 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
510                                 {
511                                         if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
512                                         {
513                                                 pool_error("pool_query_cache_lookup: error while reading message length");
514                                                 return CACHE_ERROR;
515                                         }
516                                         readlen = ntohl(readlen) - sizeof(int);
517                                         data = pool_read2(SYSDB_CON, readlen);
518                                 }
519                                 else
520                                 {
521                                         data = pool_read_string(SYSDB_CON, &readlen, 0);
522                                 }
523
524                                 return_value = CACHE_ERROR;
525                         }
526                         else
527                         {
528                                 /* shouldn't get here, but just in case */
529                                 return CACHE_ERROR;
530                         }
531                 }
532
533                 break;
534         }
535
536         return return_value;
537 }
538
539 /* --------------------------------
540  * ForwardCacheToFrontend - simply forwards cached data to the frontend
541  *
542  * since the cached data passed from the caller is in escaped binary string
543  * format, unescape it and send it to the frontend appending 'Z' at the end.
544  * returns 0 on success, -1 otherwise.
545  * --------------------------------
546  */
547 static int ForwardCacheToFrontend(POOL_CONNECTION *frontend, char *cache, char tstate)
548 {
549         int sendlen;
550         size_t sz;
551         char *binary_cache = NULL;
552
553         binary_cache = (char *)PQunescapeBytea((unsigned char *)cache, &sz);
554         sendlen = (int) sz;
555         if (malloc_failed(binary_cache))
556                 return -1;
557
558         pool_debug("ForwardCacheToFrontend: query cache found (%d bytes)", sendlen);
559
560         /* forward cache to the frontend */
561         pool_write(frontend, binary_cache, sendlen);
562
563         /* send ReadyForQuery to the frontend*/
564         pool_write(frontend, "Z", 1);
565         sendlen = htonl(5);
566         pool_write(frontend, &sendlen, sizeof(int));
567         if (pool_write_and_flush(frontend, &tstate, 1) < 0)
568         {
569                 pool_error("pool_query_cache_lookup: error while writing data to the frontend");
570                 PQfreemem(binary_cache);
571                 return -1;
572         }
573
574         PQfreemem(binary_cache);
575         return 0;
576 }
577
578 /* --------------------------------
579  * pool_query_cache_register() - register query cache to the SystemDB
580  *
581  * returns 0 on sucess, -1 otherwise
582  * --------------------------------
583  */
584 int
585 pool_query_cache_register(char kind,
586                                                   POOL_CONNECTION *frontend,
587                                                   char *database,
588                                                   char *data,
589                                                   int data_len,
590                                                   char *query)
591 {
592         int ret;
593         int send_len;
594
595         if (! system_db_connection_exists())
596                 return -1;
597         if (! CACHE_TABLE_INFO.has_prepared_statement)
598                 define_prepared_statements();
599
600         switch (kind)
601         {
602                 case 'T':                               /* RowDescription */
603                 {
604                         /* for all SELECT result data from the backend, 'T' must come first */
605                         if (query_cache_info != NULL)
606                         {
607                                 pool_error("pool_query_cache_register: received RowDescription in the wrong order");
608                                 free_query_cache_info();
609                                 return -1;
610                         }
611
612                         pool_debug("pool_query_cache_register: saving cache for query: \"%s\"", query);
613
614                         /* initialize query_cache_info and save the query */
615                         ret = init_query_cache_info(frontend, database, query);
616                         if (ret)
617                                 return ret;
618
619                         /* store data into the cache */
620                         write_cache(&kind, 1);
621                         send_len = htonl(data_len + sizeof(int));
622                         write_cache(&send_len, sizeof(int));
623                         write_cache(data, data_len);
624
625                         break;
626                 }
627
628                 case 'D':                               /* DataRow */
629                 {
630                         /* for all SELECT result data from the backend, 'T' must come first */
631                         if (query_cache_info == NULL)
632                         {
633                                 pool_error("pool_query_cache_register: received DataRow in the wrong order");
634                                 return -1;
635                         }
636
637                         write_cache(&kind, 1);
638                         send_len = htonl(data_len + sizeof(int));
639                         write_cache(&send_len, sizeof(int));
640                         write_cache(data, data_len);
641
642                         break;
643                 }
644
645                 case 'C':                               /* CommandComplete */
646                 {
647                         PGresult *pg_result = NULL;
648                         char *escaped_query = NULL;
649                         size_t escaped_query_len;
650                         time_t now = time(NULL);
651                         char *values[5];
652                         int values_len[5];
653                         int values_format[5];
654                         int i;
655
656                         /* for all SELECT result data from the backend, 'T' must come first */
657                         if (query_cache_info == NULL)
658                         {
659                                 pool_error("pool_query_cache_register: received CommandComplete in the wrong order");
660                                 return -1;
661                         }
662
663                         /* pack CommandComplete data into the cache */
664                         write_cache(&kind, 1);
665                         send_len = htonl(data_len + sizeof(int));
666                         write_cache(&send_len, sizeof(int));
667                         write_cache(data, data_len);
668
669                         query_cache_info->create_time = pq_time_to_str(now);
670                         if (malloc_failed(query_cache_info->create_time))
671                         {
672                                 free_query_cache_info();
673                                 return -1;
674                         }
675
676                         escaped_query = (char *)malloc(strlen(query_cache_info->query) * 2 + 1);
677                         if (malloc_failed(escaped_query))
678                         {
679                                 free_query_cache_info();
680                                 return -1;
681                         }
682
683 /*                      escaped_query_len = PQescapeStringConn(system_db_info->pgconn, */
684 /*                                                                                                 escaped_query, */
685 /*                                                                                                 query_cache_info->query, */
686 /*                                                                                                 strlen(query_cache_info->query))); */
687                         escaped_query_len = PQescapeString(escaped_query, query_cache_info->query, strlen(query_cache_info->query));
688
689                         /* all the result data have been received. store into the SystemDB */
690                         values[0] = strdup(query_cache_info->md5_query);
691                         values[1] = strdup(escaped_query);
692                         values[2] = (char *)malloc(query_cache_info->cache_offset);
693                         memcpy(values[2], query_cache_info->cache, query_cache_info->cache_offset);
694                         values[3] = strdup(query_cache_info->db_name);
695                         values[4] = strdup(query_cache_info->create_time);
696                         for (i = 0; i < 5; i++)
697                         {
698                                 if (malloc_failed(values[i]))
699                                 {
700                                         pool_error("pool_query_cache_register: malloc() failed");
701                                         free_query_cache_info();
702                                         {
703                                                 int j;
704                                                 for (j = 0; j < i; j++)
705                                                         free(values[j]);
706                                         }
707                                         return -1;
708                                 }
709
710                                 values_len[i] = strlen(values[i]);
711                         }
712                         values_format[0] = values_format[1] = values_format[3] = values_format[4] = 0;
713                         values_format[2] = 1;
714                         values_len[2] = query_cache_info->cache_offset;
715
716                         pg_result = PQexecPrepared(system_db_info->pgconn,
717                                                                            CACHE_TABLE_INFO.register_prepared_statement,
718                                                                            5,
719                                                                            (const char * const *)values,
720                                                                            values_len,
721                                                                            values_format,
722                                                                            0);
723                         if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
724                         {
725                                 pool_error("pool_query_cache_register: PQexecPrepared() failed. reason: %s",
726                                                    PQerrorMessage(system_db_info->pgconn));
727
728                                 PQclear(pg_result);
729                                 PQfreemem(escaped_query);
730                                 free_query_cache_info();
731                                 for (i = 0; i < 5; i++)
732                                         free(values[i]);
733                                 return -1;
734                         }
735
736                         PQclear(pg_result);
737                         PQfreemem(escaped_query);
738                         for (i = 0; i < 5; i++)
739                                 free(values[i]);
740                         free_query_cache_info();
741
742                         break;
743                 }
744
745                 case 'E':
746                 {
747                         pool_debug("pool_query_cache_register: received 'E': free query cache buffer");
748
749                         pool_close_libpq_connection();
750                         free_query_cache_info();
751
752                         break;
753                 }
754         }
755
756         return 0;
757 }
758
759 /* --------------------------------
760  * init_query_cache_info() - allocate memory for query_cache_info and stores query
761  *
762  * returns 0 on success, -1 otherwise
763 `* --------------------------------
764  */
765 static int
766 init_query_cache_info(POOL_CONNECTION *pc, char *database, char *query)
767 {
768         int query_len;                          /* length of the SELECT query to be cached */
769
770         query_cache_info = (QueryCacheInfo *)malloc(sizeof(QueryCacheInfo));
771         if (malloc_failed(query_cache_info))
772                 return -1;
773
774         /* query */
775         query_len = strlen(query);
776         query_cache_info->query = (char *)malloc(query_len + 1);
777         if (malloc_failed(query_cache_info->query))
778                 return -1;
779         memcpy(query_cache_info->query, query, query_len + 1);
780
781         /* md5_query */
782         query_cache_info->md5_query = (char *)malloc(33); /* md5sum is always 33 bytes (including the '\0') */
783         if (malloc_failed(query_cache_info->md5_query))
784                 return -1;
785         pool_md5_hash(query_cache_info->query, query_len, query_cache_info->md5_query);
786
787         /* malloc DEFAULT_CACHE_SIZE for query_cache_info->cache */
788         query_cache_info->cache = (char *)malloc(DEFAULT_CACHE_SIZE);
789         if (malloc_failed(query_cache_info->cache))
790                 return -1;
791         query_cache_info->cache_size = DEFAULT_CACHE_SIZE;
792         query_cache_info->cache_offset = 0;
793
794         /* save database name */
795         query_cache_info->db_name = (char *)malloc(strlen(database)+1);
796         if (malloc_failed(query_cache_info->db_name))
797                 return -1;
798         strcpy(query_cache_info->db_name, database);
799
800         return 0;
801 }
802
803 /* --------------------------------
804  * free_query_cache_info() - free query_cache_info and its members
805  * --------------------------------
806  */
807 static void
808 free_query_cache_info(void)
809 {
810         if (query_cache_info == NULL)
811                 return;
812
813         free(query_cache_info->md5_query);
814         free(query_cache_info->query);
815         free(query_cache_info->cache);
816         free(query_cache_info->db_name);
817         free(query_cache_info->create_time);
818         free(query_cache_info);
819         query_cache_info = NULL;
820 }
821
822 /* --------------------------------
823  * malloc_failed() - checks if the caller's most recent malloc() has succeeded
824  *
825  * returns 0 if malloc() was a success, -1 otherwise
826 `* --------------------------------
827  */
828 static int
829 malloc_failed(void *p)
830 {
831         if (p != NULL)
832                 return 0;
833
834         pool_error("pool_query_cache: malloc() failed");
835         free_query_cache_info();
836
837         return -1;
838 }
839
840 /* --------------------------------
841  * write_cache() - append result data to buffer
842  *
843  * returns 0 on success, -1 otherwise
844  * --------------------------------
845  */
846 static int
847 write_cache(void *buf, int len)
848 {
849         int required_len;
850
851         if (len < 0)
852                 return -1;
853
854         required_len = query_cache_info->cache_offset + len;
855         if (required_len > query_cache_info->cache_size)
856         {
857                 char *ptr;
858
859                 required_len = query_cache_info->cache_size * 2;
860                 ptr = (char *)realloc(query_cache_info->cache, required_len);
861                 if (malloc_failed(ptr))
862                         return -1;
863
864                 query_cache_info->cache = ptr;
865                 query_cache_info->cache_size = required_len;
866
867                 pool_debug("pool_query_cache: extended cache buffer size to %d", query_cache_info->cache_size);
868         }
869
870         memcpy(query_cache_info->cache + query_cache_info->cache_offset, buf, len);
871         query_cache_info->cache_offset += len;
872
873         return 0;
874 }
875
876 /* --------------------------------
877  * pq_time_to_str() - convert time_t to ISO standard time output string
878  *
879  * returns a pointer to newly allocated string, NULL if malloc fails
880  * --------------------------------
881  */
882 static char *
883 pq_time_to_str(time_t t)
884 {
885         char *time_p;
886         char iso_time[32];
887         struct tm *tm;
888
889         tm = localtime(&t);
890         strftime(iso_time, sizeof(iso_time), "%Y-%m-%d %H:%M:%S%z", tm);
891
892         time_p = strdup(iso_time);
893
894         return time_p;
895 }
896
897
898 /* --------------------------------
899  * system_db_connection_exists() - checks and if a connection to the SystemDB exists
900  *
901  * if not connected, it makes an attempt to connect to the SystemDB. If a connection
902  * exists, returns 1, returns 0 otherwise.
903  * --------------------------------
904  */
905 static int
906 system_db_connection_exists(void)
907 {
908         if (!system_db_info->pgconn ||
909                 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
910         {
911                 if (system_db_connect())
912                         return 0;
913         }
914
915         return 1;
916 }
917
918
919 /* --------------------------------
920  * define_prepared_statements() - defines prepared statements for the current session
921  * --------------------------------
922  */
923 static void
924 define_prepared_statements(void)
925 {
926         PGresult *pg_result;
927         char *sql = NULL;
928         int sql_len;
929
930         sql_len =
931                 strlen(pool_config->system_db_schema) +
932                 strlen(QUERY_CACHE_TABLE_NAME) +
933                 1024;
934
935         sql = (char *)malloc(sql_len);
936         if (malloc_failed(sql))
937         {
938                 pool_error("pool_query_cache: malloc() failed");
939                 return;
940         }
941
942         free(CACHE_TABLE_INFO.register_prepared_statement);
943         CACHE_TABLE_INFO.register_prepared_statement
944                 = strdup(CACHE_REGISTER_PREPARED_STMT);
945         if (malloc_failed(CACHE_TABLE_INFO.register_prepared_statement))
946         {
947                 pool_error("pool_query_cache: malloc() failed");
948                 free(sql);
949                 return;
950         }
951
952 #ifdef HAVE_PQPREPARE
953         snprintf(sql, sql_len,
954                          "INSERT INTO %s.%s VALUES ( $1, $2, $3, $4, $5 )",
955                          pool_config->system_db_schema,
956                          QUERY_CACHE_TABLE_NAME);
957         pg_result = PQprepare(system_db_info->pgconn,
958                                                   CACHE_TABLE_INFO.register_prepared_statement,
959                                                   sql,
960                                                   5,
961                                                   NULL);
962 #else
963         snprintf(sql, sql_len,
964                          "PREPARE %s (TEXT, TEXT, BYTEA, TEXT, TIMESTAMP WITH TIME ZONE) AS INSERT INTO %s.%s VALUES ( $1, $2, $3, $4, $5 )",
965                          CACHE_TABLE_INFO.register_prepared_statement,
966                          pool_config->system_db_schema,
967                          QUERY_CACHE_TABLE_NAME);
968         pg_result = PQexec(system_db_info->pgconn, sql);
969 #endif
970         if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
971         {
972                 pool_error("pool_query_cache: PQprepare() failed: %s", PQerrorMessage(system_db_info->pgconn));
973                 free(CACHE_TABLE_INFO.register_prepared_statement);
974                 free(sql);
975                 return;
976         }
977
978         pool_debug("pool_query_cache: prepared statements created");
979         CACHE_TABLE_INFO.has_prepared_statement = 1;
980
981         free(sql);
982         PQclear(pg_result);
983 }