00001 /* Copyright (C) 2000-2004 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 00018 #ifndef MYSQL_CLIENT 00019 00020 #ifdef USE_PRAGMA_IMPLEMENTATION 00021 #pragma implementation // gcc: Class implementation 00022 #endif 00023 00024 #include "mysql_priv.h" 00025 #include "slave.h" 00026 #include "rpl_filter.h" 00027 #include <my_dir.h> 00028 #endif /* MYSQL_CLIENT */ 00029 #include <base64.h> 00030 #include <my_bitmap.h> 00031 00032 #define log_cs &my_charset_latin1 00033 00034 /* 00035 pretty_print_str() 00036 */ 00037 00038 #ifdef MYSQL_CLIENT 00039 static void pretty_print_str(FILE* file, char* str, int len) 00040 { 00041 char* end = str + len; 00042 fputc('\'', file); 00043 while (str < end) 00044 { 00045 char c; 00046 switch ((c=*str++)) { 00047 case '\n': fprintf(file, "\\n"); break; 00048 case '\r': fprintf(file, "\\r"); break; 00049 case '\\': fprintf(file, "\\\\"); break; 00050 case '\b': fprintf(file, "\\b"); break; 00051 case '\t': fprintf(file, "\\t"); break; 00052 case '\'': fprintf(file, "\\'"); break; 00053 case 0 : fprintf(file, "\\0"); break; 00054 default: 00055 fputc(c, file); 00056 break; 00057 } 00058 } 00059 fputc('\'', file); 00060 } 00061 #endif /* MYSQL_CLIENT */ 00062 00063 00064 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 00065 00066 static void clear_all_errors(THD *thd, struct st_relay_log_info *rli) 00067 { 00068 thd->query_error = 0; 00069 thd->clear_error(); 00070 *rli->last_slave_error = 0; 00071 rli->last_slave_errno = 0; 00072 } 00073 00074 00075 /* 00076 Ignore error code specified on command line 00077 */ 00078 00079 inline int ignored_error_code(int err_code) 00080 { 00081 #ifdef HAVE_NDB_BINLOG 00082 /* 00083 The following error codes are hard-coded and will always be ignored. 00084 */ 00085 switch (err_code) 00086 { 00087 case ER_DB_CREATE_EXISTS: 00088 case ER_DB_DROP_EXISTS: 00089 return 1; 00090 default: 00091 /* Nothing to do */ 00092 break; 00093 } 00094 #endif 00095 return ((err_code == ER_SLAVE_IGNORED_TABLE) || 00096 (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code))); 00097 } 00098 #endif 00099 00100 00101 /* 00102 pretty_print_str() 00103 */ 00104 00105 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 00106 static char *pretty_print_str(char *packet, char *str, int len) 00107 { 00108 char *end= str + len; 00109 char *pos= packet; 00110 *pos++= '\''; 00111 while (str < end) 00112 { 00113 char c; 00114 switch ((c=*str++)) { 00115 case '\n': *pos++= '\\'; *pos++= 'n'; break; 00116 case '\r': *pos++= '\\'; *pos++= 'r'; break; 00117 case '\\': *pos++= '\\'; *pos++= '\\'; break; 00118 case '\b': *pos++= '\\'; *pos++= 'b'; break; 00119 case '\t': *pos++= '\\'; *pos++= 't'; break; 00120 case '\'': *pos++= '\\'; *pos++= '\''; break; 00121 case 0 : *pos++= '\\'; *pos++= '0'; break; 00122 default: 00123 *pos++= c; 00124 break; 00125 } 00126 } 00127 *pos++= '\''; 00128 return pos; 00129 } 00130 #endif /* !MYSQL_CLIENT */ 00131 00132 00133 /* 00134 Creates a temporary name for load data infile: 00135 00136 SYNOPSIS 00137 slave_load_file_stem() 00138 buf Store new filename here 00139 file_id File_id (part of file name) 00140 event_server_id Event_id (part of file name) 00141 ext Extension for file name 00142 00143 RETURN 00144 Pointer to start of extension 00145 */ 00146 00147 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 00148 static char *slave_load_file_stem(char *buf, uint file_id, 00149 int event_server_id, const char *ext) 00150 { 00151 char *res; 00152 fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME); 00153 to_unix_path(buf); 00154 00155 buf = strend(buf); 00156 buf = int10_to_str(::server_id, buf, 10); 00157 *buf++ = '-'; 00158 buf = int10_to_str(event_server_id, buf, 10); 00159 *buf++ = '-'; 00160 res= int10_to_str(file_id, buf, 10); 00161 strmov(res, ext); // Add extension last 00162 return res; // Pointer to extension 00163 } 00164 #endif 00165 00166 00167 /* 00168 Delete all temporary files used for SQL_LOAD. 00169 00170 SYNOPSIS 00171 cleanup_load_tmpdir() 00172 */ 00173 00174 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 00175 static void cleanup_load_tmpdir() 00176 { 00177 MY_DIR *dirp; 00178 FILEINFO *file; 00179 uint i; 00180 char fname[FN_REFLEN], prefbuf[31], *p; 00181 00182 if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME)))) 00183 return; 00184 00185 /* 00186 When we are deleting temporary files, we should only remove 00187 the files associated with the server id of our server. 00188 We don't use event_server_id here because since we've disabled 00189 direct binlogging of Create_file/Append_file/Exec_load events 00190 we cannot meet Start_log event in the middle of events from one 00191 LOAD DATA. 00192 */ 00193 p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-")); 00194 p= int10_to_str(::server_id, p, 10); 00195 *(p++)= '-'; 00196 *p= 0; 00197 00198 for (i=0 ; i < (uint)dirp->number_off_files; i++) 00199 { 00200 file=dirp->dir_entry+i; 00201 if (is_prefix(file->name, prefbuf)) 00202 { 00203 fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME); 00204 my_delete(fname, MYF(0)); 00205 } 00206 } 00207 00208 my_dirend(dirp); 00209 } 00210 #endif 00211 00212 00213 /* 00214 write_str() 00215 */ 00216 00217 static bool write_str(IO_CACHE *file, char *str, uint length) 00218 { 00219 byte tmp[1]; 00220 tmp[0]= (byte) length; 00221 return (my_b_safe_write(file, tmp, sizeof(tmp)) || 00222 my_b_safe_write(file, (byte*) str, length)); 00223 } 00224 00225 00226 /* 00227 read_str() 00228 */ 00229 00230 static inline int read_str(char **buf, char *buf_end, char **str, 00231 uint8 *len) 00232 { 00233 if (*buf + ((uint) (uchar) **buf) >= buf_end) 00234 return 1; 00235 *len= (uint8) **buf; 00236 *str= (*buf)+1; 00237 (*buf)+= (uint) *len+1; 00238 return 0; 00239 } 00240 00241 00242 /* 00243 Transforms a string into "" or its expression in 0x... form. 00244 */ 00245 00246 char *str_to_hex(char *to, const char *from, uint len) 00247 { 00248 if (len) 00249 { 00250 *to++= '0'; 00251 *to++= 'x'; 00252 to= octet2hex(to, from, len); 00253 } 00254 else 00255 to= strmov(to, "\"\""); 00256 return to; // pointer to end 0 of 'to' 00257 } 00258 00259 /* 00260 Append a version of the 'from' string suitable for use in a query to 00261 the 'to' string. To generate a correct escaping, the character set 00262 information in 'csinfo' is used. 00263 */ 00264 #ifndef MYSQL_CLIENT 00265 int 00266 append_query_string(CHARSET_INFO *csinfo, 00267 String const *from, String *to) 00268 { 00269 char *beg, *ptr; 00270 uint32 const orig_len= to->length(); 00271 if (to->reserve(orig_len + from->length()*2+3)) 00272 return 1; 00273 00274 beg= to->c_ptr_quick() + to->length(); 00275 ptr= beg; 00276 if (csinfo->escape_with_backslash_is_dangerous) 00277 ptr= str_to_hex(ptr, from->ptr(), from->length()); 00278 else 00279 { 00280 *ptr++= '\''; 00281 ptr+= escape_string_for_mysql(from->charset(), ptr, 0, 00282 from->ptr(), from->length()); 00283 *ptr++='\''; 00284 } 00285 to->length(orig_len + ptr - beg); 00286 return 0; 00287 } 00288 #endif 00289 00290 /* 00291 Prints a "session_var=value" string. Used by mysqlbinlog to print some SET 00292 commands just before it prints a query. 00293 */ 00294 00295 #ifdef MYSQL_CLIENT 00296 static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, 00297 uint32 flags, const char* name, bool* need_comma) 00298 { 00299 if (bits_changed & option) 00300 { 00301 if (*need_comma) 00302 fprintf(file,", "); 00303 fprintf(file,"%s=%d", name, test(flags & option)); 00304 *need_comma= 1; 00305 } 00306 } 00307 #endif 00308 00309 /************************************************************************** 00310 Log_event methods (= the parent class of all events) 00311 **************************************************************************/ 00312 00313 /* 00314 Log_event::get_type_str() 00315 */ 00316 00317 const char* Log_event::get_type_str() 00318 { 00319 switch(get_type_code()) { 00320 case START_EVENT_V3: return "Start_v3"; 00321 case STOP_EVENT: return "Stop"; 00322 case QUERY_EVENT: return "Query"; 00323 case ROTATE_EVENT: return "Rotate"; 00324 case INTVAR_EVENT: return "Intvar"; 00325 case LOAD_EVENT: return "Load"; 00326 case NEW_LOAD_EVENT: return "New_load"; 00327 case SLAVE_EVENT: return "Slave"; 00328 case CREATE_FILE_EVENT: return "Create_file"; 00329 case APPEND_BLOCK_EVENT: return "Append_block"; 00330 case DELETE_FILE_EVENT: return "Delete_file"; 00331 case EXEC_LOAD_EVENT: return "Exec_load"; 00332 case RAND_EVENT: return "RAND"; 00333 case XID_EVENT: return "Xid"; 00334 case USER_VAR_EVENT: return "User var"; 00335 case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; 00336 case TABLE_MAP_EVENT: return "Table_map"; 00337 case WRITE_ROWS_EVENT: return "Write_rows"; 00338 case UPDATE_ROWS_EVENT: return "Update_rows"; 00339 case DELETE_ROWS_EVENT: return "Delete_rows"; 00340 case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; 00341 case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; 00342 default: return "Unknown"; /* impossible */ 00343 } 00344 } 00345 00346 00347 /* 00348 Log_event::Log_event() 00349 */ 00350 00351 #ifndef MYSQL_CLIENT 00352 Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) 00353 :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg) 00354 { 00355 server_id= thd->server_id; 00356 when= thd->start_time; 00357 cache_stmt= using_trans; 00358 } 00359 00360 00361 /* 00362 This minimal constructor is for when you are not even sure that there 00363 is a valid THD. For example in the server when we are shutting down or 00364 flushing logs after receiving a SIGHUP (then we must write a Rotate to 00365 the binlog but we have no THD, so we need this minimal constructor). 00366 */ 00367 00368 Log_event::Log_event() 00369 :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), 00370 thd(0) 00371 { 00372 server_id= ::server_id; 00373 when= time(NULL); 00374 log_pos= 0; 00375 } 00376 #endif /* !MYSQL_CLIENT */ 00377 00378 00379 /* 00380 Log_event::Log_event() 00381 */ 00382 00383 Log_event::Log_event(const char* buf, 00384 const Format_description_log_event* description_event) 00385 :temp_buf(0), cache_stmt(0) 00386 { 00387 #ifndef MYSQL_CLIENT 00388 thd = 0; 00389 #endif 00390 when = uint4korr(buf); 00391 server_id = uint4korr(buf + SERVER_ID_OFFSET); 00392 if (description_event->binlog_version==1) 00393 { 00394 log_pos= 0; 00395 flags= 0; 00396 return; 00397 } 00398 /* 4.0 or newer */ 00399 log_pos= uint4korr(buf + LOG_POS_OFFSET); 00400 /* 00401 If the log is 4.0 (so here it can only be a 4.0 relay log read by 00402 the SQL thread or a 4.0 master binlog read by the I/O thread), 00403 log_pos is the beginning of the event: we transform it into the end 00404 of the event, which is more useful. 00405 But how do you know that the log is 4.0: you know it if 00406 description_event is version 3 *and* you are not reading a 00407 Format_desc (remember that mysqlbinlog starts by assuming that 5.0 00408 logs are in 4.0 format, until it finds a Format_desc). 00409 */ 00410 if (description_event->binlog_version==3 && 00411 buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) 00412 { 00413 /* 00414 If log_pos=0, don't change it. log_pos==0 is a marker to mean 00415 "don't change rli->group_master_log_pos" (see 00416 inc_group_relay_log_pos()). As it is unreal log_pos, adding the 00417 event len's is nonsense. For example, a fake Rotate event should 00418 not have its log_pos (which is 0) changed or it will modify 00419 Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense 00420 value of (a non-zero offset which does not exist in the master's 00421 binlog, so which will cause problems if the user uses this value 00422 in CHANGE MASTER). 00423 */ 00424 log_pos+= uint4korr(buf + EVENT_LEN_OFFSET); 00425 } 00426 DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); 00427 00428 flags= uint2korr(buf + FLAGS_OFFSET); 00429 if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || 00430 (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) 00431 { 00432 /* 00433 These events always have a header which stops here (i.e. their 00434 header is FROZEN). 00435 */ 00436 /* 00437 Initialization to zero of all other Log_event members as they're 00438 not specified. Currently there are no such members; in the future 00439 there will be an event UID (but Format_description and Rotate 00440 don't need this UID, as they are not propagated through 00441 --log-slave-updates (remember the UID is used to not play a query 00442 twice when you have two masters which are slaves of a 3rd master). 00443 Then we are done. 00444 */ 00445 return; 00446 } 00447 /* otherwise, go on with reading the header from buf (nothing now) */ 00448 } 00449 00450 #ifndef MYSQL_CLIENT 00451 #ifdef HAVE_REPLICATION 00452 00453 /* 00454 Log_event::exec_event() 00455 */ 00456 00457 int Log_event::exec_event(struct st_relay_log_info* rli) 00458 { 00459 DBUG_ENTER("Log_event::exec_event"); 00460 00461 /* 00462 rli is null when (as far as I (Guilhem) know) 00463 the caller is 00464 Load_log_event::exec_event *and* that one is called from 00465 Execute_load_log_event::exec_event. 00466 In this case, we don't do anything here ; 00467 Execute_load_log_event::exec_event will call Log_event::exec_event 00468 again later with the proper rli. 00469 Strictly speaking, if we were sure that rli is null 00470 only in the case discussed above, 'if (rli)' is useless here. 00471 But as we are not 100% sure, keep it for now. 00472 */ 00473 if (rli) 00474 { 00475 /* 00476 If in a transaction, and if the slave supports transactions, just 00477 inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN 00478 (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with 00479 BEGIN/COMMIT, not with SET AUTOCOMMIT= . 00480 00481 CAUTION: opt_using_transactions means 00482 innodb || bdb ; suppose the master supports InnoDB and BDB, 00483 but the slave supports only BDB, problems 00484 will arise: 00485 - suppose an InnoDB table is created on the master, 00486 - then it will be MyISAM on the slave 00487 - but as opt_using_transactions is true, the slave will believe he 00488 is transactional with the MyISAM table. And problems will come 00489 when one does START SLAVE; STOP SLAVE; START SLAVE; (the slave 00490 will resume at BEGIN whereas there has not been any rollback). 00491 This is the problem of using opt_using_transactions instead of a 00492 finer "does the slave support 00493 _the_transactional_handler_used_on_the_master_". 00494 00495 More generally, we'll have problems when a query mixes a 00496 transactional handler and MyISAM and STOP SLAVE is issued in the 00497 middle of the "transaction". START SLAVE will resume at BEGIN 00498 while the MyISAM table has already been updated. 00499 */ 00500 if ((thd->options & OPTION_BEGIN) && opt_using_transactions) 00501 rli->inc_event_relay_log_pos(); 00502 else 00503 { 00504 rli->inc_group_relay_log_pos(log_pos); 00505 flush_relay_log_info(rli); 00506 /* 00507 Note that Rotate_log_event::exec_event() does not call this 00508 function, so there is no chance that a fake rotate event resets 00509 last_master_timestamp. 00510 Note that we update without mutex (probably ok - except in some very 00511 rare cases, only consequence is that value may take some time to 00512 display in Seconds_Behind_Master - not critical). 00513 */ 00514 rli->last_master_timestamp= when; 00515 } 00516 } 00517 DBUG_RETURN(0); 00518 } 00519 00520 00521 /* 00522 Log_event::pack_info() 00523 */ 00524 00525 void Log_event::pack_info(Protocol *protocol) 00526 { 00527 protocol->store("", &my_charset_bin); 00528 } 00529 00530 00531 /* 00532 Log_event::net_send() 00533 00534 Only called by SHOW BINLOG EVENTS 00535 */ 00536 00537 int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos) 00538 { 00539 const char *p= strrchr(log_name, FN_LIBCHAR); 00540 const char *event_type; 00541 if (p) 00542 log_name = p + 1; 00543 00544 protocol->prepare_for_resend(); 00545 protocol->store(log_name, &my_charset_bin); 00546 protocol->store((ulonglong) pos); 00547 event_type = get_type_str(); 00548 protocol->store(event_type, strlen(event_type), &my_charset_bin); 00549 protocol->store((uint32) server_id); 00550 protocol->store((ulonglong) log_pos); 00551 pack_info(protocol); 00552 return protocol->write(); 00553 } 00554 #endif /* HAVE_REPLICATION */ 00555 00556 00557 /* 00558 Log_event::init_show_field_list() 00559 */ 00560 00561 void Log_event::init_show_field_list(List<Item>* field_list) 00562 { 00563 field_list->push_back(new Item_empty_string("Log_name", 20)); 00564 field_list->push_back(new Item_return_int("Pos", 11, 00565 MYSQL_TYPE_LONGLONG)); 00566 field_list->push_back(new Item_empty_string("Event_type", 20)); 00567 field_list->push_back(new Item_return_int("Server_id", 10, 00568 MYSQL_TYPE_LONG)); 00569 field_list->push_back(new Item_return_int("End_log_pos", 11, 00570 MYSQL_TYPE_LONGLONG)); 00571 field_list->push_back(new Item_empty_string("Info", 20)); 00572 } 00573 00574 00575 /* 00576 Log_event::write() 00577 */ 00578 00579 bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) 00580 { 00581 byte header[LOG_EVENT_HEADER_LEN]; 00582 DBUG_ENTER("Log_event::write_header"); 00583 00584 /* Store number of bytes that will be written by this event */ 00585 data_written= event_data_length + sizeof(header); 00586 00587 /* 00588 log_pos != 0 if this is relay-log event. In this case we should not 00589 change the position 00590 */ 00591 00592 if (is_artificial_event()) 00593 { 00594 /* 00595 We should not do any cleanup on slave when reading this. We 00596 mark this by setting log_pos to 0. Start_log_event_v3() will 00597 detect this on reading and set artificial_event=1 for the event. 00598 */ 00599 log_pos= 0; 00600 } 00601 else if (!log_pos) 00602 { 00603 /* 00604 Calculate position of end of event 00605 00606 Note that with a SEQ_READ_APPEND cache, my_b_tell() does not 00607 work well. So this will give slightly wrong positions for the 00608 Format_desc/Rotate/Stop events which the slave writes to its 00609 relay log. For example, the initial Format_desc will have 00610 end_log_pos=91 instead of 95. Because after writing the first 4 00611 bytes of the relay log, my_b_tell() still reports 0. Because 00612 my_b_append() does not update the counter which my_b_tell() 00613 later uses (one should probably use my_b_append_tell() to work 00614 around this). To get right positions even when writing to the 00615 relay log, we use the (new) my_b_safe_tell(). 00616 00617 Note that this raises a question on the correctness of all these 00618 DBUG_ASSERT(my_b_tell()=rli->event_relay_log_pos). 00619 00620 If in a transaction, the log_pos which we calculate below is not 00621 very good (because then my_b_safe_tell() returns start position 00622 of the BEGIN, so it's like the statement was at the BEGIN's 00623 place), but it's not a very serious problem (as the slave, when 00624 it is in a transaction, does not take those end_log_pos into 00625 account (as it calls inc_event_relay_log_pos()). To be fixed 00626 later, so that it looks less strange. But not bug. 00627 */ 00628 00629 log_pos= my_b_safe_tell(file)+data_written; 00630 } 00631 00632 /* 00633 Header will be of size LOG_EVENT_HEADER_LEN for all events, except for 00634 FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be 00635 LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header, 00636 because we read them before knowing the format). 00637 */ 00638 00639 int4store(header, (ulong) when); // timestamp 00640 header[EVENT_TYPE_OFFSET]= get_type_code(); 00641 int4store(header+ SERVER_ID_OFFSET, server_id); 00642 int4store(header+ EVENT_LEN_OFFSET, data_written); 00643 int4store(header+ LOG_POS_OFFSET, log_pos); 00644 int2store(header+ FLAGS_OFFSET, flags); 00645 00646 DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0); 00647 } 00648 00649 00650 /* 00651 Log_event::read_log_event() 00652 00653 This needn't be format-tolerant, because we only read 00654 LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length). 00655 00656 */ 00657 00658 int Log_event::read_log_event(IO_CACHE* file, String* packet, 00659 pthread_mutex_t* log_lock) 00660 { 00661 ulong data_len; 00662 int result=0; 00663 char buf[LOG_EVENT_MINIMAL_HEADER_LEN]; 00664 DBUG_ENTER("read_log_event"); 00665 00666 if (log_lock) 00667 pthread_mutex_lock(log_lock); 00668 if (my_b_read(file, (byte*) buf, sizeof(buf))) 00669 { 00670 /* 00671 If the read hits eof, we must report it as eof so the caller 00672 will know it can go into cond_wait to be woken up on the next 00673 update to the log. 00674 */ 00675 DBUG_PRINT("error",("file->error: %d", file->error)); 00676 if (!file->error) 00677 result= LOG_READ_EOF; 00678 else 00679 result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO); 00680 goto end; 00681 } 00682 data_len= uint4korr(buf + EVENT_LEN_OFFSET); 00683 if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN || 00684 data_len > current_thd->variables.max_allowed_packet) 00685 { 00686 DBUG_PRINT("error",("data_len: %ld", data_len)); 00687 result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS : 00688 LOG_READ_TOO_LARGE); 00689 goto end; 00690 } 00691 packet->append(buf, sizeof(buf)); 00692 data_len-= LOG_EVENT_MINIMAL_HEADER_LEN; 00693 if (data_len) 00694 { 00695 if (packet->append(file, data_len)) 00696 { 00697 /* 00698 Here if we hit EOF it's really an error: as data_len is >=0 00699 there's supposed to be more bytes available. 00700 EOF means we are reading the event partially, which should 00701 never happen: either we read badly or the binlog is truncated. 00702 */ 00703 result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO; 00704 /* Implicit goto end; */ 00705 } 00706 } 00707 00708 end: 00709 if (log_lock) 00710 pthread_mutex_unlock(log_lock); 00711 DBUG_RETURN(result); 00712 } 00713 #endif /* !MYSQL_CLIENT */ 00714 00715 #ifndef MYSQL_CLIENT 00716 #define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); 00717 #define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); 00718 #else 00719 #define UNLOCK_MUTEX 00720 #define LOCK_MUTEX 00721 #endif 00722 00723 /* 00724 Log_event::read_log_event() 00725 00726 NOTE: 00727 Allocates memory; The caller is responsible for clean-up. 00728 */ 00729 00730 #ifndef MYSQL_CLIENT 00731 Log_event* Log_event::read_log_event(IO_CACHE* file, 00732 pthread_mutex_t* log_lock, 00733 const Format_description_log_event *description_event) 00734 #else 00735 Log_event* Log_event::read_log_event(IO_CACHE* file, 00736 const Format_description_log_event *description_event) 00737 #endif 00738 { 00739 DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *"); 00740 DBUG_ASSERT(description_event != 0); 00741 char head[LOG_EVENT_MINIMAL_HEADER_LEN]; 00742 /* 00743 First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to 00744 check the event for sanity and to know its length; no need to really parse 00745 it. We say "at most" because this could be a 3.23 master, which has header 00746 of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's 00747 "minimal" over the set {MySQL >=4.0}). 00748 */ 00749 uint header_size= min(description_event->common_header_len, 00750 LOG_EVENT_MINIMAL_HEADER_LEN); 00751 00752 LOCK_MUTEX; 00753 DBUG_PRINT("info", ("my_b_tell=%lu", my_b_tell(file))); 00754 if (my_b_read(file, (byte *) head, header_size)) 00755 { 00756 DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \ 00757 failed my_b_read")); 00758 UNLOCK_MUTEX; 00759 /* 00760 No error here; it could be that we are at the file's end. However 00761 if the next my_b_read() fails (below), it will be an error as we 00762 were able to read the first bytes. 00763 */ 00764 DBUG_RETURN(0); 00765 } 00766 uint data_len = uint4korr(head + EVENT_LEN_OFFSET); 00767 char *buf= 0; 00768 const char *error= 0; 00769 Log_event *res= 0; 00770 #ifndef max_allowed_packet 00771 THD *thd=current_thd; 00772 uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0; 00773 #endif 00774 00775 if (data_len > max_allowed_packet) 00776 { 00777 error = "Event too big"; 00778 goto err; 00779 } 00780 00781 if (data_len < header_size) 00782 { 00783 error = "Event too small"; 00784 goto err; 00785 } 00786 00787 // some events use the extra byte to null-terminate strings 00788 if (!(buf = my_malloc(data_len+1, MYF(MY_WME)))) 00789 { 00790 error = "Out of memory"; 00791 goto err; 00792 } 00793 buf[data_len] = 0; 00794 memcpy(buf, head, header_size); 00795 if (my_b_read(file, (byte*) buf + header_size, data_len - header_size)) 00796 { 00797 error = "read error"; 00798 goto err; 00799 } 00800 if ((res= read_log_event(buf, data_len, &error, description_event))) 00801 res->register_temp_buf(buf); 00802 00803 err: 00804 UNLOCK_MUTEX; 00805 if (!res) 00806 { 00807 DBUG_ASSERT(error != 0); 00808 sql_print_error("Error in Log_event::read_log_event(): " 00809 "'%s', data_len: %d, event_type: %d", 00810 error,data_len,head[EVENT_TYPE_OFFSET]); 00811 my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); 00812 /* 00813 The SQL slave thread will check if file->error<0 to know 00814 if there was an I/O error. Even if there is no "low-level" I/O errors 00815 with 'file', any of the high-level above errors is worrying 00816 enough to stop the SQL thread now ; as we are skipping the current event, 00817 going on with reading and successfully executing other events can 00818 only corrupt the slave's databases. So stop. 00819 */ 00820 file->error= -1; 00821 } 00822 DBUG_RETURN(res); 00823 } 00824 00825 00826 /* 00827 Log_event::read_log_event() 00828 Binlog format tolerance is in (buf, event_len, description_event) 00829 constructors. 00830 */ 00831 00832 Log_event* Log_event::read_log_event(const char* buf, uint event_len, 00833 const char **error, 00834 const Format_description_log_event *description_event) 00835 { 00836 Log_event* ev; 00837 DBUG_ENTER("Log_event::read_log_event(char*,...)"); 00838 DBUG_ASSERT(description_event != 0); 00839 DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); 00840 if (event_len < EVENT_LEN_OFFSET || 00841 (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) 00842 { 00843 *error="Sanity check failed"; // Needed to free buffer 00844 DBUG_RETURN(NULL); // general sanity check - will fail on a partial read 00845 } 00846 00847 /* To check the integrity of the Log_event_type enumeration */ 00848 DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] < ENUM_END_EVENT); 00849 00850 switch(buf[EVENT_TYPE_OFFSET]) { 00851 case QUERY_EVENT: 00852 ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); 00853 break; 00854 case LOAD_EVENT: 00855 ev = new Load_log_event(buf, event_len, description_event); 00856 break; 00857 case NEW_LOAD_EVENT: 00858 ev = new Load_log_event(buf, event_len, description_event); 00859 break; 00860 case ROTATE_EVENT: 00861 ev = new Rotate_log_event(buf, event_len, description_event); 00862 break; 00863 #ifdef HAVE_REPLICATION 00864 case SLAVE_EVENT: /* can never happen (unused event) */ 00865 ev = new Slave_log_event(buf, event_len); 00866 break; 00867 #endif /* HAVE_REPLICATION */ 00868 case CREATE_FILE_EVENT: 00869 ev = new Create_file_log_event(buf, event_len, description_event); 00870 break; 00871 case APPEND_BLOCK_EVENT: 00872 ev = new Append_block_log_event(buf, event_len, description_event); 00873 break; 00874 case DELETE_FILE_EVENT: 00875 ev = new Delete_file_log_event(buf, event_len, description_event); 00876 break; 00877 case EXEC_LOAD_EVENT: 00878 ev = new Execute_load_log_event(buf, event_len, description_event); 00879 break; 00880 case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ 00881 ev = new Start_log_event_v3(buf, description_event); 00882 break; 00883 case STOP_EVENT: 00884 ev = new Stop_log_event(buf, description_event); 00885 break; 00886 case INTVAR_EVENT: 00887 ev = new Intvar_log_event(buf, description_event); 00888 break; 00889 case XID_EVENT: 00890 ev = new Xid_log_event(buf, description_event); 00891 break; 00892 case RAND_EVENT: 00893 ev = new Rand_log_event(buf, description_event); 00894 break; 00895 case USER_VAR_EVENT: 00896 ev = new User_var_log_event(buf, description_event); 00897 break; 00898 case FORMAT_DESCRIPTION_EVENT: 00899 ev = new Format_description_log_event(buf, event_len, description_event); 00900 break; 00901 #if defined(HAVE_REPLICATION) && defined(HAVE_ROW_BASED_REPLICATION) 00902 case WRITE_ROWS_EVENT: 00903 ev = new Write_rows_log_event(buf, event_len, description_event); 00904 break; 00905 case UPDATE_ROWS_EVENT: 00906 ev = new Update_rows_log_event(buf, event_len, description_event); 00907 break; 00908 case DELETE_ROWS_EVENT: 00909 ev = new Delete_rows_log_event(buf, event_len, description_event); 00910 break; 00911 case TABLE_MAP_EVENT: 00912 ev = new Table_map_log_event(buf, event_len, description_event); 00913 break; 00914 #endif 00915 case BEGIN_LOAD_QUERY_EVENT: 00916 ev = new Begin_load_query_log_event(buf, event_len, description_event); 00917 break; 00918 case EXECUTE_LOAD_QUERY_EVENT: 00919 ev = new Execute_load_query_log_event(buf, event_len, description_event); 00920 break; 00921 default: 00922 DBUG_PRINT("error",("Unknown evernt code: %d",(int) buf[EVENT_TYPE_OFFSET])); 00923 ev= NULL; 00924 break; 00925 } 00926 00927 /* 00928 is_valid() are small event-specific sanity tests which are 00929 important; for example there are some my_malloc() in constructors 00930 (e.g. Query_log_event::Query_log_event(char*...)); when these 00931 my_malloc() fail we can't return an error out of the constructor 00932 (because constructor is "void") ; so instead we leave the pointer we 00933 wanted to allocate (e.g. 'query') to 0 and we test it in is_valid(). 00934 Same for Format_description_log_event, member 'post_header_len'. 00935 */ 00936 if (!ev || !ev->is_valid()) 00937 { 00938 DBUG_PRINT("error",("Found invalid event in binary log")); 00939 00940 delete ev; 00941 #ifdef MYSQL_CLIENT 00942 if (!force_opt) /* then mysqlbinlog dies */ 00943 { 00944 *error= "Found invalid event in binary log"; 00945 DBUG_RETURN(0); 00946 } 00947 ev= new Unknown_log_event(buf, description_event); 00948 #else 00949 *error= "Found invalid event in binary log"; 00950 DBUG_RETURN(0); 00951 #endif 00952 } 00953 DBUG_RETURN(ev); 00954 } 00955 00956 #ifdef MYSQL_CLIENT 00957 00958 /* 00959 Log_event::print_header() 00960 */ 00961 00962 void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) 00963 { 00964 char llbuff[22]; 00965 my_off_t hexdump_from= print_event_info->hexdump_from; 00966 00967 fputc('#', file); 00968 print_timestamp(file); 00969 fprintf(file, " server id %d end_log_pos %s ", server_id, 00970 llstr(log_pos,llbuff)); 00971 00972 /* mysqlbinlog --hexdump */ 00973 if (print_event_info->hexdump_from) 00974 { 00975 fprintf(file, "\n"); 00976 uchar *ptr= (uchar*)temp_buf; 00977 my_off_t size= 00978 uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN; 00979 my_off_t i; 00980 00981 /* Header len * 4 >= header len * (2 chars + space + extra space) */ 00982 char *h, hex_string[LOG_EVENT_MINIMAL_HEADER_LEN*4]= {0}; 00983 char *c, char_string[16+1]= {0}; 00984 00985 /* Pretty-print event common header if header is exactly 19 bytes */ 00986 if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN) 00987 { 00988 fprintf(file, "# Position Timestamp Type Master ID " 00989 "Size Master Pos Flags \n"); 00990 fprintf(file, "# %8.8lx %02x %02x %02x %02x %02x " 00991 "%02x %02x %02x %02x %02x %02x %02x %02x " 00992 "%02x %02x %02x %02x %02x %02x\n", 00993 (unsigned long) hexdump_from, 00994 ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], 00995 ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], 00996 ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); 00997 ptr += LOG_EVENT_MINIMAL_HEADER_LEN; 00998 hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; 00999 } 01000 01001 /* Rest of event (without common header) */ 01002 for (i= 0, c= char_string, h=hex_string; 01003 i < size; 01004 i++, ptr++) 01005 { 01006 my_snprintf(h, 4, "%02x ", *ptr); 01007 h += 3; 01008 01009 *c++= my_isalnum(&my_charset_bin, *ptr) ? *ptr : '.'; 01010 01011 if (i % 16 == 15) 01012 { 01013 fprintf(file, "# %8.8lx %-48.48s |%16s|\n", 01014 (unsigned long) (hexdump_from + (i & 0xfffffff0)), 01015 hex_string, char_string); 01016 hex_string[0]= 0; 01017 char_string[0]= 0; 01018 c= char_string; 01019 h= hex_string; 01020 } 01021 else if (i % 8 == 7) *h++ = ' '; 01022 } 01023 *c= '\0'; 01024 01025 /* Non-full last line */ 01026 if (hex_string[0]) 01027 fprintf(file, "# %8.8lx %-48.48s |%s|\n# ", 01028 (unsigned long) (hexdump_from + (i & 0xfffffff0)), 01029 hex_string, char_string); 01030 } 01031 } 01032 01033 01034 void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) 01035 { 01036 uchar *ptr= (uchar*)temp_buf; 01037 my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); 01038 01039 char *tmp_str= 01040 (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME)); 01041 if (!tmp_str) { 01042 fprintf(stderr, "\nError: Out of memory. " 01043 "Could not print correct binlog event.\n"); 01044 return; 01045 } 01046 int res= base64_encode(ptr, size, tmp_str); 01047 fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); 01048 my_free(tmp_str, MYF(0)); 01049 } 01050 01051 01052 /* 01053 Log_event::print_timestamp() 01054 */ 01055 01056 void Log_event::print_timestamp(FILE* file, time_t* ts) 01057 { 01058 struct tm *res; 01059 if (!ts) 01060 ts = &when; 01061 #ifdef MYSQL_SERVER // This is always false 01062 struct tm tm_tmp; 01063 localtime_r(ts,(res= &tm_tmp)); 01064 #else 01065 res=localtime(ts); 01066 #endif 01067 01068 fprintf(file,"%02d%02d%02d %2d:%02d:%02d", 01069 res->tm_year % 100, 01070 res->tm_mon+1, 01071 res->tm_mday, 01072 res->tm_hour, 01073 res->tm_min, 01074 res->tm_sec); 01075 } 01076 01077 #endif /* MYSQL_CLIENT */ 01078 01079 01080 /************************************************************************** 01081 Query_log_event methods 01082 **************************************************************************/ 01083 01084 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 01085 01086 /* 01087 Query_log_event::pack_info() 01088 This (which is used only for SHOW BINLOG EVENTS) could be updated to 01089 print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is 01090 only an information, it does not produce suitable queries to replay (for 01091 example it does not print LOAD DATA INFILE). 01092 */ 01093 01094 void Query_log_event::pack_info(Protocol *protocol) 01095 { 01096 // TODO: show the catalog ?? 01097 char *buf, *pos; 01098 if (!(buf= my_malloc(9 + db_len + q_len, MYF(MY_WME)))) 01099 return; 01100 pos= buf; 01101 if (!(flags & LOG_EVENT_SUPPRESS_USE_F) 01102 && db && db_len) 01103 { 01104 pos= strmov(buf, "use `"); 01105 memcpy(pos, db, db_len); 01106 pos= strmov(pos+db_len, "`; "); 01107 } 01108 if (query && q_len) 01109 { 01110 memcpy(pos, query, q_len); 01111 pos+= q_len; 01112 } 01113 protocol->store(buf, pos-buf, &my_charset_bin); 01114 my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); 01115 } 01116 #endif 01117 01118 #ifndef MYSQL_CLIENT 01119 01120 /* Utility function for the next method */ 01121 static void write_str_with_code_and_len(char **dst, const char *src, 01122 int len, uint code) 01123 { 01124 DBUG_ASSERT(src); 01125 *((*dst)++)= code; 01126 *((*dst)++)= (uchar) len; 01127 bmove(*dst, src, len); 01128 (*dst)+= len; 01129 } 01130 01131 01132 /* 01133 Query_log_event::write() 01134 01135 NOTES: 01136 In this event we have to modify the header to have the correct 01137 EVENT_LEN_OFFSET as we don't yet know how many status variables we 01138 will print! 01139 */ 01140 01141 bool Query_log_event::write(IO_CACHE* file) 01142 { 01143 uchar buf[QUERY_HEADER_LEN+ 01144 1+4+ // code of flags2 and flags2 01145 1+8+ // code of sql_mode and sql_mode 01146 1+1+FN_REFLEN+ // code of catalog and catalog length and catalog 01147 1+4+ // code of autoinc and the 2 autoinc variables 01148 1+6+ // code of charset and charset 01149 1+1+MAX_TIME_ZONE_NAME_LENGTH // code of tz and tz length and tz name 01150 ], *start, *start_of_status; 01151 ulong event_length; 01152 01153 if (!query) 01154 return 1; // Something wrong with event 01155 01156 /* 01157 We want to store the thread id: 01158 (- as an information for the user when he reads the binlog) 01159 - if the query uses temporary table: for the slave SQL thread to know to 01160 which master connection the temp table belongs. 01161 Now imagine we (write()) are called by the slave SQL thread (we are 01162 logging a query executed by this thread; the slave runs with 01163 --log-slave-updates). Then this query will be logged with 01164 thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of 01165 the same name were created simultaneously on the master (in the master 01166 binlog you have 01167 CREATE TEMPORARY TABLE t; (thread 1) 01168 CREATE TEMPORARY TABLE t; (thread 2) 01169 ...) 01170 then in the slave's binlog there will be 01171 CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread) 01172 CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread) 01173 which is bad (same thread id!). 01174 01175 To avoid this, we log the thread's thread id EXCEPT for the SQL 01176 slave thread for which we log the original (master's) thread id. 01177 Now this moves the bug: what happens if the thread id on the 01178 master was 10 and when the slave replicates the query, a 01179 connection number 10 is opened by a normal client on the slave, 01180 and updates a temp table of the same name? We get a problem 01181 again. To avoid this, in the handling of temp tables (sql_base.cc) 01182 we use thread_id AND server_id. TODO when this is merged into 01183 4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id 01184 and is a session variable: that's to make mysqlbinlog work with 01185 temp tables. We probably need to introduce 01186 01187 SET PSEUDO_SERVER_ID 01188 for mysqlbinlog in 4.1. mysqlbinlog would print: 01189 SET PSEUDO_SERVER_ID= 01190 SET PSEUDO_THREAD_ID= 01191 for each query using temp tables. 01192 */ 01193 int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id); 01194 int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); 01195 buf[Q_DB_LEN_OFFSET] = (char) db_len; 01196 int2store(buf + Q_ERR_CODE_OFFSET, error_code); 01197 01198 /* 01199 You MUST always write status vars in increasing order of code. This 01200 guarantees that a slightly older slave will be able to parse those he 01201 knows. 01202 */ 01203 start_of_status= start= buf+QUERY_HEADER_LEN; 01204 if (flags2_inited) 01205 { 01206 *start++= Q_FLAGS2_CODE; 01207 int4store(start, flags2); 01208 start+= 4; 01209 } 01210 if (sql_mode_inited) 01211 { 01212 *start++= Q_SQL_MODE_CODE; 01213 int8store(start, (ulonglong)sql_mode); 01214 start+= 8; 01215 } 01216 if (catalog_len) // i.e. this var is inited (false for 4.0 events) 01217 { 01218 write_str_with_code_and_len((char **)(&start), 01219 catalog, catalog_len, Q_CATALOG_NZ_CODE); 01220 /* 01221 In 5.0.x where x<4 masters we used to store the end zero here. This was 01222 a waste of one byte so we don't do it in x>=4 masters. We change code to 01223 Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves 01224 of this x>=4 master segfault (expecting a zero when there is 01225 none). Remaining compatibility problems are: the older slave will not 01226 find the catalog; but it is will not crash, and it's not an issue 01227 that it does not find the catalog as catalogs were not used in these 01228 older MySQL versions (we store it in binlog and read it from relay log 01229 but do nothing useful with it). What is an issue is that the older slave 01230 will stop processing the Q_* blocks (and jumps to the db/query) as soon 01231 as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read 01232 Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in 01233 various ways. Documented that you should not mix alpha/beta versions if 01234 they are not exactly the same version, with example of 5.0.3->5.0.2 and 01235 5.0.4->5.0.3. If replication is from older to new, the new will 01236 recognize Q_CATALOG_CODE and have no problem. 01237 */ 01238 } 01239 if (auto_increment_increment != 1) 01240 { 01241 *start++= Q_AUTO_INCREMENT; 01242 int2store(start, auto_increment_increment); 01243 int2store(start+2, auto_increment_offset); 01244 start+= 4; 01245 } 01246 if (charset_inited) 01247 { 01248 *start++= Q_CHARSET_CODE; 01249 memcpy(start, charset, 6); 01250 start+= 6; 01251 } 01252 if (time_zone_len) 01253 { 01254 /* In the TZ sys table, column Name is of length 64 so this should be ok */ 01255 DBUG_ASSERT(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH); 01256 *start++= Q_TIME_ZONE_CODE; 01257 *start++= time_zone_len; 01258 memcpy(start, time_zone_str, time_zone_len); 01259 start+= time_zone_len; 01260 } 01261 /* 01262 Here there could be code like 01263 if (command-line-option-which-says-"log_this_variable" && inited) 01264 { 01265 *start++= Q_THIS_VARIABLE_CODE; 01266 int4store(start, this_variable); 01267 start+= 4; 01268 } 01269 */ 01270 01271 /* Store length of status variables */ 01272 status_vars_len= (uint) (start-start_of_status); 01273 int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len); 01274 01275 /* 01276 Calculate length of whole event 01277 The "1" below is the \0 in the db's length 01278 */ 01279 event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len; 01280 01281 return (write_header(file, event_length) || 01282 my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || 01283 write_post_header_for_derived(file) || 01284 my_b_safe_write(file, (byte*) start_of_status, 01285 (uint) (start-start_of_status)) || 01286 my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || 01287 my_b_safe_write(file, (byte*) query, q_len)) ? 1 : 0; 01288 } 01289 01290 /* 01291 Query_log_event::Query_log_event() 01292 01293 The simplest constructor that could possibly work. This is used for 01294 creating static objects that have a special meaning and are invisible 01295 to the log. 01296 */ 01297 Query_log_event::Query_log_event() 01298 :Log_event(), data_buf(0) 01299 { 01300 } 01301 01302 01303 /* 01304 Query_log_event::Query_log_event() 01305 */ 01306 Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, 01307 ulong query_length, bool using_trans, 01308 bool suppress_use) 01309 :Log_event(thd_arg, 01310 ((thd_arg->tmp_table_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) 01311 | (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0)), 01312 using_trans), 01313 data_buf(0), query(query_arg), catalog(thd_arg->catalog), 01314 db(thd_arg->db), q_len((uint32) query_length), 01315 error_code((thd_arg->killed != THD::NOT_KILLED) ? 01316 ((thd_arg->system_thread & SYSTEM_THREAD_DELAYED_INSERT) ? 01317 0 : thd->killed_errno()) : thd_arg->net.last_errno), 01318 thread_id(thd_arg->thread_id), 01319 /* save the original thread id; we already know the server id */ 01320 slave_proxy_id(thd_arg->variables.pseudo_thread_id), 01321 flags2_inited(1), sql_mode_inited(1), charset_inited(1), 01322 sql_mode(thd_arg->variables.sql_mode), 01323 auto_increment_increment(thd_arg->variables.auto_increment_increment), 01324 auto_increment_offset(thd_arg->variables.auto_increment_offset) 01325 { 01326 time_t end_time; 01327 time(&end_time); 01328 exec_time = (ulong) (end_time - thd->start_time); 01329 catalog_len = (catalog) ? (uint32) strlen(catalog) : 0; 01330 /* status_vars_len is set just before writing the event */ 01331 db_len = (db) ? (uint32) strlen(db) : 0; 01332 /* 01333 If we don't use flags2 for anything else than options contained in 01334 thd->options, it would be more efficient to flags2=thd_arg->options 01335 (OPTIONS_WRITTEN_TO_BINLOG would be used only at reading time). 01336 But it's likely that we don't want to use 32 bits for 3 bits; in the future 01337 we will probably want to reclaim the 29 bits. So we need the &. 01338 */ 01339 flags2= (uint32) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG); 01340 DBUG_ASSERT(thd->variables.character_set_client->number < 256*256); 01341 DBUG_ASSERT(thd->variables.collation_connection->number < 256*256); 01342 DBUG_ASSERT(thd->variables.collation_server->number < 256*256); 01343 int2store(charset, thd_arg->variables.character_set_client->number); 01344 int2store(charset+2, thd_arg->variables.collation_connection->number); 01345 int2store(charset+4, thd_arg->variables.collation_server->number); 01346 if (thd_arg->time_zone_used) 01347 { 01348 /* 01349 Note that our event becomes dependent on the Time_zone object 01350 representing the time zone. Fortunately such objects are never deleted 01351 or changed during mysqld's lifetime. 01352 */ 01353 time_zone_len= thd_arg->variables.time_zone->get_name()->length(); 01354 time_zone_str= thd_arg->variables.time_zone->get_name()->ptr(); 01355 } 01356 else 01357 time_zone_len= 0; 01358 DBUG_PRINT("info",("Query_log_event has flags2=%lu sql_mode=%lu",flags2,sql_mode)); 01359 } 01360 #endif /* MYSQL_CLIENT */ 01361 01362 01363 /* 2 utility functions for the next method */ 01364 01365 static void get_str_len_and_pointer(const char **dst, const char **src, uint *len) 01366 { 01367 if ((*len= **src)) 01368 *dst= *src + 1; // Will be copied later 01369 (*src)+= *len+1; 01370 } 01371 01372 01373 static void copy_str_and_move(char **dst, const char **src, uint len) 01374 { 01375 memcpy(*dst, *src, len); 01376 *src= *dst; 01377 (*dst)+= len; 01378 *(*dst)++= 0; 01379 } 01380 01381 01382 /* 01383 Query_log_event::Query_log_event() 01384 This is used by the SQL slave thread to prepare the event before execution. 01385 */ 01386 01387 Query_log_event::Query_log_event(const char* buf, uint event_len, 01388 const Format_description_log_event *description_event, 01389 Log_event_type event_type) 01390 :Log_event(buf, description_event), data_buf(0), query(NullS), 01391 db(NullS), catalog_len(0), status_vars_len(0), 01392 flags2_inited(0), sql_mode_inited(0), charset_inited(0), 01393 auto_increment_increment(1), auto_increment_offset(1), 01394 time_zone_len(0) 01395 { 01396 ulong data_len; 01397 uint32 tmp; 01398 uint8 common_header_len, post_header_len; 01399 char *start; 01400 const char *end; 01401 bool catalog_nz= 1; 01402 DBUG_ENTER("Query_log_event::Query_log_event(char*,...)"); 01403 01404 common_header_len= description_event->common_header_len; 01405 post_header_len= description_event->post_header_len[event_type-1]; 01406 DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d", 01407 event_len, common_header_len, post_header_len)); 01408 01409 /* 01410 We test if the event's length is sensible, and if so we compute data_len. 01411 We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant. 01412 We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0. 01413 */ 01414 if (event_len < (uint)(common_header_len + post_header_len)) 01415 DBUG_VOID_RETURN; 01416 data_len = event_len - (common_header_len + post_header_len); 01417 buf+= common_header_len; 01418 01419 slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET); 01420 exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET); 01421 db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars 01422 error_code = uint2korr(buf + Q_ERR_CODE_OFFSET); 01423 01424 /* 01425 5.0 format starts here. 01426 Depending on the format, we may or not have affected/warnings etc 01427 The remnent post-header to be parsed has length: 01428 */ 01429 tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN; 01430 if (tmp) 01431 { 01432 status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET); 01433 data_len-= status_vars_len; 01434 DBUG_PRINT("info", ("Query_log_event has status_vars_len: %u", 01435 (uint) status_vars_len)); 01436 tmp-= 2; 01437 } 01438 /* 01439 We have parsed everything we know in the post header for QUERY_EVENT, 01440 the rest of post header is either comes from older version MySQL or 01441 dedicated to derived events (e.g. Execute_load_query...) 01442 */ 01443 01444 /* variable-part: the status vars; only in MySQL 5.0 */ 01445 01446 start= (char*) (buf+post_header_len); 01447 end= (const char*) (start+status_vars_len); 01448 for (const uchar* pos= (const uchar*) start; pos < (const uchar*) end;) 01449 { 01450 switch (*pos++) { 01451 case Q_FLAGS2_CODE: 01452 flags2_inited= 1; 01453 flags2= uint4korr(pos); 01454 DBUG_PRINT("info",("In Query_log_event, read flags2: %lu", flags2)); 01455 pos+= 4; 01456 break; 01457 case Q_SQL_MODE_CODE: 01458 { 01459 #ifndef DBUG_OFF 01460 char buff[22]; 01461 #endif 01462 sql_mode_inited= 1; 01463 sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is ulonglong 01464 DBUG_PRINT("info",("In Query_log_event, read sql_mode: %s", 01465 llstr(sql_mode, buff))); 01466 pos+= 8; 01467 break; 01468 } 01469 case Q_CATALOG_NZ_CODE: 01470 get_str_len_and_pointer(&catalog, (const char **)(&pos), &catalog_len); 01471 break; 01472 case Q_AUTO_INCREMENT: 01473 auto_increment_increment= uint2korr(pos); 01474 auto_increment_offset= uint2korr(pos+2); 01475 pos+= 4; 01476 break; 01477 case Q_CHARSET_CODE: 01478 { 01479 charset_inited= 1; 01480 memcpy(charset, pos, 6); 01481 pos+= 6; 01482 break; 01483 } 01484 case Q_TIME_ZONE_CODE: 01485 { 01486 get_str_len_and_pointer(&time_zone_str, (const char **)(&pos), &time_zone_len); 01487 break; 01488 } 01489 case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */ 01490 if ((catalog_len= *pos)) 01491 catalog= (char*) pos+1; // Will be copied later 01492 pos+= catalog_len+2; // leap over end 0 01493 catalog_nz= 0; // catalog has end 0 in event 01494 break; 01495 default: 01496 /* That's why you must write status vars in growing order of code */ 01497 DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\ 01498 code: %u), skipping the rest of them", (uint) *(pos-1))); 01499 pos= (const uchar*) end; // Break loop 01500 } 01501 } 01502 01503 #if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE) 01504 if (!(start= data_buf = (char*) my_malloc(catalog_len + 1 + 01505 time_zone_len + 1 + 01506 data_len + 1 + 01507 QUERY_CACHE_FLAGS_SIZE + 01508 db_len + 1, 01509 MYF(MY_WME)))) 01510 #else 01511 if (!(start= data_buf = (char*) my_malloc(catalog_len + 1 + 01512 time_zone_len + 1 + 01513 data_len + 1, 01514 MYF(MY_WME)))) 01515 #endif 01516 DBUG_VOID_RETURN; 01517 if (catalog_len) // If catalog is given 01518 { 01519 if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3. 01520 copy_str_and_move(&start, &catalog, catalog_len); 01521 else 01522 { 01523 memcpy(start, catalog, catalog_len+1); // copy end 0 01524 catalog= start; 01525 start+= catalog_len+1; 01526 } 01527 } 01528 if (time_zone_len) 01529 copy_str_and_move(&start, &time_zone_str, time_zone_len); 01530 01531 /* A 2nd variable part; this is common to all versions */ 01532 memcpy((char*) start, end, data_len); // Copy db and query 01533 start[data_len]= '\0'; // End query with \0 (For safetly) 01534 db= start; 01535 query= start + db_len + 1; 01536 q_len= data_len - db_len -1; 01537 DBUG_VOID_RETURN; 01538 } 01539 01540 01541 /* 01542 Query_log_event::print() 01543 */ 01544 01545 #ifdef MYSQL_CLIENT 01546 void Query_log_event::print_query_header(FILE* file, 01547 PRINT_EVENT_INFO* print_event_info) 01548 { 01549 // TODO: print the catalog ?? 01550 char buff[40],*end; // Enough for SET TIMESTAMP 01551 bool different_db= 1; 01552 uint32 tmp; 01553 01554 if (!print_event_info->short_form) 01555 { 01556 print_header(file, print_event_info); 01557 fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", 01558 get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); 01559 } 01560 01561 if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) 01562 { 01563 if (different_db= memcmp(print_event_info->db, db, db_len + 1)) 01564 memcpy(print_event_info->db, db, db_len + 1); 01565 if (db[0] && different_db) 01566 fprintf(file, "use %s;\n", db); 01567 } 01568 01569 end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); 01570 *end++=';'; 01571 *end++='\n'; 01572 my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); 01573 if (flags & LOG_EVENT_THREAD_SPECIFIC_F) 01574 fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); 01575 01576 /* 01577 If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to 01578 print (remember we don't produce mixed relay logs so there cannot be 01579 5.0 events before that one so there is nothing to reset). 01580 */ 01581 if (likely(flags2_inited)) /* likely as this will mainly read 5.0 logs */ 01582 { 01583 /* tmp is a bitmask of bits which have changed. */ 01584 if (likely(print_event_info->flags2_inited)) 01585 /* All bits which have changed */ 01586 tmp= (print_event_info->flags2) ^ flags2; 01587 else /* that's the first Query event we read */ 01588 { 01589 print_event_info->flags2_inited= 1; 01590 tmp= ~((uint32)0); /* all bits have changed */ 01591 } 01592 01593 if (unlikely(tmp)) /* some bits have changed */ 01594 { 01595 bool need_comma= 0; 01596 fprintf(file, "SET "); 01597 print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, 01598 "@@session.foreign_key_checks", &need_comma); 01599 print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, 01600 "@@session.sql_auto_is_null", &need_comma); 01601 print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, 01602 "@@session.unique_checks", &need_comma); 01603 fprintf(file,";\n"); 01604 print_event_info->flags2= flags2; 01605 } 01606 } 01607 01608 /* 01609 Now the session variables; 01610 it's more efficient to pass SQL_MODE as a number instead of a 01611 comma-separated list. 01612 FOREIGN_KEY_CHECKS, SQL_AUTO_IS_NULL, UNIQUE_CHECKS are session-only 01613 variables (they have no global version; they're not listed in 01614 sql_class.h), The tests below work for pure binlogs or pure relay 01615 logs. Won't work for mixed relay logs but we don't create mixed 01616 relay logs (that is, there is no relay log with a format change 01617 except within the 3 first events, which mysqlbinlog handles 01618 gracefully). So this code should always be good. 01619 */ 01620 01621 if (likely(sql_mode_inited)) 01622 { 01623 if (unlikely(!print_event_info->sql_mode_inited)) /* first Query event */ 01624 { 01625 print_event_info->sql_mode_inited= 1; 01626 /* force a difference to force write */ 01627 print_event_info->sql_mode= ~sql_mode; 01628 } 01629 if (unlikely(print_event_info->sql_mode != sql_mode)) 01630 { 01631 fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); 01632 print_event_info->sql_mode= sql_mode; 01633 } 01634 } 01635 if (print_event_info->auto_increment_increment != auto_increment_increment || 01636 print_event_info->auto_increment_offset != auto_increment_offset) 01637 { 01638 fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", 01639 auto_increment_increment,auto_increment_offset); 01640 print_event_info->auto_increment_increment= auto_increment_increment; 01641 print_event_info->auto_increment_offset= auto_increment_offset; 01642 } 01643 01644 /* TODO: print the catalog when we feature SET CATALOG */ 01645 01646 if (likely(charset_inited)) 01647 { 01648 if (unlikely(!print_event_info->charset_inited)) /* first Query event */ 01649 { 01650 print_event_info->charset_inited= 1; 01651 print_event_info->charset[0]= ~charset[0]; // force a difference to force write 01652 } 01653 if (unlikely(bcmp(print_event_info->charset, charset, 6))) 01654 { 01655 CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME)); 01656 if (cs_info) 01657 { 01658 fprintf(file, "/*!\\C %s */;\n", cs_info->csname); /* for mysql client */ 01659 } 01660 fprintf(file,"SET " 01661 "@@session.character_set_client=%d," 01662 "@@session.collation_connection=%d," 01663 "@@session.collation_server=%d" 01664 ";\n", 01665 uint2korr(charset), 01666 uint2korr(charset+2), 01667 uint2korr(charset+4)); 01668 memcpy(print_event_info->charset, charset, 6); 01669 } 01670 } 01671 if (time_zone_len) 01672 { 01673 if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1)) 01674 { 01675 fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); 01676 memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1); 01677 } 01678 } 01679 } 01680 01681 01682 void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 01683 { 01684 print_query_header(file, print_event_info); 01685 my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); 01686 fputs(";\n", file); 01687 } 01688 #endif /* MYSQL_CLIENT */ 01689 01690 01691 /* 01692 Query_log_event::exec_event() 01693 */ 01694 01695 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 01696 01697 int Query_log_event::exec_event(struct st_relay_log_info* rli) 01698 { 01699 return exec_event(rli, query, q_len); 01700 } 01701 01702 01703 int Query_log_event::exec_event(struct st_relay_log_info* rli, 01704 const char *query_arg, uint32 q_len_arg) 01705 { 01706 LEX_STRING new_db; 01707 int expected_error,actual_error= 0; 01708 /* 01709 Colleagues: please never free(thd->catalog) in MySQL. This would lead to 01710 bugs as here thd->catalog is a part of an alloced block, not an entire 01711 alloced block (see Query_log_event::exec_event()). Same for thd->db. 01712 Thank you. 01713 */ 01714 thd->catalog= catalog_len ? (char *) catalog : (char *)""; 01715 new_db.length= db_len; 01716 new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length); 01717 thd->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */ 01718 thd->variables.auto_increment_increment= auto_increment_increment; 01719 thd->variables.auto_increment_offset= auto_increment_offset; 01720 01721 /* 01722 InnoDB internally stores the master log position it has executed so far, 01723 i.e. the position just after the COMMIT event. 01724 When InnoDB will want to store, the positions in rli won't have 01725 been updated yet, so group_master_log_* will point to old BEGIN 01726 and event_master_log* will point to the beginning of current COMMIT. 01727 But log_pos of the COMMIT Query event is what we want, i.e. the pos of the 01728 END of the current log event (COMMIT). We save it in rli so that InnoDB can 01729 access it. 01730 */ 01731 rli->future_group_master_log_pos= log_pos; 01732 DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); 01733 01734 clear_all_errors(thd, rli); 01735 rli->clear_tables_to_lock(); 01736 01737 /* 01738 Note: We do not need to execute reset_one_shot_variables() if this 01739 db_ok() test fails. 01740 Reason: The db stored in binlog events is the same for SET and for 01741 its companion query. If the SET is ignored because of 01742 db_ok(), the companion query will also be ignored, and if 01743 the companion query is ignored in the db_ok() test of 01744 ::exec_event(), then the companion SET also have so we 01745 don't need to reset_one_shot_variables(). 01746 */ 01747 if (rpl_filter->db_ok(thd->db)) 01748 { 01749 thd->set_time((time_t)when); 01750 thd->query_length= q_len_arg; 01751 thd->query= (char*)query_arg; 01752 VOID(pthread_mutex_lock(&LOCK_thread_count)); 01753 thd->query_id = next_query_id(); 01754 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01755 thd->variables.pseudo_thread_id= thread_id; // for temp tables 01756 DBUG_PRINT("query",("%s",thd->query)); 01757 01758 if (ignored_error_code((expected_error= error_code)) || 01759 !check_expected_error(thd,rli,expected_error)) 01760 { 01761 if (flags2_inited) 01762 /* 01763 all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG 01764 must take their value from flags2. 01765 */ 01766 thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG); 01767 /* 01768 else, we are in a 3.23/4.0 binlog; we previously received a 01769 Rotate_log_event which reset thd->options and sql_mode etc, so 01770 nothing to do. 01771 */ 01772 /* 01773 We do not replicate IGNORE_DIR_IN_CREATE. That is, if the master is a 01774 slave which runs with SQL_MODE=IGNORE_DIR_IN_CREATE, this should not 01775 force us to ignore the dir too. Imagine you are a ring of machines, and 01776 one has a disk problem so that you temporarily need 01777 IGNORE_DIR_IN_CREATE on this machine; you don't want it to propagate 01778 elsewhere (you don't want all slaves to start ignoring the dirs). 01779 */ 01780 if (sql_mode_inited) 01781 thd->variables.sql_mode= 01782 (ulong) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) | 01783 (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE)); 01784 if (charset_inited) 01785 { 01786 if (rli->cached_charset_compare(charset)) 01787 { 01788 /* Verify that we support the charsets found in the event. */ 01789 if (!(thd->variables.character_set_client= 01790 get_charset(uint2korr(charset), MYF(MY_WME))) || 01791 !(thd->variables.collation_connection= 01792 get_charset(uint2korr(charset+2), MYF(MY_WME))) || 01793 !(thd->variables.collation_server= 01794 get_charset(uint2korr(charset+4), MYF(MY_WME)))) 01795 { 01796 /* 01797 We updated the thd->variables with nonsensical values (0). Let's 01798 set them to something safe (i.e. which avoids crash), and we'll 01799 stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to 01800 ignore this error). 01801 */ 01802 set_slave_thread_default_charset(thd, rli); 01803 goto compare_errors; 01804 } 01805 thd->update_charset(); // for the charset change to take effect 01806 } 01807 } 01808 if (time_zone_len) 01809 { 01810 String tmp(time_zone_str, time_zone_len, &my_charset_bin); 01811 if (!(thd->variables.time_zone= 01812 my_tz_find_with_opening_tz_tables(thd, &tmp))) 01813 { 01814 my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr()); 01815 thd->variables.time_zone= global_system_variables.time_zone; 01816 goto compare_errors; 01817 } 01818 } 01819 01820 /* Execute the query (note that we bypass dispatch_command()) */ 01821 mysql_parse(thd, thd->query, thd->query_length); 01822 01823 } 01824 else 01825 { 01826 /* 01827 The query got a really bad error on the master (thread killed etc), 01828 which could be inconsistent. Parse it to test the table names: if the 01829 replicate-*-do|ignore-table rules say "this query must be ignored" then 01830 we exit gracefully; otherwise we warn about the bad error and tell DBA 01831 to check/fix it. 01832 */ 01833 if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length)) 01834 clear_all_errors(thd, rli); /* Can ignore query */ 01835 else 01836 { 01837 slave_print_msg(ERROR_LEVEL, rli, expected_error, 01838 "\ 01839 Query partially completed on the master (error on master: %d) \ 01840 and was aborted. There is a chance that your master is inconsistent at this \ 01841 point. If you are sure that your master is ok, run this query manually on the \ 01842 slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; \ 01843 START SLAVE; . Query: '%s'", expected_error, thd->query); 01844 thd->query_error= 1; 01845 } 01846 goto end; 01847 } 01848 01849 /* If the query was not ignored, it is printed to the general log */ 01850 if (thd->net.last_errno != ER_SLAVE_IGNORED_TABLE) 01851 general_log_print(thd, COM_QUERY, "%s", thd->query); 01852 01853 compare_errors: 01854 01855 /* 01856 If we expected a non-zero error code, and we don't get the same error 01857 code, and none of them should be ignored. 01858 */ 01859 DBUG_PRINT("info",("expected_error: %d last_errno: %d", 01860 expected_error, thd->net.last_errno)); 01861 if ((expected_error != (actual_error= thd->net.last_errno)) && 01862 expected_error && 01863 !ignored_error_code(actual_error) && 01864 !ignored_error_code(expected_error)) 01865 { 01866 slave_print_msg(ERROR_LEVEL, rli, 0, 01867 "\ 01868 Query caused different errors on master and slave. \ 01869 Error on master: '%s' (%d), Error on slave: '%s' (%d). \ 01870 Default database: '%s'. Query: '%s'", 01871 ER_SAFE(expected_error), 01872 expected_error, 01873 actual_error ? thd->net.last_error: "no error", 01874 actual_error, 01875 print_slave_db_safe(db), query_arg); 01876 thd->query_error= 1; 01877 } 01878 /* 01879 If we get the same error code as expected, or they should be ignored. 01880 */ 01881 else if (expected_error == actual_error || 01882 ignored_error_code(actual_error)) 01883 { 01884 DBUG_PRINT("info",("error ignored")); 01885 clear_all_errors(thd, rli); 01886 } 01887 /* 01888 Other cases: mostly we expected no error and get one. 01889 */ 01890 else if (thd->query_error || thd->is_fatal_error) 01891 { 01892 slave_print_msg(ERROR_LEVEL, rli, actual_error, 01893 "Error '%s' on query. Default database: '%s'. Query: '%s'", 01894 (actual_error ? thd->net.last_error : 01895 "unexpected success or fatal error"), 01896 print_slave_db_safe(thd->db), query_arg); 01897 thd->query_error= 1; 01898 } 01899 01900 /* 01901 TODO: compare the values of "affected rows" around here. Something 01902 like: 01903 if ((uint32) affected_in_event != (uint32) affected_on_slave) 01904 { 01905 sql_print_error("Slave: did not get the expected number of affected \ 01906 rows running query from master - expected %d, got %d (this numbers \ 01907 should have matched modulo 4294967296).", 0, ...); 01908 thd->query_error = 1; 01909 } 01910 We may also want an option to tell the slave to ignore "affected" 01911 mismatch. This mismatch could be implemented with a new ER_ code, and 01912 to ignore it you would use --slave-skip-errors... 01913 01914 To do the comparison we need to know the value of "affected" which the 01915 above mysql_parse() computed. And we need to know the value of 01916 "affected" in the master's binlog. Both will be implemented later. The 01917 important thing is that we now have the format ready to log the values 01918 of "affected" in the binlog. So we can release 5.0.0 before effectively 01919 logging "affected" and effectively comparing it. 01920 */ 01921 } /* End of if (db_ok(... */ 01922 01923 end: 01924 VOID(pthread_mutex_lock(&LOCK_thread_count)); 01925 /* 01926 Probably we have set thd->query, thd->db, thd->catalog to point to places 01927 in the data_buf of this event. Now the event is going to be deleted 01928 probably, so data_buf will be freed, so the thd->... listed above will be 01929 pointers to freed memory. 01930 So we must set them to 0, so that those bad pointers values are not later 01931 used. Note that "cleanup" queries like automatic DROP TEMPORARY TABLE 01932 don't suffer from these assignments to 0 as DROP TEMPORARY 01933 TABLE uses the db.table syntax. 01934 */ 01935 thd->catalog= 0; 01936 thd->set_db(NULL, 0); /* will free the current database */ 01937 thd->query= 0; // just to be sure 01938 thd->query_length= 0; 01939 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 01940 close_thread_tables(thd); 01941 /* 01942 As a disk space optimization, future masters will not log an event for 01943 LAST_INSERT_ID() if that function returned 0 (and thus they will be able 01944 to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt 01945 variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the 01946 resetting below we are ready to support that. 01947 */ 01948 thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0; 01949 thd->first_successful_insert_id_in_prev_stmt= 0; 01950 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; 01951 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); 01952 /* 01953 If there was an error we stop. Otherwise we increment positions. Note that 01954 we will not increment group* positions if we are just after a SET 01955 ONE_SHOT, because SET ONE_SHOT should not be separated from its following 01956 updating query. 01957 */ 01958 return (thd->query_error ? thd->query_error : 01959 (thd->one_shot_set ? (rli->inc_event_relay_log_pos(),0) : 01960 Log_event::exec_event(rli))); 01961 } 01962 #endif 01963 01964 01965 /************************************************************************** 01966 Muted_query_log_event methods 01967 **************************************************************************/ 01968 01969 #ifndef MYSQL_CLIENT 01970 /* 01971 Muted_query_log_event::Muted_query_log_event() 01972 */ 01973 Muted_query_log_event::Muted_query_log_event() 01974 :Query_log_event() 01975 { 01976 } 01977 #endif 01978 01979 01980 /************************************************************************** 01981 Start_log_event_v3 methods 01982 **************************************************************************/ 01983 01984 #ifndef MYSQL_CLIENT 01985 Start_log_event_v3::Start_log_event_v3() :Log_event(), binlog_version(BINLOG_VERSION), artificial_event(0) 01986 { 01987 created= when; 01988 memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); 01989 } 01990 #endif 01991 01992 /* 01993 Start_log_event_v3::pack_info() 01994 */ 01995 01996 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 01997 void Start_log_event_v3::pack_info(Protocol *protocol) 01998 { 01999 char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos; 02000 pos= strmov(buf, "Server ver: "); 02001 pos= strmov(pos, server_version); 02002 pos= strmov(pos, ", Binlog ver: "); 02003 pos= int10_to_str(binlog_version, pos, 10); 02004 protocol->store(buf, (uint) (pos-buf), &my_charset_bin); 02005 } 02006 #endif 02007 02008 02009 /* 02010 Start_log_event_v3::print() 02011 */ 02012 02013 #ifdef MYSQL_CLIENT 02014 void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 02015 { 02016 if (!print_event_info->short_form) 02017 { 02018 print_header(file, print_event_info); 02019 fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, 02020 server_version); 02021 print_timestamp(file); 02022 if (created) 02023 fprintf(file," at startup"); 02024 fputc('\n', file); 02025 if (flags & LOG_EVENT_BINLOG_IN_USE_F) 02026 fprintf(file, "# Warning: this binlog was not closed properly. " 02027 "Most probably mysqld crashed writing it.\n"); 02028 } 02029 if (!artificial_event && created) 02030 { 02031 #ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND 02032 /* 02033 This is for mysqlbinlog: like in replication, we want to delete the stale 02034 tmp files left by an unclean shutdown of mysqld (temporary tables) 02035 and rollback unfinished transaction. 02036 Probably this can be done with RESET CONNECTION (syntax to be defined). 02037 */ 02038 fprintf(file,"RESET CONNECTION;\n"); 02039 #else 02040 fprintf(file,"ROLLBACK;\n"); 02041 #endif 02042 } 02043 fflush(file); 02044 } 02045 #endif /* MYSQL_CLIENT */ 02046 02047 /* 02048 Start_log_event_v3::Start_log_event_v3() 02049 */ 02050 02051 Start_log_event_v3::Start_log_event_v3(const char* buf, 02052 const Format_description_log_event* description_event) 02053 :Log_event(buf, description_event) 02054 { 02055 buf+= description_event->common_header_len; 02056 binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET); 02057 memcpy(server_version, buf+ST_SERVER_VER_OFFSET, 02058 ST_SERVER_VER_LEN); 02059 created= uint4korr(buf+ST_CREATED_OFFSET); 02060 /* We use log_pos to mark if this was an artificial event or not */ 02061 artificial_event= (log_pos == 0); 02062 } 02063 02064 02065 /* 02066 Start_log_event_v3::write() 02067 */ 02068 02069 #ifndef MYSQL_CLIENT 02070 bool Start_log_event_v3::write(IO_CACHE* file) 02071 { 02072 char buff[START_V3_HEADER_LEN]; 02073 int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); 02074 memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); 02075 int4store(buff + ST_CREATED_OFFSET,created); 02076 return (write_header(file, sizeof(buff)) || 02077 my_b_safe_write(file, (byte*) buff, sizeof(buff))); 02078 } 02079 #endif 02080 02081 02082 /* 02083 Start_log_event_v3::exec_event() 02084 02085 The master started 02086 02087 IMPLEMENTATION 02088 - To handle the case where the master died without having time to write 02089 DROP TEMPORARY TABLE, DO RELEASE_LOCK (prepared statements' deletion is 02090 TODO), we clean up all temporary tables that we got, if we are sure we 02091 can (see below). 02092 02093 TODO 02094 - Remove all active user locks. 02095 Guilhem 2003-06: this is true but not urgent: the worst it can cause is 02096 the use of a bit of memory for a user lock which will not be used 02097 anymore. If the user lock is later used, the old one will be released. In 02098 other words, no deadlock problem. 02099 */ 02100 02101 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 02102 int Start_log_event_v3::exec_event(struct st_relay_log_info* rli) 02103 { 02104 DBUG_ENTER("Start_log_event_v3::exec_event"); 02105 switch (binlog_version) 02106 { 02107 case 3: 02108 case 4: 02109 /* 02110 This can either be 4.x (then a Start_log_event_v3 is only at master 02111 startup so we are sure the master has restarted and cleared his temp 02112 tables; the event always has 'created'>0) or 5.0 (then we have to test 02113 'created'). 02114 */ 02115 if (created) 02116 { 02117 close_temporary_tables(thd); 02118 cleanup_load_tmpdir(); 02119 } 02120 break; 02121 02122 /* 02123 Now the older formats; in that case load_tmpdir is cleaned up by the I/O 02124 thread. 02125 */ 02126 case 1: 02127 if (strncmp(rli->relay_log.description_event_for_exec->server_version, 02128 "3.23.57",7) >= 0 && created) 02129 { 02130 /* 02131 Can distinguish, based on the value of 'created': this event was 02132 generated at master startup. 02133 */ 02134 close_temporary_tables(thd); 02135 } 02136 /* 02137 Otherwise, can't distinguish a Start_log_event generated at 02138 master startup and one generated by master FLUSH LOGS, so cannot 02139 be sure temp tables have to be dropped. So do nothing. 02140 */ 02141 break; 02142 default: 02143 /* this case is impossible */ 02144 DBUG_RETURN(1); 02145 } 02146 DBUG_RETURN(Log_event::exec_event(rli)); 02147 } 02148 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 02149 02150 /*************************************************************************** 02151 Format_description_log_event methods 02152 ****************************************************************************/ 02153 02154 /* 02155 Format_description_log_event 1st ctor. 02156 02157 SYNOPSIS 02158 Format_description_log_event::Format_description_log_event 02159 binlog_version the binlog version for which we want to build 02160 an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x 02161 x>=2 and 4.1) or 4 (MySQL 5.0). Note that the 02162 old 4.0 (binlog version 2) is not supported; 02163 it should not be used for replication with 02164 5.0. 02165 02166 DESCRIPTION 02167 Ctor. Can be used to create the event to write to the binary log (when the 02168 server starts or when FLUSH LOGS), or to create artificial events to parse 02169 binlogs from MySQL 3.23 or 4.x. 02170 When in a client, only the 2nd use is possible. 02171 */ 02172 02173 Format_description_log_event:: 02174 Format_description_log_event(uint8 binlog_ver, const char* server_ver) 02175 :Start_log_event_v3() 02176 { 02177 created= when; 02178 binlog_version= binlog_ver; 02179 switch (binlog_ver) { 02180 case 4: /* MySQL 5.0 */ 02181 memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); 02182 common_header_len= LOG_EVENT_HEADER_LEN; 02183 number_of_event_types= LOG_EVENT_TYPES; 02184 /* we'll catch my_malloc() error in is_valid() */ 02185 post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8), 02186 MYF(MY_ZEROFILL)); 02187 /* 02188 This long list of assignments is not beautiful, but I see no way to 02189 make it nicer, as the right members are #defines, not array members, so 02190 it's impossible to write a loop. 02191 */ 02192 if (post_header_len) 02193 { 02194 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN; 02195 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN; 02196 post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN; 02197 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN; 02198 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN; 02199 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN; 02200 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN; 02201 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN; 02202 post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1]; 02203 post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN; 02204 post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN; 02205 post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN; 02206 post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN; 02207 post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN; 02208 /* 02209 We here have the possibility to simulate a master of before we changed 02210 the table map id to be stored in 6 bytes: when it was stored in 4 02211 bytes (=> post_header_len was 6). This is used to test backward 02212 compatibility. 02213 This code can be removed after a few months (today is Dec 21st 2005), 02214 when we know that the 4-byte masters are not deployed anymore (check 02215 with Tomas Ulin first!), and the accompanying test (rpl_row_4_bytes) 02216 too. 02217 */ 02218 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", 02219 post_header_len[TABLE_MAP_EVENT-1]= 02220 post_header_len[WRITE_ROWS_EVENT-1]= 02221 post_header_len[UPDATE_ROWS_EVENT-1]= 02222 post_header_len[DELETE_ROWS_EVENT-1]= 6;); 02223 post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1]; 02224 post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN; 02225 } 02226 break; 02227 02228 case 1: /* 3.23 */ 02229 case 3: /* 4.0.x x>=2 */ 02230 /* 02231 We build an artificial (i.e. not sent by the master) event, which 02232 describes what those old master versions send. 02233 */ 02234 if (binlog_ver==1) 02235 strmov(server_version, server_ver ? server_ver : "3.23"); 02236 else 02237 strmov(server_version, server_ver ? server_ver : "4.0"); 02238 common_header_len= binlog_ver==1 ? OLD_HEADER_LEN : 02239 LOG_EVENT_MINIMAL_HEADER_LEN; 02240 /* 02241 The first new event in binlog version 4 is Format_desc. So any event type 02242 after that does not exist in older versions. We use the events known by 02243 version 3, even if version 1 had only a subset of them (this is not a 02244 problem: it uses a few bytes for nothing but unifies code; it does not 02245 make the slave detect less corruptions). 02246 */ 02247 number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1; 02248 post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8), 02249 MYF(0)); 02250 if (post_header_len) 02251 { 02252 post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN; 02253 post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN; 02254 post_header_len[STOP_EVENT-1]= 0; 02255 post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN; 02256 post_header_len[INTVAR_EVENT-1]= 0; 02257 post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN; 02258 post_header_len[SLAVE_EVENT-1]= 0; 02259 post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN; 02260 post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN; 02261 post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN; 02262 post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN; 02263 post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1]; 02264 post_header_len[RAND_EVENT-1]= 0; 02265 post_header_len[USER_VAR_EVENT-1]= 0; 02266 } 02267 break; 02268 default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */ 02269 post_header_len= 0; /* will make is_valid() fail */ 02270 break; 02271 } 02272 } 02273 02274 02275 /* 02276 The problem with this constructor is that the fixed header may have a 02277 length different from this version, but we don't know this length as we 02278 have not read the Format_description_log_event which says it, yet. This 02279 length is in the post-header of the event, but we don't know where the 02280 post-header starts. 02281 So this type of event HAS to: 02282 - either have the header's length at the beginning (in the header, at a 02283 fixed position which will never be changed), not in the post-header. That 02284 would make the header be "shifted" compared to other events. 02285 - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future 02286 versions, so that we know for sure. 02287 I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because 02288 it is sent before Format_description_log_event). 02289 */ 02290 02291 Format_description_log_event:: 02292 Format_description_log_event(const char* buf, 02293 uint event_len, 02294 const 02295 Format_description_log_event* 02296 description_event) 02297 :Start_log_event_v3(buf, description_event) 02298 { 02299 DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); 02300 buf+= LOG_EVENT_MINIMAL_HEADER_LEN; 02301 if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN) 02302 DBUG_VOID_RETURN; /* sanity check */ 02303 number_of_event_types= 02304 event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1); 02305 DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d", 02306 common_header_len, number_of_event_types)); 02307 /* If alloc fails, we'll detect it in is_valid() */ 02308 post_header_len= (uint8*) my_memdup((byte*)buf+ST_COMMON_HEADER_LEN_OFFSET+1, 02309 number_of_event_types* 02310 sizeof(*post_header_len), MYF(0)); 02311 DBUG_VOID_RETURN; 02312 } 02313 02314 #ifndef MYSQL_CLIENT 02315 bool Format_description_log_event::write(IO_CACHE* file) 02316 { 02317 /* 02318 We don't call Start_log_event_v3::write() because this would make 2 02319 my_b_safe_write(). 02320 */ 02321 byte buff[FORMAT_DESCRIPTION_HEADER_LEN]; 02322 int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); 02323 memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); 02324 int4store(buff + ST_CREATED_OFFSET,created); 02325 buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN; 02326 memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (byte*) post_header_len, 02327 LOG_EVENT_TYPES); 02328 return (write_header(file, sizeof(buff)) || 02329 my_b_safe_write(file, buff, sizeof(buff))); 02330 } 02331 #endif 02332 02333 /* 02334 SYNOPSIS 02335 Format_description_log_event::exec_event() 02336 02337 IMPLEMENTATION 02338 Save the information which describes the binlog's format, to be able to 02339 read all coming events. 02340 Call Start_log_event_v3::exec_event(). 02341 */ 02342 02343 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 02344 int Format_description_log_event::exec_event(struct st_relay_log_info* rli) 02345 { 02346 DBUG_ENTER("Format_description_log_event::exec_event"); 02347 02348 /* save the information describing this binlog */ 02349 delete rli->relay_log.description_event_for_exec; 02350 rli->relay_log.description_event_for_exec= this; 02351 02352 #ifdef USING_TRANSACTIONS 02353 /* 02354 As a transaction NEVER spans on 2 or more binlogs: 02355 if we have an active transaction at this point, the master died 02356 while writing the transaction to the binary log, i.e. while 02357 flushing the binlog cache to the binlog. XA guarantees that master has 02358 rolled back. So we roll back. 02359 Note: this event could be sent by the master to inform us of the 02360 format of its binlog; in other words maybe it is not at its 02361 original place when it comes to us; we'll know this by checking 02362 log_pos ("artificial" events have log_pos == 0). 02363 */ 02364 if (!artificial_event && created && thd->transaction.all.nht) 02365 { 02366 /* This is not an error (XA is safe), just an information */ 02367 slave_print_msg(INFORMATION_LEVEL, rli, 0, 02368 "Rolling back unfinished transaction (no COMMIT " 02369 "or ROLLBACK in relay log). A probable cause is that " 02370 "the master died while writing the transaction to " 02371 "its binary log, thus rolled back too."); 02372 rli->cleanup_context(thd, 1); 02373 } 02374 #endif 02375 /* 02376 If this event comes from ourselves, there is no cleaning task to perform, 02377 we don't call Start_log_event_v3::exec_event() (this was just to update the 02378 log's description event). 02379 */ 02380 if (server_id == (uint32) ::server_id) 02381 { 02382 /* 02383 Do not modify rli->group_master_log_pos, as this event did not exist on 02384 the master. That is, just update the *relay log* coordinates; this is 02385 done by passing log_pos=0 to inc_group_relay_log_pos, like we do in 02386 Stop_log_event::exec_event(). 02387 If in a transaction, don't touch group_* coordinates. 02388 */ 02389 if (thd->options & OPTION_BEGIN) 02390 rli->inc_event_relay_log_pos(); 02391 else 02392 { 02393 rli->inc_group_relay_log_pos(0); 02394 flush_relay_log_info(rli); 02395 } 02396 DBUG_RETURN(0); 02397 } 02398 02399 /* 02400 If the event was not requested by the slave i.e. the master sent it while 02401 the slave asked for a position >4, the event will make 02402 rli->group_master_log_pos advance. Say that the slave asked for position 02403 1000, and the Format_desc event's end is 96. Then in the beginning of 02404 replication rli->group_master_log_pos will be 0, then 96, then jump to 02405 first really asked event (which is >96). So this is ok. 02406 */ 02407 DBUG_RETURN(Start_log_event_v3::exec_event(rli)); 02408 } 02409 #endif 02410 02411 /************************************************************************** 02412 Load_log_event methods 02413 General note about Load_log_event: the binlogging of LOAD DATA INFILE is 02414 going to be changed in 5.0 (or maybe in 5.1; not decided yet). 02415 However, the 5.0 slave could still have to read such events (from a 4.x 02416 master), convert them (which just means maybe expand the header, when 5.0 02417 servers have a UID in events) (remember that whatever is after the header 02418 will be like in 4.x, as this event's format is not modified in 5.0 as we 02419 will use new types of events to log the new LOAD DATA INFILE features). 02420 To be able to read/convert, we just need to not assume that the common 02421 header is of length LOG_EVENT_HEADER_LEN (we must use the description 02422 event). 02423 Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE 02424 between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the 02425 positions displayed in SHOW SLAVE STATUS then are fine too). 02426 **************************************************************************/ 02427 02428 /* 02429 Load_log_event::pack_info() 02430 */ 02431 02432 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 02433 uint Load_log_event::get_query_buffer_length() 02434 { 02435 return 02436 5 + db_len + 3 + // "use DB; " 02437 18 + fname_len + 2 + // "LOAD DATA INFILE 'file''" 02438 7 + // LOCAL 02439 9 + // " REPLACE or IGNORE " 02440 13 + table_name_len*2 + // "INTO TABLE `table`" 02441 21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'" 02442 23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'" 02443 12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'" 02444 21 + sql_ex.line_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'" 02445 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'" 02446 15 + 22 + // " IGNORE xxx LINES" 02447 3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)" 02448 } 02449 02450 02451 void Load_log_event::print_query(bool need_db, char *buf, 02452 char **end, char **fn_start, char **fn_end) 02453 { 02454 char *pos= buf; 02455 02456 if (need_db && db && db_len) 02457 { 02458 pos= strmov(pos, "use `"); 02459 memcpy(pos, db, db_len); 02460 pos= strmov(pos+db_len, "`; "); 02461 } 02462 02463 pos= strmov(pos, "LOAD DATA "); 02464 02465 if (fn_start) 02466 *fn_start= pos; 02467 02468 if (check_fname_outside_temp_buf()) 02469 pos= strmov(pos, "LOCAL "); 02470 pos= strmov(pos, "INFILE '"); 02471 memcpy(pos, fname, fname_len); 02472 pos= strmov(pos+fname_len, "' "); 02473 02474 if (sql_ex.opt_flags & REPLACE_FLAG) 02475 pos= strmov(pos, " REPLACE "); 02476 else if (sql_ex.opt_flags & IGNORE_FLAG) 02477 pos= strmov(pos, " IGNORE "); 02478 02479 pos= strmov(pos ,"INTO"); 02480 02481 if (fn_end) 02482 *fn_end= pos; 02483 02484 pos= strmov(pos ," TABLE `"); 02485 memcpy(pos, table_name, table_name_len); 02486 pos+= table_name_len; 02487 02488 /* We have to create all optinal fields as the default is not empty */ 02489 pos= strmov(pos, "` FIELDS TERMINATED BY "); 02490 pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len); 02491 if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) 02492 pos= strmov(pos, " OPTIONALLY "); 02493 pos= strmov(pos, " ENCLOSED BY "); 02494 pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len); 02495 02496 pos= strmov(pos, " ESCAPED BY "); 02497 pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len); 02498 02499 pos= strmov(pos, " LINES TERMINATED BY "); 02500 pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len); 02501 if (sql_ex.line_start_len) 02502 { 02503 pos= strmov(pos, " STARTING BY "); 02504 pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len); 02505 } 02506 02507 if ((long) skip_lines > 0) 02508 { 02509 pos= strmov(pos, " IGNORE "); 02510 pos= longlong10_to_str((longlong) skip_lines, pos, 10); 02511 pos= strmov(pos," LINES "); 02512 } 02513 02514 if (num_fields) 02515 { 02516 uint i; 02517 const char *field= fields; 02518 pos= strmov(pos, " ("); 02519 for (i = 0; i < num_fields; i++) 02520 { 02521 if (i) 02522 { 02523 *pos++= ' '; 02524 *pos++= ','; 02525 } 02526 memcpy(pos, field, field_lens[i]); 02527 pos+= field_lens[i]; 02528 field+= field_lens[i] + 1; 02529 } 02530 *pos++= ')'; 02531 } 02532 02533 *end= pos; 02534 } 02535 02536 02537 void Load_log_event::pack_info(Protocol *protocol) 02538 { 02539 char *buf, *end; 02540 02541 if (!(buf= my_malloc(get_query_buffer_length(), MYF(MY_WME)))) 02542 return; 02543 print_query(TRUE, buf, &end, 0, 0); 02544 protocol->store(buf, end-buf, &my_charset_bin); 02545 my_free(buf, MYF(0)); 02546 } 02547 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 02548 02549 02550 #ifndef MYSQL_CLIENT 02551 02552 /* 02553 Load_log_event::write_data_header() 02554 */ 02555 02556 bool Load_log_event::write_data_header(IO_CACHE* file) 02557 { 02558 char buf[LOAD_HEADER_LEN]; 02559 int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); 02560 int4store(buf + L_EXEC_TIME_OFFSET, exec_time); 02561 int4store(buf + L_SKIP_LINES_OFFSET, skip_lines); 02562 buf[L_TBL_LEN_OFFSET] = (char)table_name_len; 02563 buf[L_DB_LEN_OFFSET] = (char)db_len; 02564 int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); 02565 return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN) != 0; 02566 } 02567 02568 02569 /* 02570 Load_log_event::write_data_body() 02571 */ 02572 02573 bool Load_log_event::write_data_body(IO_CACHE* file) 02574 { 02575 if (sql_ex.write_data(file)) 02576 return 1; 02577 if (num_fields && fields && field_lens) 02578 { 02579 if (my_b_safe_write(file, (byte*)field_lens, num_fields) || 02580 my_b_safe_write(file, (byte*)fields, field_block_len)) 02581 return 1; 02582 } 02583 return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) || 02584 my_b_safe_write(file, (byte*)db, db_len + 1) || 02585 my_b_safe_write(file, (byte*)fname, fname_len)); 02586 } 02587 02588 02589 /* 02590 Load_log_event::Load_log_event() 02591 */ 02592 02593 Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex, 02594 const char *db_arg, const char *table_name_arg, 02595 List<Item> &fields_arg, 02596 enum enum_duplicates handle_dup, 02597 bool ignore, bool using_trans) 02598 :Log_event(thd_arg, !thd_arg->tmp_table_used ? 02599 0 : LOG_EVENT_THREAD_SPECIFIC_F, using_trans), 02600 thread_id(thd_arg->thread_id), 02601 slave_proxy_id(thd_arg->variables.pseudo_thread_id), 02602 num_fields(0),fields(0), 02603 field_lens(0),field_block_len(0), 02604 table_name(table_name_arg ? table_name_arg : ""), 02605 db(db_arg), fname(ex->file_name), local_fname(FALSE) 02606 { 02607 time_t end_time; 02608 time(&end_time); 02609 exec_time = (ulong) (end_time - thd_arg->start_time); 02610 /* db can never be a zero pointer in 4.0 */ 02611 db_len = (uint32) strlen(db); 02612 table_name_len = (uint32) strlen(table_name); 02613 fname_len = (fname) ? (uint) strlen(fname) : 0; 02614 sql_ex.field_term = (char*) ex->field_term->ptr(); 02615 sql_ex.field_term_len = (uint8) ex->field_term->length(); 02616 sql_ex.enclosed = (char*) ex->enclosed->ptr(); 02617 sql_ex.enclosed_len = (uint8) ex->enclosed->length(); 02618 sql_ex.line_term = (char*) ex->line_term->ptr(); 02619 sql_ex.line_term_len = (uint8) ex->line_term->length(); 02620 sql_ex.line_start = (char*) ex->line_start->ptr(); 02621 sql_ex.line_start_len = (uint8) ex->line_start->length(); 02622 sql_ex.escaped = (char*) ex->escaped->ptr(); 02623 sql_ex.escaped_len = (uint8) ex->escaped->length(); 02624 sql_ex.opt_flags = 0; 02625 sql_ex.cached_new_format = -1; 02626 02627 if (ex->dumpfile) 02628 sql_ex.opt_flags|= DUMPFILE_FLAG; 02629 if (ex->opt_enclosed) 02630 sql_ex.opt_flags|= OPT_ENCLOSED_FLAG; 02631 02632 sql_ex.empty_flags= 0; 02633 02634 switch (handle_dup) { 02635 case DUP_REPLACE: 02636 sql_ex.opt_flags|= REPLACE_FLAG; 02637 break; 02638 case DUP_UPDATE: // Impossible here 02639 case DUP_ERROR: 02640 break; 02641 } 02642 if (ignore) 02643 sql_ex.opt_flags|= IGNORE_FLAG; 02644 02645 if (!ex->field_term->length()) 02646 sql_ex.empty_flags |= FIELD_TERM_EMPTY; 02647 if (!ex->enclosed->length()) 02648 sql_ex.empty_flags |= ENCLOSED_EMPTY; 02649 if (!ex->line_term->length()) 02650 sql_ex.empty_flags |= LINE_TERM_EMPTY; 02651 if (!ex->line_start->length()) 02652 sql_ex.empty_flags |= LINE_START_EMPTY; 02653 if (!ex->escaped->length()) 02654 sql_ex.empty_flags |= ESCAPED_EMPTY; 02655 02656 skip_lines = ex->skip_lines; 02657 02658 List_iterator<Item> li(fields_arg); 02659 field_lens_buf.length(0); 02660 fields_buf.length(0); 02661 Item* item; 02662 while ((item = li++)) 02663 { 02664 num_fields++; 02665 uchar len = (uchar) strlen(item->name); 02666 field_block_len += len + 1; 02667 fields_buf.append(item->name, len + 1); 02668 field_lens_buf.append((char*)&len, 1); 02669 } 02670 02671 field_lens = (const uchar*)field_lens_buf.ptr(); 02672 fields = fields_buf.ptr(); 02673 } 02674 #endif /* !MYSQL_CLIENT */ 02675 02676 02677 /* 02678 Load_log_event::Load_log_event() 02679 02680 NOTE 02681 The caller must do buf[event_len] = 0 before he starts using the 02682 constructed event. 02683 */ 02684 02685 Load_log_event::Load_log_event(const char *buf, uint event_len, 02686 const Format_description_log_event *description_event) 02687 :Log_event(buf, description_event), num_fields(0), fields(0), 02688 field_lens(0),field_block_len(0), 02689 table_name(0), db(0), fname(0), local_fname(FALSE) 02690 { 02691 DBUG_ENTER("Load_log_event"); 02692 /* 02693 I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0, 02694 4.0->5.0 and 5.0->5.0 and it works. 02695 */ 02696 if (event_len) 02697 copy_log_event(buf, event_len, 02698 ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? 02699 LOAD_HEADER_LEN + 02700 description_event->common_header_len : 02701 LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN), 02702 description_event); 02703 /* otherwise it's a derived class, will call copy_log_event() itself */ 02704 DBUG_VOID_RETURN; 02705 } 02706 02707 02708 /* 02709 Load_log_event::copy_log_event() 02710 */ 02711 02712 int Load_log_event::copy_log_event(const char *buf, ulong event_len, 02713 int body_offset, 02714 const Format_description_log_event *description_event) 02715 { 02716 DBUG_ENTER("Load_log_event::copy_log_event"); 02717 uint data_len; 02718 char* buf_end = (char*)buf + event_len; 02719 /* this is the beginning of the post-header */ 02720 const char* data_head = buf + description_event->common_header_len; 02721 slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET); 02722 exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET); 02723 skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET); 02724 table_name_len = (uint)data_head[L_TBL_LEN_OFFSET]; 02725 db_len = (uint)data_head[L_DB_LEN_OFFSET]; 02726 num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); 02727 02728 if ((int) event_len < body_offset) 02729 DBUG_RETURN(1); 02730 /* 02731 Sql_ex.init() on success returns the pointer to the first byte after 02732 the sql_ex structure, which is the start of field lengths array. 02733 */ 02734 if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset, 02735 buf_end, 02736 buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) 02737 DBUG_RETURN(1); 02738 02739 data_len = event_len - body_offset; 02740 if (num_fields > data_len) // simple sanity check against corruption 02741 DBUG_RETURN(1); 02742 for (uint i = 0; i < num_fields; i++) 02743 field_block_len += (uint)field_lens[i] + 1; 02744 02745 fields = (char*)field_lens + num_fields; 02746 table_name = fields + field_block_len; 02747 db = table_name + table_name_len + 1; 02748 fname = db + db_len + 1; 02749 fname_len = strlen(fname); 02750 // null termination is accomplished by the caller doing buf[event_len]=0 02751 02752 DBUG_RETURN(0); 02753 } 02754 02755 02756 /* 02757 Load_log_event::print() 02758 */ 02759 02760 #ifdef MYSQL_CLIENT 02761 void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 02762 { 02763 print(file, print_event_info, 0); 02764 } 02765 02766 02767 void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, 02768 bool commented) 02769 { 02770 DBUG_ENTER("Load_log_event::print"); 02771 if (!print_event_info->short_form) 02772 { 02773 print_header(file, print_event_info); 02774 fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n", 02775 thread_id, exec_time); 02776 } 02777 02778 bool different_db= 1; 02779 if (db) 02780 { 02781 /* 02782 If the database is different from the one of the previous statement, we 02783 need to print the "use" command, and we update the last_db. 02784 But if commented, the "use" is going to be commented so we should not 02785 update the last_db. 02786 */ 02787 if ((different_db= memcmp(print_event_info->db, db, db_len + 1)) && 02788 !commented) 02789 memcpy(print_event_info->db, db, db_len + 1); 02790 } 02791 02792 if (db && db[0] && different_db) 02793 fprintf(file, "%suse %s;\n", 02794 commented ? "# " : "", 02795 db); 02796 02797 if (flags & LOG_EVENT_THREAD_SPECIFIC_F) 02798 fprintf(file,"%sSET @@session.pseudo_thread_id=%lu;\n", 02799 commented ? "# " : "", (ulong)thread_id); 02800 fprintf(file, "%sLOAD DATA ", 02801 commented ? "# " : ""); 02802 if (check_fname_outside_temp_buf()) 02803 fprintf(file, "LOCAL "); 02804 fprintf(file, "INFILE '%-*s' ", fname_len, fname); 02805 02806 if (sql_ex.opt_flags & REPLACE_FLAG) 02807 fprintf(file," REPLACE "); 02808 else if (sql_ex.opt_flags & IGNORE_FLAG) 02809 fprintf(file," IGNORE "); 02810 02811 fprintf(file, "INTO TABLE `%s`", table_name); 02812 fprintf(file, " FIELDS TERMINATED BY "); 02813 pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); 02814 02815 if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) 02816 fprintf(file," OPTIONALLY "); 02817 fprintf(file, " ENCLOSED BY "); 02818 pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); 02819 02820 fprintf(file, " ESCAPED BY "); 02821 pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); 02822 02823 fprintf(file," LINES TERMINATED BY "); 02824 pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); 02825 02826 02827 if (sql_ex.line_start) 02828 { 02829 fprintf(file," STARTING BY "); 02830 pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); 02831 } 02832 if ((long) skip_lines > 0) 02833 fprintf(file, " IGNORE %ld LINES", (long) skip_lines); 02834 02835 if (num_fields) 02836 { 02837 uint i; 02838 const char* field = fields; 02839 fprintf(file, " ("); 02840 for (i = 0; i < num_fields; i++) 02841 { 02842 if (i) 02843 fputc(',', file); 02844 fprintf(file, field); 02845 02846 field += field_lens[i] + 1; 02847 } 02848 fputc(')', file); 02849 } 02850 02851 fprintf(file, ";\n"); 02852 DBUG_VOID_RETURN; 02853 } 02854 #endif /* MYSQL_CLIENT */ 02855 02856 02857 /* 02858 Load_log_event::set_fields() 02859 02860 Note that this function can not use the member variable 02861 for the database, since LOAD DATA INFILE on the slave 02862 can be for a different database than the current one. 02863 This is the reason for the affected_db argument to this method. 02864 */ 02865 02866 #ifndef MYSQL_CLIENT 02867 void Load_log_event::set_fields(const char* affected_db, 02868 List<Item> &field_list, 02869 Name_resolution_context *context) 02870 { 02871 uint i; 02872 const char* field = fields; 02873 for (i= 0; i < num_fields; i++) 02874 { 02875 field_list.push_back(new Item_field(context, 02876 affected_db, table_name, field)); 02877 field+= field_lens[i] + 1; 02878 } 02879 } 02880 #endif /* !MYSQL_CLIENT */ 02881 02882 02883 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 02884 /* 02885 Does the data loading job when executing a LOAD DATA on the slave 02886 02887 SYNOPSIS 02888 Load_log_event::exec_event 02889 net 02890 rli 02891 use_rli_only_for_errors - if set to 1, rli is provided to 02892 Load_log_event::exec_event only for this 02893 function to have RPL_LOG_NAME and 02894 rli->last_slave_error, both being used by 02895 error reports. rli's position advancing 02896 is skipped (done by the caller which is 02897 Execute_load_log_event::exec_event). 02898 - if set to 0, rli is provided for full use, 02899 i.e. for error reports and position 02900 advancing. 02901 02902 DESCRIPTION 02903 Does the data loading job when executing a LOAD DATA on the slave 02904 02905 RETURN VALUE 02906 0 Success 02907 1 Failure 02908 */ 02909 02910 int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, 02911 bool use_rli_only_for_errors) 02912 { 02913 LEX_STRING new_db; 02914 new_db.length= db_len; 02915 new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length); 02916 thd->set_db(new_db.str, new_db.length); 02917 DBUG_ASSERT(thd->query == 0); 02918 thd->query_length= 0; // Should not be needed 02919 thd->query_error= 0; 02920 clear_all_errors(thd, rli); 02921 02922 /* see Query_log_event::exec_event() and BUG#13360 */ 02923 DBUG_ASSERT(!rli->m_table_map.count()); 02924 /* 02925 Usually mysql_init_query() is called by mysql_parse(), but we need it here 02926 as the present method does not call mysql_parse(). 02927 */ 02928 mysql_init_query(thd, 0, 0); 02929 if (!use_rli_only_for_errors) 02930 { 02931 /* Saved for InnoDB, see comment in Query_log_event::exec_event() */ 02932 rli->future_group_master_log_pos= log_pos; 02933 DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); 02934 } 02935 02936 /* 02937 We test replicate_*_db rules. Note that we have already prepared the file 02938 to load, even if we are going to ignore and delete it now. So it is 02939 possible that we did a lot of disk writes for nothing. In other words, a 02940 big LOAD DATA INFILE on the master will still consume a lot of space on 02941 the slave (space in the relay log + space of temp files: twice the space 02942 of the file to load...) even if it will finally be ignored. 02943 TODO: fix this; this can be done by testing rules in 02944 Create_file_log_event::exec_event() and then discarding Append_block and 02945 al. Another way is do the filtering in the I/O thread (more efficient: no 02946 disk writes at all). 02947 02948 02949 Note: We do not need to execute reset_one_shot_variables() if this 02950 db_ok() test fails. 02951 Reason: The db stored in binlog events is the same for SET and for 02952 its companion query. If the SET is ignored because of 02953 db_ok(), the companion query will also be ignored, and if 02954 the companion query is ignored in the db_ok() test of 02955 ::exec_event(), then the companion SET also have so we 02956 don't need to reset_one_shot_variables(). 02957 */ 02958 if (rpl_filter->db_ok(thd->db)) 02959 { 02960 thd->set_time((time_t)when); 02961 VOID(pthread_mutex_lock(&LOCK_thread_count)); 02962 thd->query_id = next_query_id(); 02963 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 02964 /* 02965 Initing thd->row_count is not necessary in theory as this variable has no 02966 influence in the case of the slave SQL thread (it is used to generate a 02967 "data truncated" warning but which is absorbed and never gets to the 02968 error log); still we init it to avoid a Valgrind message. 02969 */ 02970 mysql_reset_errors(thd, 0); 02971 02972 TABLE_LIST tables; 02973 bzero((char*) &tables,sizeof(tables)); 02974 tables.db= thd->strmake(thd->db, thd->db_length); 02975 tables.alias = tables.table_name = (char*) table_name; 02976 tables.lock_type = TL_WRITE; 02977 tables.updating= 1; 02978 02979 // the table will be opened in mysql_load 02980 if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables)) 02981 { 02982 // TODO: this is a bug - this needs to be moved to the I/O thread 02983 if (net) 02984 skip_load_data_infile(net); 02985 } 02986 else 02987 { 02988 char llbuff[22]; 02989 char *end; 02990 enum enum_duplicates handle_dup; 02991 bool ignore= 0; 02992 char *load_data_query; 02993 02994 /* 02995 Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST 02996 and written to slave's binlog if binlogging is on. 02997 */ 02998 if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1))) 02999 { 03000 /* 03001 This will set thd->fatal_error in case of OOM. So we surely will notice 03002 that something is wrong. 03003 */ 03004 goto error; 03005 } 03006 03007 print_query(FALSE, load_data_query, &end, (char **)&thd->lex->fname_start, 03008 (char **)&thd->lex->fname_end); 03009 *end= 0; 03010 thd->query_length= end - load_data_query; 03011 thd->query= load_data_query; 03012 03013 if (sql_ex.opt_flags & REPLACE_FLAG) 03014 { 03015 handle_dup= DUP_REPLACE; 03016 } 03017 else if (sql_ex.opt_flags & IGNORE_FLAG) 03018 { 03019 ignore= 1; 03020 handle_dup= DUP_ERROR; 03021 } 03022 else 03023 { 03024 /* 03025 When replication is running fine, if it was DUP_ERROR on the 03026 master then we could choose IGNORE here, because if DUP_ERROR 03027 suceeded on master, and data is identical on the master and slave, 03028 then there should be no uniqueness errors on slave, so IGNORE is 03029 the same as DUP_ERROR. But in the unlikely case of uniqueness errors 03030 (because the data on the master and slave happen to be different 03031 (user error or bug), we want LOAD DATA to print an error message on 03032 the slave to discover the problem. 03033 03034 If reading from net (a 3.23 master), mysql_load() will change this 03035 to IGNORE. 03036 */ 03037 handle_dup= DUP_ERROR; 03038 } 03039 /* 03040 We need to set thd->lex->sql_command and thd->lex->duplicates 03041 since InnoDB tests these variables to decide if this is a LOAD 03042 DATA ... REPLACE INTO ... statement even though mysql_parse() 03043 is not called. This is not needed in 5.0 since there the LOAD 03044 DATA ... statement is replicated using mysql_parse(), which 03045 sets the thd->lex fields correctly. 03046 */ 03047 thd->lex->sql_command= SQLCOM_LOAD; 03048 thd->lex->duplicates= handle_dup; 03049 03050 sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG); 03051 String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs); 03052 String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs); 03053 String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs); 03054 String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs); 03055 String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs); 03056 ex.field_term= &field_term; 03057 ex.enclosed= &enclosed; 03058 ex.line_term= &line_term; 03059 ex.line_start= &line_start; 03060 ex.escaped= &escaped; 03061 03062 ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); 03063 if (sql_ex.empty_flags & FIELD_TERM_EMPTY) 03064 ex.field_term->length(0); 03065 03066 ex.skip_lines = skip_lines; 03067 List<Item> field_list; 03068 thd->main_lex.select_lex.context.resolve_in_table_list_only(&tables); 03069 set_fields(tables.db, field_list, &thd->main_lex.select_lex.context); 03070 thd->variables.pseudo_thread_id= thread_id; 03071 List<Item> set_fields; 03072 if (net) 03073 { 03074 // mysql_load will use thd->net to read the file 03075 thd->net.vio = net->vio; 03076 /* 03077 Make sure the client does not get confused about the packet sequence 03078 */ 03079 thd->net.pkt_nr = net->pkt_nr; 03080 } 03081 /* 03082 It is safe to use set_fields twice because we are not going to 03083 update it inside mysql_load(). 03084 */ 03085 if (mysql_load(thd, &ex, &tables, field_list, set_fields, set_fields, 03086 handle_dup, ignore, net != 0)) 03087 thd->query_error= 1; 03088 if (thd->cuted_fields) 03089 { 03090 /* log_pos is the position of the LOAD event in the master log */ 03091 sql_print_warning("Slave: load data infile on table '%s' at " 03092 "log position %s in log '%s' produced %ld " 03093 "warning(s). Default database: '%s'", 03094 (char*) table_name, 03095 llstr(log_pos,llbuff), RPL_LOG_NAME, 03096 (ulong) thd->cuted_fields, 03097 print_slave_db_safe(thd->db)); 03098 } 03099 if (net) 03100 net->pkt_nr= thd->net.pkt_nr; 03101 } 03102 } 03103 else 03104 { 03105 /* 03106 We will just ask the master to send us /dev/null if we do not 03107 want to load the data. 03108 TODO: this a bug - needs to be done in I/O thread 03109 */ 03110 if (net) 03111 skip_load_data_infile(net); 03112 } 03113 03114 error: 03115 thd->net.vio = 0; 03116 const char *remember_db= thd->db; 03117 VOID(pthread_mutex_lock(&LOCK_thread_count)); 03118 thd->catalog= 0; 03119 thd->set_db(NULL, 0); /* will free the current database */ 03120 thd->query= 0; 03121 thd->query_length= 0; 03122 VOID(pthread_mutex_unlock(&LOCK_thread_count)); 03123 close_thread_tables(thd); 03124 if (thd->query_error) 03125 { 03126 /* this err/sql_errno code is copy-paste from net_send_error() */ 03127 const char *err; 03128 int sql_errno; 03129 if ((err=thd->net.last_error)[0]) 03130 sql_errno=thd->net.last_errno; 03131 else 03132 { 03133 sql_errno=ER_UNKNOWN_ERROR; 03134 err=ER(sql_errno); 03135 } 03136 slave_print_msg(ERROR_LEVEL, rli, sql_errno,"\ 03137 Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", 03138 err, (char*)table_name, print_slave_db_safe(remember_db)); 03139 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); 03140 return 1; 03141 } 03142 free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); 03143 03144 if (thd->is_fatal_error) 03145 { 03146 slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR, "\ 03147 Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'", 03148 (char*)table_name, print_slave_db_safe(remember_db)); 03149 return 1; 03150 } 03151 03152 return ( use_rli_only_for_errors ? 0 : Log_event::exec_event(rli) ); 03153 } 03154 #endif 03155 03156 03157 /************************************************************************** 03158 Rotate_log_event methods 03159 **************************************************************************/ 03160 03161 /* 03162 Rotate_log_event::pack_info() 03163 */ 03164 03165 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03166 void Rotate_log_event::pack_info(Protocol *protocol) 03167 { 03168 char buf1[256], buf[22]; 03169 String tmp(buf1, sizeof(buf1), log_cs); 03170 tmp.length(0); 03171 tmp.append(new_log_ident, ident_len); 03172 tmp.append(STRING_WITH_LEN(";pos=")); 03173 tmp.append(llstr(pos,buf)); 03174 protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); 03175 } 03176 #endif 03177 03178 03179 /* 03180 Rotate_log_event::print() 03181 */ 03182 03183 #ifdef MYSQL_CLIENT 03184 void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03185 { 03186 char buf[22]; 03187 03188 if (print_event_info->short_form) 03189 return; 03190 print_header(file, print_event_info); 03191 fprintf(file, "\tRotate to "); 03192 if (new_log_ident) 03193 my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, 03194 MYF(MY_NABP | MY_WME)); 03195 fprintf(file, " pos: %s", llstr(pos, buf)); 03196 fputc('\n', file); 03197 fflush(file); 03198 } 03199 #endif /* MYSQL_CLIENT */ 03200 03201 03202 03203 /* 03204 Rotate_log_event::Rotate_log_event() (2 constructors) 03205 */ 03206 03207 03208 #ifndef MYSQL_CLIENT 03209 Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, 03210 uint ident_len_arg, ulonglong pos_arg, 03211 uint flags_arg) 03212 :Log_event(), new_log_ident(new_log_ident_arg), 03213 pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg : 03214 (uint) strlen(new_log_ident_arg)), flags(flags_arg) 03215 { 03216 #ifndef DBUG_OFF 03217 char buff[22]; 03218 DBUG_ENTER("Rotate_log_event::Rotate_log_event(...,flags)"); 03219 DBUG_PRINT("enter",("new_log_ident %s pos %s flags %lu", new_log_ident_arg, 03220 llstr(pos_arg, buff), flags)); 03221 #endif 03222 if (flags & DUP_NAME) 03223 new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME)); 03224 DBUG_VOID_RETURN; 03225 } 03226 #endif 03227 03228 03229 Rotate_log_event::Rotate_log_event(const char* buf, uint event_len, 03230 const Format_description_log_event* description_event) 03231 :Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME) 03232 { 03233 DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)"); 03234 // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET 03235 uint8 header_size= description_event->common_header_len; 03236 uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1]; 03237 uint ident_offset; 03238 if (event_len < header_size) 03239 DBUG_VOID_RETURN; 03240 buf += header_size; 03241 pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4; 03242 ident_len = (uint)(event_len - 03243 (header_size+post_header_len)); 03244 ident_offset = post_header_len; 03245 set_if_smaller(ident_len,FN_REFLEN-1); 03246 new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME)); 03247 DBUG_VOID_RETURN; 03248 } 03249 03250 03251 /* 03252 Rotate_log_event::write() 03253 */ 03254 03255 #ifndef MYSQL_CLIENT 03256 bool Rotate_log_event::write(IO_CACHE* file) 03257 { 03258 char buf[ROTATE_HEADER_LEN]; 03259 int8store(buf + R_POS_OFFSET, pos); 03260 return (write_header(file, ROTATE_HEADER_LEN + ident_len) || 03261 my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || 03262 my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); 03263 } 03264 #endif 03265 03266 /* 03267 Rotate_log_event::exec_event() 03268 03269 Got a rotate log event from the master 03270 03271 IMPLEMENTATION 03272 This is mainly used so that we can later figure out the logname and 03273 position for the master. 03274 03275 We can't rotate the slave's BINlog as this will cause infinitive rotations 03276 in a A -> B -> A setup. 03277 The NOTES below is a wrong comment which will disappear when 4.1 is merged. 03278 03279 RETURN VALUES 03280 0 ok 03281 */ 03282 03283 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03284 int Rotate_log_event::exec_event(struct st_relay_log_info* rli) 03285 { 03286 DBUG_ENTER("Rotate_log_event::exec_event"); 03287 03288 pthread_mutex_lock(&rli->data_lock); 03289 rli->event_relay_log_pos= my_b_tell(rli->cur_log); 03290 /* 03291 If we are in a transaction: the only normal case is when the I/O thread was 03292 copying a big transaction, then it was stopped and restarted: we have this 03293 in the relay log: 03294 BEGIN 03295 ... 03296 ROTATE (a fake one) 03297 ... 03298 COMMIT or ROLLBACK 03299 In that case, we don't want to touch the coordinates which correspond to 03300 the beginning of the transaction. 03301 Starting from 5.0.0, there also are some rotates from the slave itself, in 03302 the relay log. 03303 */ 03304 if (!(thd->options & OPTION_BEGIN)) 03305 { 03306 memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); 03307 rli->notify_group_master_log_name_update(); 03308 rli->group_master_log_pos= pos; 03309 rli->group_relay_log_pos= rli->event_relay_log_pos; 03310 DBUG_PRINT("info", ("group_master_log_name: '%s' " 03311 "group_master_log_pos: %lu", 03312 rli->group_master_log_name, 03313 (ulong) rli->group_master_log_pos)); 03314 /* 03315 Reset thd->options and sql_mode etc, because this could be the signal of 03316 a master's downgrade from 5.0 to 4.0. 03317 However, no need to reset description_event_for_exec: indeed, if the next 03318 master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next 03319 master is 4.0 then the events are in the slave's format (conversion). 03320 */ 03321 set_slave_thread_options(thd); 03322 set_slave_thread_default_charset(thd, rli); 03323 thd->variables.sql_mode= global_system_variables.sql_mode; 03324 thd->variables.auto_increment_increment= 03325 thd->variables.auto_increment_offset= 1; 03326 } 03327 pthread_mutex_unlock(&rli->data_lock); 03328 pthread_cond_broadcast(&rli->data_cond); 03329 flush_relay_log_info(rli); 03330 DBUG_RETURN(0); 03331 } 03332 #endif 03333 03334 03335 /************************************************************************** 03336 Intvar_log_event methods 03337 **************************************************************************/ 03338 03339 /* 03340 Intvar_log_event::pack_info() 03341 */ 03342 03343 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03344 void Intvar_log_event::pack_info(Protocol *protocol) 03345 { 03346 char buf[256], *pos; 03347 pos= strmake(buf, get_var_type_name(), sizeof(buf)-23); 03348 *pos++= '='; 03349 pos= longlong10_to_str(val, pos, -10); 03350 protocol->store(buf, (uint) (pos-buf), &my_charset_bin); 03351 } 03352 #endif 03353 03354 03355 /* 03356 Intvar_log_event::Intvar_log_event() 03357 */ 03358 03359 Intvar_log_event::Intvar_log_event(const char* buf, 03360 const Format_description_log_event* description_event) 03361 :Log_event(buf, description_event) 03362 { 03363 buf+= description_event->common_header_len; 03364 type= buf[I_TYPE_OFFSET]; 03365 val= uint8korr(buf+I_VAL_OFFSET); 03366 } 03367 03368 03369 /* 03370 Intvar_log_event::get_var_type_name() 03371 */ 03372 03373 const char* Intvar_log_event::get_var_type_name() 03374 { 03375 switch(type) { 03376 case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID"; 03377 case INSERT_ID_EVENT: return "INSERT_ID"; 03378 default: /* impossible */ return "UNKNOWN"; 03379 } 03380 } 03381 03382 03383 /* 03384 Intvar_log_event::write() 03385 */ 03386 03387 #ifndef MYSQL_CLIENT 03388 bool Intvar_log_event::write(IO_CACHE* file) 03389 { 03390 byte buf[9]; 03391 buf[I_TYPE_OFFSET]= (byte) type; 03392 int8store(buf + I_VAL_OFFSET, val); 03393 return (write_header(file, sizeof(buf)) || 03394 my_b_safe_write(file, buf, sizeof(buf))); 03395 } 03396 #endif 03397 03398 03399 /* 03400 Intvar_log_event::print() 03401 */ 03402 03403 #ifdef MYSQL_CLIENT 03404 void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03405 { 03406 char llbuff[22]; 03407 const char *msg; 03408 LINT_INIT(msg); 03409 03410 if (!print_event_info->short_form) 03411 { 03412 print_header(file, print_event_info); 03413 fprintf(file, "\tIntvar\n"); 03414 } 03415 03416 fprintf(file, "SET "); 03417 switch (type) { 03418 case LAST_INSERT_ID_EVENT: 03419 msg="LAST_INSERT_ID"; 03420 break; 03421 case INSERT_ID_EVENT: 03422 msg="INSERT_ID"; 03423 break; 03424 case INVALID_INT_EVENT: 03425 default: // cannot happen 03426 msg="INVALID_INT"; 03427 break; 03428 } 03429 fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); 03430 fflush(file); 03431 } 03432 #endif 03433 03434 03435 /* 03436 Intvar_log_event::exec_event() 03437 */ 03438 03439 #if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT) 03440 int Intvar_log_event::exec_event(struct st_relay_log_info* rli) 03441 { 03442 switch (type) { 03443 case LAST_INSERT_ID_EVENT: 03444 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1; 03445 thd->first_successful_insert_id_in_prev_stmt= val; 03446 break; 03447 case INSERT_ID_EVENT: 03448 thd->force_one_auto_inc_interval(val); 03449 break; 03450 } 03451 rli->inc_event_relay_log_pos(); 03452 return 0; 03453 } 03454 #endif 03455 03456 03457 /************************************************************************** 03458 Rand_log_event methods 03459 **************************************************************************/ 03460 03461 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03462 void Rand_log_event::pack_info(Protocol *protocol) 03463 { 03464 char buf1[256], *pos; 03465 pos= strmov(buf1,"rand_seed1="); 03466 pos= int10_to_str((long) seed1, pos, 10); 03467 pos= strmov(pos, ",rand_seed2="); 03468 pos= int10_to_str((long) seed2, pos, 10); 03469 protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin); 03470 } 03471 #endif 03472 03473 03474 Rand_log_event::Rand_log_event(const char* buf, 03475 const Format_description_log_event* description_event) 03476 :Log_event(buf, description_event) 03477 { 03478 buf+= description_event->common_header_len; 03479 seed1= uint8korr(buf+RAND_SEED1_OFFSET); 03480 seed2= uint8korr(buf+RAND_SEED2_OFFSET); 03481 } 03482 03483 03484 #ifndef MYSQL_CLIENT 03485 bool Rand_log_event::write(IO_CACHE* file) 03486 { 03487 byte buf[16]; 03488 int8store(buf + RAND_SEED1_OFFSET, seed1); 03489 int8store(buf + RAND_SEED2_OFFSET, seed2); 03490 return (write_header(file, sizeof(buf)) || 03491 my_b_safe_write(file, buf, sizeof(buf))); 03492 } 03493 #endif 03494 03495 03496 #ifdef MYSQL_CLIENT 03497 void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03498 { 03499 char llbuff[22],llbuff2[22]; 03500 if (!print_event_info->short_form) 03501 { 03502 print_header(file, print_event_info); 03503 fprintf(file, "\tRand\n"); 03504 } 03505 fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", 03506 llstr(seed1, llbuff),llstr(seed2, llbuff2)); 03507 fflush(file); 03508 } 03509 #endif /* MYSQL_CLIENT */ 03510 03511 03512 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03513 int Rand_log_event::exec_event(struct st_relay_log_info* rli) 03514 { 03515 thd->rand.seed1= (ulong) seed1; 03516 thd->rand.seed2= (ulong) seed2; 03517 rli->inc_event_relay_log_pos(); 03518 return 0; 03519 } 03520 #endif /* !MYSQL_CLIENT */ 03521 03522 03523 /************************************************************************** 03524 Xid_log_event methods 03525 **************************************************************************/ 03526 03527 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03528 void Xid_log_event::pack_info(Protocol *protocol) 03529 { 03530 char buf[128], *pos; 03531 pos= strmov(buf, "COMMIT /* xid="); 03532 pos= longlong10_to_str(xid, pos, 10); 03533 pos= strmov(pos, " */"); 03534 protocol->store(buf, (uint) (pos-buf), &my_charset_bin); 03535 } 03536 #endif 03537 03538 /* 03539 NOTE it's ok not to use int8store here, 03540 as long as xid_t::set(ulonglong) and 03541 xid_t::get_my_xid doesn't do it either 03542 03543 we don't care about actual values of xids as long as 03544 identical numbers compare identically 03545 */ 03546 03547 Xid_log_event:: 03548 Xid_log_event(const char* buf, 03549 const Format_description_log_event *description_event) 03550 :Log_event(buf, description_event) 03551 { 03552 buf+= description_event->common_header_len; 03553 memcpy((char*) &xid, buf, sizeof(xid)); 03554 } 03555 03556 03557 #ifndef MYSQL_CLIENT 03558 bool Xid_log_event::write(IO_CACHE* file) 03559 { 03560 return write_header(file, sizeof(xid)) || 03561 my_b_safe_write(file, (byte*) &xid, sizeof(xid)); 03562 } 03563 #endif 03564 03565 03566 #ifdef MYSQL_CLIENT 03567 void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03568 { 03569 if (!print_event_info->short_form) 03570 { 03571 char buf[64]; 03572 longlong10_to_str(xid, buf, 10); 03573 03574 print_header(file, print_event_info); 03575 fprintf(file, "\tXid = %s\n", buf); 03576 fflush(file); 03577 } 03578 fprintf(file, "COMMIT;\n"); 03579 } 03580 #endif /* MYSQL_CLIENT */ 03581 03582 03583 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03584 int Xid_log_event::exec_event(struct st_relay_log_info* rli) 03585 { 03586 /* For a slave Xid_log_event is COMMIT */ 03587 general_log_print(thd, COM_QUERY, 03588 "COMMIT /* implicit, from Xid_log_event */"); 03589 return end_trans(thd, COMMIT) || Log_event::exec_event(rli); 03590 } 03591 #endif /* !MYSQL_CLIENT */ 03592 03593 03594 /************************************************************************** 03595 User_var_log_event methods 03596 **************************************************************************/ 03597 03598 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03599 void User_var_log_event::pack_info(Protocol* protocol) 03600 { 03601 char *buf= 0; 03602 uint val_offset= 4 + name_len; 03603 uint event_len= val_offset; 03604 03605 if (is_null) 03606 { 03607 buf= my_malloc(val_offset + 5, MYF(MY_WME)); 03608 strmov(buf + val_offset, "NULL"); 03609 event_len= val_offset + 4; 03610 } 03611 else 03612 { 03613 switch (type) { 03614 case REAL_RESULT: 03615 double real_val; 03616 float8get(real_val, val); 03617 buf= my_malloc(val_offset + FLOATING_POINT_BUFFER, MYF(MY_WME)); 03618 event_len+= my_sprintf(buf + val_offset, 03619 (buf + val_offset, "%.14g", real_val)); 03620 break; 03621 case INT_RESULT: 03622 buf= my_malloc(val_offset + 22, MYF(MY_WME)); 03623 event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf; 03624 break; 03625 case DECIMAL_RESULT: 03626 { 03627 buf= my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH, MYF(MY_WME)); 03628 String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin); 03629 my_decimal dec; 03630 binary2my_decimal(E_DEC_FATAL_ERROR, val+2, &dec, val[0], val[1]); 03631 my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str); 03632 event_len= str.length() + val_offset; 03633 break; 03634 } 03635 case STRING_RESULT: 03636 /* 15 is for 'COLLATE' and other chars */ 03637 buf= my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15, MYF(MY_WME)); 03638 CHARSET_INFO *cs; 03639 if (!(cs= get_charset(charset_number, MYF(0)))) 03640 { 03641 strmov(buf+val_offset, "???"); 03642 event_len+= 3; 03643 } 03644 else 03645 { 03646 char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NullS); 03647 p= str_to_hex(p, val, val_len); 03648 p= strxmov(p, " COLLATE ", cs->name, NullS); 03649 event_len= p-buf; 03650 } 03651 break; 03652 case ROW_RESULT: 03653 default: 03654 DBUG_ASSERT(1); 03655 return; 03656 } 03657 } 03658 buf[0]= '@'; 03659 buf[1]= '`'; 03660 buf[2+name_len]= '`'; 03661 buf[3+name_len]= '='; 03662 memcpy(buf+2, name, name_len); 03663 protocol->store(buf, event_len, &my_charset_bin); 03664 my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); 03665 } 03666 #endif /* !MYSQL_CLIENT */ 03667 03668 03669 User_var_log_event:: 03670 User_var_log_event(const char* buf, 03671 const Format_description_log_event* description_event) 03672 :Log_event(buf, description_event) 03673 { 03674 buf+= description_event->common_header_len; 03675 name_len= uint4korr(buf); 03676 name= (char *) buf + UV_NAME_LEN_SIZE; 03677 buf+= UV_NAME_LEN_SIZE + name_len; 03678 is_null= (bool) *buf; 03679 if (is_null) 03680 { 03681 type= STRING_RESULT; 03682 charset_number= my_charset_bin.number; 03683 val_len= 0; 03684 val= 0; 03685 } 03686 else 03687 { 03688 type= (Item_result) buf[UV_VAL_IS_NULL]; 03689 charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE); 03690 val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 03691 UV_CHARSET_NUMBER_SIZE); 03692 val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 03693 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE); 03694 } 03695 } 03696 03697 03698 #ifndef MYSQL_CLIENT 03699 bool User_var_log_event::write(IO_CACHE* file) 03700 { 03701 char buf[UV_NAME_LEN_SIZE]; 03702 char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 03703 UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE]; 03704 char buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2; 03705 uint buf1_length; 03706 ulong event_length; 03707 03708 int4store(buf, name_len); 03709 03710 if ((buf1[0]= is_null)) 03711 { 03712 buf1_length= 1; 03713 val_len= 0; 03714 } 03715 else 03716 { 03717 buf1[1]= type; 03718 int4store(buf1 + 2, charset_number); 03719 03720 switch (type) { 03721 case REAL_RESULT: 03722 float8store(buf2, *(double*) val); 03723 break; 03724 case INT_RESULT: 03725 int8store(buf2, *(longlong*) val); 03726 break; 03727 case DECIMAL_RESULT: 03728 { 03729 my_decimal *dec= (my_decimal *)val; 03730 dec->fix_buffer_pointer(); 03731 buf2[0]= (char)(dec->intg + dec->frac); 03732 buf2[1]= (char)dec->frac; 03733 decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]); 03734 val_len= decimal_bin_size(buf2[0], buf2[1]) + 2; 03735 break; 03736 } 03737 case STRING_RESULT: 03738 pos= val; 03739 break; 03740 case ROW_RESULT: 03741 default: 03742 DBUG_ASSERT(1); 03743 return 0; 03744 } 03745 int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len); 03746 buf1_length= 10; 03747 } 03748 03749 /* Length of the whole event */ 03750 event_length= sizeof(buf)+ name_len + buf1_length + val_len; 03751 03752 return (write_header(file, event_length) || 03753 my_b_safe_write(file, (byte*) buf, sizeof(buf)) || 03754 my_b_safe_write(file, (byte*) name, name_len) || 03755 my_b_safe_write(file, (byte*) buf1, buf1_length) || 03756 my_b_safe_write(file, (byte*) pos, val_len)); 03757 } 03758 #endif 03759 03760 03761 /* 03762 User_var_log_event::print() 03763 */ 03764 03765 #ifdef MYSQL_CLIENT 03766 void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03767 { 03768 if (!print_event_info->short_form) 03769 { 03770 print_header(file, print_event_info); 03771 fprintf(file, "\tUser_var\n"); 03772 } 03773 03774 fprintf(file, "SET @`"); 03775 my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); 03776 fprintf(file, "`"); 03777 03778 if (is_null) 03779 { 03780 fprintf(file, ":=NULL;\n"); 03781 } 03782 else 03783 { 03784 switch (type) { 03785 case REAL_RESULT: 03786 double real_val; 03787 float8get(real_val, val); 03788 fprintf(file, ":=%.14g;\n", real_val); 03789 break; 03790 case INT_RESULT: 03791 char int_buf[22]; 03792 longlong10_to_str(uint8korr(val), int_buf, -10); 03793 fprintf(file, ":=%s;\n", int_buf); 03794 break; 03795 case DECIMAL_RESULT: 03796 { 03797 char str_buf[200]; 03798 int str_len= sizeof(str_buf) - 1; 03799 int precision= (int)val[0]; 03800 int scale= (int)val[1]; 03801 decimal_digit_t dec_buf[10]; 03802 decimal_t dec; 03803 dec.len= 10; 03804 dec.buf= dec_buf; 03805 03806 bin2decimal(val+2, &dec, precision, scale); 03807 decimal2string(&dec, str_buf, &str_len, 0, 0, 0); 03808 str_buf[str_len]= 0; 03809 fprintf(file, ":=%s;\n",str_buf); 03810 break; 03811 } 03812 case STRING_RESULT: 03813 { 03814 /* 03815 Let's express the string in hex. That's the most robust way. If we 03816 print it in character form instead, we need to escape it with 03817 character_set_client which we don't know (we will know it in 5.0, but 03818 in 4.1 we don't know it easily when we are printing 03819 User_var_log_event). Explanation why we would need to bother with 03820 character_set_client (quoting Bar): 03821 > Note, the parser doesn't switch to another unescaping mode after 03822 > it has met a character set introducer. 03823 > For example, if an SJIS client says something like: 03824 > SET @a= _ucs2 \0a\0b' 03825 > the string constant is still unescaped according to SJIS, not 03826 > according to UCS2. 03827 */ 03828 char *hex_str; 03829 CHARSET_INFO *cs; 03830 03831 if (!(hex_str= (char *)my_alloca(2*val_len+1+2))) // 2 hex digits / byte 03832 break; // no error, as we are 'void' 03833 str_to_hex(hex_str, val, val_len); 03834 /* 03835 For proper behaviour when mysqlbinlog|mysql, we need to explicitely 03836 specify the variable's collation. It will however cause problems when 03837 people want to mysqlbinlog|mysql into another server not supporting the 03838 character set. But there's not much to do about this and it's unlikely. 03839 */ 03840 if (!(cs= get_charset(charset_number, MYF(0)))) 03841 /* 03842 Generate an unusable command (=> syntax error) is probably the best 03843 thing we can do here. 03844 */ 03845 fprintf(file, ":=???;\n"); 03846 else 03847 fprintf(file, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); 03848 my_afree(hex_str); 03849 } 03850 break; 03851 case ROW_RESULT: 03852 default: 03853 DBUG_ASSERT(1); 03854 return; 03855 } 03856 } 03857 fflush(file); 03858 } 03859 #endif 03860 03861 03862 /* 03863 User_var_log_event::exec_event() 03864 */ 03865 03866 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 03867 int User_var_log_event::exec_event(struct st_relay_log_info* rli) 03868 { 03869 Item *it= 0; 03870 CHARSET_INFO *charset; 03871 if (!(charset= get_charset(charset_number, MYF(MY_WME)))) 03872 return 1; 03873 LEX_STRING user_var_name; 03874 user_var_name.str= name; 03875 user_var_name.length= name_len; 03876 double real_val; 03877 longlong int_val; 03878 03879 if (is_null) 03880 { 03881 it= new Item_null(); 03882 } 03883 else 03884 { 03885 switch (type) { 03886 case REAL_RESULT: 03887 float8get(real_val, val); 03888 it= new Item_float(real_val); 03889 val= (char*) &real_val; // Pointer to value in native format 03890 val_len= 8; 03891 break; 03892 case INT_RESULT: 03893 int_val= (longlong) uint8korr(val); 03894 it= new Item_int(int_val); 03895 val= (char*) &int_val; // Pointer to value in native format 03896 val_len= 8; 03897 break; 03898 case DECIMAL_RESULT: 03899 { 03900 Item_decimal *dec= new Item_decimal(val+2, val[0], val[1]); 03901 it= dec; 03902 val= (char *)dec->val_decimal(NULL); 03903 val_len= sizeof(my_decimal); 03904 break; 03905 } 03906 case STRING_RESULT: 03907 it= new Item_string(val, val_len, charset); 03908 break; 03909 case ROW_RESULT: 03910 default: 03911 DBUG_ASSERT(1); 03912 return 0; 03913 } 03914 } 03915 Item_func_set_user_var e(user_var_name, it); 03916 /* 03917 Item_func_set_user_var can't substitute something else on its place => 03918 0 can be passed as last argument (reference on item) 03919 */ 03920 e.fix_fields(thd, 0); 03921 /* 03922 A variable can just be considered as a table with 03923 a single record and with a single column. Thus, like 03924 a column value, it could always have IMPLICIT derivation. 03925 */ 03926 e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT); 03927 free_root(thd->mem_root,0); 03928 03929 rli->inc_event_relay_log_pos(); 03930 return 0; 03931 } 03932 #endif /* !MYSQL_CLIENT */ 03933 03934 03935 /************************************************************************** 03936 Slave_log_event methods 03937 **************************************************************************/ 03938 03939 #ifdef HAVE_REPLICATION 03940 #ifdef MYSQL_CLIENT 03941 void Unknown_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 03942 { 03943 if (print_event_info->short_form) 03944 return; 03945 print_header(file, print_event_info); 03946 fputc('\n', file); 03947 fprintf(file, "# %s", "Unknown event\n"); 03948 } 03949 #endif 03950 03951 #ifndef MYSQL_CLIENT 03952 void Slave_log_event::pack_info(Protocol *protocol) 03953 { 03954 char buf[256+HOSTNAME_LENGTH], *pos; 03955 pos= strmov(buf, "host="); 03956 pos= strnmov(pos, master_host, HOSTNAME_LENGTH); 03957 pos= strmov(pos, ",port="); 03958 pos= int10_to_str((long) master_port, pos, 10); 03959 pos= strmov(pos, ",log="); 03960 pos= strmov(pos, master_log); 03961 pos= strmov(pos, ",pos="); 03962 pos= longlong10_to_str(master_pos, pos, 10); 03963 protocol->store(buf, pos-buf, &my_charset_bin); 03964 } 03965 #endif /* !MYSQL_CLIENT */ 03966 03967 03968 #ifndef MYSQL_CLIENT 03969 Slave_log_event::Slave_log_event(THD* thd_arg, 03970 struct st_relay_log_info* rli) 03971 :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0) 03972 { 03973 DBUG_ENTER("Slave_log_event"); 03974 if (!rli->inited) // QQ When can this happen ? 03975 DBUG_VOID_RETURN; 03976 03977 MASTER_INFO* mi = rli->mi; 03978 // TODO: re-write this better without holding both locks at the same time 03979 pthread_mutex_lock(&mi->data_lock); 03980 pthread_mutex_lock(&rli->data_lock); 03981 master_host_len = strlen(mi->host); 03982 master_log_len = strlen(rli->group_master_log_name); 03983 // on OOM, just do not initialize the structure and print the error 03984 if ((mem_pool = (char*)my_malloc(get_data_size() + 1, 03985 MYF(MY_WME)))) 03986 { 03987 master_host = mem_pool + SL_MASTER_HOST_OFFSET ; 03988 memcpy(master_host, mi->host, master_host_len + 1); 03989 master_log = master_host + master_host_len + 1; 03990 memcpy(master_log, rli->group_master_log_name, master_log_len + 1); 03991 master_port = mi->port; 03992 master_pos = rli->group_master_log_pos; 03993 DBUG_PRINT("info", ("master_log: %s pos: %d", master_log, 03994 (ulong) master_pos)); 03995 } 03996 else 03997 sql_print_error("Out of memory while recording slave event"); 03998 pthread_mutex_unlock(&rli->data_lock); 03999 pthread_mutex_unlock(&mi->data_lock); 04000 DBUG_VOID_RETURN; 04001 } 04002 #endif /* !MYSQL_CLIENT */ 04003 04004 04005 Slave_log_event::~Slave_log_event() 04006 { 04007 my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR)); 04008 } 04009 04010 04011 #ifdef MYSQL_CLIENT 04012 void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 04013 { 04014 char llbuff[22]; 04015 if (print_event_info->short_form) 04016 return; 04017 print_header(file, print_event_info); 04018 fputc('\n', file); 04019 fprintf(file, "\ 04020 Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n", 04021 master_host, master_port, master_log, llstr(master_pos, llbuff)); 04022 } 04023 #endif /* MYSQL_CLIENT */ 04024 04025 04026 int Slave_log_event::get_data_size() 04027 { 04028 return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET; 04029 } 04030 04031 04032 #ifndef MYSQL_CLIENT 04033 bool Slave_log_event::write(IO_CACHE* file) 04034 { 04035 ulong event_length= get_data_size(); 04036 int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); 04037 int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); 04038 // log and host are already there 04039 04040 return (write_header(file, event_length) || 04041 my_b_safe_write(file, (byte*) mem_pool, event_length)); 04042 } 04043 #endif 04044 04045 04046 void Slave_log_event::init_from_mem_pool(int data_size) 04047 { 04048 master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET); 04049 master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET); 04050 master_host = mem_pool + SL_MASTER_HOST_OFFSET; 04051 master_host_len = strlen(master_host); 04052 // safety 04053 master_log = master_host + master_host_len + 1; 04054 if (master_log > mem_pool + data_size) 04055 { 04056 master_host = 0; 04057 return; 04058 } 04059 master_log_len = strlen(master_log); 04060 } 04061 04062 04063 /* This code is not used, so has not been updated to be format-tolerant */ 04064 Slave_log_event::Slave_log_event(const char* buf, uint event_len) 04065 :Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0) 04066 { 04067 if (event_len < LOG_EVENT_HEADER_LEN) 04068 return; 04069 event_len -= LOG_EVENT_HEADER_LEN; 04070 if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME)))) 04071 return; 04072 memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); 04073 mem_pool[event_len] = 0; 04074 init_from_mem_pool(event_len); 04075 } 04076 04077 04078 #ifndef MYSQL_CLIENT 04079 int Slave_log_event::exec_event(struct st_relay_log_info* rli) 04080 { 04081 if (mysql_bin_log.is_open()) 04082 mysql_bin_log.write(this); 04083 return Log_event::exec_event(rli); 04084 } 04085 #endif /* !MYSQL_CLIENT */ 04086 04087 04088 /************************************************************************** 04089 Stop_log_event methods 04090 **************************************************************************/ 04091 04092 /* 04093 Stop_log_event::print() 04094 */ 04095 04096 #ifdef MYSQL_CLIENT 04097 void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 04098 { 04099 if (print_event_info->short_form) 04100 return; 04101 04102 print_header(file, print_event_info); 04103 fprintf(file, "\tStop\n"); 04104 fflush(file); 04105 } 04106 #endif /* MYSQL_CLIENT */ 04107 04108 04109 /* 04110 Stop_log_event::exec_event() 04111 04112 The master stopped. 04113 We used to clean up all temporary tables but this is useless as, as the 04114 master has shut down properly, it has written all DROP TEMPORARY TABLE 04115 (prepared statements' deletion is TODO only when we binlog prep stmts). 04116 We used to clean up slave_load_tmpdir, but this is useless as it has been 04117 cleared at the end of LOAD DATA INFILE. 04118 So we have nothing to do here. 04119 The place were we must do this cleaning is in Start_log_event_v3::exec_event(), 04120 not here. Because if we come here, the master was sane. 04121 */ 04122 04123 #ifndef MYSQL_CLIENT 04124 int Stop_log_event::exec_event(struct st_relay_log_info* rli) 04125 { 04126 /* 04127 We do not want to update master_log pos because we get a rotate event 04128 before stop, so by now group_master_log_name is set to the next log. 04129 If we updated it, we will have incorrect master coordinates and this 04130 could give false triggers in MASTER_POS_WAIT() that we have reached 04131 the target position when in fact we have not. 04132 */ 04133 if (thd->options & OPTION_BEGIN) 04134 rli->inc_event_relay_log_pos(); 04135 else 04136 { 04137 rli->inc_group_relay_log_pos(0); 04138 flush_relay_log_info(rli); 04139 } 04140 return 0; 04141 } 04142 #endif /* !MYSQL_CLIENT */ 04143 #endif /* HAVE_REPLICATION */ 04144 04145 04146 /************************************************************************** 04147 Create_file_log_event methods 04148 **************************************************************************/ 04149 04150 /* 04151 Create_file_log_event ctor 04152 */ 04153 04154 #ifndef MYSQL_CLIENT 04155 Create_file_log_event:: 04156 Create_file_log_event(THD* thd_arg, sql_exchange* ex, 04157 const char* db_arg, const char* table_name_arg, 04158 List<Item>& fields_arg, enum enum_duplicates handle_dup, 04159 bool ignore, 04160 char* block_arg, uint block_len_arg, bool using_trans) 04161 :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore, 04162 using_trans), 04163 fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg), 04164 file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) 04165 { 04166 DBUG_ENTER("Create_file_log_event"); 04167 sql_ex.force_new_format(); 04168 DBUG_VOID_RETURN; 04169 } 04170 04171 04172 /* 04173 Create_file_log_event::write_data_body() 04174 */ 04175 04176 bool Create_file_log_event::write_data_body(IO_CACHE* file) 04177 { 04178 bool res; 04179 if ((res= Load_log_event::write_data_body(file)) || fake_base) 04180 return res; 04181 return (my_b_safe_write(file, (byte*) "", 1) || 04182 my_b_safe_write(file, (byte*) block, block_len)); 04183 } 04184 04185 04186 /* 04187 Create_file_log_event::write_data_header() 04188 */ 04189 04190 bool Create_file_log_event::write_data_header(IO_CACHE* file) 04191 { 04192 bool res; 04193 byte buf[CREATE_FILE_HEADER_LEN]; 04194 if ((res= Load_log_event::write_data_header(file)) || fake_base) 04195 return res; 04196 int4store(buf + CF_FILE_ID_OFFSET, file_id); 04197 return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0; 04198 } 04199 04200 04201 /* 04202 Create_file_log_event::write_base() 04203 */ 04204 04205 bool Create_file_log_event::write_base(IO_CACHE* file) 04206 { 04207 bool res; 04208 fake_base= 1; // pretend we are Load event 04209 res= write(file); 04210 fake_base= 0; 04211 return res; 04212 } 04213 04214 #endif /* !MYSQL_CLIENT */ 04215 04216 /* 04217 Create_file_log_event ctor 04218 */ 04219 04220 Create_file_log_event::Create_file_log_event(const char* buf, uint len, 04221 const Format_description_log_event* description_event) 04222 :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0) 04223 { 04224 DBUG_ENTER("Create_file_log_event::Create_file_log_event(char*,...)"); 04225 uint block_offset; 04226 uint header_len= description_event->common_header_len; 04227 uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1]; 04228 uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1]; 04229 if (!(event_buf= my_memdup((byte*) buf, len, MYF(MY_WME))) || 04230 copy_log_event(event_buf,len, 04231 ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? 04232 load_header_len + header_len : 04233 (fake_base ? (header_len+load_header_len) : 04234 (header_len+load_header_len) + 04235 create_file_header_len)), 04236 description_event)) 04237 DBUG_VOID_RETURN; 04238 if (description_event->binlog_version!=1) 04239 { 04240 file_id= uint4korr(buf + 04241 header_len + 04242 load_header_len + CF_FILE_ID_OFFSET); 04243 /* 04244 Note that it's ok to use get_data_size() below, because it is computed 04245 with values we have already read from this event (because we called 04246 copy_log_event()); we are not using slave's format info to decode 04247 master's format, we are really using master's format info. 04248 Anyway, both formats should be identical (except the common_header_len) 04249 as these Load events are not changed between 4.0 and 5.0 (as logging of 04250 LOAD DATA INFILE does not use Load_log_event in 5.0). 04251 04252 The + 1 is for \0 terminating fname 04253 */ 04254 block_offset= (description_event->common_header_len + 04255 Load_log_event::get_data_size() + 04256 create_file_header_len + 1); 04257 if (len < block_offset) 04258 return; 04259 block = (char*)buf + block_offset; 04260 block_len = len - block_offset; 04261 } 04262 else 04263 { 04264 sql_ex.force_new_format(); 04265 inited_from_old = 1; 04266 } 04267 DBUG_VOID_RETURN; 04268 } 04269 04270 04271 /* 04272 Create_file_log_event::print() 04273 */ 04274 04275 #ifdef MYSQL_CLIENT 04276 void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, 04277 bool enable_local) 04278 { 04279 if (print_event_info->short_form) 04280 { 04281 if (enable_local && check_fname_outside_temp_buf()) 04282 Load_log_event::print(file, print_event_info); 04283 return; 04284 } 04285 04286 if (enable_local) 04287 { 04288 Load_log_event::print(file, print_event_info, 04289 !check_fname_outside_temp_buf()); 04290 /* 04291 That one is for "file_id: etc" below: in mysqlbinlog we want the #, in 04292 SHOW BINLOG EVENTS we don't. 04293 */ 04294 fprintf(file, "#"); 04295 } 04296 04297 fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); 04298 } 04299 04300 04301 void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) 04302 { 04303 print(file, print_event_info, 0); 04304 } 04305 #endif /* MYSQL_CLIENT */ 04306 04307 04308 /* 04309 Create_file_log_event::pack_info() 04310 */ 04311 04312 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04313 void Create_file_log_event::pack_info(Protocol *protocol) 04314 { 04315 char buf[NAME_LEN*2 + 30 + 21*2], *pos; 04316 pos= strmov(buf, "db="); 04317 memcpy(pos, db, db_len); 04318 pos= strmov(pos + db_len, ";table="); 04319 memcpy(pos, table_name, table_name_len); 04320 pos= strmov(pos + table_name_len, ";file_id="); 04321 pos= int10_to_str((long) file_id, pos, 10); 04322 pos= strmov(pos, ";block_len="); 04323 pos= int10_to_str((long) block_len, pos, 10); 04324 protocol->store(buf, (uint) (pos-buf), &my_charset_bin); 04325 } 04326 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 04327 04328 04329 /* 04330 Create_file_log_event::exec_event() 04331 */ 04332 04333 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04334 int Create_file_log_event::exec_event(struct st_relay_log_info* rli) 04335 { 04336 char proc_info[17+FN_REFLEN+10], *fname_buf; 04337 char *ext; 04338 int fd = -1; 04339 IO_CACHE file; 04340 int error = 1; 04341 04342 bzero((char*)&file, sizeof(file)); 04343 fname_buf= strmov(proc_info, "Making temp file "); 04344 ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info"); 04345 thd->proc_info= proc_info; 04346 my_delete(fname_buf, MYF(0)); // old copy may exist already 04347 if ((fd= my_create(fname_buf, CREATE_MODE, 04348 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, 04349 MYF(MY_WME))) < 0 || 04350 init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, 04351 MYF(MY_WME|MY_NABP))) 04352 { 04353 slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " 04354 "could not open file '%s'", fname_buf); 04355 goto err; 04356 } 04357 04358 // a trick to avoid allocating another buffer 04359 fname= fname_buf; 04360 fname_len= (uint) (strmov(ext, ".data") - fname); 04361 if (write_base(&file)) 04362 { 04363 strmov(ext, ".info"); // to have it right in the error message 04364 slave_print_msg(ERROR_LEVEL, rli, my_errno, 04365 "Error in Create_file event: could not write to file '%s'", 04366 fname_buf); 04367 goto err; 04368 } 04369 end_io_cache(&file); 04370 my_close(fd, MYF(0)); 04371 04372 // fname_buf now already has .data, not .info, because we did our trick 04373 my_delete(fname_buf, MYF(0)); // old copy may exist already 04374 if ((fd= my_create(fname_buf, CREATE_MODE, 04375 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, 04376 MYF(MY_WME))) < 0) 04377 { 04378 slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " 04379 "could not open file '%s'", fname_buf); 04380 goto err; 04381 } 04382 if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) 04383 { 04384 slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " 04385 "write to '%s' failed", fname_buf); 04386 goto err; 04387 } 04388 error=0; // Everything is ok 04389 04390 err: 04391 if (error) 04392 end_io_cache(&file); 04393 if (fd >= 0) 04394 my_close(fd, MYF(0)); 04395 thd->proc_info= 0; 04396 return error ? 1 : Log_event::exec_event(rli); 04397 } 04398 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 04399 04400 04401 /************************************************************************** 04402 Append_block_log_event methods 04403 **************************************************************************/ 04404 04405 /* 04406 Append_block_log_event ctor 04407 */ 04408 04409 #ifndef MYSQL_CLIENT 04410 Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg, 04411 char* block_arg, 04412 uint block_len_arg, 04413 bool using_trans) 04414 :Log_event(thd_arg,0, using_trans), block(block_arg), 04415 block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg) 04416 { 04417 } 04418 #endif 04419 04420 04421 /* 04422 Append_block_log_event ctor 04423 */ 04424 04425 Append_block_log_event::Append_block_log_event(const char* buf, uint len, 04426 const Format_description_log_event* description_event) 04427 :Log_event(buf, description_event),block(0) 04428 { 04429 DBUG_ENTER("Append_block_log_event::Append_block_log_event(char*,...)"); 04430 uint8 common_header_len= description_event->common_header_len; 04431 uint8 append_block_header_len= 04432 description_event->post_header_len[APPEND_BLOCK_EVENT-1]; 04433 uint total_header_len= common_header_len+append_block_header_len; 04434 if (len < total_header_len) 04435 DBUG_VOID_RETURN; 04436 file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET); 04437 block= (char*)buf + total_header_len; 04438 block_len= len - total_header_len; 04439 DBUG_VOID_RETURN; 04440 } 04441 04442 04443 /* 04444 Append_block_log_event::write() 04445 */ 04446 04447 #ifndef MYSQL_CLIENT 04448 bool Append_block_log_event::write(IO_CACHE* file) 04449 { 04450 byte buf[APPEND_BLOCK_HEADER_LEN]; 04451 int4store(buf + AB_FILE_ID_OFFSET, file_id); 04452 return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) || 04453 my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || 04454 my_b_safe_write(file, (byte*) block, block_len)); 04455 } 04456 #endif 04457 04458 04459 /* 04460 Append_block_log_event::print() 04461 */ 04462 04463 #ifdef MYSQL_CLIENT 04464 void Append_block_log_event::print(FILE* file, 04465 PRINT_EVENT_INFO* print_event_info) 04466 { 04467 if (print_event_info->short_form) 04468 return; 04469 print_header(file, print_event_info); 04470 fputc('\n', file); 04471 fprintf(file, "#%s: file_id: %d block_len: %d\n", 04472 get_type_str(), file_id, block_len); 04473 } 04474 #endif /* MYSQL_CLIENT */ 04475 04476 04477 /* 04478 Append_block_log_event::pack_info() 04479 */ 04480 04481 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04482 void Append_block_log_event::pack_info(Protocol *protocol) 04483 { 04484 char buf[256]; 04485 uint length; 04486 length= (uint) my_sprintf(buf, 04487 (buf, ";file_id=%u;block_len=%u", file_id, 04488 block_len)); 04489 protocol->store(buf, length, &my_charset_bin); 04490 } 04491 04492 04493 /* 04494 Append_block_log_event::get_create_or_append() 04495 */ 04496 04497 int Append_block_log_event::get_create_or_append() const 04498 { 04499 return 0; /* append to the file, fail if not exists */ 04500 } 04501 04502 /* 04503 Append_block_log_event::exec_event() 04504 */ 04505 04506 int Append_block_log_event::exec_event(struct st_relay_log_info* rli) 04507 { 04508 char proc_info[17+FN_REFLEN+10], *fname= proc_info+17; 04509 int fd; 04510 int error = 1; 04511 DBUG_ENTER("Append_block_log_event::exec_event"); 04512 04513 fname= strmov(proc_info, "Making temp file "); 04514 slave_load_file_stem(fname, file_id, server_id, ".data"); 04515 thd->proc_info= proc_info; 04516 if (get_create_or_append()) 04517 { 04518 my_delete(fname, MYF(0)); // old copy may exist already 04519 if ((fd= my_create(fname, CREATE_MODE, 04520 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, 04521 MYF(MY_WME))) < 0) 04522 { 04523 slave_print_msg(ERROR_LEVEL, rli, my_errno, 04524 "Error in %s event: could not create file '%s'", 04525 get_type_str(), fname); 04526 goto err; 04527 } 04528 } 04529 else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW, 04530 MYF(MY_WME))) < 0) 04531 { 04532 slave_print_msg(ERROR_LEVEL, rli, my_errno, 04533 "Error in %s event: could not open file '%s'", 04534 get_type_str(), fname); 04535 goto err; 04536 } 04537 if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) 04538 { 04539 slave_print_msg(ERROR_LEVEL, rli, my_errno, 04540 "Error in %s event: write to '%s' failed", 04541 get_type_str(), fname); 04542 goto err; 04543 } 04544 error=0; 04545 04546 err: 04547 if (fd >= 0) 04548 my_close(fd, MYF(0)); 04549 thd->proc_info= 0; 04550 DBUG_RETURN(error ? error : Log_event::exec_event(rli)); 04551 } 04552 #endif 04553 04554 04555 /************************************************************************** 04556 Delete_file_log_event methods 04557 **************************************************************************/ 04558 04559 /* 04560 Delete_file_log_event ctor 04561 */ 04562 04563 #ifndef MYSQL_CLIENT 04564 Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg, 04565 bool using_trans) 04566 :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg) 04567 { 04568 } 04569 #endif 04570 04571 /* 04572 Delete_file_log_event ctor 04573 */ 04574 04575 Delete_file_log_event::Delete_file_log_event(const char* buf, uint len, 04576 const Format_description_log_event* description_event) 04577 :Log_event(buf, description_event),file_id(0) 04578 { 04579 uint8 common_header_len= description_event->common_header_len; 04580 uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1]; 04581 if (len < (uint)(common_header_len + delete_file_header_len)) 04582 return; 04583 file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET); 04584 } 04585 04586 04587 /* 04588 Delete_file_log_event::write() 04589 */ 04590 04591 #ifndef MYSQL_CLIENT 04592 bool Delete_file_log_event::write(IO_CACHE* file) 04593 { 04594 byte buf[DELETE_FILE_HEADER_LEN]; 04595 int4store(buf + DF_FILE_ID_OFFSET, file_id); 04596 return (write_header(file, sizeof(buf)) || 04597 my_b_safe_write(file, buf, sizeof(buf))); 04598 } 04599 #endif 04600 04601 04602 /* 04603 Delete_file_log_event::print() 04604 */ 04605 04606 #ifdef MYSQL_CLIENT 04607 void Delete_file_log_event::print(FILE* file, 04608 PRINT_EVENT_INFO* print_event_info) 04609 { 04610 if (print_event_info->short_form) 04611 return; 04612 print_header(file, print_event_info); 04613 fputc('\n', file); 04614 fprintf(file, "#Delete_file: file_id=%u\n", file_id); 04615 } 04616 #endif /* MYSQL_CLIENT */ 04617 04618 /* 04619 Delete_file_log_event::pack_info() 04620 */ 04621 04622 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04623 void Delete_file_log_event::pack_info(Protocol *protocol) 04624 { 04625 char buf[64]; 04626 uint length; 04627 length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); 04628 protocol->store(buf, (int32) length, &my_charset_bin); 04629 } 04630 #endif 04631 04632 /* 04633 Delete_file_log_event::exec_event() 04634 */ 04635 04636 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04637 int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) 04638 { 04639 char fname[FN_REFLEN+10]; 04640 char *ext= slave_load_file_stem(fname, file_id, server_id, ".data"); 04641 (void) my_delete(fname, MYF(MY_WME)); 04642 strmov(ext, ".info"); 04643 (void) my_delete(fname, MYF(MY_WME)); 04644 return Log_event::exec_event(rli); 04645 } 04646 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 04647 04648 04649 /************************************************************************** 04650 Execute_load_log_event methods 04651 **************************************************************************/ 04652 04653 /* 04654 Execute_load_log_event ctor 04655 */ 04656 04657 #ifndef MYSQL_CLIENT 04658 Execute_load_log_event::Execute_load_log_event(THD *thd_arg, const char* db_arg, 04659 bool using_trans) 04660 :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg) 04661 { 04662 } 04663 #endif 04664 04665 04666 /* 04667 Execute_load_log_event ctor 04668 */ 04669 04670 Execute_load_log_event::Execute_load_log_event(const char* buf, uint len, 04671 const Format_description_log_event* description_event) 04672 :Log_event(buf, description_event), file_id(0) 04673 { 04674 uint8 common_header_len= description_event->common_header_len; 04675 uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1]; 04676 if (len < (uint)(common_header_len+exec_load_header_len)) 04677 return; 04678 file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET); 04679 } 04680 04681 04682 /* 04683 Execute_load_log_event::write() 04684 */ 04685 04686 #ifndef MYSQL_CLIENT 04687 bool Execute_load_log_event::write(IO_CACHE* file) 04688 { 04689 byte buf[EXEC_LOAD_HEADER_LEN]; 04690 int4store(buf + EL_FILE_ID_OFFSET, file_id); 04691 return (write_header(file, sizeof(buf)) || 04692 my_b_safe_write(file, buf, sizeof(buf))); 04693 } 04694 #endif 04695 04696 04697 /* 04698 Execute_load_log_event::print() 04699 */ 04700 04701 #ifdef MYSQL_CLIENT 04702 void Execute_load_log_event::print(FILE* file, 04703 PRINT_EVENT_INFO* print_event_info) 04704 { 04705 if (print_event_info->short_form) 04706 return; 04707 print_header(file, print_event_info); 04708 fputc('\n', file); 04709 fprintf(file, "#Exec_load: file_id=%d\n", 04710 file_id); 04711 } 04712 #endif 04713 04714 /* 04715 Execute_load_log_event::pack_info() 04716 */ 04717 04718 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04719 void Execute_load_log_event::pack_info(Protocol *protocol) 04720 { 04721 char buf[64]; 04722 uint length; 04723 length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); 04724 protocol->store(buf, (int32) length, &my_charset_bin); 04725 } 04726 04727 04728 /* 04729 Execute_load_log_event::exec_event() 04730 */ 04731 04732 int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) 04733 { 04734 char fname[FN_REFLEN+10]; 04735 char *ext; 04736 int fd; 04737 int error= 1; 04738 IO_CACHE file; 04739 Load_log_event *lev= 0; 04740 04741 ext= slave_load_file_stem(fname, file_id, server_id, ".info"); 04742 if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW, 04743 MYF(MY_WME))) < 0 || 04744 init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, 04745 MYF(MY_WME|MY_NABP))) 04746 { 04747 slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Exec_load event: " 04748 "could not open file '%s'", fname); 04749 goto err; 04750 } 04751 if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, 04752 (pthread_mutex_t*)0, 04753 rli->relay_log.description_event_for_exec)) || 04754 lev->get_type_code() != NEW_LOAD_EVENT) 04755 { 04756 slave_print_msg(ERROR_LEVEL, rli, 0, "Error in Exec_load event: " 04757 "file '%s' appears corrupted", fname); 04758 goto err; 04759 } 04760 04761 lev->thd = thd; 04762 /* 04763 lev->exec_event should use rli only for errors 04764 i.e. should not advance rli's position. 04765 lev->exec_event is the place where the table is loaded (it calls 04766 mysql_load()). 04767 */ 04768 04769 rli->future_group_master_log_pos= log_pos; 04770 if (lev->exec_event(0,rli,1)) 04771 { 04772 /* 04773 We want to indicate the name of the file that could not be loaded 04774 (SQL_LOADxxx). 04775 But as we are here we are sure the error is in rli->last_slave_error and 04776 rli->last_slave_errno (example of error: duplicate entry for key), so we 04777 don't want to overwrite it with the filename. 04778 What we want instead is add the filename to the current error message. 04779 */ 04780 char *tmp= my_strdup(rli->last_slave_error,MYF(MY_WME)); 04781 if (tmp) 04782 { 04783 slave_print_msg(ERROR_LEVEL, rli, 04784 rli->last_slave_errno, /* ok to re-use error code */ 04785 "%s. Failed executing load from '%s'", 04786 tmp, fname); 04787 my_free(tmp,MYF(0)); 04788 } 04789 goto err; 04790 } 04791 /* 04792 We have an open file descriptor to the .info file; we need to close it 04793 or Windows will refuse to delete the file in my_delete(). 04794 */ 04795 if (fd >= 0) 04796 { 04797 my_close(fd, MYF(0)); 04798 end_io_cache(&file); 04799 fd= -1; 04800 } 04801 (void) my_delete(fname, MYF(MY_WME)); 04802 memcpy(ext, ".data", 6); 04803 (void) my_delete(fname, MYF(MY_WME)); 04804 error = 0; 04805 04806 err: 04807 delete lev; 04808 if (fd >= 0) 04809 { 04810 my_close(fd, MYF(0)); 04811 end_io_cache(&file); 04812 } 04813 return error ? error : Log_event::exec_event(rli); 04814 } 04815 04816 #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 04817 04818 04819 /************************************************************************** 04820 Begin_load_query_log_event methods 04821 **************************************************************************/ 04822 04823 #ifndef MYSQL_CLIENT 04824 Begin_load_query_log_event:: 04825 Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg, 04826 uint block_len_arg, bool using_trans) 04827 :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg, 04828 using_trans) 04829 { 04830 file_id= thd_arg->file_id= mysql_bin_log.next_file_id(); 04831 } 04832 #endif 04833 04834 04835 Begin_load_query_log_event:: 04836 Begin_load_query_log_event(const char* buf, uint len, 04837 const Format_description_log_event* desc_event) 04838 :Append_block_log_event(buf, len, desc_event) 04839 { 04840 } 04841 04842 04843 #if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04844 int Begin_load_query_log_event::get_create_or_append() const 04845 { 04846 return 1; /* create the file */ 04847 } 04848 #endif /* defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ 04849 04850 04851 /************************************************************************** 04852 Execute_load_query_log_event methods 04853 **************************************************************************/ 04854 04855 04856 #ifndef MYSQL_CLIENT 04857 Execute_load_query_log_event:: 04858 Execute_load_query_log_event(THD* thd_arg, const char* query_arg, 04859 ulong query_length_arg, uint fn_pos_start_arg, 04860 uint fn_pos_end_arg, 04861 enum_load_dup_handling dup_handling_arg, 04862 bool using_trans, bool suppress_use): 04863 Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, 04864 suppress_use), 04865 file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), 04866 fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) 04867 { 04868 } 04869 #endif /* !MYSQL_CLIENT */ 04870 04871 04872 Execute_load_query_log_event:: 04873 Execute_load_query_log_event(const char* buf, uint event_len, 04874 const Format_description_log_event* desc_event): 04875 Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT), 04876 file_id(0), fn_pos_start(0), fn_pos_end(0) 04877 { 04878 if (!Query_log_event::is_valid()) 04879 return; 04880 04881 buf+= desc_event->common_header_len; 04882 04883 fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET); 04884 fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET); 04885 dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET)); 04886 04887 if (fn_pos_start > q_len || fn_pos_end > q_len || 04888 dup_handling > LOAD_DUP_REPLACE) 04889 return; 04890 04891 file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET); 04892 } 04893 04894 04895 ulong Execute_load_query_log_event::get_post_header_size_for_derived() 04896 { 04897 return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN; 04898 } 04899 04900 04901 #ifndef MYSQL_CLIENT 04902 bool 04903 Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file) 04904 { 04905 char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; 04906 int4store(buf, file_id); 04907 int4store(buf + 4, fn_pos_start); 04908 int4store(buf + 4 + 4, fn_pos_end); 04909 *(buf + 4 + 4 + 4)= (char)dup_handling; 04910 return my_b_safe_write(file, (byte*) buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); 04911 } 04912 #endif 04913 04914 04915 #ifdef MYSQL_CLIENT 04916 void Execute_load_query_log_event::print(FILE* file, 04917 PRINT_EVENT_INFO* print_event_info) 04918 { 04919 print(file, print_event_info, 0); 04920 } 04921 04922 04923 void Execute_load_query_log_event::print(FILE* file, 04924 PRINT_EVENT_INFO* print_event_info, 04925 const char *local_fname) 04926 { 04927 print_query_header(file, print_event_info); 04928 04929 if (local_fname) 04930 { 04931 my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); 04932 fprintf(file, " LOCAL INFILE \'"); 04933 fprintf(file, local_fname); 04934 fprintf(file, "\'"); 04935 if (dup_handling == LOAD_DUP_REPLACE) 04936 fprintf(file, " REPLACE"); 04937 fprintf(file, " INTO"); 04938 my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, 04939 MYF(MY_NABP | MY_WME)); 04940 fprintf(file, ";\n"); 04941 } 04942 else 04943 { 04944 my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); 04945 fprintf(file, ";\n"); 04946 } 04947 04948 if (!print_event_info->short_form) 04949 fprintf(file, "# file_id: %d \n", file_id); 04950 } 04951 #endif 04952 04953 04954 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 04955 void Execute_load_query_log_event::pack_info(Protocol *protocol) 04956 { 04957 char *buf, *pos; 04958 if (!(buf= my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME)))) 04959 return; 04960 pos= buf; 04961 if (db && db_len) 04962 { 04963 pos= strmov(buf, "use `"); 04964 memcpy(pos, db, db_len); 04965 pos= strmov(pos+db_len, "`; "); 04966 } 04967 if (query && q_len) 04968 { 04969 memcpy(pos, query, q_len); 04970 pos+= q_len; 04971 } 04972 pos= strmov(pos, " ;file_id="); 04973 pos= int10_to_str((long) file_id, pos, 10); 04974 protocol->store(buf, pos-buf, &my_charset_bin); 04975 my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); 04976 } 04977 04978 04979 int 04980 Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli) 04981 { 04982 char *p; 04983 char *buf; 04984 char *fname; 04985 char *fname_end; 04986 int error; 04987 04988 /* Replace filename and LOCAL keyword in query before executing it */ 04989 if (!(buf = my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) + 04990 (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME)))) 04991 { 04992 slave_print_msg(ERROR_LEVEL, rli, my_errno, "Not enough memory"); 04993 return 1; 04994 } 04995 04996 p= buf; 04997 memcpy(p, query, fn_pos_start); 04998 p+= fn_pos_start; 04999 fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'"))); 05000 p= slave_load_file_stem(p, file_id, server_id, ".data"); 05001 fname_end= p= strend(p); // Safer than p=p+5 05002 *(p++)='\''; 05003 switch (dup_handling) { 05004 case LOAD_DUP_IGNORE: 05005 p= strmake(p, STRING_WITH_LEN(" IGNORE")); 05006 break; 05007 case LOAD_DUP_REPLACE: 05008 p= strmake(p, STRING_WITH_LEN(" REPLACE")); 05009 break; 05010 default: 05011 /* Ordinary load data */ 05012 break; 05013 } 05014 p= strmake(p, STRING_WITH_LEN(" INTO")); 05015 p= strmake(p, query+fn_pos_end, q_len-fn_pos_end); 05016 05017 error= Query_log_event::exec_event(rli, buf, p-buf); 05018 05019 /* Forging file name for deletion in same buffer */ 05020 *fname_end= 0; 05021 05022 /* 05023 If there was an error the slave is going to stop, leave the 05024 file so that we can re-execute this event at START SLAVE. 05025 */ 05026 if (!error) 05027 (void) my_delete(fname, MYF(MY_WME)); 05028 05029 my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); 05030 return error; 05031 } 05032 #endif 05033 05034 05035 /************************************************************************** 05036 sql_ex_info methods 05037 **************************************************************************/ 05038 05039 /* 05040 sql_ex_info::write_data() 05041 */ 05042 05043 bool sql_ex_info::write_data(IO_CACHE* file) 05044 { 05045 if (new_format()) 05046 { 05047 return (write_str(file, field_term, (uint) field_term_len) || 05048 write_str(file, enclosed, (uint) enclosed_len) || 05049 write_str(file, line_term, (uint) line_term_len) || 05050 write_str(file, line_start, (uint) line_start_len) || 05051 write_str(file, escaped, (uint) escaped_len) || 05052 my_b_safe_write(file,(byte*) &opt_flags,1)); 05053 } 05054 else 05055 { 05056 old_sql_ex old_ex; 05057 old_ex.field_term= *field_term; 05058 old_ex.enclosed= *enclosed; 05059 old_ex.line_term= *line_term; 05060 old_ex.line_start= *line_start; 05061 old_ex.escaped= *escaped; 05062 old_ex.opt_flags= opt_flags; 05063 old_ex.empty_flags=empty_flags; 05064 return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)) != 0; 05065 } 05066 } 05067 05068 05069 /* 05070 sql_ex_info::init() 05071 */ 05072 05073 char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) 05074 { 05075 cached_new_format = use_new_format; 05076 if (use_new_format) 05077 { 05078 empty_flags=0; 05079 /* 05080 The code below assumes that buf will not disappear from 05081 under our feet during the lifetime of the event. This assumption 05082 holds true in the slave thread if the log is in new format, but is not 05083 the case when we have old format because we will be reusing net buffer 05084 to read the actual file before we write out the Create_file event. 05085 */ 05086 if (read_str(&buf, buf_end, &field_term, &field_term_len) || 05087 read_str(&buf, buf_end, &enclosed, &enclosed_len) || 05088 read_str(&buf, buf_end, &line_term, &line_term_len) || 05089 read_str(&buf, buf_end, &line_start, &line_start_len) || 05090 read_str(&buf, buf_end, &escaped, &escaped_len)) 05091 return 0; 05092 opt_flags = *buf++; 05093 } 05094 else 05095 { 05096 field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; 05097 field_term = buf++; // Use first byte in string 05098 enclosed= buf++; 05099 line_term= buf++; 05100 line_start= buf++; 05101 escaped= buf++; 05102 opt_flags = *buf++; 05103 empty_flags= *buf++; 05104 if (empty_flags & FIELD_TERM_EMPTY) 05105 field_term_len=0; 05106 if (empty_flags & ENCLOSED_EMPTY) 05107 enclosed_len=0; 05108 if (empty_flags & LINE_TERM_EMPTY) 05109 line_term_len=0; 05110 if (empty_flags & LINE_START_EMPTY) 05111 line_start_len=0; 05112 if (empty_flags & ESCAPED_EMPTY) 05113 escaped_len=0; 05114 } 05115 return buf; 05116 } 05117 05118 05119 #ifdef HAVE_ROW_BASED_REPLICATION 05120 05121 /************************************************************************** 05122 Rows_log_event member functions 05123 **************************************************************************/ 05124 05125 #ifndef MYSQL_CLIENT 05126 Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, 05127 MY_BITMAP const *cols, bool is_transactional) 05128 : Log_event(thd_arg, 0, is_transactional), 05129 m_row_count(0), 05130 m_table(tbl_arg), 05131 m_table_id(tid), 05132 m_width(tbl_arg ? tbl_arg->s->fields : 1), 05133 m_rows_buf(0), m_rows_cur(0), m_rows_end(0), 05134 m_flags(0) 05135 { 05136 /* 05137 We allow a special form of dummy event when the table, and cols 05138 are null and the table id is ~0UL. This is a temporary 05139 solution, to be able to terminate a started statement in the 05140 binary log: the extreneous events will be removed in the future. 05141 */ 05142 DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL || 05143 !tbl_arg && !cols && tid == ~0UL); 05144 05145 if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS) 05146 set_flags(NO_FOREIGN_KEY_CHECKS_F); 05147 if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) 05148 set_flags(RELAXED_UNIQUE_CHECKS_F); 05149 /* if bitmap_init fails, catched in is_valid() */ 05150 if (likely(!bitmap_init(&m_cols, 05151 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, 05152 (m_width + 7) & ~7UL, 05153 false))) 05154 { 05155 /* Cols can be zero if this is a dummy binrows event */ 05156 if (likely(cols != NULL)) 05157 memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols)); 05158 } 05159 else 05160 m_cols.bitmap= 0; // to not free it 05161 } 05162 #endif 05163 05164 Rows_log_event::Rows_log_event(const char *buf, uint event_len, 05165 Log_event_type event_type, 05166 const Format_description_log_event 05167 *description_event) 05168 : Log_event(buf, description_event), 05169 m_row_count(0), 05170 m_rows_buf(0), m_rows_cur(0), m_rows_end(0) 05171 { 05172 DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); 05173 uint8 const common_header_len= description_event->common_header_len; 05174 uint8 const post_header_len= description_event->post_header_len[event_type-1]; 05175 05176 DBUG_PRINT("enter",("event_len=%ld, common_header_len=%d, " 05177 "post_header_len=%d", 05178 event_len, common_header_len, 05179 post_header_len)); 05180 05181 const char *post_start= buf + common_header_len; 05182 post_start+= RW_MAPID_OFFSET; 05183 if (post_header_len == 6) 05184 { 05185 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ 05186 m_table_id= uint4korr(post_start); 05187 post_start+= 4; 05188 } 05189 else 05190 { 05191 m_table_id= uint6korr(post_start); 05192 post_start+= RW_FLAGS_OFFSET; 05193 } 05194 05195 m_flags= uint2korr(post_start); 05196 05197 byte const *const var_start= (const byte *)buf + common_header_len + 05198 post_header_len; 05199 byte const *const ptr_width= var_start; 05200 uchar *ptr_after_width= (uchar*) ptr_width; 05201 m_width = net_field_length(&ptr_after_width); 05202 05203 const uint byte_count= (m_width + 7) / 8; 05204 const byte* const ptr_rows_data= var_start + byte_count + 1; 05205 05206 my_size_t const data_size= event_len - (ptr_rows_data - (const byte *) buf); 05207 DBUG_PRINT("info",("m_table_id=%lu, m_flags=%d, m_width=%u, data_size=%lu", 05208 m_table_id, m_flags, m_width, data_size)); 05209 05210 m_rows_buf= (byte*)my_malloc(data_size, MYF(MY_WME)); 05211 if (likely((bool)m_rows_buf)) 05212 { 05213 /* if bitmap_init fails, catched in is_valid() */ 05214 if (likely(!bitmap_init(&m_cols, 05215 m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, 05216 (m_width + 7) & ~7UL, 05217 false))) 05218 memcpy(m_cols.bitmap, ptr_after_width, byte_count); 05219 m_rows_end= m_rows_buf + data_size; 05220 m_rows_cur= m_rows_end; 05221 memcpy(m_rows_buf, ptr_rows_data, data_size); 05222 } 05223 else 05224 m_cols.bitmap= 0; // to not free it 05225 05226 DBUG_VOID_RETURN; 05227 } 05228 05229 Rows_log_event::~Rows_log_event() 05230 { 05231 if (m_cols.bitmap == m_bitbuf) // no my_malloc happened 05232 m_cols.bitmap= 0; // so no my_free in bitmap_free 05233 bitmap_free(&m_cols); // To pair with bitmap_init(). 05234 my_free((gptr)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR)); 05235 } 05236 05237 #ifndef MYSQL_CLIENT 05238 int Rows_log_event::do_add_row_data(byte *const row_data, 05239 my_size_t const length) 05240 { 05241 /* 05242 When the table has a primary key, we would probably want, by default, to 05243 log only the primary key value instead of the entire "before image". This 05244 would save binlog space. TODO 05245 */ 05246 DBUG_ENTER("Rows_log_event::do_add_row_data"); 05247 DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data, 05248 length)); 05249 DBUG_DUMP("row_data", (const char*)row_data, min(length, 32)); 05250 05251 DBUG_ASSERT(m_rows_buf <= m_rows_cur); 05252 DBUG_ASSERT(!m_rows_buf || m_rows_end && m_rows_buf < m_rows_end); 05253 DBUG_ASSERT(m_rows_cur <= m_rows_end); 05254 05255 /* The cast will always work since m_rows_cur <= m_rows_end */ 05256 if (static_cast<my_size_t>(m_rows_end - m_rows_cur) < length) 05257 { 05258 my_size_t const block_size= 1024; 05259 my_ptrdiff_t const old_alloc= m_rows_end - m_rows_buf; 05260 my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf; 05261 my_ptrdiff_t const new_alloc= 05262 block_size * ((cur_size + length) / block_size + block_size - 1); 05263 05264 byte* const new_buf= (byte*)my_realloc((gptr)m_rows_buf, new_alloc, 05265 MYF(MY_ALLOW_ZERO_PTR|MY_WME)); 05266 if (unlikely(!new_buf)) 05267 DBUG_RETURN(HA_ERR_OUT_OF_MEM); 05268 05269 /* If the memory moved, we need to move the pointers */ 05270 if (new_buf != m_rows_buf) 05271 { 05272 m_rows_buf= new_buf; 05273 m_rows_cur= m_rows_buf + cur_size; 05274 } 05275 05276 /* 05277 The end pointer should always be changed to point to the end of 05278 the allocated memory. 05279 */ 05280 m_rows_end= m_rows_buf + new_alloc; 05281 } 05282 05283 DBUG_ASSERT(m_rows_cur + length < m_rows_end); 05284 memcpy(m_rows_cur, row_data, length); 05285 m_rows_cur+= length; 05286 m_row_count++; 05287 DBUG_RETURN(0); 05288 } 05289 #endif 05290 05291 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 05292 /* 05293 Unpack a row into a record. The row is assumed to only consist of the fields 05294 for which the bitset represented by 'arr' and 'bits'; the other parts of the 05295 record are left alone. 05296 */ 05297 static char const *unpack_row(TABLE *table, 05298 byte *record, char const *row, 05299 MY_BITMAP const *cols) 05300 { 05301 DBUG_ASSERT(record && row); 05302 05303 MY_BITMAP *write_set= table->write_set; 05304 my_size_t const n_null_bytes= table->s->null_bytes; 05305 my_ptrdiff_t const offset= record - (byte*) table->record[0]; 05306 05307 memcpy(record, row, n_null_bytes); 05308 char const *ptr= row + n_null_bytes; 05309 05310 bitmap_set_all(write_set); 05311 Field **const begin_ptr = table->field; 05312 for (Field **field_ptr= begin_ptr ; *field_ptr ; ++field_ptr) 05313 { 05314 Field *const f= *field_ptr; 05315 05316 if (bitmap_is_set(cols, (uint) (field_ptr - begin_ptr))) 05317 { 05318 /* Field...::unpack() cannot return 0 */ 05319 ptr= f->unpack(f->ptr + offset, ptr); 05320 } 05321 else 05322 bitmap_clear_bit(write_set, (uint) (field_ptr - begin_ptr)); 05323 } 05324 return ptr; 05325 } 05326 05327 int Rows_log_event::exec_event(st_relay_log_info *rli) 05328 { 05329 DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)"); 05330 int error= 0; 05331 char const *row_start= (char const *)m_rows_buf; 05332 05333 /* 05334 If m_table_id == ~0UL, then we have a dummy event that does 05335 not contain any data. In that case, we just remove all tables in 05336 the tables_to_lock list, close the thread tables, step the relay 05337 log position, and return with success. 05338 */ 05339 if (m_table_id == ~0UL) 05340 { 05341 /* 05342 This one is supposed to be set: just an extra check so that 05343 nothing strange has happened. 05344 */ 05345 DBUG_ASSERT(get_flags(STMT_END_F)); 05346 05347 rli->clear_tables_to_lock(); 05348 close_thread_tables(thd); 05349 thd->clear_error(); 05350 rli->inc_event_relay_log_pos(); 05351 DBUG_RETURN(0); 05352 } 05353 05354 /* 05355 'thd' has been set by exec_relay_log_event(), just before calling 05356 exec_event(). We still check here to prevent future coding errors. 05357 */ 05358 DBUG_ASSERT(rli->sql_thd == thd); 05359 05360 /* 05361 If there is no locks taken, this is the first binrow event seen 05362 after the table map events. We should then lock all the tables 05363 used in the transaction and proceed with execution of the actual 05364 event. 05365 */ 05366 if (!thd->lock) 05367 { 05368 bool need_reopen= 1; /* To execute the first lap of the loop below */ 05369 05370 /* 05371 lock_tables() reads the contents of thd->lex, so they must be 05372 initialized. Contrary to in Table_map_log_event::exec_event() we don't 05373 call mysql_init_query() as that may reset the binlog format. 05374 */ 05375 lex_start(thd, NULL, 0); 05376 05377 while ((error= lock_tables(thd, rli->tables_to_lock, 05378 rli->tables_to_lock_count, &need_reopen))) 05379 { 05380 if (!need_reopen) 05381 { 05382 slave_print_msg(ERROR_LEVEL, rli, error, 05383 "Error in %s event: when locking tables", 05384 get_type_str()); 05385 rli->clear_tables_to_lock(); 05386 DBUG_RETURN(error); 05387 } 05388 05389 /* 05390 So we need to reopen the tables. 05391 05392 We need to flush the pending RBR event, since it keeps a 05393 pointer to an open table. 05394 05395 ALTERNATIVE SOLUTION (not implemented): Extract a pointer to 05396 the pending RBR event and reset the table pointer after the 05397 tables has been reopened. 05398 05399 NOTE: For this new scheme there should be no pending event: 05400 need to add code to assert that is the case. 05401 */ 05402 thd->binlog_flush_pending_rows_event(false); 05403 close_tables_for_reopen(thd, &rli->tables_to_lock); 05404 05405 if ((error= open_tables(thd, &rli->tables_to_lock, 05406 &rli->tables_to_lock_count, 0))) 05407 { 05408 if (thd->query_error || thd->is_fatal_error) 05409 { 05410 /* 05411 Error reporting borrowed from Query_log_event with many excessive 05412 simplifications (we don't honour --slave-skip-errors) 05413 */ 05414 uint actual_error= thd->net.last_errno; 05415 slave_print_msg(ERROR_LEVEL, rli, actual_error, 05416 "Error '%s' on reopening tables", 05417 (actual_error ? thd->net.last_error : 05418 "unexpected success or fatal error")); 05419 thd->query_error= 1; 05420 } 05421 rli->clear_tables_to_lock(); 05422 DBUG_RETURN(error); 05423 } 05424 } 05425 /* 05426 When the open and locking succeeded, we add all the tables to 05427 the table map and remove them from tables to lock. 05428 */ 05429 05430 TABLE_LIST *ptr; 05431 for (ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) 05432 { 05433 rli->m_table_map.set_table(ptr->table_id, ptr->table); 05434 rli->touching_table(ptr->db, ptr->table_name, ptr->table_id); 05435 } 05436 rli->clear_tables_to_lock(); 05437 } 05438 05439 DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count == 0); 05440 05441 TABLE* table= rli->m_table_map.get_table(m_table_id); 05442 05443 if (table) 05444 { 05445 /* 05446 table == NULL means that this table should not be replicated 05447 (this was set up by Table_map_log_event::exec_event() which 05448 tested replicate-* rules). 05449 */ 05450 05451 /* 05452 It's not needed to set_time() but 05453 1) it continues the property that "Time" in SHOW PROCESSLIST shows how 05454 much slave is behind 05455 2) it will be needed when we allow replication from a table with no 05456 TIMESTAMP column to a table with one. 05457 So we call set_time(), like in SBR. Presently it changes nothing. 05458 */ 05459 thd->set_time((time_t)when); 05460 /* 05461 There are a few flags that are replicated with each row event. 05462 Make sure to set/clear them before executing the main body of 05463 the event. 05464 */ 05465 if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) 05466 thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; 05467 else 05468 thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; 05469 05470 if (get_flags(RELAXED_UNIQUE_CHECKS_F)) 05471 thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; 05472 else 05473 thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; 05474 /* A small test to verify that objects have consistent types */ 05475 DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); 05476 05477 error= do_before_row_operations(table); 05478 while (error == 0 && row_start < (const char*) m_rows_end) 05479 { 05480 char const *row_end= do_prepare_row(thd, table, row_start); 05481 DBUG_ASSERT(row_end != NULL); // cannot happen 05482 DBUG_ASSERT(row_end <= (const char*)m_rows_end); 05483 05484 /* in_use can have been set to NULL in close_tables_for_reopen */ 05485 THD* old_thd= table->in_use; 05486 if (!table->in_use) 05487 table->in_use= thd; 05488 error= do_exec_row(table); 05489 table->in_use = old_thd; 05490 switch (error) 05491 { 05492 /* Some recoverable errors */ 05493 case HA_ERR_RECORD_CHANGED: 05494 case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if 05495 tuple does not exist */ 05496 error= 0; 05497 case 0: 05498 break; 05499 05500 default: 05501 slave_print_msg(ERROR_LEVEL, rli, error, 05502 "Error in %s event: row application failed", 05503 get_type_str()); 05504 thd->query_error= 1; 05505 break; 05506 } 05507 05508 row_start= row_end; 05509 } 05510 DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", 05511 rli->abort_slave=1;); 05512 error= do_after_row_operations(table, error); 05513 if (!cache_stmt) 05514 { 05515 DBUG_PRINT("info", ("Marked that we need to keep log")); 05516 thd->options|= OPTION_KEEP_LOG; 05517 } 05518 } 05519 05520 if (error) 05521 { /* error has occured during the transaction */ 05522 slave_print_msg(ERROR_LEVEL, rli, error, 05523 "Error in %s event: error during transaction execution " 05524 "on table %s.%s", 05525 get_type_str(), table->s->db.str, 05526 table->s->table_name.str); 05527 /* 05528 If one day we honour --skip-slave-errors in row-based replication, and 05529 the error should be skipped, then we would clear mappings, rollback, 05530 close tables, but the slave SQL thread would not stop and then may 05531 assume the mapping is still available, the tables are still open... 05532 So then we should clear mappings/rollback/close here only if this is a 05533 STMT_END_F. 05534 For now we code, knowing that error is not skippable and so slave SQL 05535 thread is certainly going to stop. 05536 */ 05537 thd->reset_current_stmt_binlog_row_based(); 05538 rli->cleanup_context(thd, 1); 05539 thd->query_error= 1; 05540 DBUG_RETURN(error); 05541 } 05542 05543 if (get_flags(STMT_END_F)) 05544 { 05545 /* 05546 This is the end of a statement or transaction, so close (and 05547 unlock) the tables we opened when processing the 05548 Table_map_log_event starting the statement. 05549 05550 OBSERVER. This will clear *all* mappings, not only those that 05551 are open for the table. There is not good handle for on-close 05552 actions for tables. 05553 05554 NOTE. Even if we have no table ('table' == 0) we still need to be 05555 here, so that we increase the group relay log position. If we didn't, we 05556 could have a group relay log position which lags behind "forever" 05557 (assume the last master's transaction is ignored by the slave because of 05558 replicate-ignore rules). 05559 */ 05560 thd->binlog_flush_pending_rows_event(true); 05561 /* 05562 If this event is not in a transaction, the call below will, if some 05563 transactional storage engines are involved, commit the statement into 05564 them and flush the pending event to binlog. 05565 If this event is in a transaction, the call will do nothing, but a 05566 Xid_log_event will come next which will, if some transactional engines 05567 are involved, commit the transaction and flush the pending event to the 05568 binlog. 05569 */ 05570 error= ha_autocommit_or_rollback(thd, 0); 05571 /* 05572 Now what if this is not a transactional engine? we still need to 05573 flush the pending event to the binlog; we did it with 05574 thd->binlog_flush_pending_rows_event(). Note that we imitate 05575 what is done for real queries: a call to 05576 ha_autocommit_or_rollback() (sometimes only if involves a 05577 transactional engine), and a call to be sure to have the pending 05578 event flushed. 05579 */ 05580 05581 thd->reset_current_stmt_binlog_row_based(); 05582 rli->cleanup_context(thd, 0); 05583 rli->transaction_end(thd); 05584 05585 if (error == 0) 05586 { 05587 /* 05588 Clear any errors pushed in thd->net.last_err* if for example "no key 05589 found" (as this is allowed). This is a safety measure; apparently 05590 those errors (e.g. when executing a Delete_rows_log_event of a 05591 non-existing row, like in rpl_row_mystery22.test, 05592 thd->net.last_error = "Can't find record in 't1'" and last_errno=1032) 05593 do not become visible. We still prefer to wipe them out. 05594 */ 05595 thd->clear_error(); 05596 error= Log_event::exec_event(rli); 05597 } 05598 else 05599 slave_print_msg(ERROR_LEVEL, rli, error, 05600 "Error in %s event: commit of row events failed, " 05601 "table `%s`.`%s`", 05602 get_type_str(), table->s->db.str, 05603 table->s->table_name.str); 05604 DBUG_RETURN(error); 05605 } 05606 05607 if (table && (table->s->primary_key == MAX_KEY) && !cache_stmt) 05608 { 05609 /* 05610 ------------ Temporary fix until WL#2975 is implemented --------- 05611 05612 This event is not the last one (no STMT_END_F). If we stop now 05613 (in case of terminate_slave_thread()), how will we restart? We 05614 have to restart from Table_map_log_event, but as this table is 05615 not transactional, the rows already inserted will still be 05616 present, and idempotency is not guaranteed (no PK) so we risk 05617 that repeating leads to double insert. So we desperately try to 05618 continue, hope we'll eventually leave this buggy situation (by 05619 executing the final Rows_log_event). If we are in a hopeless 05620 wait (reached end of last relay log and nothing gets appended 05621 there), we timeout after one minute, and notify DBA about the 05622 problem. When WL#2975 is implemented, just remove the member 05623 st_relay_log_info::unsafe_to_stop_at and all its occurences. 05624 */ 05625 rli->unsafe_to_stop_at= time(0); 05626 } 05627 05628 DBUG_ASSERT(error == 0); 05629 thd->clear_error(); 05630 rli->inc_event_relay_log_pos(); 05631 05632 DBUG_RETURN(0); 05633 } 05634 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ 05635 05636 #ifndef MYSQL_CLIENT 05637 bool Rows_log_event::write_data_header(IO_CACHE *file) 05638 { 05639 byte buf[ROWS_HEADER_LEN]; // No need to init the buffer 05640 DBUG_ASSERT(m_table_id != ~0UL); 05641 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", 05642 { 05643 int4store(buf + 0, m_table_id); 05644 int2store(buf + 4, m_flags); 05645 return (my_b_safe_write(file, buf, 6)); 05646 }); 05647 int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); 05648 int2store(buf + RW_FLAGS_OFFSET, m_flags); 05649 return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); 05650 } 05651 05652 bool Rows_log_event::write_data_body(IO_CACHE*file) 05653 { 05654 /* 05655 Note that this should be the number of *bits*, not the number of 05656 bytes. 05657 */ 05658 char sbuf[sizeof(m_width)]; 05659 my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf; 05660 05661 char *const sbuf_end= net_store_length((char*) sbuf, (uint) m_width); 05662 DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); 05663 05664 return (my_b_safe_write(file, reinterpret_cast<byte*>(sbuf), 05665 sbuf_end - sbuf) || 05666 my_b_safe_write(file, reinterpret_cast<byte*>(m_cols.bitmap), 05667 no_bytes_in_map(&m_cols)) || 05668 my_b_safe_write(file, m_rows_buf, data_size)); 05669 } 05670 #endif 05671 05672 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 05673 void Rows_log_event::pack_info(Protocol *protocol) 05674 { 05675 char buf[256]; 05676 char const *const flagstr= 05677 get_flags(STMT_END_F) ? " flags: STMT_END_F" : ""; 05678 my_size_t bytes= my_snprintf(buf, sizeof(buf), 05679 "table_id: %lu%s", m_table_id, flagstr); 05680 protocol->store(buf, bytes, &my_charset_bin); 05681 } 05682 #endif 05683 05684 /************************************************************************** 05685 Table_map_log_event member functions 05686 **************************************************************************/ 05687 05688 /* 05689 Constructor used to build an event for writing to the binary log. 05690 Mats says tbl->s lives longer than this event so it's ok to copy pointers 05691 (tbl->s->db etc) and not pointer content. 05692 */ 05693 #if !defined(MYSQL_CLIENT) 05694 Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, 05695 bool is_transactional, uint16 flags) 05696 : Log_event(thd, 0, is_transactional), 05697 m_table(tbl), 05698 m_dbnam(tbl->s->db.str), 05699 m_dblen(m_dbnam ? tbl->s->db.length : 0), 05700 m_tblnam(tbl->s->table_name.str), 05701 m_tbllen(tbl->s->table_name.length), 05702 m_colcnt(tbl->s->fields), m_coltype(0), 05703 m_table_id(tid), 05704 m_flags(flags) 05705 { 05706 DBUG_ASSERT(m_table_id != ~0UL); 05707 /* 05708 In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in 05709 table.cc / alloc_table_share(): 05710 Use the fact the key is db/0/table_name/0 05711 As we rely on this let's assert it. 05712 */ 05713 DBUG_ASSERT((tbl->s->db.str == 0) || 05714 (tbl->s->db.str[tbl->s->db.length] == 0)); 05715 DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0); 05716 05717 05718 m_data_size= TABLE_MAP_HEADER_LEN; 05719 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;); 05720 m_data_size+= m_dblen + 2; // Include length and terminating \0 05721 m_data_size+= m_tbllen + 2; // Include length and terminating \0 05722 m_data_size+= 1 + m_colcnt; // COLCNT and column types 05723 05724 /* If malloc fails, catched in is_valid() */ 05725 if ((m_memory= my_malloc(m_colcnt, MYF(MY_WME)))) 05726 { 05727 m_coltype= reinterpret_cast<uchar*>(m_memory); 05728 for (unsigned int i= 0 ; i < m_table->s->fields ; ++i) 05729 m_coltype[i]= m_table->field[i]->type(); 05730 } 05731 } 05732 #endif /* !defined(MYSQL_CLIENT) */ 05733 05734 /* 05735 Constructor used by slave to read the event from the binary log. 05736 */ 05737 #if defined(HAVE_REPLICATION) 05738 Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, 05739 const Format_description_log_event 05740 *description_event) 05741 05742 : Log_event(buf, description_event), 05743 #ifndef MYSQL_CLIENT 05744 m_table(NULL), 05745 #endif 05746 m_memory(NULL) 05747 { 05748 DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)"); 05749 05750 uint8 common_header_len= description_event->common_header_len; 05751 uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1]; 05752 DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d", 05753 event_len, common_header_len, post_header_len)); 05754 05755 DBUG_DUMP("event buffer", buf, event_len); 05756 05757 /* Read the post-header */ 05758 const char *post_start= buf + common_header_len; 05759 05760 post_start+= TM_MAPID_OFFSET; 05761 if (post_header_len == 6) 05762 { 05763 /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ 05764 m_table_id= uint4korr(post_start); 05765 post_start+= 4; 05766 } 05767 else 05768 { 05769 DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN); 05770 m_table_id= uint6korr(post_start); 05771 post_start+= TM_FLAGS_OFFSET; 05772 } 05773 05774 DBUG_ASSERT(m_table_id != ~0UL); 05775 05776 m_flags= uint2korr(post_start); 05777 05778 /* Read the variable part of the event */ 05779 const char *const vpart= buf + common_header_len + post_header_len; 05780 05781 /* Extract the length of the various parts from the buffer */ 05782 byte const* const ptr_dblen= (byte const*)vpart + 0; 05783 m_dblen= *(uchar*) ptr_dblen; 05784 05785 /* Length of database name + counter + terminating null */ 05786 byte const* const ptr_tbllen= ptr_dblen + m_dblen + 2; 05787 m_tbllen= *(uchar*) ptr_tbllen; 05788 05789 /* Length of table name + counter + terminating null */ 05790 byte const* const ptr_colcnt= ptr_tbllen + m_tbllen + 2; 05791 uchar *ptr_after_colcnt= (uchar*) ptr_colcnt; 05792 m_colcnt= net_field_length(&ptr_after_colcnt); 05793 05794 DBUG_PRINT("info",("m_dblen=%d off=%d m_tbllen=%d off=%d m_colcnt=%d off=%d", 05795 m_dblen, ptr_dblen-(const byte*)vpart, 05796 m_tbllen, ptr_tbllen-(const byte*)vpart, 05797 m_colcnt, ptr_colcnt-(const byte*)vpart)); 05798 05799 /* Allocate mem for all fields in one go. If fails, catched in is_valid() */ 05800 m_memory= my_multi_malloc(MYF(MY_WME), 05801 &m_dbnam, m_dblen + 1, 05802 &m_tblnam, m_tbllen + 1, 05803 &m_coltype, m_colcnt, 05804 NULL); 05805 05806 if (m_memory) 05807 { 05808 /* Copy the different parts into their memory */ 05809 strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1); 05810 strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1); 05811 memcpy(m_coltype, ptr_after_colcnt, m_colcnt); 05812 } 05813 05814 DBUG_VOID_RETURN; 05815 } 05816 #endif 05817 05818 Table_map_log_event::~Table_map_log_event() 05819 { 05820 my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); 05821 } 05822 05823 /* 05824 Return value is an error code, one of: 05825 05826 -1 Failure to open table [from open_tables()] 05827 0 Success 05828 1 No room for more tables [from set_table()] 05829 2 Out of memory [from set_table()] 05830 3 Wrong table definition 05831 4 Daisy-chaining RBR with SBR not possible 05832 */ 05833 05834 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 05835 int Table_map_log_event::exec_event(st_relay_log_info *rli) 05836 { 05837 DBUG_ENTER("Table_map_log_event::exec_event(st_relay_log_info*)"); 05838 05839 DBUG_ASSERT(rli->sql_thd == thd); 05840 05841 /* Step the query id to mark what columns that are actually used. */ 05842 pthread_mutex_lock(&LOCK_thread_count); 05843 thd->query_id= next_query_id(); 05844 pthread_mutex_unlock(&LOCK_thread_count); 05845 05846 TABLE_LIST *table_list; 05847 char *db_mem, *tname_mem; 05848 void *const memory= 05849 my_multi_malloc(MYF(MY_WME), 05850 &table_list, sizeof(TABLE_LIST), 05851 &db_mem, NAME_LEN + 1, 05852 &tname_mem, NAME_LEN + 1, 05853 NULL); 05854 05855 if (memory == NULL) 05856 DBUG_RETURN(HA_ERR_OUT_OF_MEM); 05857 05858 uint dummy_len; 05859 bzero(table_list, sizeof(*table_list)); 05860 table_list->db = db_mem; 05861 table_list->alias= table_list->table_name = tname_mem; 05862 table_list->lock_type= TL_WRITE; 05863 table_list->next_global= table_list->next_local= 0; 05864 table_list->table_id= m_table_id; 05865 table_list->updating= 1; 05866 strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len)); 05867 strmov(table_list->table_name, m_tblnam); 05868 05869 int error= 0; 05870 05871 if (!rpl_filter->db_ok(table_list->db) || 05872 (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))) 05873 { 05874 my_free((gptr) memory, MYF(MY_WME)); 05875 } 05876 else 05877 { 05878 /* 05879 open_tables() reads the contents of thd->lex, so they must be 05880 initialized, so we should call lex_start(); to be even safer, we 05881 call mysql_init_query() which does a more complete set of inits. 05882 */ 05883 mysql_init_query(thd, NULL, 0); 05884 /* 05885 Check if the slave is set to use SBR. If so, it should switch 05886 to using RBR until the end of the "statement", i.e., next 05887 STMT_END_F or next error. 05888 */ 05889 if (!thd->current_stmt_binlog_row_based && 05890 mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) 05891 { 05892 thd->set_current_stmt_binlog_row_based(); 05893 } 05894 05895 /* 05896 Open the table if it is not already open and add the table to table map. 05897 Note that for any table that should not be replicated, a filter is needed. 05898 */ 05899 uint count; 05900 if ((error= open_tables(thd, &table_list, &count, 0))) 05901 { 05902 if (thd->query_error || thd->is_fatal_error) 05903 { 05904 /* 05905 Error reporting borrowed from Query_log_event with many excessive 05906 simplifications (we don't honour --slave-skip-errors) 05907 */ 05908 uint actual_error= thd->net.last_errno; 05909 slave_print_msg(ERROR_LEVEL, rli, actual_error, 05910 "Error '%s' on opening table `%s`.`%s`", 05911 (actual_error ? thd->net.last_error : 05912 "unexpected success or fatal error"), 05913 table_list->db, table_list->table_name); 05914 thd->query_error= 1; 05915 } 05916 goto err; 05917 } 05918 05919 m_table= table_list->table; 05920 05921 /* 05922 This will fail later otherwise, the 'in_use' field should be 05923 set to the current thread. 05924 */ 05925 DBUG_ASSERT(m_table->in_use); 05926 05927 /* 05928 Check that the number of columns and the field types in the 05929 event match the number of columns and field types in the opened 05930 table. 05931 */ 05932 uint col= m_table->s->fields; 05933 05934 if (col == m_colcnt) 05935 { 05936 while (col-- > 0) 05937 if (m_table->field[col]->type() != m_coltype[col]) 05938 break; 05939 } 05940 05941 TABLE_SHARE const *const tsh= m_table->s; 05942 05943 /* 05944 Check the following termination conditions: 05945 05946 (col == m_table->s->fields) 05947 ==> (m_table->s->fields != m_colcnt) 05948 (0 <= col < m_table->s->fields) 05949 ==> (m_table->field[col]->type() != m_coltype[col]) 05950 05951 Logically, A ==> B is equivalent to !A || B 05952 05953 Since col is unsigned, is suffices to check that col <= 05954 tsh->fields. If col wrapped (by decreasing col when it is 0), 05955 the number will be UINT_MAX, which is greater than tsh->fields. 05956 */ 05957 DBUG_ASSERT(!(col == tsh->fields) || tsh->fields != m_colcnt); 05958 DBUG_ASSERT(!(col < tsh->fields) || 05959 (m_table->field[col]->type() != m_coltype[col])); 05960 05961 if (col <= tsh->fields) 05962 { 05963 /* purecov: begin inspected */ 05964 /* 05965 If we get here, the number of columns in the event didn't 05966 match the number of columns in the table on the slave, *or* 05967 there were a column in the table on the slave that did not 05968 have the same type as given in the event. 05969 05970 If 'col' has the value that was assigned to it, it was a 05971 mismatch between the number of columns on the master and the 05972 slave. 05973 */ 05974 if (col == tsh->fields) 05975 { 05976 DBUG_ASSERT(tsh->db.str && tsh->table_name.str); 05977 slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF, 05978 "Table width mismatch - " 05979 "received %u columns, %s.%s has %u columns", 05980 m_colcnt, tsh->db.str, tsh->table_name.str, tsh->fields); 05981 } 05982 else 05983 { 05984 DBUG_ASSERT(col < m_colcnt && col < tsh->fields); 05985 DBUG_ASSERT(tsh->db.str && tsh->table_name.str); 05986 slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF, 05987 "Column %d type mismatch - " 05988 "received type %d, %s.%s has type %d", 05989 col, m_coltype[col], tsh->db.str, tsh->table_name.str, 05990 m_table->field[col]->type()); 05991 } 05992 05993 thd->query_error= 1; 05994 error= ERR_BAD_TABLE_DEF; 05995 goto err; 05996 /* purecov: end */ 05997 } 05998 05999 /* 06000 We record in the slave's information that the table should be 06001 locked by linking the table into the list of tables to lock, and 06002 tell the RLI that we are touching a table. 06003 */ 06004 table_list->next_global= table_list->next_local= rli->tables_to_lock; 06005 rli->tables_to_lock= table_list; 06006 rli->tables_to_lock_count++; 06007 /* 'memory' is freed in clear_tables_to_lock */ 06008 } 06009 06010 /* 06011 We explicitly do not call Log_event::exec_event() here since we do not 06012 want the relay log position to be flushed to disk. The flushing will be 06013 done by the last Rows_log_event that either ends a statement (outside a 06014 transaction) or a transaction. 06015 06016 A table map event can *never* end a transaction or a statement, so we 06017 just step the relay log position. 06018 */ 06019 06020 if (likely(!error)) 06021 rli->inc_event_relay_log_pos(); 06022 DBUG_RETURN(error); 06023 06024 err: 06025 my_free((gptr) memory, MYF(MY_WME)); 06026 DBUG_RETURN(error); 06027 } 06028 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ 06029 06030 #ifndef MYSQL_CLIENT 06031 bool Table_map_log_event::write_data_header(IO_CACHE *file) 06032 { 06033 DBUG_ASSERT(m_table_id != ~0UL); 06034 byte buf[TABLE_MAP_HEADER_LEN]; 06035 DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", 06036 { 06037 int4store(buf + 0, m_table_id); 06038 int2store(buf + 4, m_flags); 06039 return (my_b_safe_write(file, buf, 6)); 06040 }); 06041 int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); 06042 int2store(buf + TM_FLAGS_OFFSET, m_flags); 06043 return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); 06044 } 06045 06046 bool Table_map_log_event::write_data_body(IO_CACHE *file) 06047 { 06048 DBUG_ASSERT(m_dbnam != NULL); 06049 DBUG_ASSERT(m_tblnam != NULL); 06050 /* We use only one byte per length for storage in event: */ 06051 DBUG_ASSERT(m_dblen < 128); 06052 DBUG_ASSERT(m_tbllen < 128); 06053 06054 byte const dbuf[]= { m_dblen }; 06055 byte const tbuf[]= { m_tbllen }; 06056 06057 char cbuf[sizeof(m_colcnt)]; 06058 char *const cbuf_end= net_store_length((char*) cbuf, (uint) m_colcnt); 06059 DBUG_ASSERT(static_cast<my_size_t>(cbuf_end - cbuf) <= sizeof(cbuf)); 06060 06061 return (my_b_safe_write(file, dbuf, sizeof(dbuf)) || 06062 my_b_safe_write(file, (const byte*)m_dbnam, m_dblen+1) || 06063 my_b_safe_write(file, tbuf, sizeof(tbuf)) || 06064 my_b_safe_write(file, (const byte*)m_tblnam, m_tbllen+1) || 06065 my_b_safe_write(file, reinterpret_cast<byte*>(cbuf), 06066 cbuf_end - (char*) cbuf) || 06067 my_b_safe_write(file, reinterpret_cast<byte*>(m_coltype), m_colcnt)); 06068 } 06069 #endif 06070 06071 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 06072 06073 /* 06074 Print some useful information for the SHOW BINARY LOG information 06075 field. 06076 */ 06077 06078 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) 06079 void Table_map_log_event::pack_info(Protocol *protocol) 06080 { 06081 char buf[256]; 06082 my_size_t bytes= my_snprintf(buf, sizeof(buf), 06083 "table_id: %lu (%s.%s)", 06084 m_table_id, m_dbnam, m_tblnam); 06085 protocol->store(buf, bytes, &my_charset_bin); 06086 } 06087 #endif 06088 06089 06090 #endif 06091 06092 06093 #ifdef MYSQL_CLIENT 06094 void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) 06095 { 06096 if (!print_event_info->short_form) 06097 { 06098 print_header(file, print_event_info); 06099 fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", 06100 m_dbnam, m_tblnam, m_table_id); 06101 print_base64(file, print_event_info); 06102 } 06103 } 06104 #endif 06105 06106 /************************************************************************** 06107 Write_rows_log_event member functions 06108 **************************************************************************/ 06109 06110 /* 06111 Constructor used to build an event for writing to the binary log. 06112 */ 06113 #if !defined(MYSQL_CLIENT) 06114 Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, 06115 ulong tid_arg, 06116 MY_BITMAP const *cols, 06117 bool is_transactional) 06118 : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional) 06119 { 06120 } 06121 #endif 06122 06123 /* 06124 Constructor used by slave to read the event from the binary log. 06125 */ 06126 #ifdef HAVE_REPLICATION 06127 Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, 06128 const Format_description_log_event 06129 *description_event) 06130 : Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event) 06131 { 06132 } 06133 #endif 06134 06135 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 06136 int Write_rows_log_event::do_before_row_operations(TABLE *table) 06137 { 06138 int error= 0; 06139 06140 /* 06141 We are using REPLACE semantics and not INSERT IGNORE semantics 06142 when writing rows, that is: new rows replace old rows. We need to 06143 inform the storage engine that it should use this behaviour. 06144 */ 06145 06146 /* Tell the storage engine that we are using REPLACE semantics. */ 06147 thd->lex->duplicates= DUP_REPLACE; 06148 06149 /* 06150 Pretend we're executing a REPLACE command: this is needed for 06151 InnoDB and NDB Cluster since they are not (properly) checking the 06152 lex->duplicates flag. 06153 */ 06154 thd->lex->sql_command= SQLCOM_REPLACE; 06155 06156 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // Needed for ndbcluster 06157 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); // Needed for ndbcluster 06158 table->file->extra(HA_EXTRA_IGNORE_NO_KEY); // Needed for ndbcluster 06159 /* 06160 TODO: the cluster team (Tomas?) says that it's better if the engine knows 06161 how many rows are going to be inserted, then it can allocate needed memory 06162 from the start. 06163 */ 06164 table->file->ha_start_bulk_insert(0); 06165 /* 06166 We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill 06167 any TIMESTAMP column with data from the row but instead will use 06168 the event's current time. 06169 As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra 06170 columns, we know that all TIMESTAMP columns on slave will receive explicit 06171 data from the row, so TIMESTAMP_NO_AUTO_SET is ok. 06172 When we allow a table without TIMESTAMP to be replicated to a table having 06173 more columns including a TIMESTAMP column, or when we allow a TIMESTAMP 06174 column to be replicated into a BIGINT column and the slave's table has a 06175 TIMESTAMP column, then the slave's TIMESTAMP column will take its value 06176 from set_time() which we called earlier (consistent with SBR). And then in 06177 some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to 06178 analyze if explicit data is provided for slave's TIMESTAMP columns). 06179 */ 06180 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; 06181 return error; 06182 } 06183 06184 int Write_rows_log_event::do_after_row_operations(TABLE *table, int error) 06185 { 06186 if (error == 0) 06187 error= table->file->ha_end_bulk_insert(); 06188 return error; 06189 } 06190 06191 char const *Write_rows_log_event::do_prepare_row(THD *thd, TABLE *table, 06192 char const *row_start) 06193 { 06194 char const *ptr= row_start; 06195 DBUG_ASSERT(table != NULL); 06196 /* 06197 This assertion actually checks that there is at least as many 06198 columns on the slave as on the master. 06199 */ 06200 DBUG_ASSERT(table->s->fields >= m_width); 06201 DBUG_ASSERT(ptr); 06202 ptr= unpack_row(table, (byte*)table->record[0], ptr, &m_cols); 06203 return ptr; 06204 } 06205 06206 /* 06207 Check if there are more UNIQUE keys after the given key. 06208 */ 06209 static int 06210 last_uniq_key(TABLE *table, uint keyno) 06211 { 06212 while (++keyno < table->s->keys) 06213 if (table->key_info[keyno].flags & HA_NOSAME) 06214 return 0; 06215 return 1; 06216 } 06217 06218 /* Anonymous namespace for template functions/classes */ 06219 namespace { 06220 06221 /* 06222 Smart pointer that will automatically call my_afree (a macro) when 06223 the pointer goes out of scope. This is used so that I do not have 06224 to remember to call my_afree() before each return. There is no 06225 overhead associated with this, since all functions are inline. 06226 06227 I (Matz) would prefer to use the free function as a template 06228 parameter, but that is not possible when the "function" is a 06229 macro. 06230 */ 06231 template <class Obj> 06232 class auto_afree_ptr 06233 { 06234 Obj* m_ptr; 06235 public: 06236 auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { } 06237 ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); } 06238 void assign(Obj* ptr) { 06239 /* Only to be called if it hasn't been given a value before. */ 06240 DBUG_ASSERT(m_ptr == NULL); 06241 m_ptr= ptr; 06242 } 06243 Obj* get() { return m_ptr; } 06244 }; 06245 06246 } 06247 06248 06249 /* 06250 Replace the provided record in the database. 06251 06252 Similar to how it is done in <code>mysql_insert()</code>, we first 06253 try to do a <code>ha_write_row()</code> and of that fails due to 06254 duplicated keys (or indices), we do an <code>ha_update_row()</code> 06255 or a <code>ha_delete_row()</code> instead. 06256 06257 @param thd Thread context for writing the record. 06258 @param table Table to which record should be written. 06259 06260 @return Error code on failure, 0 on success. 06261 */ 06262 static int 06263 replace_record(THD *thd, TABLE *table) 06264 { 06265 DBUG_ASSERT(table != NULL && thd != NULL); 06266 06267 int error; 06268 int keynum; 06269 auto_afree_ptr<char> key(NULL); 06270 06271 while ((error= table->file->ha_write_row(table->record[0]))) 06272 { 06273 if ((keynum= table->file->get_dup_key(error)) < 0) 06274 { 06275 /* We failed to retrieve the duplicate key */ 06276 return HA_ERR_FOUND_DUPP_KEY; 06277 } 06278 06279 /* 06280 We need to retrieve the old row into record[1] to be able to 06281 either update or delete the offending record. We either: 06282 06283 - use rnd_pos() with a row-id (available as dupp_row) to the 06284 offending row, if that is possible (MyISAM and Blackhole), or else 06285 06286 - use index_read_idx() with the key that is duplicated, to 06287 retrieve the offending row. 06288 */ 06289 if (table->file->ha_table_flags() & HA_DUPLICATE_POS) 06290 { 06291 error= table->file->rnd_pos(table->record[1], table->file->dup_ref); 06292 if (error) 06293 return error; 06294 } 06295 else 06296 { 06297 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) 06298 { 06299 return my_errno; 06300 } 06301 06302 if (key.get() == NULL) 06303 { 06304 key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); 06305 if (key.get() == NULL) 06306 return ENOMEM; 06307 } 06308 06309 key_copy((byte*)key.get(), table->record[0], table->key_info + keynum, 0); 06310 error= table->file->index_read_idx(table->record[1], keynum, 06311 (const byte*)key.get(), 06312 table->key_info[keynum].key_length, 06313 HA_READ_KEY_EXACT); 06314 if (error) 06315 return error; 06316 } 06317 06318 /* 06319 Now, table->record[1] should contain the offending row. That 06320 will enable us to update it or, alternatively, delete it (so 06321 that we can insert the new row afterwards). 06322 06323 REPLACE is defined as either INSERT or DELETE + INSERT. If 06324 possible, we can replace it with an UPDATE, but that will not 06325 work on InnoDB if FOREIGN KEY checks are necessary. 06326 06327 I (Matz) am not sure of the reason for the last_uniq_key() 06328 check as, but I'm guessing that it's something along the 06329 following lines. 06330 06331 Suppose that we got the duplicate key to be a key that is not 06332 the last unique key for the table and we perform an update: 06333 then there might be another key for which the unique check will 06334 fail, so we're better off just deleting the row and inserting 06335 the correct row. 06336 */ 06337 if (last_uniq_key(table, keynum) && 06338 !table->file->referenced_by_foreign_key()) 06339 { 06340 error=table->file->ha_update_row(table->record[1], 06341 table->record[0]); 06342 return error; 06343 } 06344 else 06345 { 06346 if ((error= table->file->ha_delete_row(table->record[1]))) 06347 return error; 06348 /* Will retry ha_write_row() with the offending row removed. */ 06349 } 06350 } 06351 return error; 06352 } 06353 06354 int Write_rows_log_event::do_exec_row(TABLE *table) 06355 { 06356 DBUG_ASSERT(table != NULL); 06357 int error= replace_record(thd, table); 06358 return error; 06359 } 06360 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ 06361 06362 #ifdef MYSQL_CLIENT 06363 void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) 06364 { 06365 if (!print_event_info->short_form) 06366 { 06367 print_header(file, print_event_info); 06368 fprintf(file, "\tWrite_rows: table id %lu", m_table_id); 06369 print_base64(file, print_event_info); 06370 } 06371 } 06372 #endif 06373 06374 /************************************************************************** 06375 Delete_rows_log_event member functions 06376 **************************************************************************/ 06377 06378 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 06379 /* 06380 Compares table->record[0] and table->record[1] 06381 06382 Returns TRUE if different. 06383 */ 06384 static bool record_compare(TABLE *table) 06385 { 06386 if (table->s->blob_fields + table->s->varchar_fields == 0) 06387 return cmp_record(table,record[1]); 06388 /* Compare null bits */ 06389 if (memcmp(table->null_flags, 06390 table->null_flags+table->s->rec_buff_length, 06391 table->s->null_bytes)) 06392 return TRUE; // Diff in NULL value 06393 /* Compare updated fields */ 06394 for (Field **ptr=table->field ; *ptr ; ptr++) 06395 { 06396 if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length)) 06397 return TRUE; 06398 } 06399 return FALSE; 06400 } 06401 06402 06403 /* 06404 Find the row given by 'key', if the table has keys, or else use a table scan 06405 to find (and fetch) the row. If the engine allows random access of the 06406 records, a combination of position() and rnd_pos() will be used. 06407 */ 06408 06409 static int find_and_fetch_row(TABLE *table, byte *key) 06410 { 06411 DBUG_ENTER("find_and_fetch_row(TABLE *table, byte *key, byte *record)"); 06412 DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx", 06413 (long) table, (long) key, (long) table->record[1])); 06414 06415 DBUG_ASSERT(table->in_use != NULL); 06416 06417 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && 06418 table->s->primary_key < MAX_KEY) 06419 { 06420 /* 06421 Use a more efficient method to fetch the record given by 06422 table->record[0] if the engine allows it. We first compute a 06423 row reference using the position() member function (it will be 06424 stored in table->file->ref) and the use rnd_pos() to position 06425 the "cursor" (i.e., record[0] in this case) at the correct row. 06426 */ 06427 table->file->position(table->record[0]); 06428 DBUG_RETURN(table->file->rnd_pos(table->record[0], table->file->ref)); 06429 } 06430 06431 DBUG_ASSERT(table->record[1]); 06432 06433 /* We need to retrieve all fields */ 06434 /* TODO: Move this out from this function to main loop */ 06435 table->use_all_columns(); 06436 06437 if (table->s->keys > 0) 06438 { 06439 int error; 06440 /* We have a key: search the table using the index */ 06441 if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) 06442 return error; 06443 06444 /* 06445 We need to set the null bytes to ensure that the filler bit are 06446 all set when returning. There are storage engines that just set 06447 the necessary bits on the bytes and don't set the filler bits 06448 correctly. 06449 */ 06450 my_ptrdiff_t const pos= 06451 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; 06452 table->record[1][pos]= 0xFF; 06453 if ((error= table->file->index_read(table->record[1], key, 06454 table->key_info->key_length, 06455 HA_READ_KEY_EXACT))) 06456 { 06457 table->file->print_error(error, MYF(0)); 06458 table->file->ha_index_end(); 06459 DBUG_RETURN(error); 06460 } 06461 06462 /* 06463 Below is a minor "optimization". If the key (i.e., key number 06464 0) has the HA_NOSAME flag set, we know that we have found the 06465 correct record (since there can be no duplicates); otherwise, we 06466 have to compare the record with the one found to see if it is 06467 the correct one. 06468 06469 CAVEAT! This behaviour is essential for the replication of, 06470 e.g., the mysql.proc table since the correct record *shall* be 06471 found using the primary key *only*. There shall be no 06472 comparison of non-PK columns to decide if the correct record is 06473 found. I can see no scenario where it would be incorrect to 06474 chose the row to change only using a PK or an UNNI. 06475 */ 06476 if (table->key_info->flags & HA_NOSAME) 06477 { 06478 table->file->ha_index_end(); 06479 DBUG_RETURN(0); 06480 } 06481 06482 while (record_compare(table)) 06483 { 06484 int error; 06485 /* 06486 We need to set the null bytes to ensure that the filler bit 06487 are all set when returning. There are storage engines that 06488 just set the necessary bits on the bytes and don't set the 06489 filler bits correctly. 06490 */ 06491 my_ptrdiff_t const pos= 06492 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; 06493 table->record[1][pos]= 0xFF; 06494 if ((error= table->file->index_next(table->record[1]))) 06495 { 06496 table->file->print_error(error, MYF(0)); 06497 table->file->ha_index_end(); 06498 DBUG_RETURN(error); 06499 } 06500 } 06501 06502 /* 06503 Have to restart the scan to be able to fetch the next row. 06504 */ 06505 table->file->ha_index_end(); 06506 } 06507 else 06508 { 06509 int restart_count= 0; // Number of times scanning has restarted from top 06510 int error; 06511 06512 /* We don't have a key: search the table using rnd_next() */ 06513 if ((error= table->file->ha_rnd_init(1))) 06514 return error; 06515 06516 /* Continue until we find the right record or have made a full loop */ 06517 do 06518 { 06519 /* 06520 We need to set the null bytes to ensure that the filler bit 06521 are all set when returning. There are storage engines that 06522 just set the necessary bits on the bytes and don't set the 06523 filler bits correctly. 06524 */ 06525 my_ptrdiff_t const pos= 06526 table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; 06527 table->record[1][pos]= 0xFF; 06528 error= table->file->rnd_next(table->record[1]); 06529 switch (error) 06530 { 06531 case 0: 06532 case HA_ERR_RECORD_DELETED: 06533 break; 06534 06535 case HA_ERR_END_OF_FILE: 06536 if (++restart_count < 2) 06537 table->file->ha_rnd_init(1); 06538 break; 06539 06540 default: 06541 table->file->print_error(error, MYF(0)); 06542 table->file->ha_rnd_end(); 06543 DBUG_RETURN(error); 06544 } 06545 } 06546 while (restart_count < 2 && record_compare(table)); 06547 06548 /* 06549 Have to restart the scan to be able to fetch the next row. 06550 */ 06551 table->file->ha_rnd_end(); 06552 06553 DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); 06554 DBUG_RETURN(error); 06555 } 06556 06557 DBUG_RETURN(0); 06558 } 06559 #endif 06560 06561 /* 06562 Constructor used to build an event for writing to the binary log. 06563 */ 06564 06565 #ifndef MYSQL_CLIENT 06566 Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, 06567 ulong tid, MY_BITMAP const *cols, 06568 bool is_transactional) 06569 : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) 06570 #ifdef HAVE_REPLICATION 06571 ,m_memory(NULL), m_key(NULL), m_after_image(NULL) 06572 #endif 06573 { 06574 } 06575 #endif /* #if !defined(MYSQL_CLIENT) */ 06576 06577 /* 06578 Constructor used by slave to read the event from the binary log. 06579 */ 06580 #ifdef HAVE_REPLICATION 06581 Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, 06582 const Format_description_log_event 06583 *description_event) 06584 #if defined(MYSQL_CLIENT) 06585 : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event) 06586 #else 06587 : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event), 06588 m_memory(NULL), m_key(NULL), m_after_image(NULL) 06589 #endif 06590 { 06591 } 06592 #endif 06593 06594 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 06595 int Delete_rows_log_event::do_before_row_operations(TABLE *table) 06596 { 06597 DBUG_ASSERT(m_memory == NULL); 06598 06599 if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && 06600 table->s->primary_key < MAX_KEY) 06601 { 06602 /* 06603 We don't need to allocate any memory for m_after_image and 06604 m_key since they are not used. 06605 */ 06606 return 0; 06607 } 06608 06609 int error= 0; 06610 06611 if (table->s->keys > 0) 06612 { 06613 m_memory= 06614 my_multi_malloc(MYF(MY_WME), 06615 &m_after_image, table->s->reclength, 06616 &m_key, table->key_info->key_length, 06617 NULL); 06618 } 06619 else 06620 { 06621 m_after_image= (byte*)my_malloc(table->s->reclength, MYF(MY_WME)); 06622 m_memory= (gptr)m_after_image; 06623 m_key= NULL; 06624 } 06625 if (!m_memory) 06626 return HA_ERR_OUT_OF_MEM; 06627 06628 return error; 06629 } 06630 06631 int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error) 06632 { 06633 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ 06634 table->file->ha_index_or_rnd_end(); 06635 my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc 06636 m_memory= NULL; 06637 m_after_image= NULL; 06638 m_key= NULL; 06639 06640 return error; 06641 } 06642 06643 char const *Delete_rows_log_event::do_prepare_row(THD *thd, TABLE *table, 06644 char const *row_start) 06645 { 06646 char const *ptr= row_start; 06647 DBUG_ASSERT(ptr); 06648 /* 06649 This assertion actually checks that there is at least as many 06650 columns on the slave as on the master. 06651 */ 06652 DBUG_ASSERT(table->s->fields >= m_width); 06653 06654 DBUG_ASSERT(ptr != NULL); 06655 ptr= unpack_row(table, table->record[0], ptr, &m_cols); 06656 06657 /* 06658 If we will access rows using the random access method, m_key will 06659 be set to NULL, so we do not need to make a key copy in that case. 06660 */ 06661 if (m_key) 06662 { 06663 KEY *const key_info= table->key_info; 06664 06665 key_copy(m_key, table->record[0], key_info, 0); 06666 } 06667 06668 return ptr; 06669 } 06670 06671 int Delete_rows_log_event::do_exec_row(TABLE *table) 06672 { 06673 int error; 06674 DBUG_ASSERT(table != NULL); 06675 06676 if (!(error= find_and_fetch_row(table, m_key))) 06677 { 06678 /* 06679 Now we should have the right row to delete. We are using 06680 record[0] since it is guaranteed to point to a record with the 06681 correct value. 06682 */ 06683 error= table->file->ha_delete_row(table->record[0]); 06684 } 06685 return error; 06686 } 06687 06688 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ 06689 06690 #ifdef MYSQL_CLIENT 06691 void Delete_rows_log_event::print(FILE *file, 06692 PRINT_EVENT_INFO* print_event_info) 06693 { 06694 if (!print_event_info->short_form) 06695 { 06696 print_header(file, print_event_info); 06697 fprintf(file, "\tDelete_rows: table id %lu", m_table_id); 06698 print_base64(file, print_event_info); 06699 } 06700 } 06701 #endif 06702 06703 06704 /************************************************************************** 06705 Update_rows_log_event member functions 06706 **************************************************************************/ 06707 06708 /* 06709 Constructor used to build an event for writing to the binary log. 06710 */ 06711 #if !defined(MYSQL_CLIENT) 06712 Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, 06713 ulong tid, MY_BITMAP const *cols, 06714 bool is_transactional) 06715 : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) 06716 #ifdef HAVE_REPLICATION 06717 , m_memory(NULL), m_key(NULL) 06718 #endif 06719 { 06720 } 06721 #endif /* !defined(MYSQL_CLIENT) */ 06722 06723 /* 06724 Constructor used by slave to read the event from the binary log. 06725 */ 06726 #ifdef HAVE_REPLICATION 06727 Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, 06728 const 06729 Format_description_log_event 06730 *description_event) 06731 #if defined(MYSQL_CLIENT) 06732 : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event) 06733 #else 06734 : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event), 06735 m_memory(NULL), m_key(NULL) 06736 #endif 06737 { 06738 } 06739 #endif 06740 06741 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) 06742 int Update_rows_log_event::do_before_row_operations(TABLE *table) 06743 { 06744 DBUG_ASSERT(m_memory == NULL); 06745 06746 int error= 0; 06747 06748 if (table->s->keys > 0) 06749 { 06750 m_memory= 06751 my_multi_malloc(MYF(MY_WME), 06752 &m_after_image, table->s->reclength, 06753 &m_key, table->key_info->key_length, 06754 NULL); 06755 } 06756 else 06757 { 06758 m_after_image= (byte*)my_malloc(table->s->reclength, MYF(MY_WME)); 06759 m_memory= (gptr)m_after_image; 06760 m_key= NULL; 06761 } 06762 if (!m_memory) 06763 return HA_ERR_OUT_OF_MEM; 06764 06765 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; 06766 06767 return error; 06768 } 06769 06770 int Update_rows_log_event::do_after_row_operations(TABLE *table, int error) 06771 { 06772 /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ 06773 table->file->ha_index_or_rnd_end(); 06774 my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); 06775 m_memory= NULL; 06776 m_after_image= NULL; 06777 m_key= NULL; 06778 06779 return error; 06780 } 06781 06782 char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table, 06783 char const *row_start) 06784 { 06785 char const *ptr= row_start; 06786 DBUG_ASSERT(ptr); 06787 /* 06788 This assertion actually checks that there is at least as many 06789 columns on the slave as on the master. 06790 */ 06791 DBUG_ASSERT(table->s->fields >= m_width); 06792 06793 /* record[0] is the before image for the update */ 06794 ptr= unpack_row(table, table->record[0], ptr, &m_cols); 06795 DBUG_ASSERT(ptr != NULL); 06796 /* m_after_image is the after image for the update */ 06797 ptr= unpack_row(table, m_after_image, ptr, &m_cols); 06798 06799 /* 06800 If we will access rows using the random access method, m_key will 06801 be set to NULL, so we do not need to make a key copy in that case. 06802 */ 06803 if (m_key) 06804 { 06805 KEY *const key_info= table->key_info; 06806 06807 key_copy(m_key, table->record[0], key_info, 0); 06808 } 06809 06810 return ptr; 06811 } 06812 06813 int Update_rows_log_event::do_exec_row(TABLE *table) 06814 { 06815 DBUG_ASSERT(table != NULL); 06816 06817 int error= find_and_fetch_row(table, m_key); 06818 if (error) 06819 return error; 06820 06821 /* 06822 We have to ensure that the new record (i.e., the after image) is 06823 in record[0] and the old record (i.e., the before image) is in 06824 record[1]. This since some storage engines require this (for 06825 example, the partition engine). 06826 06827 Since find_and_fetch_row() puts the fetched record (i.e., the old 06828 record) in record[0], we have to move it out of the way and into 06829 record[1]. After that, we can put the new record (i.e., the after 06830 image) into record[0]. 06831 */ 06832 bmove_align(table->record[1], table->record[0], table->s->reclength); 06833 bmove_align(table->record[0], m_after_image, table->s->reclength); 06834 06835 /* 06836 Now we should have the right row to update. The old row (the one 06837 we're looking for) has to be in record[1] and the new row has to 06838 be in record[0] for all storage engines to work correctly. 06839 */ 06840 error= table->file->ha_update_row(table->record[1], table->record[0]); 06841 06842 return error; 06843 } 06844 #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ 06845 06846 #ifdef MYSQL_CLIENT 06847 void Update_rows_log_event::print(FILE *file, 06848 PRINT_EVENT_INFO* print_event_info) 06849 { 06850 if (!print_event_info->short_form) 06851 { 06852 print_header(file, print_event_info); 06853 fprintf(file, "\tUpdate_rows: table id %lu", m_table_id); 06854 print_base64(file, print_event_info); 06855 } 06856 } 06857 #endif 06858 06859 #endif /* defined(HAVE_ROW_BASED_REPLICATION) */
1.4.7

