]> git.8kb.co.uk Git - pgpool-ii/pgpool-ii_2.2.5/blob - pool_rewrite_query.c
Attempt to send a proper failure message to frontend when authentication
[pgpool-ii/pgpool-ii_2.2.5] / pool_rewrite_query.c
1 /* -*-pgsql-c-*- */
2 /*
3  * $Header: /cvsroot/pgpool/pgpool-II/pool_rewrite_query.c,v 1.13.2.1 2009/08/22 04:19:49 t-ishii Exp $
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2009      PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_rewrite_query.c: rewrite_query
22  *
23  */
24
25 #include "pool.h"
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include "pool_rewrite_query.h"
30
31
32 static int getInsertRule(ListCell *lc,List *list_t ,DistDefInfo *info, int div_key_num);
33 static void examInsertStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message);
34 static void examSelectStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message);
35 static char *delimistr(char *str);
36 static int direct_parallel_query(RewriteQuery *message);
37 static void initMessage(RewriteQuery *message);
38 static void initdblink(ConInfoTodblink *dblink, POOL_CONNECTION_POOL *backend);
39 static void analyze_debug(RewriteQuery *message);
40
41
42 /* create error message  */
43 char *pool_error_message(char *message)
44 {
45         String *str;
46
47         str = init_string("");
48         string_append_char(str,message);
49         return str->data;
50 }
51
52 /*
53  *  search DistDefInfo(this info is build in starting process
54  *  and get node id where a query send.
55  */
56 static int getInsertRule(ListCell *lc,List *list_t ,DistDefInfo *info,int div_key_num)
57 {
58         int loop_counter = 0;
59         int node_number = -1;
60         ListCell *cell;
61
62         if(list_t->length != 1)
63                 return -1;
64
65   cell = list_head(list_t);
66
67         if(!cell && !IsA(cell,List))
68                 return 1;
69
70         foreach(lc,lfirst(cell))
71         {
72                 A_Const *constant;
73                 Value value;
74                 void *obj = NULL;
75
76                 obj = lfirst(lc);
77
78                 if(obj && IsA(obj, TypeCast))
79                 {
80                         TypeCast *type = (TypeCast *) obj;
81                         obj = type->arg;
82
83                         if(!obj)
84                         {
85                                 return -1;
86                         }
87                 }
88
89                 if(obj && !IsA(obj, A_Const))
90                 {
91                         return -1;
92                 }
93
94                 if (loop_counter == div_key_num)
95                 {
96                         constant = (A_Const *) obj;
97                         value = constant->val;
98                         if (value.type == T_Integer)
99                         {
100                                 char temp[16];
101                                 sprintf(temp,"%ld",value.val.ival);
102                                 node_number = pool_get_id(info,temp);
103                                 break;
104                         }
105                         else
106                         {
107                                 if(value.val.str)
108                                         node_number = pool_get_id(info, value.val.str);
109                                 else
110                                         return -1;
111                                 break;
112                         }
113                 }
114                 loop_counter++;
115         }
116         /* if node_number is -1, cannot get return value from pool_get_id() */
117         return node_number;
118 }
119
120 /*
121  * This function processes the decision whether to
122  * distribute the insert sentence to the node.
123  */
124 static void examInsertStmt(Node *node,POOL_CONNECTION_POOL *backend, RewriteQuery *message)
125 {
126         RangeVar *table;
127         int cell_num;
128         int node_number;
129         DistDefInfo *info = NULL;
130         ListCell *lc = NULL;
131         List *list_t = NULL;
132         int div_key_num = 0;
133         int dist_def_flag = 0;
134         InsertStmt *insert = (InsertStmt *) node;
135
136   message->type = node->type;
137
138
139         /* insert target table */
140         table = insert->relation;
141         if (!table)
142         {
143                 /* send  error message to frontend */
144                 message->r_code = INSERT_SQL_RESTRICTION;
145                 message->r_node = -1;
146                 message->rewrite_query = pool_error_message("cannot find table name");
147                 return;
148         }
149
150         /* pool_debug("exam_InsertStmt insert table_name %s:",table->relname); */
151
152         info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
153                                                                   table->schemaname,
154                                                                   table->relname);
155
156         if (!info)
157         {
158                 /* send  error message to frontend */
159                 message->r_code = INSERT_DIST_NO_RULE;
160                 return;
161         }
162
163         /* the source SELECT ? */
164         if (insert->selectStmt && ((SelectStmt *)insert->selectStmt)->targetList)
165         {
166                 /* send  error message to frontend */
167                 message->r_code = INSERT_SQL_RESTRICTION;
168                 message->r_node = -1;
169                 message->rewrite_query = pool_error_message("cannot use SelectStmt in InsertStmt");
170                 return;
171         }
172
173         list_t = (List *)(((SelectStmt *)insert->selectStmt)->valuesLists);
174
175         if (!list_t)
176         {
177                 /* send  error message to frontend */
178                 message->r_code = INSERT_SQL_RESTRICTION;
179                 message->r_node = -1;
180                 message->rewrite_query = pool_error_message("cannot find target List");
181                 return;
182         }
183
184         /* number of target list */
185
186         if(list_t->length == 1 && IsA(lfirst(list_head(list_t)),List))
187         {
188                 cell_num = ((List *) lfirst(list_head(list_t)))->length;
189         }
190         else
191         {
192                         /* send  error message to frontend */
193                         message->r_code = INSERT_SQL_RESTRICTION;
194                         message->r_node = -1;
195                         message->rewrite_query = pool_error_message("cannot analzye this InsertStmt");
196                         return;
197   }
198
199
200         /* Is the target columns ?*/
201         if (!insert->cols)
202         {
203                 div_key_num = info->dist_key_col_id;
204                 dist_def_flag = 1;
205
206                 pool_debug("cell number %d, div key num %d, div_key columname %s",cell_num,div_key_num,info->col_list[div_key_num]);
207
208                 if (cell_num < div_key_num)
209                 {
210                         /* send  error message to frontend */
211                         message->r_code = INSERT_SQL_RESTRICTION;
212                         message->r_node = -1;
213                         message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
214                         return;
215                 }
216
217   }
218         else
219         {
220                 List *list_cols = (List *) insert->cols;
221
222                 foreach(lc, list_cols)
223                 {
224                         Node *n;
225                         ResTarget *target;
226                         n = lfirst(lc);
227                         target = (ResTarget *) n;
228                         if (strcmp(target->name,info->dist_key_col_name) == 0)
229                         {
230                                 dist_def_flag = 1;
231                                 break;
232                         }
233                         div_key_num++;
234                 }
235
236                 if (cell_num < div_key_num)
237                 {
238                         /* send  error message to frontend */
239                         message->r_code = INSERT_SQL_RESTRICTION;
240                         message->r_node = -1;
241                         message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
242                         return;
243                 }
244         }
245
246         if (dist_def_flag != 1)
247         {
248                 /* send  error message to frontend */
249                 message->r_code = INSERT_SQL_RESTRICTION;
250                 message->r_node = -1;
251                 message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
252                 return;
253         }
254
255         /* this loop get insert one args of divide rule */
256         node_number = getInsertRule(lc, list_t, info, div_key_num);
257
258         if (node_number < 0)
259         {
260                 /* send  error message to frontend */
261                 message->r_code = INSERT_SQL_RESTRICTION;
262                 message->r_node = -1;
263                 message->rewrite_query = pool_error_message("cannot get node_id from system db");
264                 return;
265         }
266
267         pool_debug("insert node_number =%d",node_number);
268         message->r_code = 0;
269         message->r_node = node_number;
270         message->rewrite_query = nodeToString(node);
271 }
272
273 /* start of rewriting query */
274 static void examSelectStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message)
275 {
276         static ConInfoTodblink dblink;
277
278         /* initialize dblink info */
279         initdblink(&dblink,backend);
280
281         /* initialize  message */
282         initMessage(message);
283         message->type = node->type;
284         message->r_code = SELECT_DEFAULT;
285
286   /* do rewrite query */
287         nodeToRewriteString(message,&dblink,node);
288 }
289
290 /* initialize Message */
291 static void initMessage(RewriteQuery *message)
292 {
293         message->r_code = 0;
294         message->r_node = 0;
295         message->column = 0;
296         message->virtual_num = 0;
297         message->is_pg_catalog = false;
298         message->is_loadbalance = false;
299         message->is_parallel = false;
300         message->table_relname = NULL;
301         message->table_alias = NULL;
302         message->dbname = NULL;
303         message->schemaname = NULL;
304         message->rewrite_query = NULL;
305         message->rewritelock = -1;
306         message->ignore_rewrite = -1;
307         message->ret_num = 0;
308 }
309
310 /* set dblink info */
311 static void initdblink(ConInfoTodblink *dblink,POOL_CONNECTION_POOL *backend)
312 {
313         dblink->dbname =  MASTER_CONNECTION(backend)->sp->database;
314         dblink->hostaddr = pool_config->pgpool2_hostname;
315         dblink->user = MASTER_CONNECTION(backend)->sp->user;
316         dblink->port = pool_config->port;
317         dblink->password = MASTER_CONNECTION(backend)->con->password;
318 }
319
320 /* reference of pg_catalog or not */
321 int IsSelectpgcatalog(Node *node,POOL_CONNECTION_POOL *backend)
322 {
323         static ConInfoTodblink dblink;
324         static RewriteQuery message;
325
326         /* initialize dblink info */
327         initdblink(&dblink,backend);
328
329         /* initialize  message */
330         initMessage(&message);
331
332         message.type = node->type;
333
334         initdblink(&dblink,backend);
335
336         if(message.is_pg_catalog)
337         {
338                 pool_debug("Isselectpgcatalog %d",message.is_pg_catalog);
339                 return 1;
340         }
341         else
342         {
343                 return 0;
344         }
345 }
346
347 /*
348  *  SELECT statement or INSERT statement is special,
349  *  peculiar process is needed in parallel mode.
350  */
351 RewriteQuery *rewrite_query_stmt(Node *node,POOL_CONNECTION *frontend,POOL_CONNECTION_POOL *backend,RewriteQuery *message)
352 {
353         switch(node->type)
354         {
355                 case T_SelectStmt:
356                 {
357                         SelectStmt *stmt = (SelectStmt *)node;
358
359                          /* Because "SELECT INTO" cannot be used in a parallel mode,
360                           * the error message is generated and send "ready for query" to frontend.
361                           */
362                         if(stmt->intoClause)
363                         {
364                                 pool_send_error_message(frontend, MAJOR(backend), "XX000",
365                                                                                 "pgpool2 sql restriction",
366                                                                                 "cannot use select into ...", "", __FILE__,
367                                                                                 __LINE__);
368
369
370                                 pool_send_readyforquery(frontend);
371                                 message->status=POOL_CONTINUE;
372                                 break;
373                         }
374
375                         /*
376                          * The Query is actually rewritten based on analytical information on the Query.
377                          */
378                         examSelectStmt(node,backend,message);
379
380                         if (message->r_code != SELECT_PGCATALOG &&
381                                 message->r_code != SELECT_RELATION_ERROR)
382                         {
383                                 /*
384                                  * The rewritten Query is transmitted to system db,
385                                  * and execution status is received.
386                                  */
387                                 POOL_CONNECTION_POOL_SLOT *system_db = pool_system_db_connection();
388                                 message->status = OneNode_do_command(frontend,
389                                                                                                         system_db->con,
390                                                                                                         message->rewrite_query,
391                                                                                                         backend->info->database);
392                         }
393                         else
394                         {
395                                 if(TSTATE(backend) == 'T' &&
396                                    message->r_code == SELECT_RELATION_ERROR)
397                                 {
398                                         /*
399                                          * In the case of message->r_code == SELECT_RELATION_ERROR and in the transaction,
400                                          * Transmit the Query to all back ends, and to abort transaction.
401                                          */
402                                         pool_debug("pool_rewrite_stmt(select): Inside transaction. abort transaction");
403                                         message->rewrite_query = nodeToString(node);
404                                         message->status = pool_parallel_exec(frontend,backend,message->rewrite_query,node,true);
405                                 }
406                                 else
407                                 {
408                                         /*
409                                          * Ohter cases of message->r_code == SELECT_RELATION_ERROR
410                                          * or SELECT_PG_CATALOG,
411                                          * Transmit the Query to Master node and receive status.
412                                          */
413                                         pool_debug("pool_rewrite_stmt: executed by Master");
414                                         message->rewrite_query = nodeToString(node);
415                                         message->status = OneNode_do_command(frontend,
416                                                                                                                 MASTER(backend),
417                                                                                                                 message->rewrite_query,
418                                                                                                                 backend->info->database);
419                                 }
420                         }
421                         pool_debug("pool_rewrite_stmt: select message_code %d",message->r_code);
422                 }
423                 break;
424
425                 case T_InsertStmt:
426
427                   /* The distribution of the INSERT sentence. */
428                         examInsertStmt(node,backend,message);
429
430                         if(message->r_code == 0 )
431                         {
432                                 /* send the INSERT sentence */
433                                 message->status = OneNode_do_command(frontend,
434                                                                                                         CONNECTION(backend,message->r_node),
435                                                                                                         message->rewrite_query,
436                                                                                                         backend->info->database);
437                         }
438                         else if (message->r_code == INSERT_SQL_RESTRICTION)
439                         {
440                                 /* Restriction case of INSERT sentence */
441                                 pool_send_error_message(frontend, MAJOR(backend), "XX000",
442                                                                                 "pgpool2 sql restriction",
443                                                                                 message->rewrite_query, "", __FILE__,
444                                                                                 __LINE__);
445
446                                 if(TSTATE(backend) == 'T')
447                                 {
448                                         /* In Transaction, send the invalid message to backend to abort this transaction */
449                                         pool_debug("rewrite_query_stmt(insert): Inside transaction. Abort transaction");
450                                         message->status = pool_parallel_exec(frontend,backend, "POOL_RESET_TSTATE",node,false);
451                                 }
452                                 else
453                                 {
454                                         /* return "ready for query" to frontend */
455                                         pool_send_readyforquery(frontend);
456                                         message->status=POOL_CONTINUE;
457                                 }
458                         }
459                         break;
460 #if 0
461                 case T_UpdateStmt:
462                         /* Improve UpdateStmt for complex query */
463                         break;
464 #endif
465                 default:
466                         message->type = node->type;
467                         message->status = POOL_CONTINUE;
468                         break;
469         }
470
471         pool_debug("pool_rewrite_stmt: query rule %d",node->type);
472
473         return message;
474 }
475
476 #define POOL_PARALLEL "pool_parallel"
477 #define POOL_LOADBALANCE "pool_loadbalance"
478
479 /*
480  * After analyzing query, check the analyze[0]->state.
481  * if the analyze[0]->state ==`P`, this query can be executed
482  * on parallel engine.
483  */
484 static int direct_parallel_query(RewriteQuery *message)
485 {
486         if(message && message->analyze[0] && message->analyze[0]->state == 'P')
487                 return 1;
488         else
489                 return 0;
490 }
491
492
493 /* escape delimiter character */
494 static char *delimistr(char *str)
495 {
496         char *result;
497         int i,j = 0;
498         int len = strlen(str);
499
500         result = palloc(len -1);
501
502         for(i = 0; i < len; i++)
503         {
504                 char c = (unsigned char) str[i];
505                 if((i != 0) && (i != len -1))
506                 {
507                         if(c=='\'' && (char) str[i+1]=='\'')
508                                 i++;
509                         result[j] = c;
510                         j++;
511                 }
512         }
513
514         result[j] = '\0';
515
516         return result;
517 }
518
519 /* for debug */
520 void analyze_debug(RewriteQuery *message)
521 {
522         int analyze_num,i;
523         analyze_num = message->analyze_num;
524
525         for(i = 0; i< analyze_num; i++)
526         {
527                 AnalyzeSelect *analyze = message->analyze[i];
528                 pool_debug("analyze_debug :select no(%d), last select(%d), last_part(%d), state(%c)",
529              analyze->now_select,analyze->last_select,analyze->call_part,analyze->state);
530         }
531 }
532
533 /*
534  * This function checks the KEYWORD(POOL_PARALLEL,POOL_LOADBALANCE)
535  * if the special function(like pool_parallel() or pool_loadbalance())
536  * is used, mark the r_code,is_parallel and is_loadbalance.
537  * In othe cases, It is necessary to analyze the Query.
538  */
539 RewriteQuery *is_parallel_query(Node *node, POOL_CONNECTION_POOL *backend)
540 {
541         static RewriteQuery message;
542         static ConInfoTodblink dblink;
543
544         initMessage(&message);
545
546         if (IsA(node, SelectStmt))
547         {
548                 SelectStmt *stmt;
549                 Node *n;
550                 int direct_ok;
551
552                 stmt = (SelectStmt *) node;
553
554                 /* Check the special function is used in this query*/
555                 if (!(stmt->distinctClause || stmt->intoClause ||
556                         stmt->fromClause || stmt->groupClause || stmt->havingClause ||
557                         stmt->sortClause || stmt->limitOffset || stmt->limitCount ||
558                         stmt->lockingClause || stmt->larg || stmt->rarg) &&
559                         (n = lfirst(list_head(stmt->targetList))) && IsA(n, ResTarget))
560                 {
561                         ResTarget *target = (ResTarget *) n;
562
563                         if (target->val && IsA(target->val, FuncCall))
564                         {
565                                 FuncCall *func = (FuncCall *) target->val;
566                                 if (list_length(func->funcname) == 1 && func->args)
567                                 {
568                                         Node *func_args = (Node *) lfirst(list_head(func->args));
569                                         message.rewrite_query = delimistr(nodeToString(func_args));
570
571                                         /* pool_parallel() is used in this query */
572                                         if(strcmp(strVal(lfirst(list_head(func->funcname))),
573                                                    POOL_PARALLEL) == 0)
574                                         {
575                                                 message.r_code = SEND_PARALLEL_ENGINE;
576                                                 message.is_parallel = true;
577                                                 message.is_loadbalance = false;
578                                                 pool_debug("can pool_parallel_exec %s",message.rewrite_query);
579                                                 return &message;
580                                         }
581                                         else /* pool_loadbalance() is used in this query */
582                                         if(strcmp(strVal(lfirst(list_head(func->funcname))),
583                                                                                                 POOL_LOADBALANCE) == 0)
584                                         {
585                                                 message.r_code = SEND_LOADBALANCE_ENGINE;
586                                                 message.is_loadbalance = true;
587                                                 message.is_parallel = false;
588                                                 pool_debug("can loadbalance_mode %s",message.rewrite_query);
589                                                 return &message;
590                                         }
591                                 }
592                         }
593                 }
594
595     /* ANALYZE QUERY */
596                 message.r_code = SELECT_ANALYZE;
597                 message.is_loadbalance = true;
598
599                 initdblink(&dblink,backend);
600                 nodeToRewriteString(&message,&dblink,node);
601
602                 if(message.is_pg_catalog)
603                 {
604                         message.is_loadbalance = false;
605                         message.is_parallel = false;
606                         pool_debug("is_parallel_query: query is done by loadbalance(pgcatalog)");
607                         return &message;
608                 }
609
610                 if(message.is_loadbalance)
611                 {
612                         message.is_parallel = false;
613                         pool_debug("is_parallel_query: query is done by loadbalance");
614                         return &message;
615                 }
616
617                 /* Analyzing Query Start */
618                 analyze_debug(&message);
619
620                 /* After the analyzing query,
621                  * this query can be executed as parallel exec, is_parallel flag is turned on
622                  */
623                 direct_ok = direct_parallel_query(&message);
624                 if(direct_ok == 1)
625                 {
626                         message.rewrite_query = nodeToString(node);
627                         message.is_parallel = true;
628                         message.is_loadbalance = false;
629                         pool_debug("can pool_parallel_exec %s",message.rewrite_query);
630                         return &message;
631                 }
632         }
633         else if (IsA(node, CopyStmt))
634         {
635                 /* For Copy Statement, check the table name, mark the is_parallel flag. */
636                 CopyStmt *stmt = (CopyStmt *)node;
637
638                 if (stmt->is_from == FALSE && stmt->filename == NULL)
639                 {
640                         RangeVar *relation = (RangeVar *)stmt->relation;
641
642                         /* check on distribution table or replicate table */
643
644                         if(pool_get_dist_def_info (MASTER_CONNECTION(backend)->sp->database, relation->schemaname, relation->relname))
645                         {
646                                 message.rewrite_query = nodeToString(stmt);
647                                 message.is_parallel = true;
648                                 message.is_loadbalance = false;
649                                 message.r_code = SEND_PARALLEL_ENGINE;
650                         }
651                 }
652         }
653
654         return &message;
655 }