3 * $Header: /cvsroot/pgpool/pgpool-II/pool_query_cache.c,v 1.9.2.1 2009/08/22 04:19:49 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2008 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * pool_query_cache.c: query cache
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
41 #define QUERY_CACHE_TABLE_NAME "query_cache"
42 #define CACHE_REGISTER_PREPARED_STMT "register_prepared_stmt"
43 #define DEFAULT_CACHE_SIZE 8192
45 #define SYSDB_CON (SYSDB_CONNECTION->con)
46 #define SYSDB_MAJOR (SYSDB_CONNECTION->sp->major)
47 #define CACHE_TABLE_INFO (SYSDB_INFO->query_cache_table_info)
49 /* data structure to store RowDescription and DataRow cache */
52 char *md5_query; /* query in md5 hushed format*/
53 char *query; /* query string */
54 char *cache; /* cached data */
55 int cache_size; /* cached data size */
56 int cache_offset; /* points the end of cache */
57 char *db_name; /* database name */
58 char *create_time; /* cache create timestamp in ISO format */
63 CACHE_FOUND, CACHE_NOT_FOUND, CACHE_ERROR
66 static QueryCacheInfo *query_cache_info;
68 static CACHE_STATUS search_system_db_for_cache(POOL_CONNECTION *frontend, char *sql, int sql_len, struct timeval *t, char tstate);
69 static int ForwardCacheToFrontend(POOL_CONNECTION *frontend, char *cache, char tstate);
70 static int init_query_cache_info(POOL_CONNECTION *pc, char *database, char *query);
71 static void free_query_cache_info(void);
72 static int malloc_failed(void *p);
73 static int write_cache(void *buf, int len);
74 static char *pq_time_to_str(time_t t);
75 static int system_db_connection_exists(void);
76 static void define_prepared_statements(void);
78 /* --------------------------------
79 * pool_clear_cache - clears cache data from the SystemDB
81 * return 0 on success, -1 otherwise
82 * --------------------------------
85 pool_clear_cache_by_time(Interval *interval, int size)
87 PGresult *pg_result = NULL;
88 long interval_in_days = 0;
89 long interval_in_seconds = 0;
90 time_t query_delete_timepoint;
91 char *query_delete_timepoint_in_str = NULL;
96 if (! system_db_connection_exists())
99 for (i = 0; i < size; i++)
101 int q = interval[i].quantity;
103 switch (interval[i].unit)
123 q *= 31; /* should I change this according to the month? */
124 interval_in_days += q;
145 interval_in_seconds += q;
150 interval_in_seconds = (interval_in_days * 86400) + interval_in_seconds;
151 query_delete_timepoint = time(NULL) - interval_in_seconds;
153 query_delete_timepoint_in_str = pq_time_to_str(query_delete_timepoint);
154 if (malloc_failed(query_delete_timepoint_in_str))
158 strlen(pool_config->system_db_schema) +
159 strlen(QUERY_CACHE_TABLE_NAME) +
160 strlen(query_delete_timepoint_in_str) +
163 sql = (char *)malloc(sql_len);
164 if (malloc_failed(sql))
166 free(query_delete_timepoint_in_str);
170 snprintf(sql, sql_len,
171 "DELETE FROM %s.%s WHERE create_time <= '%s'",
172 pool_config->system_db_schema,
173 QUERY_CACHE_TABLE_NAME,
174 query_delete_timepoint_in_str);
176 pool_debug("pool_clear_cache: delete all query cache created before '%s'", query_delete_timepoint_in_str);
178 pg_result = PQexec(system_db_info->pgconn, sql);
179 if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
181 pool_error("pool_clear_cache: PQexec() failed. reason: %s",
182 PQerrorMessage(system_db_info->pgconn));
185 free(query_delete_timepoint_in_str);
191 free(query_delete_timepoint_in_str);
197 /* --------------------------------
198 * pool_query_cache_table_exists - checks if query_cache table exists in the SystemDB
200 * This function is called once and only once from the pgpool parent process.
201 * return 1 if query_cache table exists, 0 otherwise.
202 * --------------------------------
205 pool_query_cache_table_exists(void)
207 PGresult *pg_result = NULL;
209 int sql_len = strlen(pool_config->system_db_schema) + strlen(QUERY_CACHE_TABLE_NAME) + 64;
211 if (! system_db_connection_exists())
214 sql = (char *)malloc(sql_len);
215 if (malloc_failed(sql))
218 snprintf(sql, sql_len,
219 "SELECT hash, query, value, dbname, create_time FROM %s.%s LIMIT 1",
220 pool_config->system_db_schema,
221 QUERY_CACHE_TABLE_NAME);
223 pg_result = PQexec(system_db_info->pgconn, sql);
224 if (!pg_result || PQresultStatus(pg_result) != PGRES_TUPLES_OK)
226 pool_error("pool_query_cache_table_exists: PQexec() failed. reason: %s",
227 PQerrorMessage(system_db_info->pgconn));
235 pool_close_libpq_connection();
241 /* --------------------------------
242 * pool_query_cache_lookup - retrieve query cache from the SystemDB
244 * creates a SQL query string for searching a cache from the SystemDB.
246 * returns POOL_CONTINUE if cache is found. returns POOL_END if cache was
247 * not found. returns POOL_ERROR if an error has been encountered while
250 * Note that POOL_END and POOL_ERROR are treated the same by the caller
251 * (pool_process_query.c).
252 * POOL_END and POOL_ERROR both indicates to the caller that the search
253 * query must be forwarded to the backends in order to retrieve data and
254 * the result be cached.
255 * Only difference is that POOL_ERROR indicates that some fatal error has
256 * occured; query cache function, however, should be seemless to the user
257 * whether cache was not found or error has occured during cache retrieve.
258 * --------------------------------
261 pool_query_cache_lookup(POOL_CONNECTION *frontend, char *query, char *database, char tstate)
266 struct timeval timeout;
269 if (! system_db_connection_exists())
270 return POOL_ERROR; /* same as POOL_END ... at least for now */
273 strlen(pool_config->system_db_schema) +
274 strlen(QUERY_CACHE_TABLE_NAME) +
278 sql = (char *)malloc(sql_len);
279 if (malloc_failed(sql))
280 return POOL_ERROR; /* should I exit here rather than returning an error? */
282 /* cached data lookup */
283 pool_md5_hash(query, strlen(query), md5_query);
284 snprintf(sql, sql_len, "SELECT value FROM %s.%s WHERE hash = '%s' AND dbname = '%s'",
285 pool_config->system_db_schema,
286 QUERY_CACHE_TABLE_NAME,
290 /* set timeout value for select */
291 timeout.tv_sec = pool_config->child_life_time;
294 pool_debug("pool_query_cache_lookup: searching cache for query: \"%s\"", query);
295 status = search_system_db_for_cache(frontend, sql, strlen(sql)+1, &timeout, tstate);
297 /* make sure that the remaining data is discarded */
303 /* cache found, and no backend communication needed */
304 if (status == CACHE_FOUND)
306 return POOL_CONTINUE;
309 /* cache not found */
311 if (status == CACHE_ERROR)
313 pool_error("pool_query_cache_lookup: query cache lookup failed");
314 /* reset the SystemDB connection */
315 if (system_db_info->pgconn)
316 pool_close_libpq_connection();
317 return POOL_ERROR; /* same as POOL_END ... at least for now */
320 pool_debug("pool_query_cache_lookup: query cache not found");
324 /* --------------------------------
325 * search_system_db_for_cache - search for query cache in libpq protocol level
327 * sends a cache searching query string using libpq protocol to the SystemDB.
328 * if the SystemDB returns cache, forward the data to the frontend, and return
329 * CACHE_FOUND. if cache was not found, silently discards the remaining data
330 * returned by the SystemDB, and return CACHE_NOT_FOUND. returns CACHE_ERROR
331 * if an error was encountered.
332 * --------------------------------
335 search_system_db_for_cache(POOL_CONNECTION *frontend, char *sql, int sql_len, struct timeval *t, char tstate)
340 struct timeval *timeout = NULL;
344 CACHE_STATUS return_value = CACHE_ERROR;
347 pool_debug("pool_query_cache_lookup: executing query: \"%s\"", sql);
349 pool_write(SYSDB_CON, "Q", 1);
350 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
352 int sendlen = htonl(sql_len + 4);
353 pool_write(SYSDB_CON, &sendlen, sizeof(sendlen));
355 if (pool_write_and_flush(SYSDB_CON, sql, sql_len) < 0)
357 pool_error("pool_query_cache_lookup: error while sending data to the SystemDB");
361 if ((t->tv_sec + t->tv_usec) == 0)
366 /* don't really need select() or for(;;) here, but we may need it someday... or not */
372 num_fds = SYSDB_CON->fd + 1;
373 FD_SET(SYSDB_CON->fd, &readmask);
374 fds = select(num_fds, &readmask, NULL, NULL, timeout);
380 pool_error("pool_query_cache_lookup: select() failed. reason: %s", strerror(errno));
384 /* select() timeout */
390 if (! FD_ISSET(SYSDB_CON->fd, &readmask))
392 pool_error("pool_query_cache_lookup: select() failed");
397 if (pool_read(SYSDB_CON, &kind, sizeof(kind)) < 0)
399 pool_error("pool_query_cache_lookup: error while reading message kind");
402 pool_debug("pool_query_cache_lookup: received %c from systemdb", kind);
404 /* just do the routine work of reading data in. data won't be used */
407 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
409 if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
411 pool_error("pool_query_cache_lookup: error while reading message length");
414 readlen = ntohl(readlen) - sizeof(int);
415 data = pool_read2(SYSDB_CON, readlen);
419 data = pool_read_string(SYSDB_CON, &readlen, 0);
422 else if (kind == 'D') /* cache found! forward it to the frontend */
429 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
431 if (pool_read(SYSDB_CON, &readlen, sizeof(readlen)) < 0)
433 pool_error("pool_query_cache_lookup: error while reading message length");
436 readlen = ntohl(readlen) - sizeof(int);
437 cache = pool_read2(SYSDB_CON, readlen);
441 cache = pool_read_string(SYSDB_CON, &readlen, 0);
446 pool_error("pool_query_cache_lookup: error while reading message body");
450 cache[readlen] = '\0';
452 cache += sizeof(short); /* number of columns in 'D' (we know it's always going to be 1, so skip) */
453 cache += sizeof(int); /* length of escaped bytea cache in string format. don't need the length */
455 status = ForwardCacheToFrontend(frontend, cache, tstate);
458 /* fatal error has occured while forwarding cache */
459 pool_error("pool_query_cache_lookup: query cache forwarding failed");
460 return_value = CACHE_ERROR;
463 else if (kind == 'C') /* see if 'D' was received */
466 return_value = CACHE_FOUND;
468 return_value = CACHE_NOT_FOUND;
470 /* must discard the remaining data */
471 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
473 if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
475 pool_error("pool_query_cache_lookup: error while reading message length");
478 readlen = ntohl(readlen) - sizeof(int);
479 data = pool_read2(SYSDB_CON, readlen);
483 data = pool_read_string(SYSDB_CON, &readlen, 0);
486 else if (kind == 'Z')
488 /* must discard the remaining data */
489 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
491 if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
493 pool_error("pool_query_cache_lookup: error while reading message length");
496 readlen = ntohl(readlen) - sizeof(int);
497 data = pool_read2(SYSDB_CON, readlen);
501 data = pool_read_string(SYSDB_CON, &readlen, 0);
506 else if (kind == 'E')
508 /* must discard the remaining data */
509 if (SYSDB_MAJOR == PROTO_MAJOR_V3)
511 if (pool_read(SYSDB_CON, &readlen, sizeof(int)) < 0)
513 pool_error("pool_query_cache_lookup: error while reading message length");
516 readlen = ntohl(readlen) - sizeof(int);
517 data = pool_read2(SYSDB_CON, readlen);
521 data = pool_read_string(SYSDB_CON, &readlen, 0);
524 return_value = CACHE_ERROR;
528 /* shouldn't get here, but just in case */
539 /* --------------------------------
540 * ForwardCacheToFrontend - simply forwards cached data to the frontend
542 * since the cached data passed from the caller is in escaped binary string
543 * format, unescape it and send it to the frontend appending 'Z' at the end.
544 * returns 0 on success, -1 otherwise.
545 * --------------------------------
547 static int ForwardCacheToFrontend(POOL_CONNECTION *frontend, char *cache, char tstate)
551 char *binary_cache = NULL;
553 binary_cache = (char *)PQunescapeBytea((unsigned char *)cache, &sz);
555 if (malloc_failed(binary_cache))
558 pool_debug("ForwardCacheToFrontend: query cache found (%d bytes)", sendlen);
560 /* forward cache to the frontend */
561 pool_write(frontend, binary_cache, sendlen);
563 /* send ReadyForQuery to the frontend*/
564 pool_write(frontend, "Z", 1);
566 pool_write(frontend, &sendlen, sizeof(int));
567 if (pool_write_and_flush(frontend, &tstate, 1) < 0)
569 pool_error("pool_query_cache_lookup: error while writing data to the frontend");
570 PQfreemem(binary_cache);
574 PQfreemem(binary_cache);
578 /* --------------------------------
579 * pool_query_cache_register() - register query cache to the SystemDB
581 * returns 0 on sucess, -1 otherwise
582 * --------------------------------
585 pool_query_cache_register(char kind,
586 POOL_CONNECTION *frontend,
595 if (! system_db_connection_exists())
597 if (! CACHE_TABLE_INFO.has_prepared_statement)
598 define_prepared_statements();
602 case 'T': /* RowDescription */
604 /* for all SELECT result data from the backend, 'T' must come first */
605 if (query_cache_info != NULL)
607 pool_error("pool_query_cache_register: received RowDescription in the wrong order");
608 free_query_cache_info();
612 pool_debug("pool_query_cache_register: saving cache for query: \"%s\"", query);
614 /* initialize query_cache_info and save the query */
615 ret = init_query_cache_info(frontend, database, query);
619 /* store data into the cache */
620 write_cache(&kind, 1);
621 send_len = htonl(data_len + sizeof(int));
622 write_cache(&send_len, sizeof(int));
623 write_cache(data, data_len);
628 case 'D': /* DataRow */
630 /* for all SELECT result data from the backend, 'T' must come first */
631 if (query_cache_info == NULL)
633 pool_error("pool_query_cache_register: received DataRow in the wrong order");
637 write_cache(&kind, 1);
638 send_len = htonl(data_len + sizeof(int));
639 write_cache(&send_len, sizeof(int));
640 write_cache(data, data_len);
645 case 'C': /* CommandComplete */
647 PGresult *pg_result = NULL;
648 char *escaped_query = NULL;
649 size_t escaped_query_len;
650 time_t now = time(NULL);
653 int values_format[5];
656 /* for all SELECT result data from the backend, 'T' must come first */
657 if (query_cache_info == NULL)
659 pool_error("pool_query_cache_register: received CommandComplete in the wrong order");
663 /* pack CommandComplete data into the cache */
664 write_cache(&kind, 1);
665 send_len = htonl(data_len + sizeof(int));
666 write_cache(&send_len, sizeof(int));
667 write_cache(data, data_len);
669 query_cache_info->create_time = pq_time_to_str(now);
670 if (malloc_failed(query_cache_info->create_time))
672 free_query_cache_info();
676 escaped_query = (char *)malloc(strlen(query_cache_info->query) * 2 + 1);
677 if (malloc_failed(escaped_query))
679 free_query_cache_info();
683 /* escaped_query_len = PQescapeStringConn(system_db_info->pgconn, */
685 /* query_cache_info->query, */
686 /* strlen(query_cache_info->query))); */
687 escaped_query_len = PQescapeString(escaped_query, query_cache_info->query, strlen(query_cache_info->query));
689 /* all the result data have been received. store into the SystemDB */
690 values[0] = strdup(query_cache_info->md5_query);
691 values[1] = strdup(escaped_query);
692 values[2] = (char *)malloc(query_cache_info->cache_offset);
693 memcpy(values[2], query_cache_info->cache, query_cache_info->cache_offset);
694 values[3] = strdup(query_cache_info->db_name);
695 values[4] = strdup(query_cache_info->create_time);
696 for (i = 0; i < 5; i++)
698 if (malloc_failed(values[i]))
700 pool_error("pool_query_cache_register: malloc() failed");
701 free_query_cache_info();
704 for (j = 0; j < i; j++)
710 values_len[i] = strlen(values[i]);
712 values_format[0] = values_format[1] = values_format[3] = values_format[4] = 0;
713 values_format[2] = 1;
714 values_len[2] = query_cache_info->cache_offset;
716 pg_result = PQexecPrepared(system_db_info->pgconn,
717 CACHE_TABLE_INFO.register_prepared_statement,
719 (const char * const *)values,
723 if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
725 pool_error("pool_query_cache_register: PQexecPrepared() failed. reason: %s",
726 PQerrorMessage(system_db_info->pgconn));
729 PQfreemem(escaped_query);
730 free_query_cache_info();
731 for (i = 0; i < 5; i++)
737 PQfreemem(escaped_query);
738 for (i = 0; i < 5; i++)
740 free_query_cache_info();
747 pool_debug("pool_query_cache_register: received 'E': free query cache buffer");
749 pool_close_libpq_connection();
750 free_query_cache_info();
759 /* --------------------------------
760 * init_query_cache_info() - allocate memory for query_cache_info and stores query
762 * returns 0 on success, -1 otherwise
763 `* --------------------------------
766 init_query_cache_info(POOL_CONNECTION *pc, char *database, char *query)
768 int query_len; /* length of the SELECT query to be cached */
770 query_cache_info = (QueryCacheInfo *)malloc(sizeof(QueryCacheInfo));
771 if (malloc_failed(query_cache_info))
775 query_len = strlen(query);
776 query_cache_info->query = (char *)malloc(query_len + 1);
777 if (malloc_failed(query_cache_info->query))
779 memcpy(query_cache_info->query, query, query_len + 1);
782 query_cache_info->md5_query = (char *)malloc(33); /* md5sum is always 33 bytes (including the '\0') */
783 if (malloc_failed(query_cache_info->md5_query))
785 pool_md5_hash(query_cache_info->query, query_len, query_cache_info->md5_query);
787 /* malloc DEFAULT_CACHE_SIZE for query_cache_info->cache */
788 query_cache_info->cache = (char *)malloc(DEFAULT_CACHE_SIZE);
789 if (malloc_failed(query_cache_info->cache))
791 query_cache_info->cache_size = DEFAULT_CACHE_SIZE;
792 query_cache_info->cache_offset = 0;
794 /* save database name */
795 query_cache_info->db_name = (char *)malloc(strlen(database)+1);
796 if (malloc_failed(query_cache_info->db_name))
798 strcpy(query_cache_info->db_name, database);
803 /* --------------------------------
804 * free_query_cache_info() - free query_cache_info and its members
805 * --------------------------------
808 free_query_cache_info(void)
810 if (query_cache_info == NULL)
813 free(query_cache_info->md5_query);
814 free(query_cache_info->query);
815 free(query_cache_info->cache);
816 free(query_cache_info->db_name);
817 free(query_cache_info->create_time);
818 free(query_cache_info);
819 query_cache_info = NULL;
822 /* --------------------------------
823 * malloc_failed() - checks if the caller's most recent malloc() has succeeded
825 * returns 0 if malloc() was a success, -1 otherwise
826 `* --------------------------------
829 malloc_failed(void *p)
834 pool_error("pool_query_cache: malloc() failed");
835 free_query_cache_info();
840 /* --------------------------------
841 * write_cache() - append result data to buffer
843 * returns 0 on success, -1 otherwise
844 * --------------------------------
847 write_cache(void *buf, int len)
854 required_len = query_cache_info->cache_offset + len;
855 if (required_len > query_cache_info->cache_size)
859 required_len = query_cache_info->cache_size * 2;
860 ptr = (char *)realloc(query_cache_info->cache, required_len);
861 if (malloc_failed(ptr))
864 query_cache_info->cache = ptr;
865 query_cache_info->cache_size = required_len;
867 pool_debug("pool_query_cache: extended cache buffer size to %d", query_cache_info->cache_size);
870 memcpy(query_cache_info->cache + query_cache_info->cache_offset, buf, len);
871 query_cache_info->cache_offset += len;
876 /* --------------------------------
877 * pq_time_to_str() - convert time_t to ISO standard time output string
879 * returns a pointer to newly allocated string, NULL if malloc fails
880 * --------------------------------
883 pq_time_to_str(time_t t)
890 strftime(iso_time, sizeof(iso_time), "%Y-%m-%d %H:%M:%S%z", tm);
892 time_p = strdup(iso_time);
898 /* --------------------------------
899 * system_db_connection_exists() - checks and if a connection to the SystemDB exists
901 * if not connected, it makes an attempt to connect to the SystemDB. If a connection
902 * exists, returns 1, returns 0 otherwise.
903 * --------------------------------
906 system_db_connection_exists(void)
908 if (!system_db_info->pgconn ||
909 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
911 if (system_db_connect())
919 /* --------------------------------
920 * define_prepared_statements() - defines prepared statements for the current session
921 * --------------------------------
924 define_prepared_statements(void)
931 strlen(pool_config->system_db_schema) +
932 strlen(QUERY_CACHE_TABLE_NAME) +
935 sql = (char *)malloc(sql_len);
936 if (malloc_failed(sql))
938 pool_error("pool_query_cache: malloc() failed");
942 free(CACHE_TABLE_INFO.register_prepared_statement);
943 CACHE_TABLE_INFO.register_prepared_statement
944 = strdup(CACHE_REGISTER_PREPARED_STMT);
945 if (malloc_failed(CACHE_TABLE_INFO.register_prepared_statement))
947 pool_error("pool_query_cache: malloc() failed");
952 #ifdef HAVE_PQPREPARE
953 snprintf(sql, sql_len,
954 "INSERT INTO %s.%s VALUES ( $1, $2, $3, $4, $5 )",
955 pool_config->system_db_schema,
956 QUERY_CACHE_TABLE_NAME);
957 pg_result = PQprepare(system_db_info->pgconn,
958 CACHE_TABLE_INFO.register_prepared_statement,
963 snprintf(sql, sql_len,
964 "PREPARE %s (TEXT, TEXT, BYTEA, TEXT, TIMESTAMP WITH TIME ZONE) AS INSERT INTO %s.%s VALUES ( $1, $2, $3, $4, $5 )",
965 CACHE_TABLE_INFO.register_prepared_statement,
966 pool_config->system_db_schema,
967 QUERY_CACHE_TABLE_NAME);
968 pg_result = PQexec(system_db_info->pgconn, sql);
970 if (!pg_result || PQresultStatus(pg_result) != PGRES_COMMAND_OK)
972 pool_error("pool_query_cache: PQprepare() failed: %s", PQerrorMessage(system_db_info->pgconn));
973 free(CACHE_TABLE_INFO.register_prepared_statement);
978 pool_debug("pool_query_cache: prepared statements created");
979 CACHE_TABLE_INFO.has_prepared_statement = 1;