00001 /****************************************************** 00002 Query graph 00003 00004 (c) 1996 Innobase Oy 00005 00006 Created 5/27/1996 Heikki Tuuri 00007 *******************************************************/ 00008 00009 #include "que0que.h" 00010 00011 #ifdef UNIV_NONINL 00012 #include "que0que.ic" 00013 #endif 00014 00015 #include "srv0que.h" 00016 #include "usr0sess.h" 00017 #include "trx0trx.h" 00018 #include "trx0roll.h" 00019 #include "row0undo.h" 00020 #include "row0ins.h" 00021 #include "row0upd.h" 00022 #include "row0sel.h" 00023 #include "row0purge.h" 00024 #include "dict0crea.h" 00025 #include "log0log.h" 00026 #include "eval0proc.h" 00027 #include "eval0eval.h" 00028 #include "pars0types.h" 00029 00030 #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256) 00031 #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256) 00032 #define QUE_MAX_LOOPS_WITHOUT_CHECK 16 00033 00034 /* If the following flag is set TRUE, the module will print trace info 00035 of SQL execution in the UNIV_SQL_DEBUG version */ 00036 ibool que_trace_on = FALSE; 00037 00038 ibool que_always_false = FALSE; 00039 00040 /* Short introduction to query graphs 00041 ================================== 00042 00043 A query graph consists of nodes linked to each other in various ways. The 00044 execution starts at que_run_threads() which takes a que_thr_t parameter. 00045 que_thr_t contains two fields that control query graph execution: run_node 00046 and prev_node. run_node is the next node to execute and prev_node is the 00047 last node executed. 00048 00049 Each node has a pointer to a 'next' statement, i.e., its brother, and a 00050 pointer to its parent node. The next pointer is NULL in the last statement 00051 of a block. 00052 00053 Loop nodes contain a link to the first statement of the enclosed statement 00054 list. While the loop runs, que_thr_step() checks if execution to the loop 00055 node came from its parent or from one of the statement nodes in the loop. If 00056 it came from the parent of the loop node it starts executing the first 00057 statement node in the loop. If it came from one of the statement nodes in 00058 the loop, then it checks if the statement node has another statement node 00059 following it, and runs it if so. 00060 00061 To signify loop ending, the loop statements (see e.g. while_step()) set 00062 que_thr_t->run_node to the loop node's parent node. This is noticed on the 00063 next call of que_thr_step() and execution proceeds to the node pointed to by 00064 the loop node's 'next' pointer. 00065 00066 For example, the code: 00067 00068 X := 1; 00069 WHILE X < 5 LOOP 00070 X := X + 1; 00071 X := X + 1; 00072 X := 5 00073 00074 will result in the following node hierarchy, with the X-axis indicating 00075 'next' links and the Y-axis indicating parent/child links: 00076 00077 A - W - A 00078 | 00079 | 00080 A - A 00081 00082 A = assign_node_t, W = while_node_t. */ 00083 00084 /* How a stored procedure containing COMMIT or ROLLBACK commands 00085 is executed? 00086 00087 The commit or rollback can be seen as a subprocedure call. 00088 The problem is that if there are several query threads 00089 currently running within the transaction, their action could 00090 mess the commit or rollback operation. Or, at the least, the 00091 operation would be difficult to visualize and keep in control. 00092 00093 Therefore the query thread requesting a commit or a rollback 00094 sends to the transaction a signal, which moves the transaction 00095 to TRX_QUE_SIGNALED state. All running query threads of the 00096 transaction will eventually notice that the transaction is now in 00097 this state and voluntarily suspend themselves. Only the last 00098 query thread which suspends itself will trigger handling of 00099 the signal. 00100 00101 When the transaction starts to handle a rollback or commit 00102 signal, it builds a query graph which, when executed, will 00103 roll back or commit the incomplete transaction. The transaction 00104 is moved to the TRX_QUE_ROLLING_BACK or TRX_QUE_COMMITTING state. 00105 If specified, the SQL cursors opened by the transaction are closed. 00106 When the execution of the graph completes, it is like returning 00107 from a subprocedure: the query thread which requested the operation 00108 starts running again. */ 00109 00110 /************************************************************************** 00111 Moves a thread from another state to the QUE_THR_RUNNING state. Increments 00112 the n_active_thrs counters of the query graph and transaction. 00113 ***NOTE***: This is the only function in which such a transition is allowed 00114 to happen! */ 00115 static 00116 void 00117 que_thr_move_to_run_state( 00118 /*======================*/ 00119 que_thr_t* thr); /* in: an query thread */ 00120 00121 /*************************************************************************** 00122 Adds a query graph to the session's list of graphs. */ 00123 00124 void 00125 que_graph_publish( 00126 /*==============*/ 00127 que_t* graph, /* in: graph */ 00128 sess_t* sess) /* in: session */ 00129 { 00130 #ifdef UNIV_SYNC_DEBUG 00131 ut_ad(mutex_own(&kernel_mutex)); 00132 #endif /* UNIV_SYNC_DEBUG */ 00133 00134 UT_LIST_ADD_LAST(graphs, sess->graphs, graph); 00135 } 00136 00137 /*************************************************************************** 00138 Creates a query graph fork node. */ 00139 00140 que_fork_t* 00141 que_fork_create( 00142 /*============*/ 00143 /* out, own: fork node */ 00144 que_t* graph, /* in: graph, if NULL then this 00145 fork node is assumed to be the 00146 graph root */ 00147 que_node_t* parent, /* in: parent node */ 00148 ulint fork_type, /* in: fork type */ 00149 mem_heap_t* heap) /* in: memory heap where created */ 00150 { 00151 que_fork_t* fork; 00152 00153 ut_ad(heap); 00154 00155 fork = mem_heap_alloc(heap, sizeof(que_fork_t)); 00156 00157 fork->common.type = QUE_NODE_FORK; 00158 fork->n_active_thrs = 0; 00159 00160 fork->state = QUE_FORK_COMMAND_WAIT; 00161 00162 if (graph != NULL) { 00163 fork->graph = graph; 00164 } else { 00165 fork->graph = fork; 00166 } 00167 00168 fork->common.parent = parent; 00169 fork->fork_type = fork_type; 00170 00171 fork->caller = NULL; 00172 00173 UT_LIST_INIT(fork->thrs); 00174 00175 fork->sym_tab = NULL; 00176 fork->info = NULL; 00177 00178 fork->heap = heap; 00179 00180 return(fork); 00181 } 00182 00183 /*************************************************************************** 00184 Creates a query graph thread node. */ 00185 00186 que_thr_t* 00187 que_thr_create( 00188 /*===========*/ 00189 /* out, own: query thread node */ 00190 que_fork_t* parent, /* in: parent node, i.e., a fork node */ 00191 mem_heap_t* heap) /* in: memory heap where created */ 00192 { 00193 que_thr_t* thr; 00194 00195 ut_ad(parent && heap); 00196 00197 thr = mem_heap_alloc(heap, sizeof(que_thr_t)); 00198 00199 thr->common.type = QUE_NODE_THR; 00200 thr->common.parent = parent; 00201 00202 thr->magic_n = QUE_THR_MAGIC_N; 00203 00204 thr->graph = parent->graph; 00205 00206 thr->state = QUE_THR_COMMAND_WAIT; 00207 00208 thr->is_active = FALSE; 00209 00210 thr->run_node = NULL; 00211 thr->resource = 0; 00212 thr->lock_state = QUE_THR_LOCK_NOLOCK; 00213 00214 UT_LIST_ADD_LAST(thrs, parent->thrs, thr); 00215 00216 return(thr); 00217 } 00218 00219 /************************************************************************** 00220 Moves a suspended query thread to the QUE_THR_RUNNING state and may release 00221 a single worker thread to execute it. This function should be used to end 00222 the wait state of a query thread waiting for a lock or a stored procedure 00223 completion. */ 00224 00225 void 00226 que_thr_end_wait( 00227 /*=============*/ 00228 que_thr_t* thr, /* in: query thread in the 00229 QUE_THR_LOCK_WAIT, 00230 or QUE_THR_PROCEDURE_WAIT, or 00231 QUE_THR_SIG_REPLY_WAIT state */ 00232 que_thr_t** next_thr) /* in/out: next query thread to run; 00233 if the value which is passed in is 00234 a pointer to a NULL pointer, then the 00235 calling function can start running 00236 a new query thread; if NULL is passed 00237 as the parameter, it is ignored */ 00238 { 00239 ibool was_active; 00240 00241 #ifdef UNIV_SYNC_DEBUG 00242 ut_ad(mutex_own(&kernel_mutex)); 00243 #endif /* UNIV_SYNC_DEBUG */ 00244 ut_ad(thr); 00245 ut_ad((thr->state == QUE_THR_LOCK_WAIT) 00246 || (thr->state == QUE_THR_PROCEDURE_WAIT) 00247 || (thr->state == QUE_THR_SIG_REPLY_WAIT)); 00248 ut_ad(thr->run_node); 00249 00250 thr->prev_node = thr->run_node; 00251 00252 was_active = thr->is_active; 00253 00254 que_thr_move_to_run_state(thr); 00255 00256 if (was_active) { 00257 00258 return; 00259 } 00260 00261 if (next_thr && *next_thr == NULL) { 00262 *next_thr = thr; 00263 } else { 00264 ut_a(0); 00265 srv_que_task_enqueue_low(thr); 00266 } 00267 } 00268 00269 /************************************************************************** 00270 Same as que_thr_end_wait, but no parameter next_thr available. */ 00271 00272 void 00273 que_thr_end_wait_no_next_thr( 00274 /*=========================*/ 00275 que_thr_t* thr) /* in: query thread in the QUE_THR_LOCK_WAIT, 00276 or QUE_THR_PROCEDURE_WAIT, or 00277 QUE_THR_SIG_REPLY_WAIT state */ 00278 { 00279 ibool was_active; 00280 00281 ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the 00282 only possible state here */ 00283 #ifdef UNIV_SYNC_DEBUG 00284 ut_ad(mutex_own(&kernel_mutex)); 00285 #endif /* UNIV_SYNC_DEBUG */ 00286 ut_ad(thr); 00287 ut_ad((thr->state == QUE_THR_LOCK_WAIT) 00288 || (thr->state == QUE_THR_PROCEDURE_WAIT) 00289 || (thr->state == QUE_THR_SIG_REPLY_WAIT)); 00290 00291 was_active = thr->is_active; 00292 00293 que_thr_move_to_run_state(thr); 00294 00295 if (was_active) { 00296 00297 return; 00298 } 00299 00300 /* In MySQL we let the OS thread (not just the query thread) to wait 00301 for the lock to be released: */ 00302 00303 srv_release_mysql_thread_if_suspended(thr); 00304 00305 /* srv_que_task_enqueue_low(thr); */ 00306 } 00307 00308 /************************************************************************** 00309 Inits a query thread for a command. */ 00310 UNIV_INLINE 00311 void 00312 que_thr_init_command( 00313 /*=================*/ 00314 que_thr_t* thr) /* in: query thread */ 00315 { 00316 thr->run_node = thr; 00317 thr->prev_node = thr->common.parent; 00318 00319 que_thr_move_to_run_state(thr); 00320 } 00321 00322 /************************************************************************** 00323 Starts execution of a command in a query fork. Picks a query thread which 00324 is not in the QUE_THR_RUNNING state and moves it to that state. If none 00325 can be chosen, a situation which may arise in parallelized fetches, NULL 00326 is returned. */ 00327 00328 que_thr_t* 00329 que_fork_start_command( 00330 /*===================*/ 00331 /* out: a query thread of the graph moved to 00332 QUE_THR_RUNNING state, or NULL; the query 00333 thread should be executed by que_run_threads 00334 by the caller */ 00335 que_fork_t* fork) /* in: a query fork */ 00336 { 00337 que_thr_t* thr; 00338 00339 fork->state = QUE_FORK_ACTIVE; 00340 00341 fork->last_sel_node = NULL; 00342 00343 /* Choose the query thread to run: usually there is just one thread, 00344 but in a parallelized select, which necessarily is non-scrollable, 00345 there may be several to choose from */ 00346 00347 /*--------------------------------------------------------------- 00348 First we try to find a query thread in the QUE_THR_COMMAND_WAIT state */ 00349 00350 thr = UT_LIST_GET_FIRST(fork->thrs); 00351 00352 while (thr != NULL) { 00353 if (thr->state == QUE_THR_COMMAND_WAIT) { 00354 00355 /* We have to send the initial message to query thread 00356 to start it */ 00357 00358 que_thr_init_command(thr); 00359 00360 return(thr); 00361 } 00362 00363 ut_ad(thr->state != QUE_THR_LOCK_WAIT); 00364 00365 thr = UT_LIST_GET_NEXT(thrs, thr); 00366 } 00367 00368 /*---------------------------------------------------------------- 00369 Then we try to find a query thread in the QUE_THR_SUSPENDED state */ 00370 00371 thr = UT_LIST_GET_FIRST(fork->thrs); 00372 00373 while (thr != NULL) { 00374 if (thr->state == QUE_THR_SUSPENDED) { 00375 /* In this case the execution of the thread was 00376 suspended: no initial message is needed because 00377 execution can continue from where it was left */ 00378 00379 que_thr_move_to_run_state(thr); 00380 00381 return(thr); 00382 } 00383 00384 thr = UT_LIST_GET_NEXT(thrs, thr); 00385 } 00386 00387 /*----------------------------------------------------------------- 00388 Then we try to find a query thread in the QUE_THR_COMPLETED state */ 00389 00390 thr = UT_LIST_GET_FIRST(fork->thrs); 00391 00392 while (thr != NULL) { 00393 if (thr->state == QUE_THR_COMPLETED) { 00394 que_thr_init_command(thr); 00395 00396 return(thr); 00397 } 00398 00399 thr = UT_LIST_GET_NEXT(thrs, thr); 00400 } 00401 00402 /* Else we return NULL */ 00403 return(NULL); 00404 } 00405 00406 /************************************************************************** 00407 After signal handling is finished, returns control to a query graph error 00408 handling routine. (Currently, just returns the control to the root of the 00409 graph so that the graph can communicate an error message to the client.) */ 00410 00411 void 00412 que_fork_error_handle( 00413 /*==================*/ 00414 trx_t* trx __attribute__((unused)), /* in: trx */ 00415 que_t* fork) /* in: query graph which was run before signal 00416 handling started, NULL not allowed */ 00417 { 00418 que_thr_t* thr; 00419 00420 #ifdef UNIV_SYNC_DEBUG 00421 ut_ad(mutex_own(&kernel_mutex)); 00422 #endif /* UNIV_SYNC_DEBUG */ 00423 ut_ad(trx->sess->state == SESS_ERROR); 00424 ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0); 00425 ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); 00426 00427 thr = UT_LIST_GET_FIRST(fork->thrs); 00428 00429 while (thr != NULL) { 00430 ut_ad(!thr->is_active); 00431 ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT); 00432 ut_ad(thr->state != QUE_THR_LOCK_WAIT); 00433 00434 thr->run_node = thr; 00435 thr->prev_node = thr->child; 00436 thr->state = QUE_THR_COMPLETED; 00437 00438 thr = UT_LIST_GET_NEXT(thrs, thr); 00439 } 00440 00441 thr = UT_LIST_GET_FIRST(fork->thrs); 00442 00443 que_thr_move_to_run_state(thr); 00444 00445 ut_a(0); 00446 srv_que_task_enqueue_low(thr); 00447 } 00448 00449 /******************************************************************** 00450 Tests if all the query threads in the same fork have a given state. */ 00451 UNIV_INLINE 00452 ibool 00453 que_fork_all_thrs_in_state( 00454 /*=======================*/ 00455 /* out: TRUE if all the query threads in the 00456 same fork were in the given state */ 00457 que_fork_t* fork, /* in: query fork */ 00458 ulint state) /* in: state */ 00459 { 00460 que_thr_t* thr_node; 00461 00462 thr_node = UT_LIST_GET_FIRST(fork->thrs); 00463 00464 while (thr_node != NULL) { 00465 if (thr_node->state != state) { 00466 00467 return(FALSE); 00468 } 00469 00470 thr_node = UT_LIST_GET_NEXT(thrs, thr_node); 00471 } 00472 00473 return(TRUE); 00474 } 00475 00476 /************************************************************************** 00477 Calls que_graph_free_recursive for statements in a statement list. */ 00478 static 00479 void 00480 que_graph_free_stat_list( 00481 /*=====================*/ 00482 que_node_t* node) /* in: first query graph node in the list */ 00483 { 00484 while (node) { 00485 que_graph_free_recursive(node); 00486 00487 node = que_node_get_next(node); 00488 } 00489 } 00490 00491 /************************************************************************** 00492 Frees a query graph, but not the heap where it was created. Does not free 00493 explicit cursor declarations, they are freed in que_graph_free. */ 00494 00495 void 00496 que_graph_free_recursive( 00497 /*=====================*/ 00498 que_node_t* node) /* in: query graph node */ 00499 { 00500 que_fork_t* fork; 00501 que_thr_t* thr; 00502 undo_node_t* undo; 00503 sel_node_t* sel; 00504 ins_node_t* ins; 00505 upd_node_t* upd; 00506 tab_node_t* cre_tab; 00507 ind_node_t* cre_ind; 00508 00509 if (node == NULL) { 00510 00511 return; 00512 } 00513 00514 switch (que_node_get_type(node)) { 00515 00516 case QUE_NODE_FORK: 00517 fork = node; 00518 00519 thr = UT_LIST_GET_FIRST(fork->thrs); 00520 00521 while (thr) { 00522 que_graph_free_recursive(thr); 00523 00524 thr = UT_LIST_GET_NEXT(thrs, thr); 00525 } 00526 00527 break; 00528 case QUE_NODE_THR: 00529 00530 thr = node; 00531 00532 if (thr->magic_n != QUE_THR_MAGIC_N) { 00533 fprintf(stderr, 00534 "que_thr struct appears corrupt; magic n %lu\n", 00535 (unsigned long) thr->magic_n); 00536 mem_analyze_corruption(thr); 00537 ut_error; 00538 } 00539 00540 thr->magic_n = QUE_THR_MAGIC_FREED; 00541 00542 que_graph_free_recursive(thr->child); 00543 00544 break; 00545 case QUE_NODE_UNDO: 00546 00547 undo = node; 00548 00549 mem_heap_free(undo->heap); 00550 00551 break; 00552 case QUE_NODE_SELECT: 00553 00554 sel = node; 00555 00556 sel_node_free_private(sel); 00557 00558 break; 00559 case QUE_NODE_INSERT: 00560 00561 ins = node; 00562 00563 que_graph_free_recursive(ins->select); 00564 00565 mem_heap_free(ins->entry_sys_heap); 00566 00567 break; 00568 case QUE_NODE_UPDATE: 00569 00570 upd = node; 00571 00572 if (upd->in_mysql_interface) { 00573 00574 btr_pcur_free_for_mysql(upd->pcur); 00575 } 00576 00577 que_graph_free_recursive(upd->cascade_node); 00578 00579 if (upd->cascade_heap) { 00580 mem_heap_free(upd->cascade_heap); 00581 } 00582 00583 que_graph_free_recursive(upd->select); 00584 00585 mem_heap_free(upd->heap); 00586 00587 break; 00588 case QUE_NODE_CREATE_TABLE: 00589 cre_tab = node; 00590 00591 que_graph_free_recursive(cre_tab->tab_def); 00592 que_graph_free_recursive(cre_tab->col_def); 00593 que_graph_free_recursive(cre_tab->commit_node); 00594 00595 mem_heap_free(cre_tab->heap); 00596 00597 break; 00598 case QUE_NODE_CREATE_INDEX: 00599 cre_ind = node; 00600 00601 que_graph_free_recursive(cre_ind->ind_def); 00602 que_graph_free_recursive(cre_ind->field_def); 00603 que_graph_free_recursive(cre_ind->commit_node); 00604 00605 mem_heap_free(cre_ind->heap); 00606 00607 break; 00608 case QUE_NODE_PROC: 00609 que_graph_free_stat_list(((proc_node_t*)node)->stat_list); 00610 00611 break; 00612 case QUE_NODE_IF: 00613 que_graph_free_stat_list(((if_node_t*)node)->stat_list); 00614 que_graph_free_stat_list(((if_node_t*)node)->else_part); 00615 que_graph_free_stat_list(((if_node_t*)node)->elsif_list); 00616 00617 break; 00618 case QUE_NODE_ELSIF: 00619 que_graph_free_stat_list(((elsif_node_t*)node)->stat_list); 00620 00621 break; 00622 case QUE_NODE_WHILE: 00623 que_graph_free_stat_list(((while_node_t*)node)->stat_list); 00624 00625 break; 00626 case QUE_NODE_FOR: 00627 que_graph_free_stat_list(((for_node_t*)node)->stat_list); 00628 00629 break; 00630 00631 case QUE_NODE_ASSIGNMENT: 00632 case QUE_NODE_EXIT: 00633 case QUE_NODE_RETURN: 00634 case QUE_NODE_COMMIT: 00635 case QUE_NODE_ROLLBACK: 00636 case QUE_NODE_LOCK: 00637 case QUE_NODE_FUNC: 00638 case QUE_NODE_ORDER: 00639 case QUE_NODE_ROW_PRINTF: 00640 case QUE_NODE_OPEN: 00641 case QUE_NODE_FETCH: 00642 /* No need to do anything */ 00643 00644 break; 00645 default: 00646 fprintf(stderr, 00647 "que_node struct appears corrupt; type %lu\n", 00648 (unsigned long) que_node_get_type(node)); 00649 mem_analyze_corruption(node); 00650 ut_error; 00651 } 00652 } 00653 00654 /************************************************************************** 00655 Frees a query graph. */ 00656 00657 void 00658 que_graph_free( 00659 /*===========*/ 00660 que_t* graph) /* in: query graph; we assume that the memory 00661 heap where this graph was created is private 00662 to this graph: if not, then use 00663 que_graph_free_recursive and free the heap 00664 afterwards! */ 00665 { 00666 ut_ad(graph); 00667 00668 if (graph->sym_tab) { 00669 /* The following call frees dynamic memory allocated 00670 for variables etc. during execution. Frees also explicit 00671 cursor definitions. */ 00672 00673 sym_tab_free_private(graph->sym_tab); 00674 } 00675 00676 if (graph->info && graph->info->graph_owns_us) { 00677 pars_info_free(graph->info); 00678 } 00679 00680 que_graph_free_recursive(graph); 00681 00682 mem_heap_free(graph->heap); 00683 } 00684 00685 /************************************************************************** 00686 Checks if the query graph is in a state where it should be freed, and 00687 frees it in that case. If the session is in a state where it should be 00688 closed, also this is done. */ 00689 00690 ibool 00691 que_graph_try_free( 00692 /*===============*/ 00693 /* out: TRUE if freed */ 00694 que_t* graph) /* in: query graph */ 00695 { 00696 sess_t* sess; 00697 00698 #ifdef UNIV_SYNC_DEBUG 00699 ut_ad(mutex_own(&kernel_mutex)); 00700 #endif /* UNIV_SYNC_DEBUG */ 00701 00702 sess = (graph->trx)->sess; 00703 00704 if ((graph->state == QUE_FORK_BEING_FREED) 00705 && (graph->n_active_thrs == 0)) { 00706 00707 UT_LIST_REMOVE(graphs, sess->graphs, graph); 00708 que_graph_free(graph); 00709 00710 sess_try_close(sess); 00711 00712 return(TRUE); 00713 } 00714 00715 return(FALSE); 00716 } 00717 00718 /******************************************************************** 00719 Performs an execution step on a thr node. */ 00720 static 00721 que_thr_t* 00722 que_thr_node_step( 00723 /*==============*/ 00724 /* out: query thread to run next, or NULL 00725 if none */ 00726 que_thr_t* thr) /* in: query thread where run_node must 00727 be the thread node itself */ 00728 { 00729 ut_ad(thr->run_node == thr); 00730 00731 if (thr->prev_node == thr->common.parent) { 00732 /* If control to the node came from above, it is just passed 00733 on */ 00734 00735 thr->run_node = thr->child; 00736 00737 return(thr); 00738 } 00739 00740 mutex_enter(&kernel_mutex); 00741 00742 if (que_thr_peek_stop(thr)) { 00743 00744 mutex_exit(&kernel_mutex); 00745 00746 return(thr); 00747 } 00748 00749 /* Thread execution completed */ 00750 00751 thr->state = QUE_THR_COMPLETED; 00752 00753 mutex_exit(&kernel_mutex); 00754 00755 return(NULL); 00756 } 00757 00758 /************************************************************************** 00759 Moves a thread from another state to the QUE_THR_RUNNING state. Increments 00760 the n_active_thrs counters of the query graph and transaction if thr was 00761 not active. 00762 ***NOTE***: This and ..._mysql are the only functions in which such a 00763 transition is allowed to happen! */ 00764 static 00765 void 00766 que_thr_move_to_run_state( 00767 /*======================*/ 00768 que_thr_t* thr) /* in: an query thread */ 00769 { 00770 trx_t* trx; 00771 00772 ut_ad(thr->state != QUE_THR_RUNNING); 00773 00774 trx = thr_get_trx(thr); 00775 00776 if (!thr->is_active) { 00777 00778 (thr->graph)->n_active_thrs++; 00779 00780 trx->n_active_thrs++; 00781 00782 thr->is_active = TRUE; 00783 00784 ut_ad((thr->graph)->n_active_thrs == 1); 00785 ut_ad(trx->n_active_thrs == 1); 00786 } 00787 00788 thr->state = QUE_THR_RUNNING; 00789 } 00790 00791 /************************************************************************** 00792 Decrements the query thread reference counts in the query graph and the 00793 transaction. May start signal handling, e.g., a rollback. 00794 *** NOTE ***: 00795 This and que_thr_stop_for_mysql are the only functions where the reference 00796 count can be decremented and this function may only be called from inside 00797 que_run_threads or que_thr_check_if_switch! These restrictions exist to make 00798 the rollback code easier to maintain. */ 00799 static 00800 void 00801 que_thr_dec_refer_count( 00802 /*====================*/ 00803 que_thr_t* thr, /* in: query thread */ 00804 que_thr_t** next_thr) /* in/out: next query thread to run; 00805 if the value which is passed in is 00806 a pointer to a NULL pointer, then the 00807 calling function can start running 00808 a new query thread */ 00809 { 00810 que_fork_t* fork; 00811 trx_t* trx; 00812 sess_t* sess; 00813 ulint fork_type; 00814 ibool stopped; 00815 00816 fork = thr->common.parent; 00817 trx = thr_get_trx(thr); 00818 sess = trx->sess; 00819 00820 mutex_enter(&kernel_mutex); 00821 00822 ut_a(thr->is_active); 00823 00824 if (thr->state == QUE_THR_RUNNING) { 00825 00826 stopped = que_thr_stop(thr); 00827 00828 if (!stopped) { 00829 /* The reason for the thr suspension or wait was 00830 already canceled before we came here: continue 00831 running the thread */ 00832 00833 /* fputs("!!!!!!!! Wait already ended: continue thr\n", 00834 stderr); */ 00835 00836 if (next_thr && *next_thr == NULL) { 00837 /* Normally srv_suspend_mysql_thread resets 00838 the state to DB_SUCCESS before waiting, but 00839 in this case we have to do it here, 00840 otherwise nobody does it. */ 00841 trx->error_state = DB_SUCCESS; 00842 00843 *next_thr = thr; 00844 } else { 00845 ut_a(0); 00846 srv_que_task_enqueue_low(thr); 00847 } 00848 00849 mutex_exit(&kernel_mutex); 00850 00851 return; 00852 } 00853 } 00854 00855 ut_ad(fork->n_active_thrs == 1); 00856 ut_ad(trx->n_active_thrs == 1); 00857 00858 fork->n_active_thrs--; 00859 trx->n_active_thrs--; 00860 00861 thr->is_active = FALSE; 00862 00863 if (trx->n_active_thrs > 0) { 00864 00865 mutex_exit(&kernel_mutex); 00866 00867 return; 00868 } 00869 00870 fork_type = fork->fork_type; 00871 00872 /* Check if all query threads in the same fork are completed */ 00873 00874 if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) { 00875 00876 if (fork_type == QUE_FORK_ROLLBACK) { 00877 /* This is really the undo graph used in rollback, 00878 no roll_node in this graph */ 00879 00880 ut_ad(UT_LIST_GET_LEN(trx->signals) > 0); 00881 ut_ad(trx->handling_signals == TRUE); 00882 00883 trx_finish_rollback_off_kernel(fork, trx, next_thr); 00884 00885 } else if (fork_type == QUE_FORK_PURGE) { 00886 00887 /* Do nothing */ 00888 } else if (fork_type == QUE_FORK_RECOVERY) { 00889 00890 /* Do nothing */ 00891 } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) { 00892 00893 /* Do nothing */ 00894 } else { 00895 ut_error; /* not used in MySQL */ 00896 } 00897 } 00898 00899 if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) { 00900 00901 /* If the trx is signaled and its query thread count drops to 00902 zero, then we start processing a signal; from it we may get 00903 a new query thread to run */ 00904 00905 trx_sig_start_handle(trx, next_thr); 00906 } 00907 00908 if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) { 00909 00910 trx_end_signal_handling(trx); 00911 } 00912 00913 mutex_exit(&kernel_mutex); 00914 } 00915 00916 /************************************************************************** 00917 Stops a query thread if graph or trx is in a state requiring it. The 00918 conditions are tested in the order (1) graph, (2) trx. The kernel mutex has 00919 to be reserved. */ 00920 00921 ibool 00922 que_thr_stop( 00923 /*=========*/ 00924 /* out: TRUE if stopped */ 00925 que_thr_t* thr) /* in: query thread */ 00926 { 00927 trx_t* trx; 00928 que_t* graph; 00929 ibool ret = TRUE; 00930 00931 #ifdef UNIV_SYNC_DEBUG 00932 ut_ad(mutex_own(&kernel_mutex)); 00933 #endif /* UNIV_SYNC_DEBUG */ 00934 00935 graph = thr->graph; 00936 trx = graph->trx; 00937 00938 if (graph->state == QUE_FORK_COMMAND_WAIT) { 00939 thr->state = QUE_THR_SUSPENDED; 00940 00941 } else if (trx->que_state == TRX_QUE_LOCK_WAIT) { 00942 00943 UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr); 00944 thr->state = QUE_THR_LOCK_WAIT; 00945 00946 } else if (trx->error_state != DB_SUCCESS 00947 && trx->error_state != DB_LOCK_WAIT) { 00948 00949 /* Error handling built for the MySQL interface */ 00950 thr->state = QUE_THR_COMPLETED; 00951 00952 } else if (UT_LIST_GET_LEN(trx->signals) > 0 00953 && graph->fork_type != QUE_FORK_ROLLBACK) { 00954 00955 thr->state = QUE_THR_SUSPENDED; 00956 } else { 00957 ut_ad(graph->state == QUE_FORK_ACTIVE); 00958 00959 ret = FALSE; 00960 } 00961 00962 return(ret); 00963 } 00964 00965 /************************************************************************** 00966 A patch for MySQL used to 'stop' a dummy query thread used in MySQL. The 00967 query thread is stopped and made inactive, except in the case where 00968 it was put to the lock wait state in lock0lock.c, but the lock has already 00969 been granted or the transaction chosen as a victim in deadlock resolution. */ 00970 00971 void 00972 que_thr_stop_for_mysql( 00973 /*===================*/ 00974 que_thr_t* thr) /* in: query thread */ 00975 { 00976 trx_t* trx; 00977 00978 trx = thr_get_trx(thr); 00979 00980 mutex_enter(&kernel_mutex); 00981 00982 if (thr->state == QUE_THR_RUNNING) { 00983 00984 if (trx->error_state != DB_SUCCESS 00985 && trx->error_state != DB_LOCK_WAIT) { 00986 00987 /* Error handling built for the MySQL interface */ 00988 thr->state = QUE_THR_COMPLETED; 00989 } else { 00990 /* It must have been a lock wait but the lock was 00991 already released, or this transaction was chosen 00992 as a victim in selective deadlock resolution */ 00993 00994 mutex_exit(&kernel_mutex); 00995 00996 return; 00997 } 00998 } 00999 01000 ut_ad(thr->is_active == TRUE); 01001 ut_ad(trx->n_active_thrs == 1); 01002 ut_ad(thr->graph->n_active_thrs == 1); 01003 01004 thr->is_active = FALSE; 01005 (thr->graph)->n_active_thrs--; 01006 01007 trx->n_active_thrs--; 01008 01009 mutex_exit(&kernel_mutex); 01010 } 01011 01012 /************************************************************************** 01013 Moves a thread from another state to the QUE_THR_RUNNING state. Increments 01014 the n_active_thrs counters of the query graph and transaction if thr was 01015 not active. */ 01016 01017 void 01018 que_thr_move_to_run_state_for_mysql( 01019 /*================================*/ 01020 que_thr_t* thr, /* in: an query thread */ 01021 trx_t* trx) /* in: transaction */ 01022 { 01023 if (thr->magic_n != QUE_THR_MAGIC_N) { 01024 fprintf(stderr, 01025 "que_thr struct appears corrupt; magic n %lu\n", 01026 (unsigned long) thr->magic_n); 01027 01028 mem_analyze_corruption(thr); 01029 01030 ut_error; 01031 } 01032 01033 if (!thr->is_active) { 01034 01035 thr->graph->n_active_thrs++; 01036 01037 trx->n_active_thrs++; 01038 01039 thr->is_active = TRUE; 01040 } 01041 01042 thr->state = QUE_THR_RUNNING; 01043 } 01044 01045 /************************************************************************** 01046 A patch for MySQL used to 'stop' a dummy query thread used in MySQL 01047 select, when there is no error or lock wait. */ 01048 01049 void 01050 que_thr_stop_for_mysql_no_error( 01051 /*============================*/ 01052 que_thr_t* thr, /* in: query thread */ 01053 trx_t* trx) /* in: transaction */ 01054 { 01055 ut_ad(thr->state == QUE_THR_RUNNING); 01056 ut_ad(thr->is_active == TRUE); 01057 ut_ad(trx->n_active_thrs == 1); 01058 ut_ad(thr->graph->n_active_thrs == 1); 01059 01060 if (thr->magic_n != QUE_THR_MAGIC_N) { 01061 fprintf(stderr, 01062 "que_thr struct appears corrupt; magic n %lu\n", 01063 (unsigned long) thr->magic_n); 01064 01065 mem_analyze_corruption(thr); 01066 01067 ut_error; 01068 } 01069 01070 thr->state = QUE_THR_COMPLETED; 01071 01072 thr->is_active = FALSE; 01073 (thr->graph)->n_active_thrs--; 01074 01075 trx->n_active_thrs--; 01076 } 01077 01078 /******************************************************************** 01079 Get the first containing loop node (e.g. while_node_t or for_node_t) for the 01080 given node, or NULL if the node is not within a loop. */ 01081 01082 que_node_t* 01083 que_node_get_containing_loop_node( 01084 /*==============================*/ 01085 /* out: containing loop node, or NULL. */ 01086 que_node_t* node) /* in: node */ 01087 { 01088 ut_ad(node); 01089 01090 for (;;) { 01091 ulint type; 01092 01093 node = que_node_get_parent(node); 01094 01095 if (!node) { 01096 break; 01097 } 01098 01099 type = que_node_get_type(node); 01100 01101 if ((type == QUE_NODE_FOR) || (type == QUE_NODE_WHILE)) { 01102 break; 01103 } 01104 } 01105 01106 return(node); 01107 } 01108 01109 /************************************************************************** 01110 Prints info of an SQL query graph node. */ 01111 01112 void 01113 que_node_print_info( 01114 /*================*/ 01115 que_node_t* node) /* in: query graph node */ 01116 { 01117 ulint type; 01118 const char* str; 01119 01120 type = que_node_get_type(node); 01121 01122 if (type == QUE_NODE_SELECT) { 01123 str = "SELECT"; 01124 } else if (type == QUE_NODE_INSERT) { 01125 str = "INSERT"; 01126 } else if (type == QUE_NODE_UPDATE) { 01127 str = "UPDATE"; 01128 } else if (type == QUE_NODE_WHILE) { 01129 str = "WHILE"; 01130 } else if (type == QUE_NODE_ASSIGNMENT) { 01131 str = "ASSIGNMENT"; 01132 } else if (type == QUE_NODE_IF) { 01133 str = "IF"; 01134 } else if (type == QUE_NODE_FETCH) { 01135 str = "FETCH"; 01136 } else if (type == QUE_NODE_OPEN) { 01137 str = "OPEN"; 01138 } else if (type == QUE_NODE_PROC) { 01139 str = "STORED PROCEDURE"; 01140 } else if (type == QUE_NODE_FUNC) { 01141 str = "FUNCTION"; 01142 } else if (type == QUE_NODE_LOCK) { 01143 str = "LOCK"; 01144 } else if (type == QUE_NODE_THR) { 01145 str = "QUERY THREAD"; 01146 } else if (type == QUE_NODE_COMMIT) { 01147 str = "COMMIT"; 01148 } else if (type == QUE_NODE_UNDO) { 01149 str = "UNDO ROW"; 01150 } else if (type == QUE_NODE_PURGE) { 01151 str = "PURGE ROW"; 01152 } else if (type == QUE_NODE_ROLLBACK) { 01153 str = "ROLLBACK"; 01154 } else if (type == QUE_NODE_CREATE_TABLE) { 01155 str = "CREATE TABLE"; 01156 } else if (type == QUE_NODE_CREATE_INDEX) { 01157 str = "CREATE INDEX"; 01158 } else if (type == QUE_NODE_FOR) { 01159 str = "FOR LOOP"; 01160 } else if (type == QUE_NODE_RETURN) { 01161 str = "RETURN"; 01162 } else if (type == QUE_NODE_EXIT) { 01163 str = "EXIT"; 01164 } else { 01165 str = "UNKNOWN NODE TYPE"; 01166 } 01167 01168 fprintf(stderr, "Node type %lu: %s, address %p\n", (ulong) type, str, node); 01169 } 01170 01171 /************************************************************************** 01172 Performs an execution step on a query thread. */ 01173 UNIV_INLINE 01174 que_thr_t* 01175 que_thr_step( 01176 /*=========*/ 01177 /* out: query thread to run next: it may 01178 differ from the input parameter if, e.g., a 01179 subprocedure call is made */ 01180 que_thr_t* thr) /* in: query thread */ 01181 { 01182 que_node_t* node; 01183 que_thr_t* old_thr; 01184 trx_t* trx; 01185 ulint type; 01186 01187 trx = thr_get_trx(thr); 01188 01189 ut_ad(thr->state == QUE_THR_RUNNING); 01190 ut_a(trx->error_state == DB_SUCCESS); 01191 01192 thr->resource++; 01193 01194 node = thr->run_node; 01195 type = que_node_get_type(node); 01196 01197 old_thr = thr; 01198 01199 #ifdef UNIV_DEBUG 01200 if (que_trace_on) { 01201 fputs("To execute: ", stderr); 01202 que_node_print_info(node); 01203 } 01204 #endif 01205 if (type & QUE_NODE_CONTROL_STAT) { 01206 if ((thr->prev_node != que_node_get_parent(node)) 01207 && que_node_get_next(thr->prev_node)) { 01208 01209 /* The control statements, like WHILE, always pass the 01210 control to the next child statement if there is any 01211 child left */ 01212 01213 thr->run_node = que_node_get_next(thr->prev_node); 01214 01215 } else if (type == QUE_NODE_IF) { 01216 if_step(thr); 01217 } else if (type == QUE_NODE_FOR) { 01218 for_step(thr); 01219 } else if (type == QUE_NODE_PROC) { 01220 01221 /* We can access trx->undo_no without reserving 01222 trx->undo_mutex, because there cannot be active query 01223 threads doing updating or inserting at the moment! */ 01224 01225 if (thr->prev_node == que_node_get_parent(node)) { 01226 trx->last_sql_stat_start.least_undo_no 01227 = trx->undo_no; 01228 } 01229 01230 proc_step(thr); 01231 } else if (type == QUE_NODE_WHILE) { 01232 while_step(thr); 01233 } else { 01234 ut_error; 01235 } 01236 } else if (type == QUE_NODE_ASSIGNMENT) { 01237 assign_step(thr); 01238 } else if (type == QUE_NODE_SELECT) { 01239 thr = row_sel_step(thr); 01240 } else if (type == QUE_NODE_INSERT) { 01241 thr = row_ins_step(thr); 01242 } else if (type == QUE_NODE_UPDATE) { 01243 thr = row_upd_step(thr); 01244 } else if (type == QUE_NODE_FETCH) { 01245 thr = fetch_step(thr); 01246 } else if (type == QUE_NODE_OPEN) { 01247 thr = open_step(thr); 01248 } else if (type == QUE_NODE_FUNC) { 01249 proc_eval_step(thr); 01250 01251 } else if (type == QUE_NODE_LOCK) { 01252 01253 ut_error; 01254 /* 01255 thr = que_lock_step(thr); 01256 */ 01257 } else if (type == QUE_NODE_THR) { 01258 thr = que_thr_node_step(thr); 01259 } else if (type == QUE_NODE_COMMIT) { 01260 thr = trx_commit_step(thr); 01261 } else if (type == QUE_NODE_UNDO) { 01262 thr = row_undo_step(thr); 01263 } else if (type == QUE_NODE_PURGE) { 01264 thr = row_purge_step(thr); 01265 } else if (type == QUE_NODE_RETURN) { 01266 thr = return_step(thr); 01267 } else if (type == QUE_NODE_EXIT) { 01268 thr = exit_step(thr); 01269 } else if (type == QUE_NODE_ROLLBACK) { 01270 thr = trx_rollback_step(thr); 01271 } else if (type == QUE_NODE_CREATE_TABLE) { 01272 thr = dict_create_table_step(thr); 01273 } else if (type == QUE_NODE_CREATE_INDEX) { 01274 thr = dict_create_index_step(thr); 01275 } else if (type == QUE_NODE_ROW_PRINTF) { 01276 thr = row_printf_step(thr); 01277 } else { 01278 ut_error; 01279 } 01280 01281 if (type == QUE_NODE_EXIT) { 01282 old_thr->prev_node = que_node_get_containing_loop_node(node); 01283 } else { 01284 old_thr->prev_node = node; 01285 } 01286 01287 if (thr) { 01288 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS); 01289 } 01290 01291 return(thr); 01292 } 01293 01294 /************************************************************************** 01295 Run a query thread until it finishes or encounters e.g. a lock wait. */ 01296 static 01297 void 01298 que_run_threads_low( 01299 /*================*/ 01300 que_thr_t* thr) /* in: query thread */ 01301 { 01302 que_thr_t* next_thr; 01303 ulint cumul_resource; 01304 ulint loop_count; 01305 01306 ut_ad(thr->state == QUE_THR_RUNNING); 01307 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS); 01308 01309 #ifdef UNIV_SYNC_DEBUG 01310 ut_ad(!mutex_own(&kernel_mutex)); 01311 #endif /* UNIV_SYNC_DEBUG */ 01312 01313 /* cumul_resource counts how much resources the OS thread (NOT the 01314 query thread) has spent in this function */ 01315 01316 loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK; 01317 cumul_resource = 0; 01318 loop: 01319 /* Check that there is enough space in the log to accommodate 01320 possible log entries by this query step; if the operation can touch 01321 more than about 4 pages, checks must be made also within the query 01322 step! */ 01323 01324 log_free_check(); 01325 01326 /* Perform the actual query step: note that the query thread 01327 may change if, e.g., a subprocedure call is made */ 01328 01329 /*-------------------------*/ 01330 next_thr = que_thr_step(thr); 01331 /*-------------------------*/ 01332 01333 ut_a(!next_thr || (thr_get_trx(next_thr)->error_state == DB_SUCCESS)); 01334 01335 loop_count++; 01336 01337 if (next_thr != thr) { 01338 ut_a(next_thr == NULL); 01339 01340 /* This can change next_thr to a non-NULL value if there was 01341 a lock wait that already completed. */ 01342 que_thr_dec_refer_count(thr, &next_thr); 01343 01344 if (next_thr == NULL) { 01345 01346 return; 01347 } 01348 01349 loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK; 01350 01351 thr = next_thr; 01352 } 01353 01354 goto loop; 01355 } 01356 01357 /************************************************************************** 01358 Run a query thread. Handles lock waits. */ 01359 void 01360 que_run_threads( 01361 /*============*/ 01362 que_thr_t* thr) /* in: query thread */ 01363 { 01364 loop: 01365 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS); 01366 que_run_threads_low(thr); 01367 01368 mutex_enter(&kernel_mutex); 01369 01370 switch (thr->state) { 01371 01372 case QUE_THR_RUNNING: 01373 /* There probably was a lock wait, but it already ended 01374 before we came here: continue running thr */ 01375 01376 mutex_exit(&kernel_mutex); 01377 01378 goto loop; 01379 01380 case QUE_THR_LOCK_WAIT: 01381 mutex_exit(&kernel_mutex); 01382 01383 /* The ..._mysql_... function works also for InnoDB's 01384 internal threads. Let us wait that the lock wait ends. */ 01385 01386 srv_suspend_mysql_thread(thr); 01387 01388 if (thr_get_trx(thr)->error_state != DB_SUCCESS) { 01389 /* thr was chosen as a deadlock victim or there was 01390 a lock wait timeout */ 01391 01392 que_thr_dec_refer_count(thr, NULL); 01393 01394 return; 01395 } 01396 01397 goto loop; 01398 01399 case QUE_THR_COMPLETED: 01400 case QUE_THR_COMMAND_WAIT: 01401 /* Do nothing */ 01402 break; 01403 01404 default: 01405 ut_error; 01406 } 01407 01408 mutex_exit(&kernel_mutex); 01409 } 01410 01411 /************************************************************************* 01412 Evaluate the given SQL. */ 01413 01414 ulint 01415 que_eval_sql( 01416 /*=========*/ 01417 /* out: error code or DB_SUCCESS */ 01418 pars_info_t* info, /* in: info struct, or NULL */ 01419 const char* sql, /* in: SQL string */ 01420 ibool reserve_dict_mutex, 01421 /* in: if TRUE, acquire/release 01422 dict_sys->mutex around call to pars_sql. */ 01423 trx_t* trx) /* in: trx */ 01424 { 01425 que_thr_t* thr; 01426 que_t* graph; 01427 01428 ut_a(trx->error_state == DB_SUCCESS); 01429 01430 if (reserve_dict_mutex) { 01431 mutex_enter(&dict_sys->mutex); 01432 } 01433 01434 graph = pars_sql(info, sql); 01435 01436 if (reserve_dict_mutex) { 01437 mutex_exit(&dict_sys->mutex); 01438 } 01439 01440 ut_a(graph); 01441 01442 graph->trx = trx; 01443 trx->graph = NULL; 01444 01445 graph->fork_type = QUE_FORK_MYSQL_INTERFACE; 01446 01447 ut_a(thr = que_fork_start_command(graph)); 01448 01449 que_run_threads(thr); 01450 01451 que_graph_free(graph); 01452 01453 return(trx->error_state); 01454 }
1.4.7

