00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include <NDBT_ReturnCodes.h> 00018 #include "consumer_restore.hpp" 00019 #include <my_sys.h> 00020 #include <NdbSleep.h> 00021 00022 extern my_bool opt_core; 00023 00024 extern FilteredNdbOut err; 00025 extern FilteredNdbOut info; 00026 extern FilteredNdbOut debug; 00027 00028 static void callback(int, NdbTransaction*, void*); 00029 static Uint32 get_part_id(const NdbDictionary::Table *table, 00030 Uint32 hash_value); 00031 00032 extern const char * g_connect_string; 00033 extern BaseString g_options; 00034 00035 bool 00036 BackupRestore::init() 00037 { 00038 release(); 00039 00040 if (!m_restore && !m_restore_meta && !m_restore_epoch) 00041 return true; 00042 00043 m_cluster_connection = new Ndb_cluster_connection(g_connect_string); 00044 m_cluster_connection->set_name(g_options.c_str()); 00045 if(m_cluster_connection->connect(12, 5, 1) != 0) 00046 { 00047 return false; 00048 } 00049 00050 m_ndb = new Ndb(m_cluster_connection); 00051 00052 if (m_ndb == NULL) 00053 return false; 00054 00055 m_ndb->init(1024); 00056 if (m_ndb->waitUntilReady(30) != 0) 00057 { 00058 err << "Failed to connect to ndb!!" << endl; 00059 return false; 00060 } 00061 info << "Connected to ndb!!" << endl; 00062 00063 m_callback = new restore_callback_t[m_parallelism]; 00064 00065 if (m_callback == 0) 00066 { 00067 err << "Failed to allocate callback structs" << endl; 00068 return false; 00069 } 00070 00071 m_free_callback= m_callback; 00072 for (Uint32 i= 0; i < m_parallelism; i++) { 00073 m_callback[i].restore= this; 00074 m_callback[i].connection= 0; 00075 if (i > 0) 00076 m_callback[i-1].next= &(m_callback[i]); 00077 } 00078 m_callback[m_parallelism-1].next = 0; 00079 00080 return true; 00081 } 00082 00083 void BackupRestore::release() 00084 { 00085 if (m_ndb) 00086 { 00087 delete m_ndb; 00088 m_ndb= 0; 00089 } 00090 00091 if (m_callback) 00092 { 00093 delete [] m_callback; 00094 m_callback= 0; 00095 } 00096 00097 if (m_cluster_connection) 00098 { 00099 delete m_cluster_connection; 00100 m_cluster_connection= 0; 00101 } 00102 } 00103 00104 BackupRestore::~BackupRestore() 00105 { 00106 release(); 00107 } 00108 00109 static 00110 int 00111 match_blob(const char * name){ 00112 int cnt, id1, id2; 00113 char buf[256]; 00114 if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){ 00115 return id1; 00116 } 00117 00118 return -1; 00119 } 00120 00121 const NdbDictionary::Table* 00122 BackupRestore::get_table(const NdbDictionary::Table* tab){ 00123 if(m_cache.m_old_table == tab) 00124 return m_cache.m_new_table; 00125 m_cache.m_old_table = tab; 00126 00127 int cnt, id1, id2; 00128 char db[256], schema[256]; 00129 if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", 00130 db, schema, &id1, &id2)) == 4){ 00131 m_ndb->setDatabaseName(db); 00132 m_ndb->setSchemaName(schema); 00133 00134 BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d", 00135 m_new_tables[id1]->getTableId(), id2); 00136 00137 m_cache.m_new_table = m_ndb->getDictionary()->getTable(db); 00138 00139 } else { 00140 m_cache.m_new_table = m_new_tables[tab->getTableId()]; 00141 } 00142 assert(m_cache.m_new_table); 00143 return m_cache.m_new_table; 00144 } 00145 00146 bool 00147 BackupRestore::finalize_table(const TableS & table){ 00148 bool ret= true; 00149 if (!m_restore && !m_restore_meta) 00150 return ret; 00151 if (!table.have_auto_inc()) 00152 return ret; 00153 00154 Uint64 max_val= table.get_max_auto_val(); 00155 do 00156 { 00157 Uint64 auto_val = ~(Uint64)0; 00158 int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val); 00159 if (r == -1 && m_ndb->getNdbError().status == NdbError::TemporaryError) 00160 { 00161 NdbSleep_MilliSleep(50); 00162 continue; // retry 00163 } 00164 else if (r == -1 && m_ndb->getNdbError().code != 626) 00165 { 00166 ret= false; 00167 } 00168 else if ((r == -1 && m_ndb->getNdbError().code == 626) || 00169 max_val+1 > auto_val || auto_val == ~(Uint64)0) 00170 { 00171 r= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), 00172 max_val+1, false); 00173 if (r == -1 && 00174 m_ndb->getNdbError().status == NdbError::TemporaryError) 00175 { 00176 NdbSleep_MilliSleep(50); 00177 continue; // retry 00178 } 00179 ret = (r == 0); 00180 } 00181 return (ret); 00182 } while (1); 00183 } 00184 00185 00186 static bool default_nodegroups(NdbDictionary::Table *table) 00187 { 00188 Uint16 *node_groups = (Uint16*)table->getFragmentData(); 00189 Uint32 no_parts = table->getFragmentDataLen() >> 1; 00190 Uint32 i; 00191 00192 if (node_groups[0] != 0) 00193 return false; 00194 for (i = 1; i < no_parts; i++) 00195 { 00196 if (node_groups[i] != UNDEF_NODEGROUP) 00197 return false; 00198 } 00199 return true; 00200 } 00201 00202 00203 static Uint32 get_no_fragments(Uint64 max_rows, Uint32 no_nodes) 00204 { 00205 Uint32 i = 0; 00206 Uint32 acc_row_size = 27; 00207 Uint32 acc_fragment_size = 512*1024*1024; 00208 Uint32 no_parts= (max_rows*acc_row_size)/acc_fragment_size + 1; 00209 Uint32 reported_parts = no_nodes; 00210 while (reported_parts < no_parts && ++i < 4 && 00211 (reported_parts + no_parts) < MAX_NDB_PARTITIONS) 00212 reported_parts+= no_nodes; 00213 if (reported_parts < no_parts) 00214 { 00215 err << "Table will be restored but will not be able to handle the maximum"; 00216 err << " amount of rows as requested" << endl; 00217 } 00218 return reported_parts; 00219 } 00220 00221 00222 static void set_default_nodegroups(NdbDictionary::Table *table) 00223 { 00224 Uint32 no_parts = table->getFragmentCount(); 00225 Uint16 node_group[MAX_NDB_PARTITIONS]; 00226 Uint32 i; 00227 00228 node_group[0] = 0; 00229 for (i = 1; i < no_parts; i++) 00230 { 00231 node_group[i] = UNDEF_NODEGROUP; 00232 } 00233 table->setFragmentData((const void*)node_group, 2 * no_parts); 00234 } 00235 00236 Uint32 BackupRestore::map_ng(Uint32 ng) 00237 { 00238 NODE_GROUP_MAP *ng_map = m_nodegroup_map; 00239 00240 if (ng == UNDEF_NODEGROUP || 00241 ng_map[ng].map_array[0] == UNDEF_NODEGROUP) 00242 { 00243 return ng; 00244 } 00245 else 00246 { 00247 Uint32 new_ng; 00248 Uint32 curr_inx = ng_map[ng].curr_index; 00249 Uint32 new_curr_inx = curr_inx + 1; 00250 00251 assert(ng < MAX_NDB_PARTITIONS); 00252 assert(curr_inx < MAX_MAPS_PER_NODE_GROUP); 00253 assert(new_curr_inx < MAX_MAPS_PER_NODE_GROUP); 00254 00255 if (new_curr_inx >= MAX_MAPS_PER_NODE_GROUP) 00256 new_curr_inx = 0; 00257 else if (ng_map[ng].map_array[new_curr_inx] == UNDEF_NODEGROUP) 00258 new_curr_inx = 0; 00259 new_ng = ng_map[ng].map_array[curr_inx]; 00260 ng_map[ng].curr_index = new_curr_inx; 00261 return new_ng; 00262 } 00263 } 00264 00265 00266 bool BackupRestore::map_nodegroups(Uint16 *ng_array, Uint32 no_parts) 00267 { 00268 Uint32 i; 00269 bool mapped = FALSE; 00270 DBUG_ENTER("map_nodegroups"); 00271 00272 assert(no_parts < MAX_NDB_PARTITIONS); 00273 for (i = 0; i < no_parts; i++) 00274 { 00275 Uint32 ng; 00276 ng = map_ng((Uint32)ng_array[i]); 00277 if (ng != ng_array[i]) 00278 mapped = TRUE; 00279 ng_array[i] = ng; 00280 } 00281 DBUG_RETURN(mapped); 00282 } 00283 00284 00285 static void copy_byte(const char **data, char **new_data, uint *len) 00286 { 00287 **new_data = **data; 00288 (*data)++; 00289 (*new_data)++; 00290 (*len)++; 00291 } 00292 00293 00294 bool BackupRestore::search_replace(char *search_str, char **new_data, 00295 const char **data, const char *end_data, 00296 uint *new_data_len) 00297 { 00298 uint search_str_len = strlen(search_str); 00299 uint inx = 0; 00300 bool in_delimiters = FALSE; 00301 bool escape_char = FALSE; 00302 char start_delimiter = 0; 00303 DBUG_ENTER("search_replace"); 00304 00305 do 00306 { 00307 char c = **data; 00308 copy_byte(data, new_data, new_data_len); 00309 if (escape_char) 00310 { 00311 escape_char = FALSE; 00312 } 00313 else if (in_delimiters) 00314 { 00315 if (c == start_delimiter) 00316 in_delimiters = FALSE; 00317 } 00318 else if (c == '\'' || c == '\"') 00319 { 00320 in_delimiters = TRUE; 00321 start_delimiter = c; 00322 } 00323 else if (c == '\\') 00324 { 00325 escape_char = TRUE; 00326 } 00327 else if (c == search_str[inx]) 00328 { 00329 inx++; 00330 if (inx == search_str_len) 00331 { 00332 bool found = FALSE; 00333 uint number = 0; 00334 while (*data != end_data) 00335 { 00336 if (isdigit(**data)) 00337 { 00338 found = TRUE; 00339 number = (10 * number) + (**data); 00340 if (number > MAX_NDB_NODES) 00341 break; 00342 } 00343 else if (found) 00344 { 00345 /* 00346 After long and tedious preparations we have actually found 00347 a node group identifier to convert. We'll use the mapping 00348 table created for node groups and then insert the new number 00349 instead of the old number. 00350 */ 00351 uint temp = map_ng(number); 00352 int no_digits = 0; 00353 char digits[10]; 00354 while (temp != 0) 00355 { 00356 digits[no_digits] = temp % 10; 00357 no_digits++; 00358 temp/=10; 00359 } 00360 for (no_digits--; no_digits >= 0; no_digits--) 00361 { 00362 **new_data = digits[no_digits]; 00363 *new_data_len+=1; 00364 } 00365 DBUG_RETURN(FALSE); 00366 } 00367 else 00368 break; 00369 (*data)++; 00370 } 00371 DBUG_RETURN(TRUE); 00372 } 00373 } 00374 else 00375 inx = 0; 00376 } while (*data < end_data); 00377 DBUG_RETURN(FALSE); 00378 } 00379 00380 bool BackupRestore::map_in_frm(char *new_data, const char *data, 00381 uint data_len, uint *new_data_len) 00382 { 00383 const char *end_data= data + data_len; 00384 const char *end_part_data; 00385 const char *part_data; 00386 char *extra_ptr; 00387 uint start_key_definition_len = uint2korr(data + 6); 00388 uint key_definition_len = uint4korr(data + 47); 00389 uint part_info_len; 00390 DBUG_ENTER("map_in_frm"); 00391 00392 if (data_len < 4096) goto error; 00393 extra_ptr = (char*)data + start_key_definition_len + key_definition_len; 00394 if ((int)data_len < ((extra_ptr - data) + 2)) goto error; 00395 extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr); 00396 if ((int)data_len < ((extra_ptr - data) + 2)) goto error; 00397 extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr); 00398 if ((int)data_len < ((extra_ptr - data) + 4)) goto error; 00399 part_info_len = uint4korr(extra_ptr); 00400 part_data = extra_ptr + 4; 00401 if ((int)data_len < ((part_data + part_info_len) - data)) goto error; 00402 00403 do 00404 { 00405 copy_byte(&data, &new_data, new_data_len); 00406 } while (data < part_data); 00407 end_part_data = part_data + part_info_len; 00408 do 00409 { 00410 if (search_replace((char*)" NODEGROUP = ", &new_data, &data, 00411 end_part_data, new_data_len)) 00412 goto error; 00413 } while (data != end_part_data); 00414 do 00415 { 00416 copy_byte(&data, &new_data, new_data_len); 00417 } while (data < end_data); 00418 DBUG_RETURN(FALSE); 00419 error: 00420 DBUG_RETURN(TRUE); 00421 } 00422 00423 00424 bool BackupRestore::translate_frm(NdbDictionary::Table *table) 00425 { 00426 const void *pack_data, *data, *new_pack_data; 00427 char *new_data; 00428 uint data_len, pack_len, new_data_len, new_pack_len; 00429 uint no_parts, extra_growth; 00430 DBUG_ENTER("translate_frm"); 00431 00432 pack_data = table->getFrmData(); 00433 no_parts = table->getFragmentCount(); 00434 /* 00435 Add max 4 characters per partition to handle worst case 00436 of mapping from single digit to 5-digit number. 00437 Fairly future-proof, ok up to 99999 node groups. 00438 */ 00439 extra_growth = no_parts * 4; 00440 if (unpackfrm(&data, &data_len, pack_data)) 00441 { 00442 DBUG_RETURN(TRUE); 00443 } 00444 if ((new_data = my_malloc(data_len + extra_growth, MYF(0)))) 00445 { 00446 DBUG_RETURN(TRUE); 00447 } 00448 if (map_in_frm(new_data, (const char*)data, data_len, &new_data_len)) 00449 { 00450 my_free(new_data, MYF(0)); 00451 DBUG_RETURN(TRUE); 00452 } 00453 if (packfrm((const void*)new_data, new_data_len, 00454 &new_pack_data, &new_pack_len)) 00455 { 00456 my_free(new_data, MYF(0)); 00457 DBUG_RETURN(TRUE); 00458 } 00459 table->setFrm(new_pack_data, new_pack_len); 00460 DBUG_RETURN(FALSE); 00461 } 00462 00463 #include <signaldata/DictTabInfo.hpp> 00464 00465 bool 00466 BackupRestore::object(Uint32 type, const void * ptr) 00467 { 00468 if (!m_restore_meta) 00469 return true; 00470 00471 NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); 00472 switch(type){ 00473 case DictTabInfo::Tablespace: 00474 { 00475 NdbDictionary::Tablespace old(*(NdbDictionary::Tablespace*)ptr); 00476 00477 Uint32 id = old.getObjectId(); 00478 00479 if (!m_no_restore_disk) 00480 { 00481 NdbDictionary::LogfileGroup * lg = m_logfilegroups[old.getDefaultLogfileGroupId()]; 00482 old.setDefaultLogfileGroup(* lg); 00483 info << "Creating tablespace: " << old.getName() << "..." << flush; 00484 int ret = dict->createTablespace(old); 00485 if (ret) 00486 { 00487 NdbError errobj= dict->getNdbError(); 00488 info << "FAILED" << endl; 00489 err << "Create tablespace failed: " << old.getName() << ": " << errobj << endl; 00490 return false; 00491 } 00492 info << "done" << endl; 00493 } 00494 00495 NdbDictionary::Tablespace curr = dict->getTablespace(old.getName()); 00496 NdbError errobj = dict->getNdbError(); 00497 if(errobj.classification == ndberror_cl_none) 00498 { 00499 NdbDictionary::Tablespace* currptr = new NdbDictionary::Tablespace(curr); 00500 NdbDictionary::Tablespace * null = 0; 00501 m_tablespaces.set(currptr, id, null); 00502 debug << "Retreived tablespace: " << currptr->getName() 00503 << " oldid: " << id << " newid: " << currptr->getObjectId() 00504 << " " << (void*)currptr << endl; 00505 return true; 00506 } 00507 00508 err << "Failed to retrieve tablespace \"" << old.getName() << "\": " 00509 << errobj << endl; 00510 00511 return false; 00512 break; 00513 } 00514 case DictTabInfo::LogfileGroup: 00515 { 00516 NdbDictionary::LogfileGroup old(*(NdbDictionary::LogfileGroup*)ptr); 00517 00518 Uint32 id = old.getObjectId(); 00519 00520 if (!m_no_restore_disk) 00521 { 00522 info << "Creating logfile group: " << old.getName() << "..." << flush; 00523 int ret = dict->createLogfileGroup(old); 00524 if (ret) 00525 { 00526 NdbError errobj= dict->getNdbError(); 00527 info << "FAILED" << endl; 00528 err << "Create logfile group failed: " << old.getName() << ": " << errobj << endl; 00529 return false; 00530 } 00531 info << "done" << endl; 00532 } 00533 00534 NdbDictionary::LogfileGroup curr = dict->getLogfileGroup(old.getName()); 00535 NdbError errobj = dict->getNdbError(); 00536 if(errobj.classification == ndberror_cl_none) 00537 { 00538 NdbDictionary::LogfileGroup* currptr = 00539 new NdbDictionary::LogfileGroup(curr); 00540 NdbDictionary::LogfileGroup * null = 0; 00541 m_logfilegroups.set(currptr, id, null); 00542 debug << "Retreived logfile group: " << currptr->getName() 00543 << " oldid: " << id << " newid: " << currptr->getObjectId() 00544 << " " << (void*)currptr << endl; 00545 return true; 00546 } 00547 00548 err << "Failed to retrieve logfile group \"" << old.getName() << "\": " 00549 << errobj << endl; 00550 00551 return false; 00552 break; 00553 } 00554 case DictTabInfo::Datafile: 00555 { 00556 if (!m_no_restore_disk) 00557 { 00558 NdbDictionary::Datafile old(*(NdbDictionary::Datafile*)ptr); 00559 NdbDictionary::ObjectId objid; 00560 old.getTablespaceId(&objid); 00561 NdbDictionary::Tablespace * ts = m_tablespaces[objid.getObjectId()]; 00562 debug << "Connecting datafile " << old.getPath() 00563 << " to tablespace: oldid: " << objid.getObjectId() 00564 << " newid: " << ts->getObjectId() << endl; 00565 old.setTablespace(* ts); 00566 info << "Creating datafile \"" << old.getPath() << "\"..." << flush; 00567 if (dict->createDatafile(old)) 00568 { 00569 NdbError errobj= dict->getNdbError(); 00570 info << "FAILED" << endl; 00571 err << "Create datafile failed: " << old.getPath() << ": " << errobj << endl; 00572 return false; 00573 } 00574 info << "done" << endl; 00575 } 00576 return true; 00577 break; 00578 } 00579 case DictTabInfo::Undofile: 00580 { 00581 if (!m_no_restore_disk) 00582 { 00583 NdbDictionary::Undofile old(*(NdbDictionary::Undofile*)ptr); 00584 NdbDictionary::ObjectId objid; 00585 old.getLogfileGroupId(&objid); 00586 NdbDictionary::LogfileGroup * lg = m_logfilegroups[objid.getObjectId()]; 00587 debug << "Connecting undofile " << old.getPath() 00588 << " to logfile group: oldid: " << objid.getObjectId() 00589 << " newid: " << lg->getObjectId() 00590 << " " << (void*)lg << endl; 00591 old.setLogfileGroup(* lg); 00592 info << "Creating undofile \"" << old.getPath() << "\"..." << flush; 00593 if (dict->createUndofile(old)) 00594 { 00595 NdbError errobj= dict->getNdbError(); 00596 info << "FAILED" << endl; 00597 err << "Create undofile failed: " << old.getPath() << ": " << errobj << endl; 00598 return false; 00599 } 00600 info << "done" << endl; 00601 } 00602 return true; 00603 break; 00604 } 00605 } 00606 return true; 00607 } 00608 00609 bool 00610 BackupRestore::update_apply_status(const RestoreMetaData &metaData) 00611 { 00612 if (!m_restore_epoch) 00613 return true; 00614 00615 bool result= false; 00616 00617 m_ndb->setDatabaseName(NDB_REP_DB); 00618 m_ndb->setSchemaName("def"); 00619 00620 NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); 00621 const NdbDictionary::Table *ndbtab= dict->getTable(Ndb_apply_table); 00622 if (!ndbtab) 00623 { 00624 err << Ndb_apply_table << ": " 00625 << dict->getNdbError() << endl; 00626 return false; 00627 } 00628 Uint32 server_id= 0; 00629 Uint64 epoch= metaData.getStopGCP(); 00630 NdbTransaction * trans= m_ndb->startTransaction(); 00631 if (!trans) 00632 { 00633 err << Ndb_apply_table << ": " 00634 << m_ndb->getNdbError() << endl; 00635 return false; 00636 } 00637 NdbOperation * op= trans->getNdbOperation(ndbtab); 00638 if (!op) 00639 { 00640 err << Ndb_apply_table << ": " 00641 << trans->getNdbError() << endl; 00642 goto err; 00643 } 00644 if (op->writeTuple() || 00645 op->equal(0u, (const char *)&server_id, sizeof(server_id)) || 00646 op->setValue(1u, (const char *)&epoch, sizeof(epoch))) 00647 { 00648 err << Ndb_apply_table << ": " 00649 << op->getNdbError() << endl; 00650 goto err; 00651 } 00652 if (trans->execute(NdbTransaction::Commit)) 00653 { 00654 err << Ndb_apply_table << ": " 00655 << trans->getNdbError() << endl; 00656 goto err; 00657 } 00658 result= true; 00659 err: 00660 m_ndb->closeTransaction(trans); 00661 return result; 00662 } 00663 00664 bool 00665 BackupRestore::table(const TableS & table){ 00666 if (!m_restore && !m_restore_meta) 00667 return true; 00668 00669 const char * name = table.getTableName(); 00670 00674 if(match_blob(name) >= 0) 00675 return true; 00676 00677 const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable); 00678 if(tmptab.m_indexType != NdbDictionary::Index::Undefined){ 00679 m_indexes.push_back(table.m_dictTable); 00680 return true; 00681 } 00682 00683 BaseString tmp(name); 00684 Vector<BaseString> split; 00685 if(tmp.split(split, "/") != 3){ 00686 err << "Invalid table name format " << name << endl; 00687 return false; 00688 } 00689 00690 m_ndb->setDatabaseName(split[0].c_str()); 00691 m_ndb->setSchemaName(split[1].c_str()); 00692 00693 NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); 00694 if(m_restore_meta) 00695 { 00696 NdbDictionary::Table copy(*table.m_dictTable); 00697 00698 copy.setName(split[2].c_str()); 00699 Uint32 id; 00700 if (copy.getTablespace(&id)) 00701 { 00702 debug << "Connecting " << name << " to tablespace oldid: " << id << flush; 00703 NdbDictionary::Tablespace* ts = m_tablespaces[id]; 00704 debug << " newid: " << ts->getObjectId() << endl; 00705 copy.setTablespace(* ts); 00706 } 00707 00708 if (copy.getDefaultNoPartitionsFlag()) 00709 { 00710 /* 00711 Table was defined with default number of partitions. We can restore 00712 it with whatever is the default in this cluster. 00713 We use the max_rows parameter in calculating the default number. 00714 */ 00715 Uint32 no_nodes = m_cluster_connection->no_db_nodes(); 00716 copy.setFragmentCount(get_no_fragments(copy.getMaxRows(), 00717 no_nodes)); 00718 set_default_nodegroups(©); 00719 } 00720 else 00721 { 00722 /* 00723 Table was defined with specific number of partitions. It should be 00724 restored with the same number of partitions. It will either be 00725 restored in the same node groups as when backup was taken or by 00726 using a node group map supplied to the ndb_restore program. 00727 */ 00728 Uint16 *ng_array = (Uint16*)copy.getFragmentData(); 00729 Uint16 no_parts = copy.getFragmentCount(); 00730 if (map_nodegroups(ng_array, no_parts)) 00731 { 00732 if (translate_frm(©)) 00733 { 00734 err << "Create table " << table.getTableName() << " failed: "; 00735 err << "Translate frm error" << endl; 00736 return false; 00737 } 00738 } 00739 copy.setFragmentData((const void *)ng_array, no_parts << 1); 00740 } 00741 00742 /* 00743 update min and max rows to reflect the table, this to 00744 ensure that memory is allocated properly in the ndb kernel 00745 */ 00746 copy.setMinRows(table.getNoOfRecords()); 00747 if (table.getNoOfRecords() > copy.getMaxRows()) 00748 { 00749 copy.setMaxRows(table.getNoOfRecords()); 00750 } 00751 00752 if (dict->createTable(copy) == -1) 00753 { 00754 err << "Create table " << table.getTableName() << " failed: " 00755 << dict->getNdbError() << endl; 00756 if (dict->getNdbError().code == 771) 00757 { 00758 /* 00759 The user on the cluster where the backup was created had specified 00760 specific node groups for partitions. Some of these node groups 00761 didn't exist on this cluster. We will warn the user of this and 00762 inform him of his option. 00763 */ 00764 err << "The node groups defined in the table didn't exist in this"; 00765 err << " cluster." << endl << "There is an option to use the"; 00766 err << " the parameter ndb-nodegroup-map to define a mapping from"; 00767 err << endl << "the old nodegroups to new nodegroups" << endl; 00768 } 00769 return false; 00770 } 00771 info << "Successfully restored table " << table.getTableName()<< endl ; 00772 } 00773 00774 const NdbDictionary::Table* tab = dict->getTable(split[2].c_str()); 00775 if(tab == 0){ 00776 err << "Unable to find table: " << split[2].c_str() << endl; 00777 return false; 00778 } 00779 if(m_restore_meta) 00780 { 00781 if (tab->getFrmData()) 00782 { 00783 // a MySQL Server table is restored, thus an event should be created 00784 BaseString event_name("REPL$"); 00785 event_name.append(split[0].c_str()); 00786 event_name.append("/"); 00787 event_name.append(split[2].c_str()); 00788 00789 NdbDictionary::Event my_event(event_name.c_str()); 00790 my_event.setTable(*tab); 00791 my_event.addTableEvent(NdbDictionary::Event::TE_ALL); 00792 00793 // add all columns to the event 00794 bool has_blobs = false; 00795 for(int a= 0; a < tab->getNoOfColumns(); a++) 00796 { 00797 my_event.addEventColumn(a); 00798 NdbDictionary::Column::Type t = tab->getColumn(a)->getType(); 00799 if (t == NdbDictionary::Column::Blob || 00800 t == NdbDictionary::Column::Text) 00801 has_blobs = true; 00802 } 00803 if (has_blobs) 00804 my_event.mergeEvents(true); 00805 00806 while ( dict->createEvent(my_event) ) // Add event to database 00807 { 00808 if (dict->getNdbError().classification == NdbError::SchemaObjectExists) 00809 { 00810 info << "Event for table " << table.getTableName() 00811 << " already exists, removing.\n"; 00812 if (!dict->dropEvent(my_event.getName())) 00813 continue; 00814 } 00815 err << "Create table event for " << table.getTableName() << " failed: " 00816 << dict->getNdbError() << endl; 00817 dict->dropTable(split[2].c_str()); 00818 return false; 00819 } 00820 info << "Successfully restored table event " << event_name << endl ; 00821 } 00822 } 00823 const NdbDictionary::Table* null = 0; 00824 m_new_tables.fill(table.m_dictTable->getTableId(), null); 00825 m_new_tables[table.m_dictTable->getTableId()] = tab; 00826 return true; 00827 } 00828 00829 bool 00830 BackupRestore::endOfTables(){ 00831 if(!m_restore_meta) 00832 return true; 00833 00834 NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); 00835 for(size_t i = 0; i<m_indexes.size(); i++){ 00836 NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]); 00837 00838 BaseString tmp(indtab.m_primaryTable.c_str()); 00839 Vector<BaseString> split; 00840 if(tmp.split(split, "/") != 3){ 00841 err << "Invalid table name format " << indtab.m_primaryTable.c_str() 00842 << endl; 00843 return false; 00844 } 00845 00846 m_ndb->setDatabaseName(split[0].c_str()); 00847 m_ndb->setSchemaName(split[1].c_str()); 00848 00849 const NdbDictionary::Table * prim = dict->getTable(split[2].c_str()); 00850 if(prim == 0){ 00851 err << "Unable to find base table \"" << split[2].c_str() 00852 << "\" for index " 00853 << indtab.getName() << endl; 00854 return false; 00855 } 00856 NdbTableImpl& base = NdbTableImpl::getImpl(*prim); 00857 NdbIndexImpl* idx; 00858 int id; 00859 char idxName[255], buf[255]; 00860 if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s", 00861 buf, buf, &id, idxName) != 4){ 00862 err << "Invalid index name format " << indtab.getName() << endl; 00863 return false; 00864 } 00865 if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base)) 00866 { 00867 err << "Failed to create index " << idxName 00868 << " on " << split[2].c_str() << endl; 00869 return false; 00870 } 00871 idx->setName(idxName); 00872 if(dict->createIndex(* idx) != 0) 00873 { 00874 delete idx; 00875 err << "Failed to create index " << idxName 00876 << " on " << split[2].c_str() << endl 00877 << dict->getNdbError() << endl; 00878 00879 return false; 00880 } 00881 delete idx; 00882 info << "Successfully created index " << idxName 00883 << " on " << split[2].c_str() << endl; 00884 } 00885 return true; 00886 } 00887 00888 void BackupRestore::tuple(const TupleS & tup, Uint32 fragmentId) 00889 { 00890 if (!m_restore) 00891 return; 00892 00893 while (m_free_callback == 0) 00894 { 00895 assert(m_transactions == m_parallelism); 00896 // send-poll all transactions 00897 // close transaction is done in callback 00898 m_ndb->sendPollNdb(3000, 1); 00899 } 00900 00901 restore_callback_t * cb = m_free_callback; 00902 00903 if (cb == 0) 00904 assert(false); 00905 00906 m_free_callback = cb->next; 00907 cb->retries = 0; 00908 cb->fragId = fragmentId; 00909 cb->tup = tup; // must do copy! 00910 tuple_a(cb); 00911 00912 } 00913 00914 void BackupRestore::tuple_a(restore_callback_t *cb) 00915 { 00916 Uint32 partition_id = cb->fragId; 00917 while (cb->retries < 10) 00918 { 00922 cb->connection = m_ndb->startTransaction(); 00923 if (cb->connection == NULL) 00924 { 00925 if (errorHandler(cb)) 00926 { 00927 m_ndb->sendPollNdb(3000, 1); 00928 continue; 00929 } 00930 err << "Cannot start transaction" << endl; 00931 exitHandler(); 00932 } // if 00933 00934 const TupleS &tup = cb->tup; 00935 const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable); 00936 00937 NdbOperation * op = cb->connection->getNdbOperation(table); 00938 00939 if (op == NULL) 00940 { 00941 if (errorHandler(cb)) 00942 continue; 00943 err << "Cannot get operation: " << cb->connection->getNdbError() << endl; 00944 exitHandler(); 00945 } // if 00946 00947 if (op->writeTuple() == -1) 00948 { 00949 if (errorHandler(cb)) 00950 continue; 00951 err << "Error defining op: " << cb->connection->getNdbError() << endl; 00952 exitHandler(); 00953 } // if 00954 00955 if (table->getFragmentType() == NdbDictionary::Object::UserDefined) 00956 { 00957 if (table->getDefaultNoPartitionsFlag()) 00958 { 00959 /* 00960 This can only happen for HASH partitioning with 00961 user defined hash function where user hasn't 00962 specified the number of partitions and we 00963 have to calculate it. We use the hash value 00964 stored in the record to calculate the partition 00965 to use. 00966 */ 00967 int i = tup.getNoOfAttributes() - 1; 00968 const AttributeData *attr_data = tup.getData(i); 00969 Uint32 hash_value = *attr_data->u_int32_value; 00970 op->setPartitionId(get_part_id(table, hash_value)); 00971 } 00972 else 00973 { 00974 /* 00975 Either RANGE or LIST (with or without subparts) 00976 OR HASH partitioning with user defined hash 00977 function but with fixed set of partitions. 00978 */ 00979 op->setPartitionId(partition_id); 00980 } 00981 } 00982 int ret = 0; 00983 for (int j = 0; j < 2; j++) 00984 { 00985 for (int i = 0; i < tup.getNoOfAttributes(); i++) 00986 { 00987 const AttributeDesc * attr_desc = tup.getDesc(i); 00988 const AttributeData * attr_data = tup.getData(i); 00989 int size = attr_desc->size; 00990 int arraySize = attr_desc->arraySize; 00991 char * dataPtr = attr_data->string_value; 00992 Uint32 length = attr_data->size; 00993 00994 if (j == 0 && tup.getTable()->have_auto_inc(i)) 00995 tup.getTable()->update_max_auto_val(dataPtr,size); 00996 00997 if (attr_desc->m_column->getPrimaryKey()) 00998 { 00999 if (j == 1) continue; 01000 ret = op->equal(i, dataPtr, length); 01001 } 01002 else 01003 { 01004 if (j == 0) continue; 01005 if (attr_data->null) 01006 ret = op->setValue(i, NULL, 0); 01007 else 01008 ret = op->setValue(i, dataPtr, length); 01009 } 01010 if (ret < 0) { 01011 ndbout_c("Column: %d type %d %d %d %d",i, 01012 attr_desc->m_column->getType(), 01013 size, arraySize, attr_data->size); 01014 break; 01015 } 01016 } 01017 if (ret < 0) 01018 break; 01019 } 01020 if (ret < 0) 01021 { 01022 if (errorHandler(cb)) 01023 continue; 01024 err << "Error defining op: " << cb->connection->getNdbError() << endl; 01025 exitHandler(); 01026 } 01027 01028 // Prepare transaction (the transaction is NOT yet sent to NDB) 01029 cb->connection->executeAsynchPrepare(NdbTransaction::Commit, 01030 &callback, cb); 01031 m_transactions++; 01032 return; 01033 } 01034 err << "Retried transaction " << cb->retries << " times.\nLast error" 01035 << m_ndb->getNdbError(cb->error_code) << endl 01036 << "...Unable to recover from errors. Exiting..." << endl; 01037 exitHandler(); 01038 } 01039 01040 void BackupRestore::cback(int result, restore_callback_t *cb) 01041 { 01042 m_transactions--; 01043 01044 if (result < 0) 01045 { 01049 if (errorHandler(cb)) 01050 tuple_a(cb); // retry 01051 else 01052 { 01053 err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl; 01054 exitHandler(); 01055 } 01056 } 01057 else 01058 { 01062 m_ndb->closeTransaction(cb->connection); 01063 cb->connection= 0; 01064 cb->next= m_free_callback; 01065 m_free_callback= cb; 01066 m_dataCount++; 01067 } 01068 } 01069 01075 bool BackupRestore::errorHandler(restore_callback_t *cb) 01076 { 01077 NdbError error; 01078 if(cb->connection) 01079 { 01080 error= cb->connection->getNdbError(); 01081 m_ndb->closeTransaction(cb->connection); 01082 cb->connection= 0; 01083 } 01084 else 01085 { 01086 error= m_ndb->getNdbError(); 01087 } 01088 01089 Uint32 sleepTime = 100 + cb->retries * 300; 01090 01091 cb->retries++; 01092 cb->error_code = error.code; 01093 01094 switch(error.status) 01095 { 01096 case NdbError::Success: 01097 err << "Success error: " << error << endl; 01098 return false; 01099 // ERROR! 01100 01101 case NdbError::TemporaryError: 01102 err << "Temporary error: " << error << endl; 01103 NdbSleep_MilliSleep(sleepTime); 01104 return true; 01105 // RETRY 01106 01107 case NdbError::UnknownResult: 01108 err << "Unknown: " << error << endl; 01109 return false; 01110 // ERROR! 01111 01112 default: 01113 case NdbError::PermanentError: 01114 //ERROR 01115 err << "Permanent: " << error << endl; 01116 return false; 01117 } 01118 err << "No error status" << endl; 01119 return false; 01120 } 01121 01122 void BackupRestore::exitHandler() 01123 { 01124 release(); 01125 NDBT_ProgramExit(NDBT_FAILED); 01126 if (opt_core) 01127 abort(); 01128 else 01129 exit(NDBT_FAILED); 01130 } 01131 01132 01133 void 01134 BackupRestore::tuple_free() 01135 { 01136 if (!m_restore) 01137 return; 01138 01139 // Poll all transactions 01140 while (m_transactions) 01141 { 01142 m_ndb->sendPollNdb(3000); 01143 } 01144 } 01145 01146 void 01147 BackupRestore::endOfTuples() 01148 { 01149 tuple_free(); 01150 } 01151 01152 static bool use_part_id(const NdbDictionary::Table *table) 01153 { 01154 if (table->getDefaultNoPartitionsFlag() && 01155 (table->getFragmentType() == NdbDictionary::Object::UserDefined)) 01156 return false; 01157 else 01158 return true; 01159 } 01160 01161 static Uint32 get_part_id(const NdbDictionary::Table *table, 01162 Uint32 hash_value) 01163 { 01164 Uint32 no_frags = table->getFragmentCount(); 01165 01166 if (table->getLinearFlag()) 01167 { 01168 Uint32 part_id; 01169 Uint32 mask = 1; 01170 while (no_frags > mask) mask <<= 1; 01171 mask--; 01172 part_id = hash_value & mask; 01173 if (part_id >= no_frags) 01174 part_id = hash_value & (mask >> 1); 01175 return part_id; 01176 } 01177 else 01178 return (hash_value % no_frags); 01179 } 01180 01181 void 01182 BackupRestore::logEntry(const LogEntry & tup) 01183 { 01184 if (!m_restore) 01185 return; 01186 01187 NdbTransaction * trans = m_ndb->startTransaction(); 01188 if (trans == NULL) 01189 { 01190 // Deep shit, TODO: handle the error 01191 err << "Cannot start transaction" << endl; 01192 exitHandler(); 01193 } // if 01194 01195 const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable); 01196 NdbOperation * op = trans->getNdbOperation(table); 01197 if (op == NULL) 01198 { 01199 err << "Cannot get operation: " << trans->getNdbError() << endl; 01200 exitHandler(); 01201 } // if 01202 01203 int check = 0; 01204 switch(tup.m_type) 01205 { 01206 case LogEntry::LE_INSERT: 01207 check = op->insertTuple(); 01208 break; 01209 case LogEntry::LE_UPDATE: 01210 check = op->updateTuple(); 01211 break; 01212 case LogEntry::LE_DELETE: 01213 check = op->deleteTuple(); 01214 break; 01215 default: 01216 err << "Log entry has wrong operation type." 01217 << " Exiting..."; 01218 exitHandler(); 01219 } 01220 01221 if (check != 0) 01222 { 01223 err << "Error defining op: " << trans->getNdbError() << endl; 01224 exitHandler(); 01225 } // if 01226 01227 if (table->getFragmentType() == NdbDictionary::Object::UserDefined) 01228 { 01229 if (table->getDefaultNoPartitionsFlag()) 01230 { 01231 const AttributeS * attr = tup[tup.size()-1]; 01232 Uint32 hash_value = *(Uint32*)attr->Data.string_value; 01233 op->setPartitionId(get_part_id(table, hash_value)); 01234 } 01235 else 01236 op->setPartitionId(tup.m_frag_id); 01237 } 01238 01239 Bitmask<4096> keys; 01240 for (Uint32 i= 0; i < tup.size(); i++) 01241 { 01242 const AttributeS * attr = tup[i]; 01243 int size = attr->Desc->size; 01244 int arraySize = attr->Desc->arraySize; 01245 const char * dataPtr = attr->Data.string_value; 01246 01247 if (tup.m_table->have_auto_inc(attr->Desc->attrId)) 01248 tup.m_table->update_max_auto_val(dataPtr,size); 01249 01250 const Uint32 length = (size / 8) * arraySize; 01251 if (attr->Desc->m_column->getPrimaryKey()) 01252 { 01253 if(!keys.get(attr->Desc->attrId)) 01254 { 01255 keys.set(attr->Desc->attrId); 01256 check= op->equal(attr->Desc->attrId, dataPtr, length); 01257 } 01258 } 01259 else 01260 check= op->setValue(attr->Desc->attrId, dataPtr, length); 01261 01262 if (check != 0) 01263 { 01264 err << "Error defining op: " << trans->getNdbError() << endl; 01265 exitHandler(); 01266 } // if 01267 } 01268 01269 const int ret = trans->execute(NdbTransaction::Commit); 01270 if (ret != 0) 01271 { 01272 // Both insert update and delete can fail during log running 01273 // and it's ok 01274 // TODO: check that the error is either tuple exists or tuple does not exist? 01275 bool ok= false; 01276 NdbError errobj= trans->getNdbError(); 01277 switch(tup.m_type) 01278 { 01279 case LogEntry::LE_INSERT: 01280 if(errobj.status == NdbError::PermanentError && 01281 errobj.classification == NdbError::ConstraintViolation) 01282 ok= true; 01283 break; 01284 case LogEntry::LE_UPDATE: 01285 case LogEntry::LE_DELETE: 01286 if(errobj.status == NdbError::PermanentError && 01287 errobj.classification == NdbError::NoDataFound) 01288 ok= true; 01289 break; 01290 } 01291 if (!ok) 01292 { 01293 err << "execute failed: " << errobj << endl; 01294 exitHandler(); 01295 } 01296 } 01297 01298 m_ndb->closeTransaction(trans); 01299 m_logCount++; 01300 } 01301 01302 void 01303 BackupRestore::endOfLogEntrys() 01304 { 01305 if (!m_restore) 01306 return; 01307 01308 info << "Restored " << m_dataCount << " tuples and " 01309 << m_logCount << " log entries" << endl; 01310 } 01311 01312 /* 01313 * callback : This is called when the transaction is polled 01314 * 01315 * (This function must have three arguments: 01316 * - The result of the transaction, 01317 * - The NdbTransaction object, and 01318 * - A pointer to an arbitrary object.) 01319 */ 01320 01321 static void 01322 callback(int result, NdbTransaction* trans, void* aObject) 01323 { 01324 restore_callback_t *cb = (restore_callback_t *)aObject; 01325 (cb->restore)->cback(result, cb); 01326 } 01327 01328 #if 0 // old tuple impl 01329 void 01330 BackupRestore::tuple(const TupleS & tup) 01331 { 01332 if (!m_restore) 01333 return; 01334 while (1) 01335 { 01336 NdbTransaction * trans = m_ndb->startTransaction(); 01337 if (trans == NULL) 01338 { 01339 // Deep shit, TODO: handle the error 01340 ndbout << "Cannot start transaction" << endl; 01341 exitHandler(); 01342 } // if 01343 01344 const TableS * table = tup.getTable(); 01345 NdbOperation * op = trans->getNdbOperation(table->getTableName()); 01346 if (op == NULL) 01347 { 01348 ndbout << "Cannot get operation: "; 01349 ndbout << trans->getNdbError() << endl; 01350 exitHandler(); 01351 } // if 01352 01353 // TODO: check return value and handle error 01354 if (op->writeTuple() == -1) 01355 { 01356 ndbout << "writeTuple call failed: "; 01357 ndbout << trans->getNdbError() << endl; 01358 exitHandler(); 01359 } // if 01360 01361 for (int i = 0; i < tup.getNoOfAttributes(); i++) 01362 { 01363 const AttributeS * attr = tup[i]; 01364 int size = attr->Desc->size; 01365 int arraySize = attr->Desc->arraySize; 01366 const char * dataPtr = attr->Data.string_value; 01367 01368 const Uint32 length = (size * arraySize) / 8; 01369 if (attr->Desc->m_column->getPrimaryKey()) 01370 op->equal(i, dataPtr, length); 01371 } 01372 01373 for (int i = 0; i < tup.getNoOfAttributes(); i++) 01374 { 01375 const AttributeS * attr = tup[i]; 01376 int size = attr->Desc->size; 01377 int arraySize = attr->Desc->arraySize; 01378 const char * dataPtr = attr->Data.string_value; 01379 01380 const Uint32 length = (size * arraySize) / 8; 01381 if (!attr->Desc->m_column->getPrimaryKey()) 01382 if (attr->Data.null) 01383 op->setValue(i, NULL, 0); 01384 else 01385 op->setValue(i, dataPtr, length); 01386 } 01387 int ret = trans->execute(NdbTransaction::Commit); 01388 if (ret != 0) 01389 { 01390 ndbout << "execute failed: "; 01391 ndbout << trans->getNdbError() << endl; 01392 exitHandler(); 01393 } 01394 m_ndb->closeTransaction(trans); 01395 if (ret == 0) 01396 break; 01397 } 01398 m_dataCount++; 01399 } 01400 #endif 01401 01402 template class Vector<NdbDictionary::Table*>; 01403 template class Vector<const NdbDictionary::Table*>; 01404 template class Vector<NdbDictionary::Tablespace*>; 01405 template class Vector<NdbDictionary::LogfileGroup*>;
1.4.7

