]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_system.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_system.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_system.c,v 1.5.2.1 2009/08/22 04:19:49 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2008      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_system.c: systemdb
22  *
23  */
24
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include "pool.h"
29
30 static int create_prepared_statement(DistDefInfo *dist_info);
31 static int  get_col_list(DistDefInfo *info);
32 static int  get_col_list2(RepliDefInfo *info);
33
34 static
35 int  get_col_list(DistDefInfo *info)
36 {
37         int i;
38         static char sql[1024];
39         PGresult *result;
40
41         if (!system_db_info->pgconn ||
42                 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
43         {
44                 if (system_db_connect())
45                         return -1;
46         }
47
48         for (i = 0; i < info->col_num; i++)
49         {
50                 snprintf(sql,
51                                  sizeof(sql),
52                                  "SELECT col_list[%d],type_list[%d] FROM %s.dist_def where dbname = '%s' and schema_name = '%s' and table_name = '%s'",
53                                  i + 1,
54                                  i + 1,
55                                  pool_config->system_db_schema,
56                                  info->dbname,info->schema_name,
57                                  info->table_name);
58
59                 result = PQexec(system_db_info->pgconn, sql);
60
61                 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
62                 {
63                         pool_error("get_col_list :PQexec failed: %s",
64                                            PQerrorMessage(system_db_info->pgconn));
65                         return -1;
66                 }
67                 else
68                 {
69                         info->col_list[i]  = malloc(strlen(PQgetvalue(result,0,0)) + 1);
70                         info->type_list[i] = malloc(strlen(PQgetvalue(result,0,1)) + 1);
71
72                         if (info->col_list[i] == NULL || info->type_list[i] == NULL)
73                         {
74                                 pool_error("get_col_list: malloc failed: %s", strerror(errno));
75                                 PQclear(result);
76                                 return -1;
77                         }
78                         strcpy(info->col_list[i],PQgetvalue(result,0,0));
79                         strcpy(info->type_list[i],PQgetvalue(result,0,1));
80                         if (strcmp(info->col_list[i], info->dist_key_col_name) == 0)
81                                 info->dist_key_col_id = i;
82                         PQclear(result);
83                 }
84         }
85         return 0;
86 }
87
88 static
89 int  get_col_list2(RepliDefInfo *info)
90 {
91         int i;
92         static char sql[1024];
93         PGresult *result;
94
95         if (!system_db_info->pgconn ||
96                 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
97         {
98                 if (system_db_connect())
99                         return -1;
100         }
101
102         for (i = 0; i < info->col_num; i++)
103         {
104                 snprintf(sql,
105                                  sizeof(sql),
106                                  "SELECT col_list[%d],type_list[%d] FROM %s.replicate_def where dbname = '%s' and schema_name = '%s' and table_name = '%s'",
107                                  i + 1,
108                                  i + 1,
109                                  pool_config->system_db_schema,
110                                  info->dbname,info->schema_name,
111                                  info->table_name);
112
113                 result = PQexec(system_db_info->pgconn, sql);
114
115                 if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
116                 {
117                         pool_error("get_col_list2: PQexec failed: %s",
118                                            PQerrorMessage(system_db_info->pgconn));
119                         return -1;
120                 }
121                 else
122                 {
123                         info->col_list[i]  = malloc(strlen(PQgetvalue(result,0,0)) + 1);
124                         info->type_list[i] = malloc(strlen(PQgetvalue(result,0,1)) + 1);
125
126                         if (info->col_list[i] == NULL || info->type_list[i] == NULL)
127                         {
128                                 pool_error("get_col_list2: malloc failed: %s", strerror(errno));
129                                 PQclear(result);
130                                 return -1;
131                         }
132                         strcpy(info->col_list[i],PQgetvalue(result,0,0));
133                         strcpy(info->type_list[i],PQgetvalue(result,0,1));
134                         PQclear(result);
135                 }
136         }
137         return 0;
138 }
139
140 /*
141  * system_db_connect:
142  *     Connects System DB by PQconnectdb().
143  */
144 int system_db_connect (void)
145 {
146         static char conninfo[1024];
147         int i;
148
149         snprintf(conninfo,
150                          sizeof(conninfo),
151                          "host='%s' port=%d dbname='%s' user='%s' password='%s'",
152                          system_db_info->info->hostname,
153                          system_db_info->info->port,
154                          system_db_info->info->database_name,
155                          system_db_info->info->user,
156                          system_db_info->info->password);
157
158         system_db_info->pgconn = PQconnectdb(conninfo);
159
160         if (PQstatus(system_db_info->pgconn) != CONNECTION_OK)
161         {
162                 pool_error("Connection to database failed: %s",
163                                    PQerrorMessage(system_db_info->pgconn));
164                 PQfinish(system_db_info->pgconn);
165                 system_db_info->pgconn = NULL;
166                 return 1;
167         }
168
169         for (i = 0; i < system_db_info->info->dist_def_num; i++)
170         {
171                 DistDefInfo *info = &system_db_info->info->dist_def_slot[i];
172                 info->is_created_prepare = 0;
173         }
174
175         system_db_info->info->query_cache_table_info.has_prepared_statement = 0;
176
177         return 0;
178 }
179
180 /*
181  * pool_memset_system_db_info:
182  *    Initializes distribution rules. Distribution rules are stored in
183  *    System DB. So we have to execute query, and expand results on
184  *    memory.
185  */
186 int pool_memset_system_db_info (SystemDBInfo *info)
187 {
188         int i;
189         static char sql[1024],sql2[1024];
190         PGresult *result;
191         DistDefInfo *dist_info = NULL;
192         RepliDefInfo *repli_info = NULL;
193
194         if (!system_db_info->pgconn ||
195                 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
196         {
197                 if (system_db_connect())
198                         return -1;
199         }
200
201   /* get distribution rules */
202         snprintf(sql,
203                          sizeof(sql),
204                          "SELECT dbname, schema_name, table_name,col_name,array_upper(col_list,1),col_list,type_list, dist_def_func FROM %s.dist_def",
205                          pool_config->system_db_schema);
206
207         result = PQexec(system_db_info->pgconn, sql);
208         if (!result || PQresultStatus(result) != PGRES_TUPLES_OK)
209         {
210                 pool_error("PQexec failed: %s", PQerrorMessage(system_db_info->pgconn));
211                 return -1;
212         }
213         else
214         {
215                 info->dist_def_num = PQntuples(result);
216                 if (info->dist_def_num != 0)
217                 {
218                         dist_info = malloc(sizeof(DistDefInfo) * info->dist_def_num);
219                 }
220
221                 if (dist_info == NULL && info->dist_def_num != 0)
222                 {
223                         pool_error("pool_memset_system_db_info: malloc failed: %s",
224                                            strerror(errno));
225                         PQclear(result);
226                         pool_close_libpq_connection();
227                         return -1;
228                 }
229
230                 info->dist_def_slot = dist_info;
231
232                 for (i = 0; i < PQntuples(result); ++i)
233                 {
234                         char *t_dbname;
235                         char *t_schema_name;
236                         char *t_table_name;
237                         char *t_dist_key_col_name;
238                         char *t_dist_def_func;
239                         int num;
240                         int len;
241
242                         num = atol(PQgetvalue(result, i ,4));
243                         t_dbname = malloc(strlen(PQgetvalue(result,i,0)) + 1);
244                         if (t_dbname == NULL)
245                         {
246                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
247                                                    strerror(errno));
248                                 PQclear(result);
249                                 pool_close_libpq_connection();
250                                 return -1;
251                         }
252                         strcpy(t_dbname, PQgetvalue(result,i,0));
253                         dist_info[i].dbname = t_dbname;
254
255                         t_schema_name = malloc(strlen(PQgetvalue(result,i,1)) + 1);
256                         if (t_schema_name == NULL)
257                         {
258                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
259                                                    strerror(errno));
260                                 PQclear(result);
261                                 pool_close_libpq_connection();
262                                 return -1;
263                         }
264                         strcpy(t_schema_name, PQgetvalue(result,i,1));
265                         dist_info[i].schema_name = t_schema_name;
266
267                         t_table_name = malloc(strlen(PQgetvalue(result,i,2)) + 1);
268                         if (t_table_name == NULL)
269                         {
270                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
271                                                    strerror(errno));
272                                 PQclear(result);
273                                 pool_close_libpq_connection();
274                                 return -1;
275                         }
276                         strcpy(t_table_name, PQgetvalue(result,i,2));
277                         dist_info[i].table_name = t_table_name;
278
279                         t_dist_key_col_name = malloc(strlen(PQgetvalue(result,i,3)) + 1);
280                         if (t_dist_key_col_name == NULL)
281                         {
282                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
283                                                    strerror(errno));
284                                 PQclear(result);
285                                 pool_close_libpq_connection();
286                                 return -1;
287                         }
288                         strcpy(t_dist_key_col_name, PQgetvalue(result,i,3));
289                         dist_info[i].dist_key_col_name = t_dist_key_col_name;
290
291                         t_dist_def_func = malloc(strlen(PQgetvalue(result,i,7)) + 1);
292                         if (t_dist_def_func == NULL)
293                         {
294                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
295                                                    strerror(errno));
296                                 PQclear(result);
297                                 pool_close_libpq_connection();
298                                 return -1;
299                         }
300                         strcpy(t_dist_def_func, PQgetvalue(result,i,7));
301                         dist_info[i].dist_def_func = t_dist_def_func;
302
303                         dist_info[i].col_num = num;
304
305                         dist_info[i].col_list = calloc(num, sizeof(char *));
306                         dist_info[i].type_list = calloc(num, sizeof(char *));
307                         if (dist_info[i].col_list == NULL || dist_info[i].type_list == NULL)
308                         {
309                                 pool_error("pool_memset_system_db_info: calloc failed: %s",
310                                                    strerror(errno));
311                                 PQclear(result);
312                                 pool_close_libpq_connection();
313                                 return -1;
314                         }
315
316                         if (get_col_list(&dist_info[i]) < 0)
317                         {
318                                 pool_error("get_col_list() failed");
319                                 PQclear(result);
320                                 pool_close_libpq_connection();
321                                 return -1;
322                         }
323
324                         /* create PREPARE statement */
325                         len = strlen(t_dbname) + strlen(t_schema_name) +
326                                 strlen(t_table_name) + strlen("pgpool_");
327
328                         dist_info[i].prepare_name = malloc(len + 1);
329                         if (dist_info[i].prepare_name == NULL)
330                         {
331                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
332                                                    strerror(errno));
333                                 return -1;
334                         }
335
336                         snprintf(dist_info[i].prepare_name, len+1, "pgpool_%s%s%s",
337                                          t_dbname, t_schema_name, t_table_name);
338                         dist_info[i].prepare_name[len] = '\0';
339                 }
340         }
341
342         PQclear(result);
343
344   /* get replication rules */
345         snprintf(sql2,
346                          sizeof(sql2),
347                          "SELECT dbname, schema_name, table_name, array_upper(col_list,1),col_list,type_list FROM %s.replicate_def",
348                          pool_config->system_db_schema);
349
350         result = PQexec(system_db_info->pgconn, sql2);
351
352         if (!result)
353         {
354                 pool_error("PQexec failed: %s", PQerrorMessage(system_db_info->pgconn));
355                 return -1;
356         }
357         else if (PQresultStatus(result) != PGRES_TUPLES_OK)
358         {
359                 info->repli_def_num = 0;
360                 info->repli_def_slot = NULL;
361         }
362         else
363         {
364                 info->repli_def_num = PQntuples(result);
365                 if (info->repli_def_num != 0)
366                 {
367                         repli_info = malloc(sizeof(RepliDefInfo) * info->repli_def_num);
368                 }
369
370                 if (repli_info == NULL && info->repli_def_num != 0)
371                 {
372                         pool_error("pool_memset_system_db_info: malloc failed: %s",
373                                            strerror(errno));
374                         PQclear(result);
375                         pool_close_libpq_connection();
376                         return -1;
377                 }
378
379                 info->repli_def_slot = repli_info;
380
381                 for (i = 0; i < PQntuples(result); ++i)
382                 {
383                         char *t_dbname;
384                         char *t_schema_name;
385                         char *t_table_name;
386                         int num;
387                         int len;
388
389                         num = atol(PQgetvalue(result, i ,3));
390                         t_dbname = malloc(strlen(PQgetvalue(result,i,0)) + 1);
391                         if (t_dbname == NULL)
392                         {
393                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
394                                                    strerror(errno));
395                                 PQclear(result);
396                                 pool_close_libpq_connection();
397                                 return -1;
398                         }
399                         strcpy(t_dbname, PQgetvalue(result,i,0));
400                         repli_info[i].dbname = t_dbname;
401
402                         t_schema_name = malloc(strlen(PQgetvalue(result,i,1)) + 1);
403                         if (t_schema_name == NULL)
404                         {
405                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
406                                                    strerror(errno));
407                                 PQclear(result);
408                                 pool_close_libpq_connection();
409                                 return -1;
410                         }
411                         strcpy(t_schema_name, PQgetvalue(result,i,1));
412                         repli_info[i].schema_name = t_schema_name;
413
414                         t_table_name = malloc(strlen(PQgetvalue(result,i,2)) + 1);
415                         if (t_table_name == NULL)
416                         {
417                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
418                                                    strerror(errno));
419                                 PQclear(result);
420                                 pool_close_libpq_connection();
421                                 return -1;
422                         }
423                         strcpy(t_table_name, PQgetvalue(result,i,2));
424                         repli_info[i].table_name = t_table_name;
425
426                         repli_info[i].col_num = num;
427
428                         repli_info[i].col_list = calloc(num, sizeof(char *));
429                         repli_info[i].type_list = calloc(num, sizeof(char *));
430                         if (repli_info[i].col_list == NULL || repli_info[i].type_list == NULL)
431                         {
432                                 pool_error("pool_memset_system_db_info: calloc failed: %s",
433                                                    strerror(errno));
434                                 PQclear(result);
435                                 pool_close_libpq_connection();
436                                 return -1;
437                         }
438
439                         if (get_col_list2(&repli_info[i]) < 0)
440                         {
441                                 pool_error("get_col_list() failed");
442                                 PQclear(result);
443                                 pool_close_libpq_connection();
444                                 return -1;
445                         }
446
447                         /* create PREPARE statement */
448                         len = strlen(t_dbname) + strlen(t_schema_name) +
449                                 strlen(t_table_name) + strlen("pgpool_");
450
451                         repli_info[i].prepare_name = malloc(len + 1);
452                         if (repli_info[i].prepare_name == NULL)
453                         {
454                                 pool_error("pool_memset_system_db_info: malloc failed: %s",
455                                                    strerror(errno));
456                                 return -1;
457                         }
458
459                         snprintf(repli_info[i].prepare_name, len+1, "pgpool_%s%s%s",
460                                          t_dbname, t_schema_name, t_table_name);
461                         repli_info[i].prepare_name[len] = '\0';
462         }
463         }
464
465         PQclear(result);
466
467         pool_close_libpq_connection();
468         return i;
469 }
470
471 /*
472  * pool_get_dist_def_info:
473  *    Looks up distribution rule with dbname, schema_name and table_name.
474  */
475 DistDefInfo *pool_get_dist_def_info (char *dbname, char *schema_name, char *table_name)
476 {
477         int i;
478         int dist_def_num = system_db_info->info->dist_def_num;
479         char *public ="public";
480
481         if (!dbname || !table_name)
482         {
483                 return NULL;
484         }
485
486         if (!schema_name)
487         {
488                 schema_name = public;
489         }
490
491         for (i = 0; i < dist_def_num; i++)
492         {
493                 char *mem_dbname;
494                 char *mem_schema_name;
495                 char *mem_table_name;
496
497                 mem_dbname = system_db_info->info->dist_def_slot[i].dbname;
498                 mem_schema_name = system_db_info->info->dist_def_slot[i].schema_name;
499                 mem_table_name  = system_db_info->info->dist_def_slot[i].table_name;
500
501                 if ((strcmp(mem_dbname, dbname) == 0) &&
502                         (strcmp(mem_schema_name, schema_name) == 0) &&
503                         (strcmp(mem_table_name, table_name) ==0))
504                 {
505                         return &system_db_info->info->dist_def_slot[i];
506                 }
507         }
508         return NULL;
509 }
510
511 /*
512  * pool_get_repli_def_info:
513  *    Looks up replication rule with dbname, schema_name and table_name.
514  */
515 RepliDefInfo *pool_get_repli_def_info (char *dbname, char *schema_name, char *table_name)
516 {
517         int i;
518         int repli_def_num = system_db_info->info->repli_def_num;
519         char *public ="public";
520
521         if (!dbname || !table_name)
522         {
523                 return NULL;
524         }
525
526         if (!schema_name)
527         {
528                 schema_name = public;
529         }
530
531         for (i = 0; i < repli_def_num; i++)
532         {
533                 char *mem_dbname;
534                 char *mem_schema_name;
535                 char *mem_table_name;
536
537                 mem_dbname = system_db_info->info->repli_def_slot[i].dbname;
538                 mem_schema_name = system_db_info->info->repli_def_slot[i].schema_name;
539                 mem_table_name  = system_db_info->info->repli_def_slot[i].table_name;
540
541                 if ((strcmp(mem_dbname, dbname) == 0) &&
542                         (strcmp(mem_schema_name, schema_name) == 0) &&
543                         (strcmp(mem_table_name, table_name) ==0))
544                 {
545                         return &system_db_info->info->repli_def_slot[i];
546                 }
547         }
548         return NULL;
549 }
550
551 /*
552  * pool_get_id:
553  *    Returns the backend node id from value.
554  */
555 int pool_get_id (DistDefInfo *info, const char *value)
556 {
557         int num;
558         PGresult *result;
559         char *type;
560         int length;
561
562         if (!system_db_info->pgconn ||
563                 (PQstatus(system_db_info->pgconn) != CONNECTION_OK))
564         {
565                 if (system_db_connect())
566                         return -1;
567         }
568
569         if (info->is_created_prepare == 0)
570         {
571                 if (create_prepared_statement(info) != 0)
572                         return -1;
573         }
574
575         type=info->type_list[info->dist_key_col_id];
576         length = strlen(value);
577         result = PQexecPrepared(system_db_info->pgconn, info->prepare_name,
578                                                         1, &value, &length, NULL, 0);
579
580         if (!result || PQresultStatus(result) != PGRES_TUPLES_OK ||
581                 PQgetisnull(result, 0, 0))
582         {
583                 pool_error("PQexecPrepared failed: %s", PQerrorMessage(system_db_info->pgconn));
584                 return -1;
585         }
586         else
587         {
588                 char *id;
589                 id = PQgetvalue(result, 0 ,0);
590
591                 if(strlen(id))
592                 {
593                         num = atoi(id);
594                         PQclear(result);
595
596                         if(num < NUM_BACKENDS)
597                         {
598                                 return num;
599                         } else {
600                                 return -1;
601                         }
602                 }
603                 return -1;
604         }
605 }
606
607 /*
608  * pool_close_libpq_connection:
609  *     Closes libpq's connection.
610  */
611 void pool_close_libpq_connection(void)
612 {
613         PQfinish(system_db_info->pgconn);
614         system_db_info->pgconn = NULL;
615 }
616
617 /*
618  * pool_system_db_connection:
619  *     Returns persistent connection to the system DB
620  */
621 POOL_CONNECTION_POOL_SLOT *pool_system_db_connection(void)
622 {
623         return system_db_info->connection;
624 }
625
626 /*
627  * create_prepared_statement:
628  *     Returns 0 if prepared statement is created.
629  *     Returns 1 if prepared statement can't created.
630  */
631 static int create_prepared_statement(DistDefInfo *dist_info)
632 {
633         static char sql[1024];
634         PGresult *result;
635
636 #ifdef HAVE_PQPREPARE
637         snprintf(sql, 1024, "SELECT %s($1::%s)", dist_info->dist_def_func,
638                          dist_info->type_list[dist_info->dist_key_col_id]);
639         result = PQprepare(system_db_info->pgconn,
640                                            dist_info->prepare_name,
641                                            sql, 1, NULL);
642 #else
643         snprintf(sql, 1024, "PREPARE %s (%s) AS SELECT %s($1::%s)",
644                          dist_info->prepare_name,
645                          dist_info->type_list[dist_info->dist_key_col_id],
646                          dist_info->dist_def_func,
647                          dist_info->type_list[dist_info->dist_key_col_id]);
648         result = PQexec(system_db_info->pgconn, sql);
649 #endif /* HAVE_PQPREPARE */
650
651         if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
652         {
653                 pool_error("PQprepare failed: %s", PQerrorMessage(system_db_info->pgconn));
654                 return 1;
655         }
656         dist_info->is_created_prepare = 1;
657         return 0;
658 }