3 * $Header: /cvsroot/pgpool/pgpool-II/pool_rewrite_query.c,v 1.13.2.1 2009/08/22 04:19:49 t-ishii Exp $
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
8 * Copyright (c) 2003-2009 PgPool Global Development Group
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
21 * pool_rewrite_query.c: rewrite_query
29 #include "pool_rewrite_query.h"
32 static int getInsertRule(ListCell *lc,List *list_t ,DistDefInfo *info, int div_key_num);
33 static void examInsertStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message);
34 static void examSelectStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message);
35 static char *delimistr(char *str);
36 static int direct_parallel_query(RewriteQuery *message);
37 static void initMessage(RewriteQuery *message);
38 static void initdblink(ConInfoTodblink *dblink, POOL_CONNECTION_POOL *backend);
39 static void analyze_debug(RewriteQuery *message);
42 /* create error message */
43 char *pool_error_message(char *message)
47 str = init_string("");
48 string_append_char(str,message);
53 * search DistDefInfo(this info is build in starting process
54 * and get node id where a query send.
56 static int getInsertRule(ListCell *lc,List *list_t ,DistDefInfo *info,int div_key_num)
62 if(list_t->length != 1)
65 cell = list_head(list_t);
67 if(!cell && !IsA(cell,List))
70 foreach(lc,lfirst(cell))
78 if(obj && IsA(obj, TypeCast))
80 TypeCast *type = (TypeCast *) obj;
89 if(obj && !IsA(obj, A_Const))
94 if (loop_counter == div_key_num)
96 constant = (A_Const *) obj;
97 value = constant->val;
98 if (value.type == T_Integer)
101 sprintf(temp,"%ld",value.val.ival);
102 node_number = pool_get_id(info,temp);
108 node_number = pool_get_id(info, value.val.str);
116 /* if node_number is -1, cannot get return value from pool_get_id() */
121 * This function processes the decision whether to
122 * distribute the insert sentence to the node.
124 static void examInsertStmt(Node *node,POOL_CONNECTION_POOL *backend, RewriteQuery *message)
129 DistDefInfo *info = NULL;
133 int dist_def_flag = 0;
134 InsertStmt *insert = (InsertStmt *) node;
136 message->type = node->type;
139 /* insert target table */
140 table = insert->relation;
143 /* send error message to frontend */
144 message->r_code = INSERT_SQL_RESTRICTION;
145 message->r_node = -1;
146 message->rewrite_query = pool_error_message("cannot find table name");
150 /* pool_debug("exam_InsertStmt insert table_name %s:",table->relname); */
152 info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
158 /* send error message to frontend */
159 message->r_code = INSERT_DIST_NO_RULE;
163 /* the source SELECT ? */
164 if (insert->selectStmt && ((SelectStmt *)insert->selectStmt)->targetList)
166 /* send error message to frontend */
167 message->r_code = INSERT_SQL_RESTRICTION;
168 message->r_node = -1;
169 message->rewrite_query = pool_error_message("cannot use SelectStmt in InsertStmt");
173 list_t = (List *)(((SelectStmt *)insert->selectStmt)->valuesLists);
177 /* send error message to frontend */
178 message->r_code = INSERT_SQL_RESTRICTION;
179 message->r_node = -1;
180 message->rewrite_query = pool_error_message("cannot find target List");
184 /* number of target list */
186 if(list_t->length == 1 && IsA(lfirst(list_head(list_t)),List))
188 cell_num = ((List *) lfirst(list_head(list_t)))->length;
192 /* send error message to frontend */
193 message->r_code = INSERT_SQL_RESTRICTION;
194 message->r_node = -1;
195 message->rewrite_query = pool_error_message("cannot analzye this InsertStmt");
200 /* Is the target columns ?*/
203 div_key_num = info->dist_key_col_id;
206 pool_debug("cell number %d, div key num %d, div_key columname %s",cell_num,div_key_num,info->col_list[div_key_num]);
208 if (cell_num < div_key_num)
210 /* send error message to frontend */
211 message->r_code = INSERT_SQL_RESTRICTION;
212 message->r_node = -1;
213 message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
220 List *list_cols = (List *) insert->cols;
222 foreach(lc, list_cols)
227 target = (ResTarget *) n;
228 if (strcmp(target->name,info->dist_key_col_name) == 0)
236 if (cell_num < div_key_num)
238 /* send error message to frontend */
239 message->r_code = INSERT_SQL_RESTRICTION;
240 message->r_node = -1;
241 message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
246 if (dist_def_flag != 1)
248 /* send error message to frontend */
249 message->r_code = INSERT_SQL_RESTRICTION;
250 message->r_node = -1;
251 message->rewrite_query = pool_error_message("cannot find dividing key in InsertStmt");
255 /* this loop get insert one args of divide rule */
256 node_number = getInsertRule(lc, list_t, info, div_key_num);
260 /* send error message to frontend */
261 message->r_code = INSERT_SQL_RESTRICTION;
262 message->r_node = -1;
263 message->rewrite_query = pool_error_message("cannot get node_id from system db");
267 pool_debug("insert node_number =%d",node_number);
269 message->r_node = node_number;
270 message->rewrite_query = nodeToString(node);
273 /* start of rewriting query */
274 static void examSelectStmt(Node *node,POOL_CONNECTION_POOL *backend,RewriteQuery *message)
276 static ConInfoTodblink dblink;
278 /* initialize dblink info */
279 initdblink(&dblink,backend);
281 /* initialize message */
282 initMessage(message);
283 message->type = node->type;
284 message->r_code = SELECT_DEFAULT;
286 /* do rewrite query */
287 nodeToRewriteString(message,&dblink,node);
290 /* initialize Message */
291 static void initMessage(RewriteQuery *message)
296 message->virtual_num = 0;
297 message->is_pg_catalog = false;
298 message->is_loadbalance = false;
299 message->is_parallel = false;
300 message->table_relname = NULL;
301 message->table_alias = NULL;
302 message->dbname = NULL;
303 message->schemaname = NULL;
304 message->rewrite_query = NULL;
305 message->rewritelock = -1;
306 message->ignore_rewrite = -1;
307 message->ret_num = 0;
310 /* set dblink info */
311 static void initdblink(ConInfoTodblink *dblink,POOL_CONNECTION_POOL *backend)
313 dblink->dbname = MASTER_CONNECTION(backend)->sp->database;
314 dblink->hostaddr = pool_config->pgpool2_hostname;
315 dblink->user = MASTER_CONNECTION(backend)->sp->user;
316 dblink->port = pool_config->port;
317 dblink->password = MASTER_CONNECTION(backend)->con->password;
320 /* reference of pg_catalog or not */
321 int IsSelectpgcatalog(Node *node,POOL_CONNECTION_POOL *backend)
323 static ConInfoTodblink dblink;
324 static RewriteQuery message;
326 /* initialize dblink info */
327 initdblink(&dblink,backend);
329 /* initialize message */
330 initMessage(&message);
332 message.type = node->type;
334 initdblink(&dblink,backend);
336 if(message.is_pg_catalog)
338 pool_debug("Isselectpgcatalog %d",message.is_pg_catalog);
348 * SELECT statement or INSERT statement is special,
349 * peculiar process is needed in parallel mode.
351 RewriteQuery *rewrite_query_stmt(Node *node,POOL_CONNECTION *frontend,POOL_CONNECTION_POOL *backend,RewriteQuery *message)
357 SelectStmt *stmt = (SelectStmt *)node;
359 /* Because "SELECT INTO" cannot be used in a parallel mode,
360 * the error message is generated and send "ready for query" to frontend.
364 pool_send_error_message(frontend, MAJOR(backend), "XX000",
365 "pgpool2 sql restriction",
366 "cannot use select into ...", "", __FILE__,
370 pool_send_readyforquery(frontend);
371 message->status=POOL_CONTINUE;
376 * The Query is actually rewritten based on analytical information on the Query.
378 examSelectStmt(node,backend,message);
380 if (message->r_code != SELECT_PGCATALOG &&
381 message->r_code != SELECT_RELATION_ERROR)
384 * The rewritten Query is transmitted to system db,
385 * and execution status is received.
387 POOL_CONNECTION_POOL_SLOT *system_db = pool_system_db_connection();
388 message->status = OneNode_do_command(frontend,
390 message->rewrite_query,
391 backend->info->database);
395 if(TSTATE(backend) == 'T' &&
396 message->r_code == SELECT_RELATION_ERROR)
399 * In the case of message->r_code == SELECT_RELATION_ERROR and in the transaction,
400 * Transmit the Query to all back ends, and to abort transaction.
402 pool_debug("pool_rewrite_stmt(select): Inside transaction. abort transaction");
403 message->rewrite_query = nodeToString(node);
404 message->status = pool_parallel_exec(frontend,backend,message->rewrite_query,node,true);
409 * Ohter cases of message->r_code == SELECT_RELATION_ERROR
410 * or SELECT_PG_CATALOG,
411 * Transmit the Query to Master node and receive status.
413 pool_debug("pool_rewrite_stmt: executed by Master");
414 message->rewrite_query = nodeToString(node);
415 message->status = OneNode_do_command(frontend,
417 message->rewrite_query,
418 backend->info->database);
421 pool_debug("pool_rewrite_stmt: select message_code %d",message->r_code);
427 /* The distribution of the INSERT sentence. */
428 examInsertStmt(node,backend,message);
430 if(message->r_code == 0 )
432 /* send the INSERT sentence */
433 message->status = OneNode_do_command(frontend,
434 CONNECTION(backend,message->r_node),
435 message->rewrite_query,
436 backend->info->database);
438 else if (message->r_code == INSERT_SQL_RESTRICTION)
440 /* Restriction case of INSERT sentence */
441 pool_send_error_message(frontend, MAJOR(backend), "XX000",
442 "pgpool2 sql restriction",
443 message->rewrite_query, "", __FILE__,
446 if(TSTATE(backend) == 'T')
448 /* In Transaction, send the invalid message to backend to abort this transaction */
449 pool_debug("rewrite_query_stmt(insert): Inside transaction. Abort transaction");
450 message->status = pool_parallel_exec(frontend,backend, "POOL_RESET_TSTATE",node,false);
454 /* return "ready for query" to frontend */
455 pool_send_readyforquery(frontend);
456 message->status=POOL_CONTINUE;
462 /* Improve UpdateStmt for complex query */
466 message->type = node->type;
467 message->status = POOL_CONTINUE;
471 pool_debug("pool_rewrite_stmt: query rule %d",node->type);
476 #define POOL_PARALLEL "pool_parallel"
477 #define POOL_LOADBALANCE "pool_loadbalance"
480 * After analyzing query, check the analyze[0]->state.
481 * if the analyze[0]->state ==`P`, this query can be executed
482 * on parallel engine.
484 static int direct_parallel_query(RewriteQuery *message)
486 if(message && message->analyze[0] && message->analyze[0]->state == 'P')
493 /* escape delimiter character */
494 static char *delimistr(char *str)
498 int len = strlen(str);
500 result = palloc(len -1);
502 for(i = 0; i < len; i++)
504 char c = (unsigned char) str[i];
505 if((i != 0) && (i != len -1))
507 if(c=='\'' && (char) str[i+1]=='\'')
520 void analyze_debug(RewriteQuery *message)
523 analyze_num = message->analyze_num;
525 for(i = 0; i< analyze_num; i++)
527 AnalyzeSelect *analyze = message->analyze[i];
528 pool_debug("analyze_debug :select no(%d), last select(%d), last_part(%d), state(%c)",
529 analyze->now_select,analyze->last_select,analyze->call_part,analyze->state);
534 * This function checks the KEYWORD(POOL_PARALLEL,POOL_LOADBALANCE)
535 * if the special function(like pool_parallel() or pool_loadbalance())
536 * is used, mark the r_code,is_parallel and is_loadbalance.
537 * In othe cases, It is necessary to analyze the Query.
539 RewriteQuery *is_parallel_query(Node *node, POOL_CONNECTION_POOL *backend)
541 static RewriteQuery message;
542 static ConInfoTodblink dblink;
544 initMessage(&message);
546 if (IsA(node, SelectStmt))
552 stmt = (SelectStmt *) node;
554 /* Check the special function is used in this query*/
555 if (!(stmt->distinctClause || stmt->intoClause ||
556 stmt->fromClause || stmt->groupClause || stmt->havingClause ||
557 stmt->sortClause || stmt->limitOffset || stmt->limitCount ||
558 stmt->lockingClause || stmt->larg || stmt->rarg) &&
559 (n = lfirst(list_head(stmt->targetList))) && IsA(n, ResTarget))
561 ResTarget *target = (ResTarget *) n;
563 if (target->val && IsA(target->val, FuncCall))
565 FuncCall *func = (FuncCall *) target->val;
566 if (list_length(func->funcname) == 1 && func->args)
568 Node *func_args = (Node *) lfirst(list_head(func->args));
569 message.rewrite_query = delimistr(nodeToString(func_args));
571 /* pool_parallel() is used in this query */
572 if(strcmp(strVal(lfirst(list_head(func->funcname))),
575 message.r_code = SEND_PARALLEL_ENGINE;
576 message.is_parallel = true;
577 message.is_loadbalance = false;
578 pool_debug("can pool_parallel_exec %s",message.rewrite_query);
581 else /* pool_loadbalance() is used in this query */
582 if(strcmp(strVal(lfirst(list_head(func->funcname))),
583 POOL_LOADBALANCE) == 0)
585 message.r_code = SEND_LOADBALANCE_ENGINE;
586 message.is_loadbalance = true;
587 message.is_parallel = false;
588 pool_debug("can loadbalance_mode %s",message.rewrite_query);
596 message.r_code = SELECT_ANALYZE;
597 message.is_loadbalance = true;
599 initdblink(&dblink,backend);
600 nodeToRewriteString(&message,&dblink,node);
602 if(message.is_pg_catalog)
604 message.is_loadbalance = false;
605 message.is_parallel = false;
606 pool_debug("is_parallel_query: query is done by loadbalance(pgcatalog)");
610 if(message.is_loadbalance)
612 message.is_parallel = false;
613 pool_debug("is_parallel_query: query is done by loadbalance");
617 /* Analyzing Query Start */
618 analyze_debug(&message);
620 /* After the analyzing query,
621 * this query can be executed as parallel exec, is_parallel flag is turned on
623 direct_ok = direct_parallel_query(&message);
626 message.rewrite_query = nodeToString(node);
627 message.is_parallel = true;
628 message.is_loadbalance = false;
629 pool_debug("can pool_parallel_exec %s",message.rewrite_query);
633 else if (IsA(node, CopyStmt))
635 /* For Copy Statement, check the table name, mark the is_parallel flag. */
636 CopyStmt *stmt = (CopyStmt *)node;
638 if (stmt->is_from == FALSE && stmt->filename == NULL)
640 RangeVar *relation = (RangeVar *)stmt->relation;
642 /* check on distribution table or replicate table */
644 if(pool_get_dist_def_info (MASTER_CONNECTION(backend)->sp->database, relation->schemaname, relation->relname))
646 message.rewrite_query = nodeToString(stmt);
647 message.is_parallel = true;
648 message.is_loadbalance = false;
649 message.r_code = SEND_PARALLEL_ENGINE;