3 * $Header: /cvsroot/pgpool/pgpool-II/pool_system.c,v 1.5.2.1 2009/08/22 04:19:49 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2008 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * pool_system.c: systemdb
30 static int create_prepared_statement(DistDefInfo *dist_info);
31 static int get_col_list(DistDefInfo *info);
32 static int get_col_list2(RepliDefInfo *info);
35 int get_col_list(DistDefInfo *info)
38 static char sql[1024];
41 if (!system_db_info->pgconn ||
42 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
44 if (system_db_connect())
48 for (i = 0; i < info->col_num; i++)
52 "SELECT col_list[%d],type_list[%d] FROM %s.dist_def where dbname = '%s' and schema_name = '%s' and table_name = '%s'",
55 pool_config->system_db_schema,
56 info->dbname,info->schema_name,
59 result = PQexec(system_db_info->pgconn, sql);
61 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
63 pool_error("get_col_list :PQexec failed: %s",
64 PQerrorMessage(system_db_info->pgconn));
69 info->col_list[i] = malloc(strlen(PQgetvalue(result,0,0)) + 1);
70 info->type_list[i] = malloc(strlen(PQgetvalue(result,0,1)) + 1);
72 if (info->col_list[i] == NULL || info->type_list[i] == NULL)
74 pool_error("get_col_list: malloc failed: %s", strerror(errno));
78 strcpy(info->col_list[i],PQgetvalue(result,0,0));
79 strcpy(info->type_list[i],PQgetvalue(result,0,1));
80 if (strcmp(info->col_list[i], info->dist_key_col_name) == 0)
81 info->dist_key_col_id = i;
89 int get_col_list2(RepliDefInfo *info)
92 static char sql[1024];
95 if (!system_db_info->pgconn ||
96 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
98 if (system_db_connect())
102 for (i = 0; i < info->col_num; i++)
106 "SELECT col_list[%d],type_list[%d] FROM %s.replicate_def where dbname = '%s' and schema_name = '%s' and table_name = '%s'",
109 pool_config->system_db_schema,
110 info->dbname,info->schema_name,
113 result = PQexec(system_db_info->pgconn, sql);
115 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
117 pool_error("get_col_list2: PQexec failed: %s",
118 PQerrorMessage(system_db_info->pgconn));
123 info->col_list[i] = malloc(strlen(PQgetvalue(result,0,0)) + 1);
124 info->type_list[i] = malloc(strlen(PQgetvalue(result,0,1)) + 1);
126 if (info->col_list[i] == NULL || info->type_list[i] == NULL)
128 pool_error("get_col_list2: malloc failed: %s", strerror(errno));
132 strcpy(info->col_list[i],PQgetvalue(result,0,0));
133 strcpy(info->type_list[i],PQgetvalue(result,0,1));
142 * Connects System DB by PQconnectdb().
144 int system_db_connect (void)
146 static char conninfo[1024];
151 "host='%s' port=%d dbname='%s' user='%s' password='%s'",
152 system_db_info->info->hostname,
153 system_db_info->info->port,
154 system_db_info->info->database_name,
155 system_db_info->info->user,
156 system_db_info->info->password);
158 system_db_info->pgconn = PQconnectdb(conninfo);
160 if (PQstatus(system_db_info->pgconn) != CONNECTION_OK)
162 pool_error("Connection to database failed: %s",
163 PQerrorMessage(system_db_info->pgconn));
164 PQfinish(system_db_info->pgconn);
165 system_db_info->pgconn = NULL;
169 for (i = 0; i < system_db_info->info->dist_def_num; i++)
171 DistDefInfo *info = &system_db_info->info->dist_def_slot[i];
172 info->is_created_prepare = 0;
175 system_db_info->info->query_cache_table_info.has_prepared_statement = 0;
181 * pool_memset_system_db_info:
182 * Initializes distribution rules. Distribution rules are stored in
183 * System DB. So we have to execute query, and expand results on
186 int pool_memset_system_db_info (SystemDBInfo *info)
189 static char sql[1024],sql2[1024];
191 DistDefInfo *dist_info = NULL;
192 RepliDefInfo *repli_info = NULL;
194 if (!system_db_info->pgconn ||
195 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
197 if (system_db_connect())
201 /* get distribution rules */
204 "SELECT dbname, schema_name, table_name,col_name,array_upper(col_list,1),col_list,type_list, dist_def_func FROM %s.dist_def",
205 pool_config->system_db_schema);
207 result = PQexec(system_db_info->pgconn, sql);
208 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
210 pool_error("PQexec failed: %s", PQerrorMessage(system_db_info->pgconn));
215 info->dist_def_num = PQntuples(result);
216 if (info->dist_def_num != 0)
218 dist_info = malloc(sizeof(DistDefInfo) * info->dist_def_num);
221 if (dist_info == NULL && info->dist_def_num != 0)
223 pool_error("pool_memset_system_db_info: malloc failed: %s",
226 pool_close_libpq_connection();
230 info->dist_def_slot = dist_info;
232 for (i = 0; i < PQntuples(result); ++i)
237 char *t_dist_key_col_name;
238 char *t_dist_def_func;
242 num = atol(PQgetvalue(result, i ,4));
243 t_dbname = malloc(strlen(PQgetvalue(result,i,0)) + 1);
244 if (t_dbname == NULL)
246 pool_error("pool_memset_system_db_info: malloc failed: %s",
249 pool_close_libpq_connection();
252 strcpy(t_dbname, PQgetvalue(result,i,0));
253 dist_info[i].dbname = t_dbname;
255 t_schema_name = malloc(strlen(PQgetvalue(result,i,1)) + 1);
256 if (t_schema_name == NULL)
258 pool_error("pool_memset_system_db_info: malloc failed: %s",
261 pool_close_libpq_connection();
264 strcpy(t_schema_name, PQgetvalue(result,i,1));
265 dist_info[i].schema_name = t_schema_name;
267 t_table_name = malloc(strlen(PQgetvalue(result,i,2)) + 1);
268 if (t_table_name == NULL)
270 pool_error("pool_memset_system_db_info: malloc failed: %s",
273 pool_close_libpq_connection();
276 strcpy(t_table_name, PQgetvalue(result,i,2));
277 dist_info[i].table_name = t_table_name;
279 t_dist_key_col_name = malloc(strlen(PQgetvalue(result,i,3)) + 1);
280 if (t_dist_key_col_name == NULL)
282 pool_error("pool_memset_system_db_info: malloc failed: %s",
285 pool_close_libpq_connection();
288 strcpy(t_dist_key_col_name, PQgetvalue(result,i,3));
289 dist_info[i].dist_key_col_name = t_dist_key_col_name;
291 t_dist_def_func = malloc(strlen(PQgetvalue(result,i,7)) + 1);
292 if (t_dist_def_func == NULL)
294 pool_error("pool_memset_system_db_info: malloc failed: %s",
297 pool_close_libpq_connection();
300 strcpy(t_dist_def_func, PQgetvalue(result,i,7));
301 dist_info[i].dist_def_func = t_dist_def_func;
303 dist_info[i].col_num = num;
305 dist_info[i].col_list = calloc(num, sizeof(char *));
306 dist_info[i].type_list = calloc(num, sizeof(char *));
307 if (dist_info[i].col_list == NULL || dist_info[i].type_list == NULL)
309 pool_error("pool_memset_system_db_info: calloc failed: %s",
312 pool_close_libpq_connection();
316 if (get_col_list(&dist_info[i]) < 0)
318 pool_error("get_col_list() failed");
320 pool_close_libpq_connection();
324 /* create PREPARE statement */
325 len = strlen(t_dbname) + strlen(t_schema_name) +
326 strlen(t_table_name) + strlen("pgpool_");
328 dist_info[i].prepare_name = malloc(len + 1);
329 if (dist_info[i].prepare_name == NULL)
331 pool_error("pool_memset_system_db_info: malloc failed: %s",
336 snprintf(dist_info[i].prepare_name, len+1, "pgpool_%s%s%s",
337 t_dbname, t_schema_name, t_table_name);
338 dist_info[i].prepare_name[len] = '\0';
344 /* get replication rules */
347 "SELECT dbname, schema_name, table_name, array_upper(col_list,1),col_list,type_list FROM %s.replicate_def",
348 pool_config->system_db_schema);
350 result = PQexec(system_db_info->pgconn, sql2);
354 pool_error("PQexec failed: %s", PQerrorMessage(system_db_info->pgconn));
357 else if (PQresultStatus(result) != PGRES_TUPLES_OK)
359 info->repli_def_num = 0;
360 info->repli_def_slot = NULL;
364 info->repli_def_num = PQntuples(result);
365 if (info->repli_def_num != 0)
367 repli_info = malloc(sizeof(RepliDefInfo) * info->repli_def_num);
370 if (repli_info == NULL && info->repli_def_num != 0)
372 pool_error("pool_memset_system_db_info: malloc failed: %s",
375 pool_close_libpq_connection();
379 info->repli_def_slot = repli_info;
381 for (i = 0; i < PQntuples(result); ++i)
389 num = atol(PQgetvalue(result, i ,3));
390 t_dbname = malloc(strlen(PQgetvalue(result,i,0)) + 1);
391 if (t_dbname == NULL)
393 pool_error("pool_memset_system_db_info: malloc failed: %s",
396 pool_close_libpq_connection();
399 strcpy(t_dbname, PQgetvalue(result,i,0));
400 repli_info[i].dbname = t_dbname;
402 t_schema_name = malloc(strlen(PQgetvalue(result,i,1)) + 1);
403 if (t_schema_name == NULL)
405 pool_error("pool_memset_system_db_info: malloc failed: %s",
408 pool_close_libpq_connection();
411 strcpy(t_schema_name, PQgetvalue(result,i,1));
412 repli_info[i].schema_name = t_schema_name;
414 t_table_name = malloc(strlen(PQgetvalue(result,i,2)) + 1);
415 if (t_table_name == NULL)
417 pool_error("pool_memset_system_db_info: malloc failed: %s",
420 pool_close_libpq_connection();
423 strcpy(t_table_name, PQgetvalue(result,i,2));
424 repli_info[i].table_name = t_table_name;
426 repli_info[i].col_num = num;
428 repli_info[i].col_list = calloc(num, sizeof(char *));
429 repli_info[i].type_list = calloc(num, sizeof(char *));
430 if (repli_info[i].col_list == NULL || repli_info[i].type_list == NULL)
432 pool_error("pool_memset_system_db_info: calloc failed: %s",
435 pool_close_libpq_connection();
439 if (get_col_list2(&repli_info[i]) < 0)
441 pool_error("get_col_list() failed");
443 pool_close_libpq_connection();
447 /* create PREPARE statement */
448 len = strlen(t_dbname) + strlen(t_schema_name) +
449 strlen(t_table_name) + strlen("pgpool_");
451 repli_info[i].prepare_name = malloc(len + 1);
452 if (repli_info[i].prepare_name == NULL)
454 pool_error("pool_memset_system_db_info: malloc failed: %s",
459 snprintf(repli_info[i].prepare_name, len+1, "pgpool_%s%s%s",
460 t_dbname, t_schema_name, t_table_name);
461 repli_info[i].prepare_name[len] = '\0';
467 pool_close_libpq_connection();
472 * pool_get_dist_def_info:
473 * Looks up distribution rule with dbname, schema_name and table_name.
475 DistDefInfo *pool_get_dist_def_info (char *dbname, char *schema_name, char *table_name)
478 int dist_def_num = system_db_info->info->dist_def_num;
479 char *public ="public";
481 if (!dbname || !table_name)
488 schema_name = public;
491 for (i = 0; i < dist_def_num; i++)
494 char *mem_schema_name;
495 char *mem_table_name;
497 mem_dbname = system_db_info->info->dist_def_slot[i].dbname;
498 mem_schema_name = system_db_info->info->dist_def_slot[i].schema_name;
499 mem_table_name = system_db_info->info->dist_def_slot[i].table_name;
501 if ((strcmp(mem_dbname, dbname) == 0) &&
502 (strcmp(mem_schema_name, schema_name) == 0) &&
503 (strcmp(mem_table_name, table_name) ==0))
505 return &system_db_info->info->dist_def_slot[i];
512 * pool_get_repli_def_info:
513 * Looks up replication rule with dbname, schema_name and table_name.
515 RepliDefInfo *pool_get_repli_def_info (char *dbname, char *schema_name, char *table_name)
518 int repli_def_num = system_db_info->info->repli_def_num;
519 char *public ="public";
521 if (!dbname || !table_name)
528 schema_name = public;
531 for (i = 0; i < repli_def_num; i++)
534 char *mem_schema_name;
535 char *mem_table_name;
537 mem_dbname = system_db_info->info->repli_def_slot[i].dbname;
538 mem_schema_name = system_db_info->info->repli_def_slot[i].schema_name;
539 mem_table_name = system_db_info->info->repli_def_slot[i].table_name;
541 if ((strcmp(mem_dbname, dbname) == 0) &&
542 (strcmp(mem_schema_name, schema_name) == 0) &&
543 (strcmp(mem_table_name, table_name) ==0))
545 return &system_db_info->info->repli_def_slot[i];
553 * Returns the backend node id from value.
555 int pool_get_id (DistDefInfo *info, const char *value)
562 if (!system_db_info->pgconn ||
563 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
565 if (system_db_connect())
569 if (info->is_created_prepare == 0)
571 if (create_prepared_statement(info) != 0)
575 type=info->type_list[info->dist_key_col_id];
576 length = strlen(value);
577 result = PQexecPrepared(system_db_info->pgconn, info->prepare_name,
578 1, &value, &length, NULL, 0);
580 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK ||
581 PQgetisnull(result, 0, 0))
583 pool_error("PQexecPrepared failed: %s", PQerrorMessage(system_db_info->pgconn));
589 id = PQgetvalue(result, 0 ,0);
596 if(num < NUM_BACKENDS)
608 * pool_close_libpq_connection:
609 * Closes libpq's connection.
611 void pool_close_libpq_connection(void)
613 PQfinish(system_db_info->pgconn);
614 system_db_info->pgconn = NULL;
618 * pool_system_db_connection:
619 * Returns persistent connection to the system DB
621 POOL_CONNECTION_POOL_SLOT *pool_system_db_connection(void)
623 return system_db_info->connection;
627 * create_prepared_statement:
628 * Returns 0 if prepared statement is created.
629 * Returns 1 if prepared statement can't created.
631 static int create_prepared_statement(DistDefInfo *dist_info)
633 static char sql[1024];
636 #ifdef HAVE_PQPREPARE
637 snprintf(sql, 1024, "SELECT %s($1::%s)", dist_info->dist_def_func,
638 dist_info->type_list[dist_info->dist_key_col_id]);
639 result = PQprepare(system_db_info->pgconn,
640 dist_info->prepare_name,
643 snprintf(sql, 1024, "PREPARE %s (%s) AS SELECT %s($1::%s)",
644 dist_info->prepare_name,
645 dist_info->type_list[dist_info->dist_key_col_id],
646 dist_info->dist_def_func,
647 dist_info->type_list[dist_info->dist_key_col_id]);
648 result = PQexec(system_db_info->pgconn, sql);
649 #endif /* HAVE_PQPREPARE */
651 if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
653 pool_error("PQprepare failed: %s", PQerrorMessage(system_db_info->pgconn));
656 dist_info->is_created_prepare = 1;