00001 /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 00018 /* Insert of records */ 00019 00020 /* 00021 INSERT DELAYED 00022 00023 Insert delayed is distinguished from a normal insert by lock_type == 00024 TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a 00025 "delayed" table (delayed_get_table()), but falls back to 00026 open_and_lock_tables() on error and proceeds as normal insert then. 00027 00028 Opening a "delayed" table means to find a delayed insert thread that 00029 has the table open already. If this fails, a new thread is created and 00030 waited for to open and lock the table. 00031 00032 If accessing the thread succeeded, in 00033 delayed_insert::get_local_table() the table of the thread is copied 00034 for local use. A copy is required because the normal insert logic 00035 works on a target table, but the other threads table object must not 00036 be used. The insert logic uses the record buffer to create a record. 00037 And the delayed insert thread uses the record buffer to pass the 00038 record to the table handler. So there must be different objects. Also 00039 the copied table is not included in the lock, so that the statement 00040 can proceed even if the real table cannot be accessed at this moment. 00041 00042 Copying a table object is not a trivial operation. Besides the TABLE 00043 object there are the field pointer array, the field objects and the 00044 record buffer. After copying the field objects, their pointers into 00045 the record must be "moved" to point to the new record buffer. 00046 00047 After this setup the normal insert logic is used. Only that for 00048 delayed inserts write_delayed() is called instead of write_record(). 00049 It inserts the rows into a queue and signals the delayed insert thread 00050 instead of writing directly to the table. 00051 00052 The delayed insert thread awakes from the signal. It locks the table, 00053 inserts the rows from the queue, unlocks the table, and waits for the 00054 next signal. It does normally live until a FLUSH TABLES or SHUTDOWN. 00055 00056 */ 00057 00058 #include "mysql_priv.h" 00059 #include "sp_head.h" 00060 #include "sql_trigger.h" 00061 #include "sql_select.h" 00062 #include "sql_show.h" 00063 00064 static int check_null_fields(THD *thd,TABLE *entry); 00065 #ifndef EMBEDDED_LIBRARY 00066 static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); 00067 static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup, 00068 LEX_STRING query, bool ignore, bool log_on); 00069 static void end_delayed_insert(THD *thd); 00070 pthread_handler_t handle_delayed_insert(void *arg); 00071 static void unlink_blobs(register TABLE *table); 00072 #endif 00073 static bool check_view_insertability(THD *thd, TABLE_LIST *view); 00074 00075 /* Define to force use of my_malloc() if the allocated memory block is big */ 00076 00077 #ifndef HAVE_ALLOCA 00078 #define my_safe_alloca(size, min_length) my_alloca(size) 00079 #define my_safe_afree(ptr, size, min_length) my_afree(ptr) 00080 #else 00081 #define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0))) 00082 #define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0)) 00083 #endif 00084 00085 00086 /* 00087 Check if insert fields are correct. 00088 00089 SYNOPSIS 00090 check_insert_fields() 00091 thd The current thread. 00092 table The table for insert. 00093 fields The insert fields. 00094 values The insert values. 00095 check_unique If duplicate values should be rejected. 00096 00097 NOTE 00098 Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type 00099 or leaves it as is, depending on if timestamp should be updated or 00100 not. 00101 00102 RETURN 00103 0 OK 00104 -1 Error 00105 */ 00106 00107 static int check_insert_fields(THD *thd, TABLE_LIST *table_list, 00108 List<Item> &fields, List<Item> &values, 00109 bool check_unique) 00110 { 00111 TABLE *table= table_list->table; 00112 00113 if (!table_list->updatable) 00114 { 00115 my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT"); 00116 return -1; 00117 } 00118 00119 if (fields.elements == 0 && values.elements != 0) 00120 { 00121 if (!table) 00122 { 00123 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0), 00124 table_list->view_db.str, table_list->view_name.str); 00125 return -1; 00126 } 00127 if (values.elements != table->s->fields) 00128 { 00129 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L); 00130 return -1; 00131 } 00132 #ifndef NO_EMBEDDED_ACCESS_CHECKS 00133 if (grant_option) 00134 { 00135 Field_iterator_table fields; 00136 fields.set_table(table); 00137 if (check_grant_all_columns(thd, INSERT_ACL, &table->grant, 00138 table->s->db.str, table->s->table_name.str, 00139 &fields)) 00140 return -1; 00141 } 00142 #endif 00143 clear_timestamp_auto_bits(table->timestamp_field_type, 00144 TIMESTAMP_AUTO_SET_ON_INSERT); 00145 /* 00146 No fields are provided so all fields must be provided in the values. 00147 Thus we set all bits in the write set. 00148 */ 00149 bitmap_set_all(table->write_set); 00150 } 00151 else 00152 { // Part field list 00153 SELECT_LEX *select_lex= &thd->lex->select_lex; 00154 Name_resolution_context *context= &select_lex->context; 00155 Name_resolution_context_state ctx_state; 00156 int res; 00157 00158 if (fields.elements != values.elements) 00159 { 00160 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L); 00161 return -1; 00162 } 00163 00164 thd->dup_field= 0; 00165 select_lex->no_wrap_view_item= TRUE; 00166 00167 /* Save the state of the current name resolution context. */ 00168 ctx_state.save_state(context, table_list); 00169 00170 /* 00171 Perform name resolution only in the first table - 'table_list', 00172 which is the table that is inserted into. 00173 */ 00174 table_list->next_local= 0; 00175 context->resolve_in_table_list_only(table_list); 00176 res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0); 00177 00178 /* Restore the current context. */ 00179 ctx_state.restore_state(context, table_list); 00180 thd->lex->select_lex.no_wrap_view_item= FALSE; 00181 00182 if (res) 00183 return -1; 00184 00185 if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE) 00186 { 00187 /* it is join view => we need to find table for update */ 00188 List_iterator_fast<Item> it(fields); 00189 Item *item; 00190 TABLE_LIST *tbl= 0; // reset for call to check_single_table() 00191 table_map map= 0; 00192 00193 while ((item= it++)) 00194 map|= item->used_tables(); 00195 if (table_list->check_single_table(&tbl, map, table_list) || tbl == 0) 00196 { 00197 my_error(ER_VIEW_MULTIUPDATE, MYF(0), 00198 table_list->view_db.str, table_list->view_name.str); 00199 return -1; 00200 } 00201 table_list->table= table= tbl->table; 00202 } 00203 00204 if (check_unique && thd->dup_field) 00205 { 00206 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name); 00207 return -1; 00208 } 00209 if (table->timestamp_field) // Don't automaticly set timestamp if used 00210 { 00211 if (bitmap_is_set(table->write_set, 00212 table->timestamp_field->field_index)) 00213 clear_timestamp_auto_bits(table->timestamp_field_type, 00214 TIMESTAMP_AUTO_SET_ON_INSERT); 00215 else 00216 { 00217 bitmap_set_bit(table->write_set, 00218 table->timestamp_field->field_index); 00219 } 00220 } 00221 } 00222 // For the values we need select_priv 00223 #ifndef NO_EMBEDDED_ACCESS_CHECKS 00224 table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege); 00225 #endif 00226 00227 if (check_key_in_view(thd, table_list) || 00228 (table_list->view && 00229 check_view_insertability(thd, table_list))) 00230 { 00231 my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT"); 00232 return -1; 00233 } 00234 00235 return 0; 00236 } 00237 00238 00239 /* 00240 Check update fields for the timestamp field. 00241 00242 SYNOPSIS 00243 check_update_fields() 00244 thd The current thread. 00245 insert_table_list The insert table list. 00246 table The table for update. 00247 update_fields The update fields. 00248 00249 NOTE 00250 If the update fields include the timestamp field, 00251 remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type. 00252 00253 RETURN 00254 0 OK 00255 -1 Error 00256 */ 00257 00258 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list, 00259 List<Item> &update_fields) 00260 { 00261 TABLE *table= insert_table_list->table; 00262 my_bool timestamp_mark; 00263 00264 if (table->timestamp_field) 00265 { 00266 /* 00267 Unmark the timestamp field so that we can check if this is modified 00268 by update_fields 00269 */ 00270 timestamp_mark= bitmap_test_and_clear(table->write_set, 00271 table->timestamp_field->field_index); 00272 } 00273 00274 /* Check the fields we are going to modify */ 00275 if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0)) 00276 return -1; 00277 00278 if (table->timestamp_field) 00279 { 00280 /* Don't set timestamp column if this is modified. */ 00281 if (bitmap_is_set(table->write_set, 00282 table->timestamp_field->field_index)) 00283 clear_timestamp_auto_bits(table->timestamp_field_type, 00284 TIMESTAMP_AUTO_SET_ON_UPDATE); 00285 if (timestamp_mark) 00286 bitmap_set_bit(table->write_set, 00287 table->timestamp_field->field_index); 00288 } 00289 return 0; 00290 } 00291 00292 00293 bool mysql_insert(THD *thd,TABLE_LIST *table_list, 00294 List<Item> &fields, 00295 List<List_item> &values_list, 00296 List<Item> &update_fields, 00297 List<Item> &update_values, 00298 enum_duplicates duplic, 00299 bool ignore) 00300 { 00301 int error, res; 00302 /* 00303 log_on is about delayed inserts only. 00304 By default, both logs are enabled (this won't cause problems if the server 00305 runs without --log-update or --log-bin). 00306 */ 00307 bool log_on= ((thd->options & OPTION_BIN_LOG) || 00308 (!(thd->security_ctx->master_access & SUPER_ACL))); 00309 bool transactional_table, joins_freed= FALSE; 00310 bool changed; 00311 uint value_count; 00312 ulong counter = 1; 00313 ulonglong id; 00314 COPY_INFO info; 00315 TABLE *table= 0; 00316 List_iterator_fast<List_item> its(values_list); 00317 List_item *values; 00318 Name_resolution_context *context; 00319 Name_resolution_context_state ctx_state; 00320 #ifndef EMBEDDED_LIBRARY 00321 char *query= thd->query; 00322 #endif 00323 thr_lock_type lock_type = table_list->lock_type; 00324 Item *unused_conds= 0; 00325 DBUG_ENTER("mysql_insert"); 00326 00327 /* 00328 in safe mode or with skip-new change delayed insert to be regular 00329 if we are told to replace duplicates, the insert cannot be concurrent 00330 delayed insert changed to regular in slave thread 00331 */ 00332 #ifdef EMBEDDED_LIBRARY 00333 if (lock_type == TL_WRITE_DELAYED) 00334 lock_type=TL_WRITE; 00335 #else 00336 if ((lock_type == TL_WRITE_DELAYED && 00337 ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) || 00338 thd->slave_thread || !thd->variables.max_insert_delayed_threads)) || 00339 (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) || 00340 (duplic == DUP_UPDATE)) 00341 lock_type=TL_WRITE; 00342 #endif 00343 table_list->lock_type= lock_type; 00344 00345 #ifndef EMBEDDED_LIBRARY 00346 if (lock_type == TL_WRITE_DELAYED) 00347 { 00348 if (thd->locked_tables) 00349 { 00350 DBUG_ASSERT(table_list->db); /* Must be set in the parser */ 00351 if (find_locked_table(thd, table_list->db, table_list->table_name)) 00352 { 00353 my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), 00354 table_list->table_name); 00355 DBUG_RETURN(TRUE); 00356 } 00357 } 00358 if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error) 00359 { 00360 /* 00361 Open tables used for sub-selects or in stored functions, will also 00362 cache these functions. 00363 */ 00364 res= open_and_lock_tables(thd, table_list->next_global); 00365 /* 00366 First is not processed by open_and_lock_tables() => we need set 00367 updateability flags "by hands". 00368 */ 00369 if (!table_list->derived && !table_list->view) 00370 table_list->updatable= 1; // usual table 00371 } 00372 else 00373 { 00374 /* Too many delayed insert threads; Use a normal insert */ 00375 table_list->lock_type= lock_type= TL_WRITE; 00376 res= open_and_lock_tables(thd, table_list); 00377 } 00378 } 00379 else 00380 #endif /* EMBEDDED_LIBRARY */ 00381 res= open_and_lock_tables(thd, table_list); 00382 if (res || thd->is_fatal_error) 00383 DBUG_RETURN(TRUE); 00384 00385 thd->proc_info="init"; 00386 thd->used_tables=0; 00387 values= its++; 00388 00389 if (mysql_prepare_insert(thd, table_list, table, fields, values, 00390 update_fields, update_values, duplic, &unused_conds, 00391 FALSE)) 00392 goto abort; 00393 00394 /* mysql_prepare_insert set table_list->table if it was not set */ 00395 table= table_list->table; 00396 00397 context= &thd->lex->select_lex.context; 00398 /* Save the state of the current name resolution context. */ 00399 ctx_state.save_state(context, table_list); 00400 00401 /* 00402 Perform name resolution only in the first table - 'table_list', 00403 which is the table that is inserted into. 00404 */ 00405 table_list->next_local= 0; 00406 context->resolve_in_table_list_only(table_list); 00407 00408 value_count= values->elements; 00409 while ((values= its++)) 00410 { 00411 counter++; 00412 if (values->elements != value_count) 00413 { 00414 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter); 00415 goto abort; 00416 } 00417 if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) 00418 goto abort; 00419 } 00420 its.rewind (); 00421 00422 /* Restore the current context. */ 00423 ctx_state.restore_state(context, table_list); 00424 00425 /* 00426 Fill in the given fields and dump it to the table file 00427 */ 00428 info.records= info.deleted= info.copied= info.updated= 0; 00429 info.ignore= ignore; 00430 info.handle_duplicates=duplic; 00431 info.update_fields= &update_fields; 00432 info.update_values= &update_values; 00433 info.view= (table_list->view ? table_list : 0); 00434 00435 /* 00436 Count warnings for all inserts. 00437 For single line insert, generate an error if try to set a NOT NULL field 00438 to NULL. 00439 */ 00440 thd->count_cuted_fields= ((values_list.elements == 1 && 00441 !ignore) ? 00442 CHECK_FIELD_ERROR_FOR_NULL : 00443 CHECK_FIELD_WARN); 00444 thd->cuted_fields = 0L; 00445 table->next_number_field=table->found_next_number_field; 00446 00447 error=0; 00448 thd->proc_info="update"; 00449 if (duplic != DUP_ERROR || ignore) 00450 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); 00451 if (duplic == DUP_REPLACE && 00452 (!table->triggers || !table->triggers->has_delete_triggers())) 00453 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); 00454 /* 00455 let's *try* to start bulk inserts. It won't necessary 00456 start them as values_list.elements should be greater than 00457 some - handler dependent - threshold. 00458 We should not start bulk inserts if this statement uses 00459 functions or invokes triggers since they may access 00460 to the same table and therefore should not see its 00461 inconsistent state created by this optimization. 00462 So we call start_bulk_insert to perform nesessary checks on 00463 values_list.elements, and - if nothing else - to initialize 00464 the code to make the call of end_bulk_insert() below safe. 00465 */ 00466 if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode) 00467 table->file->ha_start_bulk_insert(values_list.elements); 00468 00469 thd->no_trans_update= 0; 00470 thd->abort_on_warning= (!ignore && 00471 (thd->variables.sql_mode & 00472 (MODE_STRICT_TRANS_TABLES | 00473 MODE_STRICT_ALL_TABLES))); 00474 00475 if ((fields.elements || !value_count) && 00476 check_that_all_fields_are_given_values(thd, table, table_list)) 00477 { 00478 /* thd->net.report_error is now set, which will abort the next loop */ 00479 error= 1; 00480 } 00481 00482 table->mark_columns_needed_for_insert(); 00483 00484 if (table_list->prepare_where(thd, 0, TRUE) || 00485 table_list->prepare_check_option(thd)) 00486 error= 1; 00487 00488 while ((values= its++)) 00489 { 00490 if (fields.elements || !value_count) 00491 { 00492 restore_record(table,s->default_values); // Get empty record 00493 if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0, 00494 table->triggers, 00495 TRG_EVENT_INSERT)) 00496 { 00497 if (values_list.elements != 1 && !thd->net.report_error) 00498 { 00499 info.records++; 00500 continue; 00501 } 00502 /* 00503 TODO: set thd->abort_on_warning if values_list.elements == 1 00504 and check that all items return warning in case of problem with 00505 storing field. 00506 */ 00507 error=1; 00508 break; 00509 } 00510 } 00511 else 00512 { 00513 if (thd->used_tables) // Column used in values() 00514 restore_record(table,s->default_values); // Get empty record 00515 else 00516 { 00517 /* 00518 Fix delete marker. No need to restore rest of record since it will 00519 be overwritten by fill_record() anyway (and fill_record() does not 00520 use default values in this case). 00521 */ 00522 table->record[0][0]= table->s->default_values[0]; 00523 } 00524 if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0, 00525 table->triggers, 00526 TRG_EVENT_INSERT)) 00527 { 00528 if (values_list.elements != 1 && ! thd->net.report_error) 00529 { 00530 info.records++; 00531 continue; 00532 } 00533 error=1; 00534 break; 00535 } 00536 } 00537 00538 if ((res= table_list->view_check_option(thd, 00539 (values_list.elements == 1 ? 00540 0 : 00541 ignore))) == 00542 VIEW_CHECK_SKIP) 00543 continue; 00544 else if (res == VIEW_CHECK_ERROR) 00545 { 00546 error= 1; 00547 break; 00548 } 00549 #ifndef EMBEDDED_LIBRARY 00550 if (lock_type == TL_WRITE_DELAYED) 00551 { 00552 LEX_STRING const st_query = { query, thd->query_length }; 00553 error=write_delayed(thd, table, duplic, st_query, ignore, log_on); 00554 query=0; 00555 } 00556 else 00557 #endif 00558 error=write_record(thd, table ,&info); 00559 if (error) 00560 break; 00561 thd->row_count++; 00562 } 00563 00564 free_underlaid_joins(thd, &thd->lex->select_lex); 00565 joins_freed= TRUE; 00566 table->file->ha_release_auto_increment(); 00567 00568 /* 00569 Now all rows are inserted. Time to update logs and sends response to 00570 user 00571 */ 00572 #ifndef EMBEDDED_LIBRARY 00573 if (lock_type == TL_WRITE_DELAYED) 00574 { 00575 if (!error) 00576 { 00577 info.copied=values_list.elements; 00578 end_delayed_insert(thd); 00579 } 00580 query_cache_invalidate3(thd, table_list, 1); 00581 } 00582 else 00583 #endif 00584 { 00585 if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error) 00586 { 00587 table->file->print_error(my_errno,MYF(0)); 00588 error=1; 00589 } 00590 transactional_table= table->file->has_transactions(); 00591 00592 if ((changed= (info.copied || info.deleted || info.updated))) 00593 { 00594 /* 00595 Invalidate the table in the query cache if something changed. 00596 For the transactional algorithm to work the invalidation must be 00597 before binlog writing and ha_autocommit_or_rollback 00598 */ 00599 query_cache_invalidate3(thd, table_list, 1); 00600 if (error <= 0 || !transactional_table) 00601 { 00602 if (mysql_bin_log.is_open()) 00603 { 00604 if (error <= 0) 00605 thd->clear_error(); 00606 if (thd->binlog_query(THD::ROW_QUERY_TYPE, 00607 thd->query, thd->query_length, 00608 transactional_table, FALSE) && 00609 transactional_table) 00610 { 00611 error=1; 00612 } 00613 } 00614 if (!transactional_table) 00615 thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; 00616 } 00617 } 00618 if (transactional_table) 00619 error=ha_autocommit_or_rollback(thd,error); 00620 00621 if (thd->lock) 00622 { 00623 mysql_unlock_tables(thd, thd->lock); 00624 /* 00625 Invalidate the table in the query cache if something changed 00626 after unlocking when changes become fisible. 00627 TODO: this is workaround. right way will be move invalidating in 00628 the unlock procedure. 00629 */ 00630 if (lock_type == TL_WRITE_CONCURRENT_INSERT && changed) 00631 { 00632 query_cache_invalidate3(thd, table_list, 1); 00633 } 00634 thd->lock=0; 00635 } 00636 } 00637 thd->proc_info="end"; 00638 /* 00639 We'll report to the client this id: 00640 - if the table contains an autoincrement column and we successfully 00641 inserted an autogenerated value, the autogenerated value. 00642 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was 00643 called, X. 00644 - if the table contains an autoincrement column, and some rows were 00645 inserted, the id of the last "inserted" row (if IGNORE, that value may not 00646 have been really inserted but ignored). 00647 */ 00648 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ? 00649 thd->first_successful_insert_id_in_cur_stmt : 00650 (thd->arg_of_last_insert_id_function ? 00651 thd->first_successful_insert_id_in_prev_stmt : 00652 ((table->next_number_field && info.copied) ? 00653 table->next_number_field->val_int() : 0)); 00654 table->next_number_field=0; 00655 thd->count_cuted_fields= CHECK_FIELD_IGNORE; 00656 if (duplic != DUP_ERROR || ignore) 00657 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); 00658 if (duplic == DUP_REPLACE && 00659 (!table->triggers || !table->triggers->has_delete_triggers())) 00660 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); 00661 00662 if (error) 00663 goto abort; 00664 if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) || 00665 !thd->cuted_fields)) 00666 { 00667 thd->row_count_func= info.copied+info.deleted+info.updated; 00668 send_ok(thd, (ulong) thd->row_count_func, id); 00669 } 00670 else 00671 { 00672 char buff[160]; 00673 if (ignore) 00674 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, 00675 (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 : 00676 (ulong) (info.records - info.copied), (ulong) thd->cuted_fields); 00677 else 00678 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, 00679 (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields); 00680 thd->row_count_func= info.copied+info.deleted+info.updated; 00681 ::send_ok(thd, (ulong) thd->row_count_func, id, buff); 00682 } 00683 thd->abort_on_warning= 0; 00684 DBUG_RETURN(FALSE); 00685 00686 abort: 00687 #ifndef EMBEDDED_LIBRARY 00688 if (lock_type == TL_WRITE_DELAYED) 00689 end_delayed_insert(thd); 00690 #endif 00691 if (table != NULL) 00692 table->file->ha_release_auto_increment(); 00693 if (!joins_freed) 00694 free_underlaid_joins(thd, &thd->lex->select_lex); 00695 thd->abort_on_warning= 0; 00696 DBUG_RETURN(TRUE); 00697 } 00698 00699 00700 /* 00701 Additional check for insertability for VIEW 00702 00703 SYNOPSIS 00704 check_view_insertability() 00705 thd - thread handler 00706 view - reference on VIEW 00707 00708 IMPLEMENTATION 00709 A view is insertable if the folloings are true: 00710 - All columns in the view are columns from a table 00711 - All not used columns in table have a default values 00712 - All field in view are unique (not referring to the same column) 00713 00714 RETURN 00715 FALSE - OK 00716 view->contain_auto_increment is 1 if and only if the view contains an 00717 auto_increment field 00718 00719 TRUE - can't be used for insert 00720 */ 00721 00722 static bool check_view_insertability(THD * thd, TABLE_LIST *view) 00723 { 00724 uint num= view->view->select_lex.item_list.elements; 00725 TABLE *table= view->table; 00726 Field_translator *trans_start= view->field_translation, 00727 *trans_end= trans_start + num; 00728 Field_translator *trans; 00729 Field **field_ptr= table->field; 00730 uint used_fields_buff_size= bitmap_buffer_size(table->s->fields); 00731 uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size); 00732 MY_BITMAP used_fields; 00733 enum_mark_columns save_mark_used_columns= thd->mark_used_columns; 00734 DBUG_ENTER("check_key_in_view"); 00735 00736 if (!used_fields_buff) 00737 DBUG_RETURN(TRUE); // EOM 00738 00739 DBUG_ASSERT(view->table != 0 && view->field_translation != 0); 00740 00741 VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0)); 00742 bitmap_clear_all(&used_fields); 00743 00744 view->contain_auto_increment= 0; 00745 /* 00746 we must not set query_id for fields as they're not 00747 really used in this context 00748 */ 00749 thd->mark_used_columns= MARK_COLUMNS_NONE; 00750 /* check simplicity and prepare unique test of view */ 00751 for (trans= trans_start; trans != trans_end; trans++) 00752 { 00753 if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item)) 00754 { 00755 thd->mark_used_columns= save_mark_used_columns; 00756 DBUG_RETURN(TRUE); 00757 } 00758 Item_field *field; 00759 /* simple SELECT list entry (field without expression) */ 00760 if (!(field= trans->item->filed_for_view_update())) 00761 { 00762 thd->mark_used_columns= save_mark_used_columns; 00763 DBUG_RETURN(TRUE); 00764 } 00765 if (field->field->unireg_check == Field::NEXT_NUMBER) 00766 view->contain_auto_increment= 1; 00767 /* prepare unique test */ 00768 /* 00769 remove collation (or other transparent for update function) if we have 00770 it 00771 */ 00772 trans->item= field; 00773 } 00774 thd->mark_used_columns= save_mark_used_columns; 00775 /* unique test */ 00776 for (trans= trans_start; trans != trans_end; trans++) 00777 { 00778 /* Thanks to test above, we know that all columns are of type Item_field */ 00779 Item_field *field= (Item_field *)trans->item; 00780 /* check fields belong to table in which we are inserting */ 00781 if (field->field->table == table && 00782 bitmap_fast_test_and_set(&used_fields, field->field->field_index)) 00783 DBUG_RETURN(TRUE); 00784 } 00785 00786 DBUG_RETURN(FALSE); 00787 } 00788 00789 00790 /* 00791 Check if table can be updated 00792 00793 SYNOPSIS 00794 mysql_prepare_insert_check_table() 00795 thd Thread handle 00796 table_list Table list 00797 fields List of fields to be updated 00798 where Pointer to where clause 00799 select_insert Check is making for SELECT ... INSERT 00800 00801 RETURN 00802 FALSE ok 00803 TRUE ERROR 00804 */ 00805 00806 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list, 00807 List<Item> &fields, 00808 bool select_insert) 00809 { 00810 bool insert_into_view= (table_list->view != 0); 00811 DBUG_ENTER("mysql_prepare_insert_check_table"); 00812 00813 /* 00814 first table in list is the one we'll INSERT into, requires INSERT_ACL. 00815 all others require SELECT_ACL only. the ACL requirement below is for 00816 new leaves only anyway (view-constituents), so check for SELECT rather 00817 than INSERT. 00818 */ 00819 00820 if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context, 00821 &thd->lex->select_lex.top_join_list, 00822 table_list, 00823 &thd->lex->select_lex.leaf_tables, 00824 select_insert, SELECT_ACL)) 00825 DBUG_RETURN(TRUE); 00826 00827 if (insert_into_view && !fields.elements) 00828 { 00829 thd->lex->empty_field_list_on_rset= 1; 00830 if (!table_list->table) 00831 { 00832 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0), 00833 table_list->view_db.str, table_list->view_name.str); 00834 DBUG_RETURN(TRUE); 00835 } 00836 DBUG_RETURN(insert_view_fields(thd, &fields, table_list)); 00837 } 00838 00839 DBUG_RETURN(FALSE); 00840 } 00841 00842 00843 /* 00844 Prepare items in INSERT statement 00845 00846 SYNOPSIS 00847 mysql_prepare_insert() 00848 thd Thread handler 00849 table_list Global/local table list 00850 table Table to insert into (can be NULL if table should 00851 be taken from table_list->table) 00852 where Where clause (for insert ... select) 00853 select_insert TRUE if INSERT ... SELECT statement 00854 00855 TODO (in far future) 00856 In cases of: 00857 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a 00858 ON DUPLICATE KEY ... 00859 we should be able to refer to sum1 in the ON DUPLICATE KEY part 00860 00861 WARNING 00862 You MUST set table->insert_values to 0 after calling this function 00863 before releasing the table object. 00864 00865 RETURN VALUE 00866 FALSE OK 00867 TRUE error 00868 */ 00869 00870 bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list, 00871 TABLE *table, List<Item> &fields, List_item *values, 00872 List<Item> &update_fields, List<Item> &update_values, 00873 enum_duplicates duplic, 00874 COND **where, bool select_insert) 00875 { 00876 SELECT_LEX *select_lex= &thd->lex->select_lex; 00877 Name_resolution_context *context= &select_lex->context; 00878 Name_resolution_context_state ctx_state; 00879 bool insert_into_view= (table_list->view != 0); 00880 bool res= 0; 00881 DBUG_ENTER("mysql_prepare_insert"); 00882 DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d", 00883 (ulong)table_list, (ulong)table, 00884 (int)insert_into_view)); 00885 00886 /* 00887 For subqueries in VALUES() we should not see the table in which we are 00888 inserting (for INSERT ... SELECT this is done by changing table_list, 00889 because INSERT ... SELECT share SELECT_LEX it with SELECT. 00890 */ 00891 if (!select_insert) 00892 { 00893 for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit(); 00894 un; 00895 un= un->next_unit()) 00896 { 00897 for (SELECT_LEX *sl= un->first_select(); 00898 sl; 00899 sl= sl->next_select()) 00900 { 00901 sl->context.outer_context= 0; 00902 } 00903 } 00904 } 00905 00906 if (duplic == DUP_UPDATE) 00907 { 00908 /* it should be allocated before Item::fix_fields() */ 00909 if (table_list->set_insert_values(thd->mem_root)) 00910 DBUG_RETURN(TRUE); 00911 } 00912 00913 if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert)) 00914 DBUG_RETURN(TRUE); 00915 00916 /* Save the state of the current name resolution context. */ 00917 ctx_state.save_state(context, table_list); 00918 00919 /* 00920 Perform name resolution only in the first table - 'table_list', 00921 which is the table that is inserted into. 00922 */ 00923 table_list->next_local= 0; 00924 context->resolve_in_table_list_only(table_list); 00925 00926 /* Prepare the fields in the statement. */ 00927 if (values && 00928 !(res= check_insert_fields(thd, context->table_list, fields, *values, 00929 !insert_into_view) || 00930 setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) && 00931 duplic == DUP_UPDATE) 00932 { 00933 select_lex->no_wrap_view_item= TRUE; 00934 res= check_update_fields(thd, context->table_list, update_fields); 00935 select_lex->no_wrap_view_item= FALSE; 00936 /* 00937 When we are not using GROUP BY we can refer to other tables in the 00938 ON DUPLICATE KEY part. 00939 */ 00940 if (select_lex->group_list.elements == 0) 00941 { 00942 context->table_list->next_local= ctx_state.save_next_local; 00943 /* first_name_resolution_table was set by resolve_in_table_list_only() */ 00944 context->first_name_resolution_table-> 00945 next_name_resolution_table= ctx_state.save_next_local; 00946 } 00947 if (!res) 00948 res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0); 00949 } 00950 00951 /* Restore the current context. */ 00952 ctx_state.restore_state(context, table_list); 00953 00954 if (res) 00955 DBUG_RETURN(res); 00956 00957 if (!table) 00958 table= table_list->table; 00959 00960 if (!select_insert) 00961 { 00962 Item *fake_conds= 0; 00963 TABLE_LIST *duplicate; 00964 if ((duplicate= unique_table(thd, table_list, table_list->next_global))) 00965 { 00966 update_non_unique_table_error(table_list, "INSERT", duplicate); 00967 DBUG_RETURN(TRUE); 00968 } 00969 select_lex->fix_prepare_information(thd, &fake_conds); 00970 select_lex->first_execution= 0; 00971 } 00972 if (duplic == DUP_UPDATE || duplic == DUP_REPLACE) 00973 table->prepare_for_position(); 00974 DBUG_RETURN(FALSE); 00975 } 00976 00977 00978 /* Check if there is more uniq keys after field */ 00979 00980 static int last_uniq_key(TABLE *table,uint keynr) 00981 { 00982 while (++keynr < table->s->keys) 00983 if (table->key_info[keynr].flags & HA_NOSAME) 00984 return 0; 00985 return 1; 00986 } 00987 00988 00989 /* 00990 Write a record to table with optional deleting of conflicting records, 00991 invoke proper triggers if needed. 00992 00993 SYNOPSIS 00994 write_record() 00995 thd - thread context 00996 table - table to which record should be written 00997 info - COPY_INFO structure describing handling of duplicates 00998 and which is used for counting number of records inserted 00999 and deleted. 01000 01001 NOTE 01002 Once this record will be written to table after insert trigger will 01003 be invoked. If instead of inserting new record we will update old one 01004 then both on update triggers will work instead. Similarly both on 01005 delete triggers will be invoked if we will delete conflicting records. 01006 01007 Sets thd->no_trans_update if table which is updated didn't have 01008 transactions. 01009 01010 RETURN VALUE 01011 0 - success 01012 non-0 - error 01013 */ 01014 01015 01016 int write_record(THD *thd, TABLE *table,COPY_INFO *info) 01017 { 01018 int error, trg_error= 0; 01019 char *key=0; 01020 MY_BITMAP *save_read_set, *save_write_set; 01021 ulonglong prev_insert_id= table->file->next_insert_id; 01022 ulonglong insert_id_for_cur_row= 0; 01023 DBUG_ENTER("write_record"); 01024 01025 info->records++; 01026 save_read_set= table->read_set; 01027 save_write_set= table->write_set; 01028 01029 if (info->handle_duplicates == DUP_REPLACE || 01030 info->handle_duplicates == DUP_UPDATE) 01031 { 01032 while ((error=table->file->ha_write_row(table->record[0]))) 01033 { 01034 uint key_nr; 01035 /* 01036 If we do more than one iteration of this loop, from the second one the 01037 row will have an explicit value in the autoinc field, which was set at 01038 the first call of handler::update_auto_increment(). So we must save 01039 the autogenerated value to avoid thd->insert_id_for_cur_row to become 01040 0. 01041 */ 01042 if (table->file->insert_id_for_cur_row > 0) 01043 insert_id_for_cur_row= table->file->insert_id_for_cur_row; 01044 else 01045 table->file->insert_id_for_cur_row= insert_id_for_cur_row; 01046 bool is_duplicate_key_error; 01047 if (table->file->is_fatal_error(error, HA_CHECK_DUP)) 01048 goto err; 01049 is_duplicate_key_error= table->file->is_fatal_error(error, 0); 01050 if (!is_duplicate_key_error) 01051 { 01052 /* 01053 We come here when we had an ignorable error which is not a duplicate 01054 key error. In this we ignore error if ignore flag is set, otherwise 01055 report error as usual. We will not do any duplicate key processing. 01056 */ 01057 if (info->ignore) 01058 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */ 01059 goto err; 01060 } 01061 if ((int) (key_nr = table->file->get_dup_key(error)) < 0) 01062 { 01063 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */ 01064 goto err; 01065 } 01066 /* Read all columns for the row we are going to replace */ 01067 table->use_all_columns(); 01068 /* 01069 Don't allow REPLACE to replace a row when a auto_increment column 01070 was used. This ensures that we don't get a problem when the 01071 whole range of the key has been used. 01072 */ 01073 if (info->handle_duplicates == DUP_REPLACE && 01074 table->next_number_field && 01075 key_nr == table->s->next_number_index && 01076 (insert_id_for_cur_row > 0)) 01077 goto err; 01078 if (table->file->ha_table_flags() & HA_DUPLICATE_POS) 01079 { 01080 if (table->file->rnd_pos(table->record[1],table->file->dup_ref)) 01081 goto err; 01082 } 01083 else 01084 { 01085 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */ 01086 { 01087 error=my_errno; 01088 goto err; 01089 } 01090 01091 if (!key) 01092 { 01093 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length, 01094 MAX_KEY_LENGTH))) 01095 { 01096 error=ENOMEM; 01097 goto err; 01098 } 01099 } 01100 key_copy((byte*) key,table->record[0],table->key_info+key_nr,0); 01101 if ((error=(table->file->index_read_idx(table->record[1],key_nr, 01102 (byte*) key, 01103 table->key_info[key_nr]. 01104 key_length, 01105 HA_READ_KEY_EXACT)))) 01106 goto err; 01107 } 01108 if (info->handle_duplicates == DUP_UPDATE) 01109 { 01110 int res= 0; 01111 /* 01112 We don't check for other UNIQUE keys - the first row 01113 that matches, is updated. If update causes a conflict again, 01114 an error is returned 01115 */ 01116 DBUG_ASSERT(table->insert_values != NULL); 01117 store_record(table,insert_values); 01118 restore_record(table,record[1]); 01119 DBUG_ASSERT(info->update_fields->elements == 01120 info->update_values->elements); 01121 if (fill_record_n_invoke_before_triggers(thd, *info->update_fields, 01122 *info->update_values, 0, 01123 table->triggers, 01124 TRG_EVENT_UPDATE)) 01125 goto before_trg_err; 01126 01127 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */ 01128 if (info->view && 01129 (res= info->view->view_check_option(current_thd, info->ignore)) == 01130 VIEW_CHECK_SKIP) 01131 goto ok_or_after_trg_err; 01132 if (res == VIEW_CHECK_ERROR) 01133 goto before_trg_err; 01134 01135 if ((error=table->file->ha_update_row(table->record[1], 01136 table->record[0]))) 01137 { 01138 if (info->ignore && 01139 !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY)) 01140 { 01141 table->file->restore_auto_increment(prev_insert_id); 01142 goto ok_or_after_trg_err; 01143 } 01144 goto err; 01145 } 01146 info->updated++; 01147 /* 01148 If ON DUP KEY UPDATE updates a row instead of inserting one, and 01149 there is an auto_increment column, then SELECT LAST_INSERT_ID() 01150 returns the id of the updated row: 01151 */ 01152 if (table->next_number_field) 01153 { 01154 longlong field_val= table->next_number_field->val_int(); 01155 thd->record_first_successful_insert_id_in_cur_stmt(field_val); 01156 table->file->adjust_next_insert_id_after_explicit_value(field_val); 01157 } 01158 trg_error= (table->triggers && 01159 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE, 01160 TRG_ACTION_AFTER, TRUE)); 01161 info->copied++; 01162 goto ok_or_after_trg_err; 01163 } 01164 else /* DUP_REPLACE */ 01165 { 01166 /* 01167 The manual defines the REPLACE semantics that it is either 01168 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in 01169 InnoDB do not function in the defined way if we allow MySQL 01170 to convert the latter operation internally to an UPDATE. 01171 We also should not perform this conversion if we have 01172 timestamp field with ON UPDATE which is different from DEFAULT. 01173 Another case when conversion should not be performed is when 01174 we have ON DELETE trigger on table so user may notice that 01175 we cheat here. Note that it is ok to do such conversion for 01176 tables which have ON UPDATE but have no ON DELETE triggers, 01177 we just should not expose this fact to users by invoking 01178 ON UPDATE triggers. 01179 */ 01180 if (last_uniq_key(table,key_nr) && 01181 !table->file->referenced_by_foreign_key() && 01182 (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET || 01183 table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) && 01184 (!table->triggers || !table->triggers->has_delete_triggers())) 01185 { 01186 if ((error=table->file->ha_update_row(table->record[1], 01187 table->record[0]))) 01188 goto err; 01189 info->deleted++; 01190 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row); 01191 /* 01192 Since we pretend that we have done insert we should call 01193 its after triggers. 01194 */ 01195 goto after_trg_n_copied_inc; 01196 } 01197 else 01198 { 01199 if (table->triggers && 01200 table->triggers->process_triggers(thd, TRG_EVENT_DELETE, 01201 TRG_ACTION_BEFORE, TRUE)) 01202 goto before_trg_err; 01203 if ((error=table->file->ha_delete_row(table->record[1]))) 01204 goto err; 01205 info->deleted++; 01206 if (!table->file->has_transactions()) 01207 thd->no_trans_update= 1; 01208 if (table->triggers && 01209 table->triggers->process_triggers(thd, TRG_EVENT_DELETE, 01210 TRG_ACTION_AFTER, TRUE)) 01211 { 01212 trg_error= 1; 01213 goto ok_or_after_trg_err; 01214 } 01215 /* Let us attempt do write_row() once more */ 01216 } 01217 } 01218 } 01219 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row); 01220 /* 01221 Restore column maps if they where replaced during an duplicate key 01222 problem. 01223 */ 01224 if (table->read_set != save_read_set || 01225 table->write_set != save_write_set) 01226 table->column_bitmaps_set(save_read_set, save_write_set); 01227 } 01228 else if ((error=table->file->ha_write_row(table->record[0]))) 01229 { 01230 if (!info->ignore || 01231 table->file->is_fatal_error(error, HA_CHECK_DUP)) 01232 goto err; 01233 table->file->restore_auto_increment(prev_insert_id); 01234 goto ok_or_after_trg_err; 01235 } 01236 01237 after_trg_n_copied_inc: 01238 info->copied++; 01239 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row); 01240 trg_error= (table->triggers && 01241 table->triggers->process_triggers(thd, TRG_EVENT_INSERT, 01242 TRG_ACTION_AFTER, TRUE)); 01243 01244 ok_or_after_trg_err: 01245 if (key) 01246 my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH); 01247 if (!table->file->has_transactions()) 01248 thd->no_trans_update= 1; 01249 DBUG_RETURN(trg_error); 01250 01251 err: 01252 info->last_errno= error; 01253 /* current_select is NULL if this is a delayed insert */ 01254 if (thd->lex->current_select) 01255 thd->lex->current_select->no_error= 0; // Give error 01256 table->file->print_error(error,MYF(0)); 01257 01258 before_trg_err: 01259 table->file->restore_auto_increment(prev_insert_id); 01260 if (key) 01261 my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH); 01262 table->column_bitmaps_set(save_read_set, save_write_set); 01263 DBUG_RETURN(1); 01264 } 01265 01266 01267 /****************************************************************************** 01268 Check that all fields with arn't null_fields are used 01269 ******************************************************************************/ 01270 01271 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, 01272 TABLE_LIST *table_list) 01273 { 01274 int err= 0; 01275 MY_BITMAP *write_set= entry->write_set; 01276 01277 for (Field **field=entry->field ; *field ; field++) 01278 { 01279 if (!bitmap_is_set(write_set, (*field)->field_index) && 01280 ((*field)->flags & NO_DEFAULT_VALUE_FLAG) && 01281 ((*field)->real_type() != FIELD_TYPE_ENUM)) 01282 { 01283 bool view= FALSE; 01284 if (table_list) 01285 { 01286 table_list= table_list->top_table(); 01287 view= test(table_list->view); 01288 } 01289 if (view) 01290 { 01291 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 01292 ER_NO_DEFAULT_FOR_VIEW_FIELD, 01293 ER(ER_NO_DEFAULT_FOR_VIEW_FIELD), 01294 table_list->view_db.str, 01295 table_list->view_name.str); 01296 } 01297 else 01298 { 01299 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, 01300 ER_NO_DEFAULT_FOR_FIELD, 01301 ER(ER_NO_DEFAULT_FOR_FIELD), 01302 (*field)->field_name); 01303 } 01304 err= 1; 01305 } 01306 } 01307 return thd->abort_on_warning ? err : 0; 01308 } 01309 01310 /***************************************************************************** 01311 Handling of delayed inserts 01312 A thread is created for each table that one uses with the DELAYED attribute. 01313 *****************************************************************************/ 01314 01315 #ifndef EMBEDDED_LIBRARY 01316 01317 class delayed_row :public ilink { 01318 public: 01319 char *record; 01320 enum_duplicates dup; 01321 time_t start_time; 01322 bool query_start_used, ignore, log_query; 01323 bool stmt_depends_on_first_successful_insert_id_in_prev_stmt; 01324 ulonglong first_successful_insert_id_in_prev_stmt; 01325 timestamp_auto_set_type timestamp_field_type; 01326 LEX_STRING query; 01327 01328 delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg, 01329 bool ignore_arg, bool log_query_arg) 01330 : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg), 01331 query(query_arg) 01332 {} 01333 ~delayed_row() 01334 { 01335 x_free(query.str); 01336 x_free(record); 01337 } 01338 }; 01339 01340 01341 class delayed_insert :public ilink { 01342 uint locks_in_memory; 01343 public: 01344 THD thd; 01345 TABLE *table; 01346 pthread_mutex_t mutex; 01347 pthread_cond_t cond,cond_client; 01348 volatile uint tables_in_use,stacked_inserts; 01349 volatile bool status,dead; 01350 COPY_INFO info; 01351 I_List<delayed_row> rows; 01352 ulong group_count; 01353 TABLE_LIST table_list; // Argument 01354 01355 delayed_insert() 01356 :locks_in_memory(0), 01357 table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), 01358 group_count(0) 01359 { 01360 thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user; 01361 thd.security_ctx->host=(char*) my_localhost; 01362 thd.current_tablenr=0; 01363 thd.version=refresh_version; 01364 thd.command=COM_DELAYED_INSERT; 01365 thd.lex->current_select= 0; // for my_message_sql 01366 thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock() 01367 /* 01368 Statement-based replication of INSERT DELAYED has problems with RAND() 01369 and user vars, so in mixed mode we go to row-based. 01370 */ 01371 thd.set_current_stmt_binlog_row_based_if_mixed(); 01372 01373 bzero((char*) &thd.net, sizeof(thd.net)); // Safety 01374 bzero((char*) &table_list, sizeof(table_list)); // Safety 01375 thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT; 01376 thd.security_ctx->host_or_ip= ""; 01377 bzero((char*) &info,sizeof(info)); 01378 pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST); 01379 pthread_cond_init(&cond,NULL); 01380 pthread_cond_init(&cond_client,NULL); 01381 VOID(pthread_mutex_lock(&LOCK_thread_count)); 01382 delayed_insert_threads++; 01383 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01384 } 01385 ~delayed_insert() 01386 { 01387 /* The following is not really needed, but just for safety */ 01388 delayed_row *row; 01389 while ((row=rows.get())) 01390 delete row; 01391 if (table) 01392 close_thread_tables(&thd); 01393 VOID(pthread_mutex_lock(&LOCK_thread_count)); 01394 pthread_mutex_destroy(&mutex); 01395 pthread_cond_destroy(&cond); 01396 pthread_cond_destroy(&cond_client); 01397 thd.unlink(); // Must be unlinked under lock 01398 x_free(thd.query); 01399 thd.security_ctx->user= thd.security_ctx->host=0; 01400 thread_count--; 01401 delayed_insert_threads--; 01402 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01403 VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */ 01404 } 01405 01406 /* The following is for checking when we can delete ourselves */ 01407 inline void lock() 01408 { 01409 locks_in_memory++; // Assume LOCK_delay_insert 01410 } 01411 void unlock() 01412 { 01413 pthread_mutex_lock(&LOCK_delayed_insert); 01414 if (!--locks_in_memory) 01415 { 01416 pthread_mutex_lock(&mutex); 01417 if (thd.killed && ! stacked_inserts && ! tables_in_use) 01418 { 01419 pthread_cond_signal(&cond); 01420 status=1; 01421 } 01422 pthread_mutex_unlock(&mutex); 01423 } 01424 pthread_mutex_unlock(&LOCK_delayed_insert); 01425 } 01426 inline uint lock_count() { return locks_in_memory; } 01427 01428 TABLE* get_local_table(THD* client_thd); 01429 bool handle_inserts(void); 01430 }; 01431 01432 01433 I_List<delayed_insert> delayed_threads; 01434 01435 01436 delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list) 01437 { 01438 thd->proc_info="waiting for delay_list"; 01439 pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list 01440 I_List_iterator<delayed_insert> it(delayed_threads); 01441 delayed_insert *tmp; 01442 while ((tmp=it++)) 01443 { 01444 if (!strcmp(tmp->thd.db, table_list->db) && 01445 !strcmp(table_list->table_name, tmp->table->s->table_name.str)) 01446 { 01447 tmp->lock(); 01448 break; 01449 } 01450 } 01451 pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list 01452 return tmp; 01453 } 01454 01455 01456 static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list) 01457 { 01458 int error; 01459 delayed_insert *tmp; 01460 TABLE *table; 01461 DBUG_ENTER("delayed_get_table"); 01462 01463 /* Must be set in the parser */ 01464 DBUG_ASSERT(table_list->db); 01465 01466 /* Find the thread which handles this table. */ 01467 if (!(tmp=find_handler(thd,table_list))) 01468 { 01469 /* 01470 No match. Create a new thread to handle the table, but 01471 no more than max_insert_delayed_threads. 01472 */ 01473 if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads) 01474 DBUG_RETURN(0); 01475 thd->proc_info="Creating delayed handler"; 01476 pthread_mutex_lock(&LOCK_delayed_create); 01477 /* 01478 The first search above was done without LOCK_delayed_create. 01479 Another thread might have created the handler in between. Search again. 01480 */ 01481 if (! (tmp= find_handler(thd, table_list))) 01482 { 01483 if (!(tmp=new delayed_insert())) 01484 { 01485 my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert)); 01486 goto err1; 01487 } 01488 pthread_mutex_lock(&LOCK_thread_count); 01489 thread_count++; 01490 pthread_mutex_unlock(&LOCK_thread_count); 01491 tmp->thd.set_db(table_list->db, strlen(table_list->db)); 01492 tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME)); 01493 if (tmp->thd.db == NULL || tmp->thd.query == NULL) 01494 { 01495 delete tmp; 01496 my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0)); 01497 goto err1; 01498 } 01499 tmp->table_list= *table_list; // Needed to open table 01500 tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query; 01501 tmp->lock(); 01502 pthread_mutex_lock(&tmp->mutex); 01503 if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib, 01504 handle_delayed_insert,(void*) tmp))) 01505 { 01506 DBUG_PRINT("error", 01507 ("Can't create thread to handle delayed insert (error %d)", 01508 error)); 01509 pthread_mutex_unlock(&tmp->mutex); 01510 tmp->unlock(); 01511 delete tmp; 01512 my_error(ER_CANT_CREATE_THREAD, MYF(0), error); 01513 goto err1; 01514 } 01515 01516 /* Wait until table is open */ 01517 thd->proc_info="waiting for handler open"; 01518 while (!tmp->thd.killed && !tmp->table && !thd->killed) 01519 { 01520 pthread_cond_wait(&tmp->cond_client,&tmp->mutex); 01521 } 01522 pthread_mutex_unlock(&tmp->mutex); 01523 thd->proc_info="got old table"; 01524 if (tmp->thd.killed) 01525 { 01526 if (tmp->thd.is_fatal_error) 01527 { 01528 /* Copy error message and abort */ 01529 thd->fatal_error(); 01530 strmov(thd->net.last_error,tmp->thd.net.last_error); 01531 thd->net.last_errno=tmp->thd.net.last_errno; 01532 } 01533 tmp->unlock(); 01534 goto err; 01535 } 01536 if (thd->killed) 01537 { 01538 tmp->unlock(); 01539 goto err; 01540 } 01541 } 01542 pthread_mutex_unlock(&LOCK_delayed_create); 01543 } 01544 01545 pthread_mutex_lock(&tmp->mutex); 01546 table= tmp->get_local_table(thd); 01547 pthread_mutex_unlock(&tmp->mutex); 01548 if (table) 01549 thd->di=tmp; 01550 else if (tmp->thd.is_fatal_error) 01551 thd->fatal_error(); 01552 /* Unlock the delayed insert object after its last access. */ 01553 tmp->unlock(); 01554 DBUG_RETURN((table_list->table=table)); 01555 01556 err1: 01557 thd->fatal_error(); 01558 err: 01559 pthread_mutex_unlock(&LOCK_delayed_create); 01560 DBUG_RETURN(0); // Continue with normal insert 01561 } 01562 01563 01564 /* 01565 As we can't let many threads modify the same TABLE structure, we create 01566 an own structure for each tread. This includes a row buffer to save the 01567 column values and new fields that points to the new row buffer. 01568 The memory is allocated in the client thread and is freed automaticly. 01569 */ 01570 01571 TABLE *delayed_insert::get_local_table(THD* client_thd) 01572 { 01573 my_ptrdiff_t adjust_ptrs; 01574 Field **field,**org_field, *found_next_number_field; 01575 TABLE *copy; 01576 TABLE_SHARE *share= table->s; 01577 byte *bitmap; 01578 DBUG_ENTER("delayed_insert::get_local_table"); 01579 01580 /* First request insert thread to get a lock */ 01581 status=1; 01582 tables_in_use++; 01583 if (!thd.lock) // Table is not locked 01584 { 01585 client_thd->proc_info="waiting for handler lock"; 01586 pthread_cond_signal(&cond); // Tell handler to lock table 01587 while (!dead && !thd.lock && ! client_thd->killed) 01588 { 01589 pthread_cond_wait(&cond_client,&mutex); 01590 } 01591 client_thd->proc_info="got handler lock"; 01592 if (client_thd->killed) 01593 goto error; 01594 if (dead) 01595 { 01596 strmov(client_thd->net.last_error,thd.net.last_error); 01597 client_thd->net.last_errno=thd.net.last_errno; 01598 goto error; 01599 } 01600 } 01601 01602 /* 01603 Allocate memory for the TABLE object, the field pointers array, and 01604 one record buffer of reclength size. Normally a table has three 01605 record buffers of rec_buff_length size, which includes alignment 01606 bytes. Since the table copy is used for creating one record only, 01607 the other record buffers and alignment are unnecessary. 01608 */ 01609 client_thd->proc_info="allocating local table"; 01610 copy= (TABLE*) client_thd->alloc(sizeof(*copy)+ 01611 (share->fields+1)*sizeof(Field**)+ 01612 share->reclength + 01613 share->column_bitmap_size*2); 01614 if (!copy) 01615 goto error; 01616 01617 /* Copy the TABLE object. */ 01618 *copy= *table; 01619 /* We don't need to change the file handler here */ 01620 /* Assign the pointers for the field pointers array and the record. */ 01621 field= copy->field= (Field**) (copy + 1); 01622 bitmap= (byte*) (field + share->fields + 1); 01623 copy->record[0]= (bitmap + share->column_bitmap_size * 2); 01624 memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength); 01625 /* 01626 Make a copy of all fields. 01627 The copied fields need to point into the copied record. This is done 01628 by copying the field objects with their old pointer values and then 01629 "move" the pointers by the distance between the original and copied 01630 records. That way we preserve the relative positions in the records. 01631 */ 01632 adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]); 01633 found_next_number_field= table->found_next_number_field; 01634 for (org_field= table->field; *org_field; org_field++, field++) 01635 { 01636 if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1))) 01637 DBUG_RETURN(0); 01638 (*field)->orig_table= copy; // Remove connection 01639 (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0] 01640 if (*org_field == found_next_number_field) 01641 (*field)->table->found_next_number_field= *field; 01642 } 01643 *field=0; 01644 01645 /* Adjust timestamp */ 01646 if (table->timestamp_field) 01647 { 01648 /* Restore offset as this may have been reset in handle_inserts */ 01649 copy->timestamp_field= 01650 (Field_timestamp*) copy->field[share->timestamp_field_offset]; 01651 copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check; 01652 copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type(); 01653 } 01654 01655 /* Adjust in_use for pointing to client thread */ 01656 copy->in_use= client_thd; 01657 01658 /* Adjust lock_count. This table object is not part of a lock. */ 01659 copy->lock_count= 0; 01660 01661 /* Adjust bitmaps */ 01662 copy->def_read_set.bitmap= (my_bitmap_map*) bitmap; 01663 copy->def_write_set.bitmap= ((my_bitmap_map*) 01664 (bitmap + share->column_bitmap_size)); 01665 copy->tmp_set.bitmap= 0; // To catch errors 01666 bzero((char*) bitmap, share->column_bitmap_size*2); 01667 copy->read_set= ©->def_read_set; 01668 copy->write_set= ©->def_write_set; 01669 01670 DBUG_RETURN(copy); 01671 01672 /* Got fatal error */ 01673 error: 01674 tables_in_use--; 01675 status=1; 01676 pthread_cond_signal(&cond); // Inform thread about abort 01677 DBUG_RETURN(0); 01678 } 01679 01680 01681 /* Put a question in queue */ 01682 01683 static int 01684 write_delayed(THD *thd,TABLE *table, enum_duplicates duplic, 01685 LEX_STRING query, bool ignore, bool log_on) 01686 { 01687 delayed_row *row; 01688 delayed_insert *di=thd->di; 01689 DBUG_ENTER("write_delayed"); 01690 DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length)); 01691 01692 thd->proc_info="waiting for handler insert"; 01693 pthread_mutex_lock(&di->mutex); 01694 while (di->stacked_inserts >= delayed_queue_size && !thd->killed) 01695 pthread_cond_wait(&di->cond_client,&di->mutex); 01696 thd->proc_info="storing row into queue"; 01697 01698 if (thd->killed) 01699 goto err; 01700 01701 /* 01702 Take a copy of the query string, if there is any. The string will 01703 be free'ed when the row is destroyed. If there is no query string, 01704 we don't do anything special. 01705 */ 01706 01707 if (query.str) 01708 { 01709 char *str; 01710 if (!(str= my_strndup(query.str, query.length, MYF(MY_WME)))) 01711 goto err; 01712 query.str= str; 01713 } 01714 row= new delayed_row(query, duplic, ignore, log_on); 01715 if (row == NULL) 01716 { 01717 my_free(query.str, MYF(MY_WME)); 01718 goto err; 01719 } 01720 01721 if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME)))) 01722 goto err; 01723 memcpy(row->record, table->record[0], table->s->reclength); 01724 row->start_time= thd->start_time; 01725 row->query_start_used= thd->query_start_used; 01726 /* 01727 those are for the binlog: LAST_INSERT_ID() has been evaluated at this 01728 time, so record does not need it, but statement-based binlogging of the 01729 INSERT will need when the row is actually inserted. 01730 As for SET INSERT_ID, DELAYED does not honour it (BUG#20830). 01731 */ 01732 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 01733 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt; 01734 row->first_successful_insert_id_in_prev_stmt= 01735 thd->first_successful_insert_id_in_prev_stmt; 01736 row->timestamp_field_type= table->timestamp_field_type; 01737 01738 di->rows.push_back(row); 01739 di->stacked_inserts++; 01740 di->status=1; 01741 if (table->s->blob_fields) 01742 unlink_blobs(table); 01743 pthread_cond_signal(&di->cond); 01744 01745 thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status); 01746 pthread_mutex_unlock(&di->mutex); 01747 DBUG_RETURN(0); 01748 01749 err: 01750 delete row; 01751 pthread_mutex_unlock(&di->mutex); 01752 DBUG_RETURN(1); 01753 } 01754 01755 01756 static void end_delayed_insert(THD *thd) 01757 { 01758 DBUG_ENTER("end_delayed_insert"); 01759 delayed_insert *di=thd->di; 01760 pthread_mutex_lock(&di->mutex); 01761 DBUG_PRINT("info",("tables in use: %d",di->tables_in_use)); 01762 if (!--di->tables_in_use || di->thd.killed) 01763 { // Unlock table 01764 di->status=1; 01765 pthread_cond_signal(&di->cond); 01766 } 01767 pthread_mutex_unlock(&di->mutex); 01768 DBUG_VOID_RETURN; 01769 } 01770 01771 01772 /* We kill all delayed threads when doing flush-tables */ 01773 01774 void kill_delayed_threads(void) 01775 { 01776 VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list 01777 01778 I_List_iterator<delayed_insert> it(delayed_threads); 01779 delayed_insert *tmp; 01780 while ((tmp=it++)) 01781 { 01782 /* Ensure that the thread doesn't kill itself while we are looking at it */ 01783 pthread_mutex_lock(&tmp->mutex); 01784 tmp->thd.killed= THD::KILL_CONNECTION; 01785 if (tmp->thd.mysys_var) 01786 { 01787 pthread_mutex_lock(&tmp->thd.mysys_var->mutex); 01788 if (tmp->thd.mysys_var->current_cond) 01789 { 01790 /* 01791 We need the following test because the main mutex may be locked 01792 in handle_delayed_insert() 01793 */ 01794 if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) 01795 pthread_mutex_lock(tmp->thd.mysys_var->current_mutex); 01796 pthread_cond_broadcast(tmp->thd.mysys_var->current_cond); 01797 if (&tmp->mutex != tmp->thd.mysys_var->current_mutex) 01798 pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex); 01799 } 01800 pthread_mutex_unlock(&tmp->thd.mysys_var->mutex); 01801 } 01802 pthread_mutex_unlock(&tmp->mutex); 01803 } 01804 VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list 01805 } 01806 01807 01808 /* 01809 * Create a new delayed insert thread 01810 */ 01811 01812 pthread_handler_t handle_delayed_insert(void *arg) 01813 { 01814 delayed_insert *di=(delayed_insert*) arg; 01815 THD *thd= &di->thd; 01816 01817 pthread_detach_this_thread(); 01818 /* Add thread to THD list so that's it's visible in 'show processlist' */ 01819 pthread_mutex_lock(&LOCK_thread_count); 01820 thd->thread_id=thread_id++; 01821 thd->end_time(); 01822 threads.append(thd); 01823 thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED; 01824 pthread_mutex_unlock(&LOCK_thread_count); 01825 01826 /* 01827 Wait until the client runs into pthread_cond_wait(), 01828 where we free it after the table is opened and di linked in the list. 01829 If we did not wait here, the client might detect the opened table 01830 before it is linked to the list. It would release LOCK_delayed_create 01831 and allow another thread to create another handler for the same table, 01832 since it does not find one in the list. 01833 */ 01834 pthread_mutex_lock(&di->mutex); 01835 #if !defined( __WIN__) /* Win32 calls this in pthread_create */ 01836 if (my_thread_init()) 01837 { 01838 strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES)); 01839 goto end; 01840 } 01841 #endif 01842 01843 DBUG_ENTER("handle_delayed_insert"); 01844 thd->thread_stack= (char*) &thd; 01845 if (init_thr_lock() || thd->store_globals()) 01846 { 01847 thd->fatal_error(); 01848 strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES)); 01849 goto err; 01850 } 01851 #if !defined(__WIN__) && !defined(__NETWARE__) 01852 sigset_t set; 01853 VOID(sigemptyset(&set)); // Get mask in use 01854 VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); 01855 #endif 01856 01857 /* open table */ 01858 01859 if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED))) 01860 { 01861 thd->fatal_error(); // Abort waiting inserts 01862 goto err; 01863 } 01864 if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED)) 01865 { 01866 thd->fatal_error(); 01867 my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name); 01868 goto err; 01869 } 01870 di->table->copy_blobs=1; 01871 01872 /* One can now use this */ 01873 pthread_mutex_lock(&LOCK_delayed_insert); 01874 delayed_threads.append(di); 01875 pthread_mutex_unlock(&LOCK_delayed_insert); 01876 01877 /* Tell client that the thread is initialized */ 01878 pthread_cond_signal(&di->cond_client); 01879 01880 /* Now wait until we get an insert or lock to handle */ 01881 /* We will not abort as long as a client thread uses this thread */ 01882 01883 for (;;) 01884 { 01885 if (thd->killed == THD::KILL_CONNECTION) 01886 { 01887 uint lock_count; 01888 /* 01889 Remove this from delay insert list so that no one can request a 01890 table from this 01891 */ 01892 pthread_mutex_unlock(&di->mutex); 01893 pthread_mutex_lock(&LOCK_delayed_insert); 01894 di->unlink(); 01895 lock_count=di->lock_count(); 01896 pthread_mutex_unlock(&LOCK_delayed_insert); 01897 pthread_mutex_lock(&di->mutex); 01898 if (!lock_count && !di->tables_in_use && !di->stacked_inserts) 01899 break; // Time to die 01900 } 01901 01902 if (!di->status && !di->stacked_inserts) 01903 { 01904 struct timespec abstime; 01905 set_timespec(abstime, delayed_insert_timeout); 01906 01907 /* Information for pthread_kill */ 01908 di->thd.mysys_var->current_mutex= &di->mutex; 01909 di->thd.mysys_var->current_cond= &di->cond; 01910 di->thd.proc_info="Waiting for INSERT"; 01911 01912 DBUG_PRINT("info",("Waiting for someone to insert rows")); 01913 while (!thd->killed) 01914 { 01915 int error; 01916 #if defined(HAVE_BROKEN_COND_TIMEDWAIT) 01917 error=pthread_cond_wait(&di->cond,&di->mutex); 01918 #else 01919 error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime); 01920 #ifdef EXTRA_DEBUG 01921 if (error && error != EINTR && error != ETIMEDOUT) 01922 { 01923 fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error); 01924 DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait", 01925 error)); 01926 } 01927 #endif 01928 #endif 01929 if (thd->killed || di->status) 01930 break; 01931 if (error == ETIMEDOUT || error == ETIME) 01932 { 01933 thd->killed= THD::KILL_CONNECTION; 01934 break; 01935 } 01936 } 01937 /* We can't lock di->mutex and mysys_var->mutex at the same time */ 01938 pthread_mutex_unlock(&di->mutex); 01939 pthread_mutex_lock(&di->thd.mysys_var->mutex); 01940 di->thd.mysys_var->current_mutex= 0; 01941 di->thd.mysys_var->current_cond= 0; 01942 pthread_mutex_unlock(&di->thd.mysys_var->mutex); 01943 pthread_mutex_lock(&di->mutex); 01944 } 01945 di->thd.proc_info=0; 01946 01947 if (di->tables_in_use && ! thd->lock) 01948 { 01949 bool not_used; 01950 /* 01951 Request for new delayed insert. 01952 Lock the table, but avoid to be blocked by a global read lock. 01953 If we got here while a global read lock exists, then one or more 01954 inserts started before the lock was requested. These are allowed 01955 to complete their work before the server returns control to the 01956 client which requested the global read lock. The delayed insert 01957 handler will close the table and finish when the outstanding 01958 inserts are done. 01959 */ 01960 if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 01961 MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK, 01962 ¬_used))) 01963 { 01964 /* Fatal error */ 01965 di->dead= 1; 01966 thd->killed= THD::KILL_CONNECTION; 01967 } 01968 pthread_cond_broadcast(&di->cond_client); 01969 } 01970 if (di->stacked_inserts) 01971 { 01972 if (di->handle_inserts()) 01973 { 01974 /* Some fatal error */ 01975 di->dead= 1; 01976 thd->killed= THD::KILL_CONNECTION; 01977 } 01978 } 01979 di->status=0; 01980 if (!di->stacked_inserts && !di->tables_in_use && thd->lock) 01981 { 01982 /* 01983 No one is doing a insert delayed 01984 Unlock table so that other threads can use it 01985 */ 01986 MYSQL_LOCK *lock=thd->lock; 01987 thd->lock=0; 01988 pthread_mutex_unlock(&di->mutex); 01989 di->table->file->ha_release_auto_increment(); 01990 mysql_unlock_tables(thd, lock); 01991 di->group_count=0; 01992 pthread_mutex_lock(&di->mutex); 01993 } 01994 if (di->tables_in_use) 01995 pthread_cond_broadcast(&di->cond_client); // If waiting clients 01996 } 01997 01998 err: 01999 /* 02000 mysql_lock_tables() can potentially start a transaction and write 02001 a table map. In the event of an error, that transaction has to be 02002 rolled back. We only need to roll back a potential statement 02003 transaction, since real transactions are rolled back in 02004 close_thread_tables(). 02005 */ 02006 ha_rollback_stmt(thd); 02007 02008 end: 02009 /* 02010 di should be unlinked from the thread handler list and have no active 02011 clients 02012 */ 02013 02014 close_thread_tables(thd); // Free the table 02015 di->table=0; 02016 di->dead= 1; // If error 02017 thd->killed= THD::KILL_CONNECTION; // If error 02018 pthread_cond_broadcast(&di->cond_client); // Safety 02019 pthread_mutex_unlock(&di->mutex); 02020 02021 pthread_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table 02022 pthread_mutex_lock(&LOCK_delayed_insert); 02023 delete di; 02024 pthread_mutex_unlock(&LOCK_delayed_insert); 02025 pthread_mutex_unlock(&LOCK_delayed_create); 02026 02027 my_thread_end(); 02028 pthread_exit(0); 02029 DBUG_RETURN(0); 02030 } 02031 02032 02033 /* Remove pointers from temporary fields to allocated values */ 02034 02035 static void unlink_blobs(register TABLE *table) 02036 { 02037 for (Field **ptr=table->field ; *ptr ; ptr++) 02038 { 02039 if ((*ptr)->flags & BLOB_FLAG) 02040 ((Field_blob *) (*ptr))->clear_temporary(); 02041 } 02042 } 02043 02044 /* Free blobs stored in current row */ 02045 02046 static void free_delayed_insert_blobs(register TABLE *table) 02047 { 02048 for (Field **ptr=table->field ; *ptr ; ptr++) 02049 { 02050 if ((*ptr)->flags & BLOB_FLAG) 02051 { 02052 char *str; 02053 ((Field_blob *) (*ptr))->get_ptr(&str); 02054 my_free(str,MYF(MY_ALLOW_ZERO_PTR)); 02055 ((Field_blob *) (*ptr))->reset(); 02056 } 02057 } 02058 } 02059 02060 02061 bool delayed_insert::handle_inserts(void) 02062 { 02063 int error; 02064 ulong max_rows; 02065 bool using_ignore= 0, using_opt_replace= 0, 02066 using_bin_log= mysql_bin_log.is_open(); 02067 delayed_row *row; 02068 DBUG_ENTER("handle_inserts"); 02069 02070 /* Allow client to insert new rows */ 02071 pthread_mutex_unlock(&mutex); 02072 02073 table->next_number_field=table->found_next_number_field; 02074 table->use_all_columns(); 02075 02076 thd.proc_info="upgrading lock"; 02077 if (thr_upgrade_write_delay_lock(*thd.lock->locks)) 02078 { 02079 /* This can only happen if thread is killed by shutdown */ 02080 sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str); 02081 goto err; 02082 } 02083 02084 thd.proc_info="insert"; 02085 max_rows= delayed_insert_limit; 02086 if (thd.killed || table->s->version != refresh_version) 02087 { 02088 thd.killed= THD::KILL_CONNECTION; 02089 max_rows= ULONG_MAX; // Do as much as possible 02090 } 02091 02092 /* 02093 We can't use row caching when using the binary log because if 02094 we get a crash, then binary log will contain rows that are not yet 02095 written to disk, which will cause problems in replication. 02096 */ 02097 if (!using_bin_log) 02098 table->file->extra(HA_EXTRA_WRITE_CACHE); 02099 pthread_mutex_lock(&mutex); 02100 02101 while ((row=rows.get())) 02102 { 02103 stacked_inserts--; 02104 pthread_mutex_unlock(&mutex); 02105 memcpy(table->record[0],row->record,table->s->reclength); 02106 02107 thd.start_time=row->start_time; 02108 thd.query_start_used=row->query_start_used; 02109 /* for the binlog, forget auto_increment ids generated by previous rows */ 02110 // thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty(); 02111 thd.first_successful_insert_id_in_prev_stmt= 02112 row->first_successful_insert_id_in_prev_stmt; 02113 thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 02114 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt; 02115 table->timestamp_field_type= row->timestamp_field_type; 02116 02117 info.ignore= row->ignore; 02118 info.handle_duplicates= row->dup; 02119 if (info.ignore || 02120 info.handle_duplicates != DUP_ERROR) 02121 { 02122 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); 02123 using_ignore=1; 02124 } 02125 if (info.handle_duplicates == DUP_REPLACE && 02126 (!table->triggers || 02127 !table->triggers->has_delete_triggers())) 02128 { 02129 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); 02130 using_opt_replace= 1; 02131 } 02132 thd.clear_error(); // reset error for binlog 02133 if (write_record(&thd, table, &info)) 02134 { 02135 info.error_count++; // Ignore errors 02136 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); 02137 row->log_query = 0; 02138 } 02139 02140 if (using_ignore) 02141 { 02142 using_ignore=0; 02143 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); 02144 } 02145 if (using_opt_replace) 02146 { 02147 using_opt_replace= 0; 02148 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); 02149 } 02150 02151 if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open()) 02152 { 02153 /* 02154 If the query has several rows to insert, only the first row will come 02155 here. In row-based binlogging, this means that the first row will be 02156 written to binlog as one Table_map event and one Rows event (due to an 02157 event flush done in binlog_query()), then all other rows of this query 02158 will be binlogged together as one single Table_map event and one 02159 single Rows event. 02160 */ 02161 thd.binlog_query(THD::ROW_QUERY_TYPE, 02162 row->query.str, row->query.length, 02163 FALSE, FALSE); 02164 } 02165 02166 if (table->s->blob_fields) 02167 free_delayed_insert_blobs(table); 02168 thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status); 02169 thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status); 02170 pthread_mutex_lock(&mutex); 02171 02172 delete row; 02173 /* 02174 Let READ clients do something once in a while 02175 We should however not break in the middle of a multi-line insert 02176 if we have binary logging enabled as we don't want other commands 02177 on this table until all entries has been processed 02178 */ 02179 if (group_count++ >= max_rows && (row= rows.head()) && 02180 (!(row->log_query & using_bin_log))) 02181 { 02182 group_count=0; 02183 if (stacked_inserts || tables_in_use) // Let these wait a while 02184 { 02185 if (tables_in_use) 02186 pthread_cond_broadcast(&cond_client); // If waiting clients 02187 thd.proc_info="reschedule"; 02188 pthread_mutex_unlock(&mutex); 02189 if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) 02190 { 02191 /* This should never happen */ 02192 table->file->print_error(error,MYF(0)); 02193 sql_print_error("%s",thd.net.last_error); 02194 goto err; 02195 } 02196 query_cache_invalidate3(&thd, table, 1); 02197 if (thr_reschedule_write_lock(*thd.lock->locks)) 02198 { 02199 /* This should never happen */ 02200 sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK), 02201 table->s->table_name.str); 02202 } 02203 if (!using_bin_log) 02204 table->file->extra(HA_EXTRA_WRITE_CACHE); 02205 pthread_mutex_lock(&mutex); 02206 thd.proc_info="insert"; 02207 } 02208 if (tables_in_use) 02209 pthread_cond_broadcast(&cond_client); // If waiting clients 02210 } 02211 } 02212 thd.proc_info=0; 02213 pthread_mutex_unlock(&mutex); 02214 02215 #ifdef HAVE_ROW_BASED_REPLICATION 02216 /* 02217 We need to flush the pending event when using row-based 02218 replication since the flushing normally done in binlog_query() is 02219 not done last in the statement: for delayed inserts, the insert 02220 statement is logged *before* all rows are inserted. 02221 02222 We can flush the pending event without checking the thd->lock 02223 since the delayed insert *thread* is not inside a stored function 02224 or trigger. 02225 02226 TODO: Move the logging to last in the sequence of rows. 02227 */ 02228 if (thd.current_stmt_binlog_row_based) 02229 thd.binlog_flush_pending_rows_event(TRUE); 02230 #endif /* HAVE_ROW_BASED_REPLICATION */ 02231 02232 if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) 02233 { // This shouldn't happen 02234 table->file->print_error(error,MYF(0)); 02235 sql_print_error("%s",thd.net.last_error); 02236 goto err; 02237 } 02238 query_cache_invalidate3(&thd, table, 1); 02239 pthread_mutex_lock(&mutex); 02240 DBUG_RETURN(0); 02241 02242 err: 02243 /* Remove all not used rows */ 02244 while ((row=rows.get())) 02245 { 02246 delete row; 02247 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); 02248 stacked_inserts--; 02249 } 02250 thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status); 02251 pthread_mutex_lock(&mutex); 02252 DBUG_RETURN(1); 02253 } 02254 #endif /* EMBEDDED_LIBRARY */ 02255 02256 /*************************************************************************** 02257 Store records in INSERT ... SELECT * 02258 ***************************************************************************/ 02259 02260 02261 /* 02262 make insert specific preparation and checks after opening tables 02263 02264 SYNOPSIS 02265 mysql_insert_select_prepare() 02266 thd thread handler 02267 02268 RETURN 02269 FALSE OK 02270 TRUE Error 02271 */ 02272 02273 bool mysql_insert_select_prepare(THD *thd) 02274 { 02275 LEX *lex= thd->lex; 02276 SELECT_LEX *select_lex= &lex->select_lex; 02277 TABLE_LIST *first_select_leaf_table; 02278 DBUG_ENTER("mysql_insert_select_prepare"); 02279 02280 /* 02281 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE 02282 clause if table is VIEW 02283 */ 02284 02285 if (mysql_prepare_insert(thd, lex->query_tables, 02286 lex->query_tables->table, lex->field_list, 0, 02287 lex->update_list, lex->value_list, 02288 lex->duplicates, 02289 &select_lex->where, TRUE)) 02290 DBUG_RETURN(TRUE); 02291 02292 /* 02293 exclude first table from leaf tables list, because it belong to 02294 INSERT 02295 */ 02296 DBUG_ASSERT(select_lex->leaf_tables != 0); 02297 lex->leaf_tables_insert= select_lex->leaf_tables; 02298 /* skip all leaf tables belonged to view where we are insert */ 02299 for (first_select_leaf_table= select_lex->leaf_tables->next_leaf; 02300 first_select_leaf_table && 02301 first_select_leaf_table->belong_to_view && 02302 first_select_leaf_table->belong_to_view == 02303 lex->leaf_tables_insert->belong_to_view; 02304 first_select_leaf_table= first_select_leaf_table->next_leaf) 02305 {} 02306 select_lex->leaf_tables= first_select_leaf_table; 02307 DBUG_RETURN(FALSE); 02308 } 02309 02310 02311 select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par, 02312 List<Item> *fields_par, 02313 List<Item> *update_fields, 02314 List<Item> *update_values, 02315 enum_duplicates duplic, 02316 bool ignore_check_option_errors) 02317 :table_list(table_list_par), table(table_par), fields(fields_par), 02318 autoinc_value_of_last_inserted_row(0), 02319 insert_into_view(table_list_par && table_list_par->view != 0) 02320 { 02321 bzero((char*) &info,sizeof(info)); 02322 info.handle_duplicates= duplic; 02323 info.ignore= ignore_check_option_errors; 02324 info.update_fields= update_fields; 02325 info.update_values= update_values; 02326 if (table_list_par) 02327 info.view= (table_list_par->view ? table_list_par : 0); 02328 } 02329 02330 02331 int 02332 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) 02333 { 02334 LEX *lex= thd->lex; 02335 int res; 02336 SELECT_LEX *lex_current_select_save= lex->current_select; 02337 DBUG_ENTER("select_insert::prepare"); 02338 02339 unit= u; 02340 /* 02341 Since table in which we are going to insert is added to the first 02342 select, LEX::current_select should point to the first select while 02343 we are fixing fields from insert list. 02344 */ 02345 lex->current_select= &lex->select_lex; 02346 res= check_insert_fields(thd, table_list, *fields, values, 02347 !insert_into_view) || 02348 setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0); 02349 02350 if (info.handle_duplicates == DUP_UPDATE) 02351 { 02352 /* Save the state of the current name resolution context. */ 02353 Name_resolution_context *context= &lex->select_lex.context; 02354 Name_resolution_context_state ctx_state; 02355 02356 /* Save the state of the current name resolution context. */ 02357 ctx_state.save_state(context, table_list); 02358 02359 /* Perform name resolution only in the first table - 'table_list'. */ 02360 table_list->next_local= 0; 02361 context->resolve_in_table_list_only(table_list); 02362 02363 lex->select_lex.no_wrap_view_item= TRUE; 02364 res= res || check_update_fields(thd, context->table_list, 02365 *info.update_fields); 02366 lex->select_lex.no_wrap_view_item= FALSE; 02367 /* 02368 When we are not using GROUP BY we can refer to other tables in the 02369 ON DUPLICATE KEY part 02370 */ 02371 if (lex->select_lex.group_list.elements == 0) 02372 { 02373 context->table_list->next_local= ctx_state.save_next_local; 02374 /* first_name_resolution_table was set by resolve_in_table_list_only() */ 02375 context->first_name_resolution_table-> 02376 next_name_resolution_table= ctx_state.save_next_local; 02377 } 02378 res= res || setup_fields(thd, 0, *info.update_values, MARK_COLUMNS_READ, 02379 0, 0); 02380 02381 /* Restore the current context. */ 02382 ctx_state.restore_state(context, table_list); 02383 } 02384 02385 lex->current_select= lex_current_select_save; 02386 if (res) 02387 DBUG_RETURN(1); 02388 /* 02389 if it is INSERT into join view then check_insert_fields already found 02390 real table for insert 02391 */ 02392 table= table_list->table; 02393 02394 /* 02395 Is table which we are changing used somewhere in other parts of 02396 query 02397 */ 02398 if (!(lex->current_select->options & OPTION_BUFFER_RESULT) && 02399 unique_table(thd, table_list, table_list->next_global)) 02400 { 02401 /* Using same table for INSERT and SELECT */ 02402 lex->current_select->options|= OPTION_BUFFER_RESULT; 02403 lex->current_select->join->select_options|= OPTION_BUFFER_RESULT; 02404 } 02405 else if (!thd->prelocked_mode) 02406 { 02407 /* 02408 We must not yet prepare the result table if it is the same as one of the 02409 source tables (INSERT SELECT). The preparation may disable 02410 indexes on the result table, which may be used during the select, if it 02411 is the same table (Bug #6034). Do the preparation after the select phase 02412 in select_insert::prepare2(). 02413 We won't start bulk inserts at all if this statement uses functions or 02414 should invoke triggers since they may access to the same table too. 02415 */ 02416 table->file->ha_start_bulk_insert((ha_rows) 0); 02417 } 02418 restore_record(table,s->default_values); // Get empty record 02419 table->next_number_field=table->found_next_number_field; 02420 thd->cuted_fields=0; 02421 if (info.ignore || info.handle_duplicates != DUP_ERROR) 02422 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); 02423 if (info.handle_duplicates == DUP_REPLACE && 02424 (!table->triggers || !table->triggers->has_delete_triggers())) 02425 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); 02426 thd->no_trans_update= 0; 02427 thd->abort_on_warning= (!info.ignore && 02428 (thd->variables.sql_mode & 02429 (MODE_STRICT_TRANS_TABLES | 02430 MODE_STRICT_ALL_TABLES))); 02431 res= ((fields->elements && 02432 check_that_all_fields_are_given_values(thd, table, table_list)) || 02433 table_list->prepare_where(thd, 0, TRUE) || 02434 table_list->prepare_check_option(thd)); 02435 02436 if (!res) 02437 table->mark_columns_needed_for_insert(); 02438 02439 DBUG_RETURN(res); 02440 } 02441 02442 02443 /* 02444 Finish the preparation of the result table. 02445 02446 SYNOPSIS 02447 select_insert::prepare2() 02448 void 02449 02450 DESCRIPTION 02451 If the result table is the same as one of the source tables (INSERT SELECT), 02452 the result table is not finally prepared at the join prepair phase. 02453 Do the final preparation now. 02454 02455 RETURN 02456 0 OK 02457 */ 02458 02459 int select_insert::prepare2(void) 02460 { 02461 DBUG_ENTER("select_insert::prepare2"); 02462 if (thd->lex->current_select->options & OPTION_BUFFER_RESULT && 02463 !thd->prelocked_mode) 02464 table->file->ha_start_bulk_insert((ha_rows) 0); 02465 DBUG_RETURN(0); 02466 } 02467 02468 02469 void select_insert::cleanup() 02470 { 02471 /* select_insert/select_create are never re-used in prepared statement */ 02472 DBUG_ASSERT(0); 02473 } 02474 02475 select_insert::~select_insert() 02476 { 02477 DBUG_ENTER("~select_insert"); 02478 if (table) 02479 { 02480 table->next_number_field=0; 02481 table->file->ha_reset(); 02482 } 02483 thd->count_cuted_fields= CHECK_FIELD_IGNORE; 02484 thd->abort_on_warning= 0; 02485 DBUG_VOID_RETURN; 02486 } 02487 02488 02489 bool select_insert::send_data(List<Item> &values) 02490 { 02491 DBUG_ENTER("select_insert::send_data"); 02492 bool error=0; 02493 02494 if (unit->offset_limit_cnt) 02495 { // using limit offset,count 02496 unit->offset_limit_cnt--; 02497 DBUG_RETURN(0); 02498 } 02499 02500 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields 02501 store_values(values); 02502 thd->count_cuted_fields= CHECK_FIELD_IGNORE; 02503 if (thd->net.report_error) 02504 DBUG_RETURN(1); 02505 if (table_list) // Not CREATE ... SELECT 02506 { 02507 switch (table_list->view_check_option(thd, info.ignore)) { 02508 case VIEW_CHECK_SKIP: 02509 DBUG_RETURN(0); 02510 case VIEW_CHECK_ERROR: 02511 DBUG_RETURN(1); 02512 } 02513 } 02514 02515 error= write_record(thd, table, &info); 02516 02517 if (!error) 02518 { 02519 if (table->triggers || info.handle_duplicates == DUP_UPDATE) 02520 { 02521 /* 02522 Restore fields of the record since it is possible that they were 02523 changed by ON DUPLICATE KEY UPDATE clause. 02524 02525 If triggers exist then whey can modify some fields which were not 02526 originally touched by INSERT ... SELECT, so we have to restore 02527 their original values for the next row. 02528 */ 02529 restore_record(table, s->default_values); 02530 } 02531 if (table->next_number_field) 02532 { 02533 /* 02534 If no value has been autogenerated so far, we need to remember the 02535 value we just saw, we may need to send it to client in the end. 02536 */ 02537 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization 02538 autoinc_value_of_last_inserted_row= 02539 table->next_number_field->val_int(); 02540 /* 02541 Clear auto-increment field for the next record, if triggers are used 02542 we will clear it twice, but this should be cheap. 02543 */ 02544 table->next_number_field->reset(); 02545 } 02546 } 02547 table->file->ha_release_auto_increment(); 02548 DBUG_RETURN(error); 02549 } 02550 02551 02552 void select_insert::store_values(List<Item> &values) 02553 { 02554 if (fields->elements) 02555 fill_record_n_invoke_before_triggers(thd, *fields, values, 1, 02556 table->triggers, TRG_EVENT_INSERT); 02557 else 02558 fill_record_n_invoke_before_triggers(thd, table->field, values, 1, 02559 table->triggers, TRG_EVENT_INSERT); 02560 } 02561 02562 void select_insert::send_error(uint errcode,const char *err) 02563 { 02564 DBUG_ENTER("select_insert::send_error"); 02565 02566 /* Avoid an extra 'unknown error' message if we already reported an error */ 02567 if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) 02568 my_message(errcode, err, MYF(0)); 02569 02570 if (!table) 02571 { 02572 /* 02573 This can only happen when using CREATE ... SELECT and the table was not 02574 created becasue of an syntax error 02575 */ 02576 DBUG_VOID_RETURN; 02577 } 02578 if (!thd->prelocked_mode) 02579 table->file->ha_end_bulk_insert(); 02580 /* 02581 If at least one row has been inserted/modified and will stay in the table 02582 (the table doesn't have transactions) we must write to the binlog (and 02583 the error code will make the slave stop). 02584 02585 For many errors (example: we got a duplicate key error while 02586 inserting into a MyISAM table), no row will be added to the table, 02587 so passing the error to the slave will not help since there will 02588 be an error code mismatch (the inserts will succeed on the slave 02589 with no error). 02590 02591 If we are using row-based replication we have two cases where this 02592 code is executed: replication of CREATE-SELECT and replication of 02593 INSERT-SELECT. 02594 02595 When replicating a CREATE-SELECT statement, we shall not write the 02596 events to the binary log and should thus not set 02597 OPTION_STATUS_NO_TRANS_UPDATE. 02598 02599 When replicating INSERT-SELECT, we shall not write the events to 02600 the binary log for transactional table, but shall write all events 02601 if there is one or more writes to non-transactional tables. In 02602 this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a 02603 write to a non-transactional table, otherwise it is cleared. 02604 */ 02605 if (info.copied || info.deleted || info.updated) 02606 { 02607 if (!table->file->has_transactions()) 02608 { 02609 if (mysql_bin_log.is_open()) 02610 { 02611 thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, 02612 table->file->has_transactions(), FALSE); 02613 } 02614 if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && 02615 !can_rollback_data()) 02616 thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; 02617 query_cache_invalidate3(thd, table, 1); 02618 } 02619 } 02620 ha_rollback_stmt(thd); 02621 DBUG_VOID_RETURN; 02622 } 02623 02624 02625 bool select_insert::send_eof() 02626 { 02627 int error,error2; 02628 ulonglong id; 02629 DBUG_ENTER("select_insert::send_eof"); 02630 02631 error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; 02632 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); 02633 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); 02634 02635 if (info.copied || info.deleted || info.updated) 02636 { 02637 /* 02638 We must invalidate the table in the query cache before binlog writing 02639 and ha_autocommit_or_rollback. 02640 */ 02641 query_cache_invalidate3(thd, table, 1); 02642 /* 02643 Mark that we have done permanent changes if all of the below is true 02644 - Table doesn't support transactions 02645 - It's a normal (not temporary) table. (Changes to temporary tables 02646 are not logged in RBR) 02647 - We are using statement based replication 02648 */ 02649 if (!table->file->has_transactions() && 02650 (!table->s->tmp_table || 02651 !thd->current_stmt_binlog_row_based)) 02652 thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; 02653 } 02654 02655 /* 02656 Write to binlog before commiting transaction. No statement will 02657 be written by the binlog_query() below in RBR mode. All the 02658 events are in the transaction cache and will be written when 02659 ha_autocommit_or_rollback() is issued below. 02660 */ 02661 if (mysql_bin_log.is_open()) 02662 { 02663 if (!error) 02664 thd->clear_error(); 02665 thd->binlog_query(THD::ROW_QUERY_TYPE, 02666 thd->query, thd->query_length, 02667 table->file->has_transactions(), FALSE); 02668 } 02669 if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) 02670 error=error2; 02671 if (error) 02672 { 02673 table->file->print_error(error,MYF(0)); 02674 DBUG_RETURN(1); 02675 } 02676 char buff[160]; 02677 if (info.ignore) 02678 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, 02679 (ulong) (info.records - info.copied), (ulong) thd->cuted_fields); 02680 else 02681 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records, 02682 (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields); 02683 thd->row_count_func= info.copied+info.deleted+info.updated; 02684 02685 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ? 02686 thd->first_successful_insert_id_in_cur_stmt : 02687 (thd->arg_of_last_insert_id_function ? 02688 thd->first_successful_insert_id_in_prev_stmt : 02689 (info.copied ? autoinc_value_of_last_inserted_row : 0)); 02690 ::send_ok(thd, (ulong) thd->row_count_func, id, buff); 02691 DBUG_RETURN(0); 02692 } 02693 02694 02695 /*************************************************************************** 02696 CREATE TABLE (SELECT) ... 02697 ***************************************************************************/ 02698 02699 /* 02700 Create table from lists of fields and items (or open existing table 02701 with same name). 02702 02703 SYNOPSIS 02704 create_table_from_items() 02705 thd in Thread object 02706 create_info in Create information (like MAX_ROWS, ENGINE or 02707 temporary table flag) 02708 create_table in Pointer to TABLE_LIST object providing database 02709 and name for table to be created or to be open 02710 extra_fields in/out Initial list of fields for table to be created 02711 keys in List of keys for table to be created 02712 items in List of items which should be used to produce rest 02713 of fields for the table (corresponding fields will 02714 be added to the end of 'extra_fields' list) 02715 lock out Pointer to the MYSQL_LOCK object for table created 02716 (open) will be returned in this parameter. Since 02717 this table is not included in THD::lock caller is 02718 responsible for explicitly unlocking this table. 02719 hooks 02720 02721 NOTES 02722 If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS 02723 flag and table with name provided already exists then this function will 02724 simply open existing table. 02725 Also note that create, open and lock sequence in this function is not 02726 atomic and thus contains gap for deadlock and can cause other troubles. 02727 Since this function contains some logic specific to CREATE TABLE ... SELECT 02728 it should be changed before it can be used in other contexts. 02729 02730 RETURN VALUES 02731 non-zero Pointer to TABLE object for table created or opened 02732 0 Error 02733 */ 02734 02735 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info, 02736 TABLE_LIST *create_table, 02737 List<create_field> *extra_fields, 02738 List<Key> *keys, 02739 List<Item> *items, 02740 MYSQL_LOCK **lock, 02741 TABLEOP_HOOKS *hooks) 02742 { 02743 TABLE tmp_table; // Used during 'create_field()' 02744 TABLE_SHARE share; 02745 TABLE *table= 0; 02746 uint select_field_count= items->elements; 02747 /* Add selected items to field list */ 02748 List_iterator_fast<Item> it(*items); 02749 Item *item; 02750 Field *tmp_field; 02751 bool not_used; 02752 DBUG_ENTER("create_table_from_items"); 02753 02754 tmp_table.alias= 0; 02755 tmp_table.timestamp_field= 0; 02756 tmp_table.s= &share; 02757 init_tmp_table_share(&share, "", 0, "", ""); 02758 02759 tmp_table.s->db_create_options=0; 02760 tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr; 02761 tmp_table.s->db_low_byte_first= 02762 test(create_info->db_type == &myisam_hton || 02763 create_info->db_type == &heap_hton); 02764 tmp_table.null_row=tmp_table.maybe_null=0; 02765 02766 while ((item=it++)) 02767 { 02768 create_field *cr_field; 02769 Field *field, *def_field; 02770 if (item->type() == Item::FUNC_ITEM) 02771 field= item->tmp_table_field(&tmp_table); 02772 else 02773 field= create_tmp_field(thd, &tmp_table, item, item->type(), 02774 (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0, 02775 0); 02776 if (!field || 02777 !(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ? 02778 ((Item_field *)item)->field : 02779 (Field*) 0)))) 02780 DBUG_RETURN(0); 02781 if (item->maybe_null) 02782 cr_field->flags &= ~NOT_NULL_FLAG; 02783 extra_fields->push_back(cr_field); 02784 } 02785 /* 02786 create and lock table 02787 02788 We don't log the statement, it will be logged later. 02789 02790 If this is a HEAP table, the automatic DELETE FROM which is written to the 02791 binlog when a HEAP table is opened for the first time since startup, must 02792 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we 02793 don't want to delete from it) 2) it would be written before the CREATE 02794 TABLE, which is a wrong order. So we keep binary logging disabled when we 02795 open_table(). 02796 NOTE: By locking table which we just have created (or for which we just 02797 have have found that it already exists) separately from other tables used 02798 by the statement we create potential window for deadlock. 02799 TODO: create and open should be done atomic ! 02800 */ 02801 { 02802 tmp_disable_binlog(thd); 02803 if (!mysql_create_table(thd, create_table->db, create_table->table_name, 02804 create_info, *extra_fields, *keys, 0, 02805 select_field_count, 0)) 02806 { 02807 /* 02808 If we are here in prelocked mode we either create temporary table 02809 or prelocked mode is caused by the SELECT part of this statement. 02810 */ 02811 DBUG_ASSERT(!thd->prelocked_mode || 02812 create_info->options & HA_LEX_CREATE_TMP_TABLE || 02813 thd->lex->requires_prelocking()); 02814 02815 /* 02816 NOTE: We don't want to ignore set of locked tables here if we are 02817 under explicit LOCK TABLES since it will open gap for deadlock 02818 too wide (and also is not backward compatible). 02819 */ 02820 02821 if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0, 02822 (MYSQL_LOCK_IGNORE_FLUSH | 02823 ((thd->prelocked_mode == PRELOCKED) ? 02824 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0))))) 02825 quick_rm_table(create_info->db_type, create_table->db, 02826 table_case_name(create_info, create_table->table_name), 02827 0); 02828 } 02829 reenable_binlog(thd); 02830 if (!table) // open failed 02831 DBUG_RETURN(0); 02832 } 02833 02834 /* 02835 FIXME: What happens if trigger manages to be created while we are 02836 obtaining this lock ? May be it is sensible just to disable 02837 trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH 02838 save us from that ? 02839 */ 02840 table->reginfo.lock_type=TL_WRITE; 02841 hooks->prelock(&table, 1); // Call prelock hooks 02842 if (! ((*lock)= mysql_lock_tables(thd, &table, 1, 02843 MYSQL_LOCK_IGNORE_FLUSH, ¬_used))) 02844 { 02845 VOID(pthread_mutex_lock(&LOCK_open)); 02846 hash_delete(&open_cache,(byte*) table); 02847 VOID(pthread_mutex_unlock(&LOCK_open)); 02848 quick_rm_table(create_info->db_type, create_table->db, 02849 table_case_name(create_info, create_table->table_name), 0); 02850 DBUG_RETURN(0); 02851 } 02852 table->file->extra(HA_EXTRA_WRITE_CACHE); 02853 DBUG_RETURN(table); 02854 } 02855 02856 02857 int 02858 select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) 02859 { 02860 DBUG_ENTER("select_create::prepare"); 02861 02862 TABLEOP_HOOKS *hook_ptr= NULL; 02863 #ifdef HAVE_ROW_BASED_REPLICATION 02864 class MY_HOOKS : public TABLEOP_HOOKS { 02865 public: 02866 MY_HOOKS(select_create *x) : ptr(x) { } 02867 virtual void do_prelock(TABLE **tables, uint count) 02868 { 02869 if (ptr->get_thd()->current_stmt_binlog_row_based && 02870 !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) 02871 ptr->binlog_show_create_table(tables, count); 02872 } 02873 02874 private: 02875 select_create *ptr; 02876 }; 02877 02878 MY_HOOKS hooks(this); 02879 hook_ptr= &hooks; 02880 #endif 02881 02882 unit= u; 02883 if (!(table= create_table_from_items(thd, create_info, create_table, 02884 extra_fields, keys, &values, 02885 &thd->extra_lock, hook_ptr))) 02886 DBUG_RETURN(-1); // abort() deletes table 02887 02888 if (table->s->fields < values.elements) 02889 { 02890 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1); 02891 DBUG_RETURN(-1); 02892 } 02893 02894 /* First field to copy */ 02895 field= table->field+table->s->fields - values.elements; 02896 02897 /* Mark all fields that are given values */ 02898 for (Field **f= field ; *f ; f++) 02899 bitmap_set_bit(table->write_set, (*f)->field_index); 02900 02901 /* Don't set timestamp if used */ 02902 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; 02903 table->next_number_field=table->found_next_number_field; 02904 02905 restore_record(table,s->default_values); // Get empty record 02906 thd->cuted_fields=0; 02907 if (info.ignore || info.handle_duplicates != DUP_ERROR) 02908 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); 02909 if (info.handle_duplicates == DUP_REPLACE && 02910 (!table->triggers || !table->triggers->has_delete_triggers())) 02911 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); 02912 if (!thd->prelocked_mode) 02913 table->file->ha_start_bulk_insert((ha_rows) 0); 02914 thd->no_trans_update= 0; 02915 thd->abort_on_warning= (!info.ignore && 02916 (thd->variables.sql_mode & 02917 (MODE_STRICT_TRANS_TABLES | 02918 MODE_STRICT_ALL_TABLES))); 02919 if (check_that_all_fields_are_given_values(thd, table, table_list)) 02920 DBUG_RETURN(1); 02921 table->mark_columns_needed_for_insert(); 02922 DBUG_RETURN(0); 02923 } 02924 02925 02926 #ifdef HAVE_ROW_BASED_REPLICATION 02927 void 02928 select_create::binlog_show_create_table(TABLE **tables, uint count) 02929 { 02930 /* 02931 Note 1: In RBR mode, we generate a CREATE TABLE statement for the 02932 created table by calling store_create_info() (behaves as SHOW 02933 CREATE TABLE). In the event of an error, nothing should be 02934 written to the binary log, even if the table is non-transactional; 02935 therefore we pretend that the generated CREATE TABLE statement is 02936 for a transactional table. The event will then be put in the 02937 transaction cache, and any subsequent events (e.g., table-map 02938 events and binrow events) will also be put there. We can then use 02939 ha_autocommit_or_rollback() to either throw away the entire 02940 kaboodle of events, or write them to the binary log. 02941 02942 We write the CREATE TABLE statement here and not in prepare() 02943 since there potentially are sub-selects or accesses to information 02944 schema that will do a close_thread_tables(), destroying the 02945 statement transaction cache. 02946 */ 02947 DBUG_ASSERT(thd->current_stmt_binlog_row_based); 02948 DBUG_ASSERT(tables && *tables && count > 0); 02949 02950 char buf[2048]; 02951 String query(buf, sizeof(buf), system_charset_info); 02952 int result; 02953 TABLE_LIST table_list; 02954 02955 memset(&table_list, 0, sizeof(table_list)); 02956 table_list.table = *tables; 02957 query.length(0); // Have to zero it since constructor doesn't 02958 02959 result= store_create_info(thd, &table_list, &query, create_info); 02960 DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */ 02961 02962 thd->binlog_query(THD::STMT_QUERY_TYPE, 02963 query.ptr(), query.length(), 02964 /* is_trans */ TRUE, 02965 /* suppress_use */ FALSE); 02966 } 02967 #endif // HAVE_ROW_BASED_REPLICATION 02968 02969 void select_create::store_values(List<Item> &values) 02970 { 02971 fill_record_n_invoke_before_triggers(thd, field, values, 1, 02972 table->triggers, TRG_EVENT_INSERT); 02973 } 02974 02975 02976 void select_create::send_error(uint errcode,const char *err) 02977 { 02978 /* 02979 Disable binlog, because we "roll back" partial inserts in ::abort 02980 by removing the table, even for non-transactional tables. 02981 */ 02982 tmp_disable_binlog(thd); 02983 select_insert::send_error(errcode, err); 02984 reenable_binlog(thd); 02985 } 02986 02987 02988 bool select_create::send_eof() 02989 { 02990 bool tmp=select_insert::send_eof(); 02991 if (tmp) 02992 abort(); 02993 else 02994 { 02995 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); 02996 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); 02997 VOID(pthread_mutex_lock(&LOCK_open)); 02998 mysql_unlock_tables(thd, thd->extra_lock); 02999 if (!table->s->tmp_table) 03000 { 03001 if (close_thread_table(thd, &table)) 03002 broadcast_refresh(); 03003 } 03004 thd->extra_lock=0; 03005 table=0; 03006 VOID(pthread_mutex_unlock(&LOCK_open)); 03007 } 03008 return tmp; 03009 } 03010 03011 void select_create::abort() 03012 { 03013 VOID(pthread_mutex_lock(&LOCK_open)); 03014 if (thd->extra_lock) 03015 { 03016 mysql_unlock_tables(thd, thd->extra_lock); 03017 thd->extra_lock=0; 03018 } 03019 if (table) 03020 { 03021 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); 03022 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); 03023 handlerton *table_type=table->s->db_type; 03024 if (!table->s->tmp_table) 03025 { 03026 ulong version= table->s->version; 03027 table->s->version= 0; 03028 hash_delete(&open_cache,(byte*) table); 03029 if (!create_info->table_existed) 03030 quick_rm_table(table_type, create_table->db, 03031 create_table->table_name, 0); 03032 /* Tell threads waiting for refresh that something has happened */ 03033 if (version != refresh_version) 03034 broadcast_refresh(); 03035 } 03036 else if (!create_info->table_existed) 03037 close_temporary_table(thd, table, 1, 1); 03038 table=0; // Safety 03039 } 03040 VOID(pthread_mutex_unlock(&LOCK_open)); 03041 } 03042 03043 03044 /***************************************************************************** 03045 Instansiate templates 03046 *****************************************************************************/ 03047 03048 #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION 03049 template class List_iterator_fast<List_item>; 03050 #ifndef EMBEDDED_LIBRARY 03051 template class I_List<delayed_insert>; 03052 template class I_List_iterator<delayed_insert>; 03053 template class I_List<delayed_row>; 03054 #endif /* EMBEDDED_LIBRARY */ 03055 #endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */
1.4.7

