00001 /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB & Sasha 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include "mysql_priv.h" 00018 #ifdef HAVE_REPLICATION 00019 00020 #include "repl_failsafe.h" 00021 #include "sql_repl.h" 00022 #include "slave.h" 00023 #include "rpl_filter.h" 00024 #include "log_event.h" 00025 #include <mysql.h> 00026 00027 #define SLAVE_LIST_CHUNK 128 00028 #define SLAVE_ERRMSG_SIZE (FN_REFLEN+64) 00029 00030 00031 RPL_STATUS rpl_status=RPL_NULL; 00032 pthread_mutex_t LOCK_rpl_status; 00033 pthread_cond_t COND_rpl_status; 00034 HASH slave_list; 00035 00036 const char *rpl_role_type[] = {"MASTER","SLAVE",NullS}; 00037 TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"", 00038 rpl_role_type, NULL}; 00039 00040 const char* rpl_status_type[]= 00041 { 00042 "AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", "LOST_SOLDIER","TROOP_SOLDIER", 00043 "RECOVERY_CAPTAIN","NULL",NullS 00044 }; 00045 TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"", 00046 rpl_status_type, NULL}; 00047 00048 00049 static Slave_log_event* find_slave_event(IO_CACHE* log, 00050 const char* log_file_name, 00051 char* errmsg); 00052 00053 /* 00054 All of the functions defined in this file which are not used (the ones to 00055 handle failsafe) are not used; their code has not been updated for more than 00056 one year now so should be considered as BADLY BROKEN. Do not enable it. 00057 The used functions (to handle LOAD DATA FROM MASTER, plus some small 00058 functions like register_slave()) are working. 00059 */ 00060 00061 static int init_failsafe_rpl_thread(THD* thd) 00062 { 00063 DBUG_ENTER("init_failsafe_rpl_thread"); 00064 thd->system_thread = SYSTEM_THREAD_DELAYED_INSERT; 00065 /* 00066 thd->bootstrap is to report errors barely to stderr; if this code is 00067 enable again one day, one should check if bootstrap is still needed (maybe 00068 this thread has no other error reporting method). 00069 */ 00070 thd->bootstrap = 1; 00071 thd->security_ctx->skip_grants(); 00072 my_net_init(&thd->net, 0); 00073 thd->net.read_timeout = slave_net_timeout; 00074 thd->max_client_packet_length=thd->net.max_packet; 00075 pthread_mutex_lock(&LOCK_thread_count); 00076 thd->thread_id = thread_id++; 00077 pthread_mutex_unlock(&LOCK_thread_count); 00078 00079 if (init_thr_lock() || thd->store_globals()) 00080 { 00081 close_connection(thd, ER_OUT_OF_RESOURCES, 1); // is this needed? 00082 statistic_increment(aborted_connects,&LOCK_status); 00083 end_thread(thd,0); 00084 DBUG_RETURN(-1); 00085 } 00086 00087 #if !defined(__WIN__) && !defined(__NETWARE__) 00088 sigset_t set; 00089 VOID(sigemptyset(&set)); // Get mask in use 00090 VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); 00091 #endif 00092 00093 thd->mem_root->free= thd->mem_root->used= 0; 00094 if (thd->variables.max_join_size == HA_POS_ERROR) 00095 thd->options|= OPTION_BIG_SELECTS; 00096 00097 thd->proc_info="Thread initialized"; 00098 thd->version=refresh_version; 00099 thd->set_time(); 00100 DBUG_RETURN(0); 00101 } 00102 00103 00104 void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status) 00105 { 00106 pthread_mutex_lock(&LOCK_rpl_status); 00107 if (rpl_status == from_status || rpl_status == RPL_ANY) 00108 rpl_status = to_status; 00109 pthread_cond_signal(&COND_rpl_status); 00110 pthread_mutex_unlock(&LOCK_rpl_status); 00111 } 00112 00113 00114 #define get_object(p, obj) \ 00115 {\ 00116 uint len = (uint)*p++; \ 00117 if (p + len > p_end || len >= sizeof(obj)) \ 00118 goto err; \ 00119 strmake(obj,(char*) p,len); \ 00120 p+= len; \ 00121 }\ 00122 00123 00124 static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi) 00125 { 00126 return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name, 00127 mi->pos); 00128 } 00129 00130 00131 void unregister_slave(THD* thd, bool only_mine, bool need_mutex) 00132 { 00133 if (thd->server_id) 00134 { 00135 if (need_mutex) 00136 pthread_mutex_lock(&LOCK_slave_list); 00137 00138 SLAVE_INFO* old_si; 00139 if ((old_si = (SLAVE_INFO*)hash_search(&slave_list, 00140 (byte*)&thd->server_id, 4)) && 00141 (!only_mine || old_si->thd == thd)) 00142 hash_delete(&slave_list, (byte*)old_si); 00143 00144 if (need_mutex) 00145 pthread_mutex_unlock(&LOCK_slave_list); 00146 } 00147 } 00148 00149 00150 /* 00151 Register slave in 'slave_list' hash table 00152 00153 RETURN VALUES 00154 0 ok 00155 1 Error. Error message sent to client 00156 */ 00157 00158 int register_slave(THD* thd, uchar* packet, uint packet_length) 00159 { 00160 int res; 00161 SLAVE_INFO *si; 00162 uchar *p= packet, *p_end= packet + packet_length; 00163 00164 if (check_access(thd, REPL_SLAVE_ACL, any_db,0,0,0,0)) 00165 return 1; 00166 if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) 00167 goto err2; 00168 00169 thd->server_id= si->server_id= uint4korr(p); 00170 p+= 4; 00171 get_object(p,si->host); 00172 get_object(p,si->user); 00173 get_object(p,si->password); 00174 if (p+10 > p_end) 00175 goto err; 00176 si->port= uint2korr(p); 00177 p += 2; 00178 si->rpl_recovery_rank= uint4korr(p); 00179 p += 4; 00180 if (!(si->master_id= uint4korr(p))) 00181 si->master_id= server_id; 00182 si->thd= thd; 00183 00184 pthread_mutex_lock(&LOCK_slave_list); 00185 unregister_slave(thd,0,0); 00186 res= my_hash_insert(&slave_list, (byte*) si); 00187 pthread_mutex_unlock(&LOCK_slave_list); 00188 return res; 00189 00190 err: 00191 my_free((gptr) si, MYF(MY_WME)); 00192 my_message(ER_UNKNOWN_ERROR, "Wrong parameters to function register_slave", 00193 MYF(0)); 00194 err2: 00195 return 1; 00196 } 00197 00198 extern "C" uint32 00199 *slave_list_key(SLAVE_INFO* si, uint* len, 00200 my_bool not_used __attribute__((unused))) 00201 { 00202 *len = 4; 00203 return &si->server_id; 00204 } 00205 00206 extern "C" void slave_info_free(void *s) 00207 { 00208 my_free((gptr) s, MYF(MY_WME)); 00209 } 00210 00211 void init_slave_list() 00212 { 00213 hash_init(&slave_list, system_charset_info, SLAVE_LIST_CHUNK, 0, 0, 00214 (hash_get_key) slave_list_key, (hash_free_key) slave_info_free, 0); 00215 pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST); 00216 } 00217 00218 void end_slave_list() 00219 { 00220 /* No protection by a mutex needed as we are only called at shutdown */ 00221 if (hash_inited(&slave_list)) 00222 { 00223 hash_free(&slave_list); 00224 pthread_mutex_destroy(&LOCK_slave_list); 00225 } 00226 } 00227 00228 static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg) 00229 { 00230 my_off_t log_pos = (my_off_t) mi->pos; 00231 uint32 target_server_id = mi->server_id; 00232 00233 for (;;) 00234 { 00235 Log_event* ev; 00236 if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0))) 00237 { 00238 if (log->error > 0) 00239 strmov(errmsg, "Binary log truncated in the middle of event"); 00240 else if (log->error < 0) 00241 strmov(errmsg, "I/O error reading binary log"); 00242 else 00243 strmov(errmsg, "Could not find target event in the binary log"); 00244 return 1; 00245 } 00246 00247 if (ev->log_pos >= log_pos && ev->server_id == target_server_id) 00248 { 00249 delete ev; 00250 mi->pos = my_b_tell(log); 00251 return 0; 00252 } 00253 delete ev; 00254 } 00255 /* Impossible */ 00256 } 00257 00258 /* 00259 Before 4.0.15 we had a member of THD called log_pos, it was meant for 00260 failsafe replication code in repl_failsafe.cc which is disabled until 00261 it is reworked. Event's log_pos used to be preserved through 00262 log-slave-updates to make code in repl_failsafe.cc work (this 00263 function, SHOW NEW MASTER); but on the other side it caused unexpected 00264 values in Exec_Master_Log_Pos in A->B->C replication setup, 00265 synchronization problems in master_pos_wait(), ... So we 00266 (Dmitri & Guilhem) removed it. 00267 00268 So for now this function is broken. 00269 */ 00270 00271 int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) 00272 { 00273 LOG_INFO linfo; 00274 char last_log_name[FN_REFLEN]; 00275 IO_CACHE log; 00276 File file = -1, last_file = -1; 00277 pthread_mutex_t *log_lock; 00278 const char* errmsg_p; 00279 Slave_log_event* sev = 0; 00280 my_off_t last_pos = 0; 00281 int error = 1; 00282 int cmp_res; 00283 LINT_INIT(cmp_res); 00284 DBUG_ENTER("translate_master"); 00285 00286 if (!mysql_bin_log.is_open()) 00287 { 00288 strmov(errmsg,"Binary log is not open"); 00289 DBUG_RETURN(1); 00290 } 00291 00292 if (!server_id_supplied) 00293 { 00294 strmov(errmsg, "Misconfigured master - server id was not set"); 00295 DBUG_RETURN(1); 00296 } 00297 00298 if (mysql_bin_log.find_log_pos(&linfo, NullS, 1)) 00299 { 00300 strmov(errmsg,"Could not find first log"); 00301 DBUG_RETURN(1); 00302 } 00303 thd->current_linfo = &linfo; 00304 00305 bzero((char*) &log,sizeof(log)); 00306 log_lock = mysql_bin_log.get_log_lock(); 00307 pthread_mutex_lock(log_lock); 00308 00309 for (;;) 00310 { 00311 if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0) 00312 { 00313 strmov(errmsg, errmsg_p); 00314 goto err; 00315 } 00316 00317 if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg))) 00318 goto err; 00319 00320 cmp_res = cmp_master_pos(sev, mi); 00321 delete sev; 00322 00323 if (!cmp_res) 00324 { 00325 /* Copy basename */ 00326 fn_format(mi->log_file_name, linfo.log_file_name, "","",1); 00327 mi->pos = my_b_tell(&log); 00328 goto mi_inited; 00329 } 00330 else if (cmp_res > 0) 00331 { 00332 if (!last_pos) 00333 { 00334 strmov(errmsg, 00335 "Slave event in first log points past the target position"); 00336 goto err; 00337 } 00338 end_io_cache(&log); 00339 (void) my_close(file, MYF(MY_WME)); 00340 if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0, 00341 MYF(MY_WME))) 00342 { 00343 errmsg[0] = 0; 00344 goto err; 00345 } 00346 break; 00347 } 00348 00349 strmov(last_log_name, linfo.log_file_name); 00350 last_pos = my_b_tell(&log); 00351 00352 switch (mysql_bin_log.find_next_log(&linfo, 1)) { 00353 case LOG_INFO_EOF: 00354 if (last_file >= 0) 00355 (void)my_close(last_file, MYF(MY_WME)); 00356 last_file = -1; 00357 goto found_log; 00358 case 0: 00359 break; 00360 default: 00361 strmov(errmsg, "Error reading log index"); 00362 goto err; 00363 } 00364 00365 end_io_cache(&log); 00366 if (last_file >= 0) 00367 (void) my_close(last_file, MYF(MY_WME)); 00368 last_file = file; 00369 } 00370 00371 found_log: 00372 my_b_seek(&log, last_pos); 00373 if (find_target_pos(mi,&log,errmsg)) 00374 goto err; 00375 fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */ 00376 00377 mi_inited: 00378 error = 0; 00379 err: 00380 pthread_mutex_unlock(log_lock); 00381 end_io_cache(&log); 00382 pthread_mutex_lock(&LOCK_thread_count); 00383 thd->current_linfo = 0; 00384 pthread_mutex_unlock(&LOCK_thread_count); 00385 if (file >= 0) 00386 (void) my_close(file, MYF(MY_WME)); 00387 if (last_file >= 0 && last_file != file) 00388 (void) my_close(last_file, MYF(MY_WME)); 00389 00390 DBUG_RETURN(error); 00391 } 00392 00393 00394 /* 00395 Caller must delete result when done 00396 */ 00397 00398 static Slave_log_event* find_slave_event(IO_CACHE* log, 00399 const char* log_file_name, 00400 char* errmsg) 00401 { 00402 Log_event* ev; 00403 int i; 00404 bool slave_event_found = 0; 00405 LINT_INIT(ev); 00406 00407 for (i = 0; i < 2; i++) 00408 { 00409 if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0))) 00410 { 00411 my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, 00412 "Error reading event in log '%s'", 00413 (char*)log_file_name); 00414 return 0; 00415 } 00416 if (ev->get_type_code() == SLAVE_EVENT) 00417 { 00418 slave_event_found = 1; 00419 break; 00420 } 00421 delete ev; 00422 } 00423 if (!slave_event_found) 00424 { 00425 my_snprintf(errmsg, SLAVE_ERRMSG_SIZE, 00426 "Could not find slave event in log '%s'", 00427 (char*)log_file_name); 00428 return 0; 00429 } 00430 00431 return (Slave_log_event*)ev; 00432 } 00433 00434 /* 00435 This function is broken now. See comment for translate_master(). 00436 */ 00437 00438 bool show_new_master(THD* thd) 00439 { 00440 Protocol *protocol= thd->protocol; 00441 DBUG_ENTER("show_new_master"); 00442 List<Item> field_list; 00443 char errmsg[SLAVE_ERRMSG_SIZE]; 00444 LEX_MASTER_INFO* lex_mi= &thd->lex->mi; 00445 00446 errmsg[0]=0; // Safety 00447 if (translate_master(thd, lex_mi, errmsg)) 00448 { 00449 if (errmsg[0]) 00450 my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), 00451 "SHOW NEW MASTER", errmsg); 00452 DBUG_RETURN(TRUE); 00453 } 00454 else 00455 { 00456 field_list.push_back(new Item_empty_string("Log_name", 20)); 00457 field_list.push_back(new Item_return_int("Log_pos", 10, 00458 MYSQL_TYPE_LONGLONG)); 00459 if (protocol->send_fields(&field_list, 00460 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) 00461 DBUG_RETURN(TRUE); 00462 protocol->prepare_for_resend(); 00463 protocol->store(lex_mi->log_file_name, &my_charset_bin); 00464 protocol->store((ulonglong) lex_mi->pos); 00465 if (protocol->write()) 00466 DBUG_RETURN(TRUE); 00467 send_eof(thd); 00468 DBUG_RETURN(FALSE); 00469 } 00470 } 00471 00472 /* 00473 Asks the master for the list of its other connected slaves. 00474 This is for failsafe replication: 00475 in order for failsafe replication to work, the servers involved in 00476 replication must know of each other. We accomplish this by having each 00477 slave report to the master how to reach it, and on connection, each 00478 slave receives information about where the other slaves are. 00479 00480 SYNOPSIS 00481 update_slave_list() 00482 mysql pre-existing connection to the master 00483 mi master info 00484 00485 NOTES 00486 mi is used only to give detailed error messages which include the 00487 hostname/port of the master, the username used by the slave to connect to 00488 the master. 00489 If the user used by the slave to connect to the master does not have the 00490 REPLICATION SLAVE privilege, it will pop in this function because 00491 SHOW SLAVE HOSTS will fail on the master. 00492 00493 RETURN VALUES 00494 1 error 00495 0 success 00496 */ 00497 00498 int update_slave_list(MYSQL* mysql, MASTER_INFO* mi) 00499 { 00500 MYSQL_RES* res=0; 00501 MYSQL_ROW row; 00502 const char* error=0; 00503 bool have_auth_info; 00504 int port_ind; 00505 DBUG_ENTER("update_slave_list"); 00506 00507 if (mysql_real_query(mysql, STRING_WITH_LEN("SHOW SLAVE HOSTS")) || 00508 !(res = mysql_store_result(mysql))) 00509 { 00510 error= mysql_error(mysql); 00511 goto err; 00512 } 00513 00514 switch (mysql_num_fields(res)) { 00515 case 5: 00516 have_auth_info = 0; 00517 port_ind=2; 00518 break; 00519 case 7: 00520 have_auth_info = 1; 00521 port_ind=4; 00522 break; 00523 default: 00524 error= "the master returned an invalid number of fields for SHOW SLAVE \ 00525 HOSTS"; 00526 goto err; 00527 } 00528 00529 pthread_mutex_lock(&LOCK_slave_list); 00530 00531 while ((row= mysql_fetch_row(res))) 00532 { 00533 uint32 server_id; 00534 SLAVE_INFO* si, *old_si; 00535 server_id = atoi(row[0]); 00536 if ((old_si= (SLAVE_INFO*)hash_search(&slave_list, 00537 (byte*)&server_id,4))) 00538 si = old_si; 00539 else 00540 { 00541 if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) 00542 { 00543 error= "the slave is out of memory"; 00544 pthread_mutex_unlock(&LOCK_slave_list); 00545 goto err; 00546 } 00547 si->server_id = server_id; 00548 my_hash_insert(&slave_list, (byte*)si); 00549 } 00550 strmake(si->host, row[1], sizeof(si->host)-1); 00551 si->port = atoi(row[port_ind]); 00552 si->rpl_recovery_rank = atoi(row[port_ind+1]); 00553 si->master_id = atoi(row[port_ind+2]); 00554 if (have_auth_info) 00555 { 00556 strmake(si->user, row[2], sizeof(si->user)-1); 00557 strmake(si->password, row[3], sizeof(si->password)-1); 00558 } 00559 } 00560 pthread_mutex_unlock(&LOCK_slave_list); 00561 00562 err: 00563 if (res) 00564 mysql_free_result(res); 00565 if (error) 00566 { 00567 sql_print_error("While trying to obtain the list of slaves from the master \ 00568 '%s:%d', user '%s' got the following error: '%s'", 00569 mi->host, mi->port, mi->user, error); 00570 DBUG_RETURN(1); 00571 } 00572 DBUG_RETURN(0); 00573 } 00574 00575 00576 int find_recovery_captain(THD* thd, MYSQL* mysql) 00577 { 00578 return 0; 00579 } 00580 00581 00582 pthread_handler_t handle_failsafe_rpl(void *arg) 00583 { 00584 DBUG_ENTER("handle_failsafe_rpl"); 00585 THD *thd = new THD; 00586 thd->thread_stack = (char*)&thd; 00587 MYSQL* recovery_captain = 0; 00588 const char* msg; 00589 00590 pthread_detach_this_thread(); 00591 if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mysql_init(0))) 00592 { 00593 sql_print_error("Could not initialize failsafe replication thread"); 00594 goto err; 00595 } 00596 pthread_mutex_lock(&LOCK_rpl_status); 00597 msg= thd->enter_cond(&COND_rpl_status, 00598 &LOCK_rpl_status, "Waiting for request"); 00599 while (!thd->killed && !abort_loop) 00600 { 00601 bool break_req_chain = 0; 00602 pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status); 00603 thd->proc_info="Processing request"; 00604 while (!break_req_chain) 00605 { 00606 switch (rpl_status) { 00607 case RPL_LOST_SOLDIER: 00608 if (find_recovery_captain(thd, recovery_captain)) 00609 rpl_status=RPL_TROOP_SOLDIER; 00610 else 00611 rpl_status=RPL_RECOVERY_CAPTAIN; 00612 break_req_chain=1; /* for now until other states are implemented */ 00613 break; 00614 default: 00615 break_req_chain=1; 00616 break; 00617 } 00618 } 00619 } 00620 thd->exit_cond(msg); 00621 err: 00622 if (recovery_captain) 00623 mysql_close(recovery_captain); 00624 delete thd; 00625 my_thread_end(); 00626 pthread_exit(0); 00627 DBUG_RETURN(0); 00628 } 00629 00630 00631 bool show_slave_hosts(THD* thd) 00632 { 00633 List<Item> field_list; 00634 Protocol *protocol= thd->protocol; 00635 DBUG_ENTER("show_slave_hosts"); 00636 00637 field_list.push_back(new Item_return_int("Server_id", 10, 00638 MYSQL_TYPE_LONG)); 00639 field_list.push_back(new Item_empty_string("Host", 20)); 00640 if (opt_show_slave_auth_info) 00641 { 00642 field_list.push_back(new Item_empty_string("User",20)); 00643 field_list.push_back(new Item_empty_string("Password",20)); 00644 } 00645 field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG)); 00646 field_list.push_back(new Item_return_int("Rpl_recovery_rank", 7, 00647 MYSQL_TYPE_LONG)); 00648 field_list.push_back(new Item_return_int("Master_id", 10, 00649 MYSQL_TYPE_LONG)); 00650 00651 if (protocol->send_fields(&field_list, 00652 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) 00653 DBUG_RETURN(TRUE); 00654 00655 pthread_mutex_lock(&LOCK_slave_list); 00656 00657 for (uint i = 0; i < slave_list.records; ++i) 00658 { 00659 SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i); 00660 protocol->prepare_for_resend(); 00661 protocol->store((uint32) si->server_id); 00662 protocol->store(si->host, &my_charset_bin); 00663 if (opt_show_slave_auth_info) 00664 { 00665 protocol->store(si->user, &my_charset_bin); 00666 protocol->store(si->password, &my_charset_bin); 00667 } 00668 protocol->store((uint32) si->port); 00669 protocol->store((uint32) si->rpl_recovery_rank); 00670 protocol->store((uint32) si->master_id); 00671 if (protocol->write()) 00672 { 00673 pthread_mutex_unlock(&LOCK_slave_list); 00674 DBUG_RETURN(TRUE); 00675 } 00676 } 00677 pthread_mutex_unlock(&LOCK_slave_list); 00678 send_eof(thd); 00679 DBUG_RETURN(FALSE); 00680 } 00681 00682 00683 int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi) 00684 { 00685 DBUG_ENTER("connect_to_master"); 00686 00687 if (!mi->host || !*mi->host) /* empty host */ 00688 { 00689 strmov(mysql->net.last_error, "Master is not configured"); 00690 DBUG_RETURN(1); 00691 } 00692 mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); 00693 mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); 00694 00695 #ifdef HAVE_OPENSSL 00696 if (mi->ssl) 00697 mysql_ssl_set(mysql, 00698 mi->ssl_key[0]?mi->ssl_key:0, 00699 mi->ssl_cert[0]?mi->ssl_cert:0, 00700 mi->ssl_ca[0]?mi->ssl_ca:0, 00701 mi->ssl_capath[0]?mi->ssl_capath:0, 00702 mi->ssl_cipher[0]?mi->ssl_cipher:0); 00703 #endif 00704 00705 mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); 00706 mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); 00707 if (!mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, 00708 mi->port, 0, 0)) 00709 DBUG_RETURN(1); 00710 mysql->reconnect= 1; 00711 DBUG_RETURN(0); 00712 } 00713 00714 00715 static inline void cleanup_mysql_results(MYSQL_RES* db_res, 00716 MYSQL_RES** cur, MYSQL_RES** start) 00717 { 00718 for (; cur >= start; --cur) 00719 { 00720 if (*cur) 00721 mysql_free_result(*cur); 00722 } 00723 mysql_free_result(db_res); 00724 } 00725 00726 00727 static int fetch_db_tables(THD *thd, MYSQL *mysql, const char *db, 00728 MYSQL_RES *table_res, MASTER_INFO *mi) 00729 { 00730 MYSQL_ROW row; 00731 for (row = mysql_fetch_row(table_res); row; 00732 row = mysql_fetch_row(table_res)) 00733 { 00734 TABLE_LIST table; 00735 const char* table_name= row[0]; 00736 int error; 00737 if (rpl_filter->is_on()) 00738 { 00739 bzero((char*) &table, sizeof(table)); //just for safe 00740 table.db= (char*) db; 00741 table.table_name= (char*) table_name; 00742 table.updating= 1; 00743 00744 if (!rpl_filter->tables_ok(thd->db, &table)) 00745 continue; 00746 } 00747 /* download master's table and overwrite slave's table */ 00748 if ((error= fetch_master_table(thd, db, table_name, mi, mysql, 1))) 00749 return error; 00750 } 00751 return 0; 00752 } 00753 00754 /* 00755 Load all MyISAM tables from master to this slave. 00756 00757 REQUIREMENTS 00758 - No active transaction (flush_relay_log_info would not work in this case) 00759 */ 00760 00761 bool load_master_data(THD* thd) 00762 { 00763 MYSQL mysql; 00764 MYSQL_RES* master_status_res = 0; 00765 int error = 0; 00766 const char* errmsg=0; 00767 int restart_thread_mask; 00768 HA_CREATE_INFO create_info; 00769 00770 mysql_init(&mysql); 00771 00772 /* 00773 We do not want anyone messing with the slave at all for the entire 00774 duration of the data load. 00775 */ 00776 pthread_mutex_lock(&LOCK_active_mi); 00777 lock_slave_threads(active_mi); 00778 init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/); 00779 if (restart_thread_mask && 00780 (error=terminate_slave_threads(active_mi,restart_thread_mask, 00781 1 /*skip lock*/))) 00782 { 00783 my_message(error, ER(error), MYF(0)); 00784 unlock_slave_threads(active_mi); 00785 pthread_mutex_unlock(&LOCK_active_mi); 00786 return TRUE; 00787 } 00788 00789 if (connect_to_master(thd, &mysql, active_mi)) 00790 { 00791 my_error(error= ER_CONNECT_TO_MASTER, MYF(0), mysql_error(&mysql)); 00792 goto err; 00793 } 00794 00795 // now that we are connected, get all database and tables in each 00796 { 00797 MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res; 00798 uint num_dbs; 00799 00800 if (mysql_real_query(&mysql, STRING_WITH_LEN("SHOW DATABASES")) || 00801 !(db_res = mysql_store_result(&mysql))) 00802 { 00803 my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql)); 00804 goto err; 00805 } 00806 00807 if (!(num_dbs = (uint) mysql_num_rows(db_res))) 00808 goto err; 00809 /* 00810 In theory, the master could have no databases at all 00811 and run with skip-grant 00812 */ 00813 00814 if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) 00815 { 00816 my_message(error = ER_OUTOFMEMORY, ER(ER_OUTOFMEMORY), MYF(0)); 00817 goto err; 00818 } 00819 00820 /* 00821 This is a temporary solution until we have online backup 00822 capabilities - to be replaced once online backup is working 00823 we wait to issue FLUSH TABLES WITH READ LOCK for as long as we 00824 can to minimize the lock time. 00825 */ 00826 if (mysql_real_query(&mysql, 00827 STRING_WITH_LEN("FLUSH TABLES WITH READ LOCK")) || 00828 mysql_real_query(&mysql, STRING_WITH_LEN("SHOW MASTER STATUS")) || 00829 !(master_status_res = mysql_store_result(&mysql))) 00830 { 00831 my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql)); 00832 goto err; 00833 } 00834 00835 /* 00836 Go through every table in every database, and if the replication 00837 rules allow replicating it, get it 00838 */ 00839 00840 table_res_end = table_res + num_dbs; 00841 00842 for (cur_table_res = table_res; cur_table_res < table_res_end; 00843 cur_table_res++) 00844 { 00845 // since we know how many rows we have, this can never be NULL 00846 MYSQL_ROW row = mysql_fetch_row(db_res); 00847 char* db = row[0]; 00848 00849 /* 00850 Do not replicate databases excluded by rules. We also test 00851 replicate_wild_*_table rules (replicate_wild_ignore_table='db1.%' will 00852 be considered as "ignore the 'db1' database as a whole, as it already 00853 works for CREATE DATABASE and DROP DATABASE). 00854 Also skip 'mysql' database - in most cases the user will 00855 mess up and not exclude mysql database with the rules when 00856 he actually means to - in this case, he is up for a surprise if 00857 his priv tables get dropped and downloaded from master 00858 TODO - add special option, not enabled 00859 by default, to allow inclusion of mysql database into load 00860 data from master 00861 */ 00862 00863 if (!rpl_filter->db_ok(db) || 00864 !rpl_filter->db_ok_with_wild_table(db) || 00865 !strcmp(db,"mysql") || 00866 is_schema_db(db)) 00867 { 00868 *cur_table_res = 0; 00869 continue; 00870 } 00871 00872 bzero((char*) &create_info, sizeof(create_info)); 00873 create_info.options= HA_LEX_CREATE_IF_NOT_EXISTS; 00874 00875 if (mysql_create_db(thd, db, &create_info, 1)) 00876 { 00877 cleanup_mysql_results(db_res, cur_table_res - 1, table_res); 00878 goto err; 00879 } 00880 00881 if (mysql_select_db(&mysql, db) || 00882 mysql_real_query(&mysql, STRING_WITH_LEN("SHOW TABLES")) || 00883 !(*cur_table_res = mysql_store_result(&mysql))) 00884 { 00885 my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql)); 00886 cleanup_mysql_results(db_res, cur_table_res - 1, table_res); 00887 goto err; 00888 } 00889 00890 if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi))) 00891 { 00892 // we do not report the error - fetch_db_tables handles it 00893 cleanup_mysql_results(db_res, cur_table_res, table_res); 00894 goto err; 00895 } 00896 } 00897 00898 cleanup_mysql_results(db_res, cur_table_res - 1, table_res); 00899 00900 // adjust replication coordinates from the master 00901 if (master_status_res) 00902 { 00903 MYSQL_ROW row = mysql_fetch_row(master_status_res); 00904 00905 /* 00906 We need this check because the master may not be running with 00907 log-bin, but it will still allow us to do all the steps 00908 of LOAD DATA FROM MASTER - no reason to forbid it, really, 00909 although it does not make much sense for the user to do it 00910 */ 00911 if (row && row[0] && row[1]) 00912 { 00913 /* 00914 If the slave's master info is not inited, we init it, then we write 00915 the new coordinates to it. Must call init_master_info() *before* 00916 setting active_mi, because init_master_info() sets active_mi with 00917 defaults. 00918 */ 00919 int error; 00920 00921 if (init_master_info(active_mi, master_info_file, relay_log_info_file, 00922 0, (SLAVE_IO | SLAVE_SQL))) 00923 my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); 00924 strmake(active_mi->master_log_name, row[0], 00925 sizeof(active_mi->master_log_name)); 00926 active_mi->master_log_pos= my_strtoll10(row[1], (char**) 0, &error); 00927 /* at least in recent versions, the condition below should be false */ 00928 if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE) 00929 active_mi->master_log_pos = BIN_LOG_HEADER_SIZE; 00930 /* 00931 Relay log's IO_CACHE may not be inited (even if we are sure that some 00932 host was specified; there could have been a problem when replication 00933 started, which led to relay log's IO_CACHE to not be inited. 00934 */ 00935 if (flush_master_info(active_mi, 0)) 00936 sql_print_error("Failed to flush master info file"); 00937 } 00938 mysql_free_result(master_status_res); 00939 } 00940 00941 if (mysql_real_query(&mysql, STRING_WITH_LEN("UNLOCK TABLES"))) 00942 { 00943 my_error(error= ER_QUERY_ON_MASTER, MYF(0), mysql_error(&mysql)); 00944 goto err; 00945 } 00946 } 00947 thd->proc_info="purging old relay logs"; 00948 if (purge_relay_logs(&active_mi->rli,thd, 00949 0 /* not only reset, but also reinit */, 00950 &errmsg)) 00951 { 00952 my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg); 00953 unlock_slave_threads(active_mi); 00954 pthread_mutex_unlock(&LOCK_active_mi); 00955 return TRUE; 00956 } 00957 pthread_mutex_lock(&active_mi->rli.data_lock); 00958 active_mi->rli.group_master_log_pos = active_mi->master_log_pos; 00959 strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name, 00960 sizeof(active_mi->rli.group_master_log_name)-1); 00961 /* 00962 Cancel the previous START SLAVE UNTIL, as the fact to download 00963 a new copy logically makes UNTIL irrelevant. 00964 */ 00965 clear_until_condition(&active_mi->rli); 00966 00967 /* 00968 No need to update rli.event* coordinates, they will be when the slave 00969 threads start ; only rli.group* coordinates are necessary here. 00970 */ 00971 flush_relay_log_info(&active_mi->rli); 00972 pthread_cond_broadcast(&active_mi->rli.data_cond); 00973 pthread_mutex_unlock(&active_mi->rli.data_lock); 00974 thd->proc_info = "starting slave"; 00975 if (restart_thread_mask) 00976 { 00977 error=start_slave_threads(0 /* mutex not needed */, 00978 1 /* wait for start */, 00979 active_mi,master_info_file,relay_log_info_file, 00980 restart_thread_mask); 00981 } 00982 00983 err: 00984 unlock_slave_threads(active_mi); 00985 pthread_mutex_unlock(&LOCK_active_mi); 00986 thd->proc_info = 0; 00987 00988 mysql_close(&mysql); // safe to call since we always do mysql_init() 00989 if (!error) 00990 send_ok(thd); 00991 00992 return error; 00993 } 00994 00995 #endif /* HAVE_REPLICATION */ 00996
1.4.7

