00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include <my_config.h> 00018 #include "Suma.hpp" 00019 00020 #include <ndb_version.h> 00021 00022 #include <NdbTCP.h> 00023 #include <Bitmask.hpp> 00024 #include <SimpleProperties.hpp> 00025 00026 #include <signaldata/NodeFailRep.hpp> 00027 #include <signaldata/ReadNodesConf.hpp> 00028 00029 #include <signaldata/ListTables.hpp> 00030 #include <signaldata/GetTabInfo.hpp> 00031 #include <signaldata/GetTableId.hpp> 00032 #include <signaldata/DictTabInfo.hpp> 00033 #include <signaldata/SumaImpl.hpp> 00034 #include <signaldata/ScanFrag.hpp> 00035 #include <signaldata/TransIdAI.hpp> 00036 #include <signaldata/CreateTrig.hpp> 00037 #include <signaldata/AlterTrig.hpp> 00038 #include <signaldata/DropTrig.hpp> 00039 #include <signaldata/FireTrigOrd.hpp> 00040 #include <signaldata/TrigAttrInfo.hpp> 00041 #include <signaldata/CheckNodeGroups.hpp> 00042 #include <signaldata/GCPSave.hpp> 00043 #include <signaldata/CreateTab.hpp> 00044 #include <signaldata/DropTab.hpp> 00045 #include <signaldata/AlterTable.hpp> 00046 #include <signaldata/AlterTab.hpp> 00047 #include <signaldata/DihFragCount.hpp> 00048 #include <signaldata/SystemError.hpp> 00049 00050 #include <ndbapi/NdbDictionary.hpp> 00051 00052 #include <DebuggerNames.hpp> 00053 #include <../dbtup/Dbtup.hpp> 00054 #include <../dbdih/Dbdih.hpp> 00055 00056 //#define HANDOVER_DEBUG 00057 //#define NODEFAIL_DEBUG 00058 //#define NODEFAIL_DEBUG2 00059 //#define DEBUG_SUMA_SEQUENCE 00060 //#define EVENT_DEBUG 00061 //#define EVENT_PH3_DEBUG 00062 //#define EVENT_DEBUG2 00063 #if 0 00064 #undef DBUG_ENTER 00065 #undef DBUG_PRINT 00066 #undef DBUG_RETURN 00067 #undef DBUG_VOID_RETURN 00068 00069 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);} 00070 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;} 00071 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); } 00072 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; } 00073 #endif 00074 00082 Uint32 g_subPtrI = RNIL; 00083 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE; 00084 00085 static const Uint32 MAX_CONCURRENT_GCP = 2; 00086 00087 /************************************************************** 00088 * 00089 * Start of suma 00090 * 00091 */ 00092 00093 #define PRINT_ONLY 0 00094 00095 void 00096 Suma::getNodeGroupMembers(Signal* signal) 00097 { 00098 jam(); 00099 DBUG_ENTER("Suma::getNodeGroupMembers"); 00103 CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); 00104 sd->blockRef = reference(); 00105 sd->requestType = 00106 CheckNodeGroups::Direct | 00107 CheckNodeGroups::GetNodeGroupMembers; 00108 sd->nodeId = getOwnNodeId(); 00109 EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, 00110 CheckNodeGroups::SignalLength); 00111 jamEntry(); 00112 00113 c_nodeGroup = sd->output; 00114 c_nodes_in_nodegroup_mask.assign(sd->mask); 00115 c_noNodesInGroup = c_nodes_in_nodegroup_mask.count(); 00116 Uint32 i, pos= 0; 00117 00118 for (i = 0; i < MAX_NDB_NODES; i++) { 00119 if (sd->mask.get(i)) 00120 { 00121 c_nodesInGroup[pos++] = i; 00122 } 00123 } 00124 00125 const Uint32 replicas= c_noNodesInGroup; 00126 00127 Uint32 buckets= 1; 00128 for(i = 1; i <= replicas; i++) 00129 buckets *= i; 00130 00131 for(i = 0; i<buckets; i++) 00132 { 00133 Bucket* ptr= c_buckets+i; 00134 for(Uint32 j= 0; j< replicas; j++) 00135 { 00136 ptr->m_nodes[j] = c_nodesInGroup[(i + j) % replicas]; 00137 } 00138 } 00139 00140 c_no_of_buckets= buckets; 00141 ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup 00142 00143 #ifndef DBUG_OFF 00144 for (Uint32 i = 0; i < c_noNodesInGroup; i++) { 00145 DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, " 00146 "member[%u] %u", 00147 c_nodeGroup, getOwnNodeId(), 00148 i, c_nodesInGroup[i])); 00149 } 00150 #endif 00151 00152 DBUG_VOID_RETURN; 00153 } 00154 00155 void 00156 Suma::execREAD_CONFIG_REQ(Signal* signal) 00157 { 00158 jamEntry(); 00159 00160 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); 00161 00162 Uint32 ref = req->senderRef; 00163 Uint32 senderData = req->senderData; 00164 00165 const ndb_mgm_configuration_iterator * p = 00166 m_ctx.m_config.getOwnConfigIterator(); 00167 ndbrequire(p != 0); 00168 00169 // SumaParticipant 00170 Uint32 noTables, noAttrs; 00171 ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, 00172 &noTables); 00173 ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES, 00174 &noAttrs); 00175 00176 c_tablePool.setSize(noTables); 00177 c_tables.setSize(noTables); 00178 00179 c_subscriptions.setSize(noTables); 00180 c_subscriberPool.setSize(2*noTables); 00181 00182 c_subscriptionPool.setSize(noTables); 00183 c_syncPool.setSize(2); 00184 c_dataBufferPool.setSize(noAttrs); 00185 00186 // Calculate needed gcp pool as 10 records + the ones needed 00187 // during a possible api timeout 00188 Uint32 dbApiHbInterval, gcpInterval; 00189 ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL, 00190 &dbApiHbInterval); 00191 ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL, 00192 &gcpInterval); 00193 c_gcp_pool.setSize(10 + (4*dbApiHbInterval)/gcpInterval); 00194 00195 c_page_chunk_pool.setSize(50); 00196 00197 { 00198 SLList<SyncRecord> tmp(c_syncPool); 00199 Ptr<SyncRecord> ptr; 00200 while(tmp.seize(ptr)) 00201 new (ptr.p) SyncRecord(* this, c_dataBufferPool); 00202 tmp.release(); 00203 } 00204 00205 // Suma 00206 c_masterNodeId = getOwnNodeId(); 00207 00208 c_nodeGroup = c_noNodesInGroup = 0; 00209 for (int i = 0; i < MAX_REPLICAS; i++) { 00210 c_nodesInGroup[i] = 0; 00211 } 00212 00213 m_first_free_page= RNIL; 00214 00215 c_no_of_buckets = 0; 00216 memset(c_buckets, 0, sizeof(c_buckets)); 00217 for(Uint32 i = 0; i<NO_OF_BUCKETS; i++) 00218 { 00219 Bucket* bucket= c_buckets+i; 00220 bucket->m_buffer_tail = RNIL; 00221 bucket->m_buffer_head.m_page_id = RNIL; 00222 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS; 00223 } 00224 00225 m_max_seen_gci = 0; // FIRE_TRIG_ORD 00226 m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send 00227 m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP 00228 m_gcp_complete_rep_count = 0; 00229 m_out_of_buffer_gci = 0; 00230 00231 c_startup.m_wait_handover= false; 00232 c_failedApiNodes.clear(); 00233 c_startup.m_restart_server_node_id = 0; // Server for my NR 00234 00235 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); 00236 conf->senderRef = reference(); 00237 conf->senderData = senderData; 00238 sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 00239 ReadConfigConf::SignalLength, JBB); 00240 } 00241 00242 void 00243 Suma::execSTTOR(Signal* signal) { 00244 jamEntry(); 00245 00246 DBUG_ENTER("Suma::execSTTOR"); 00247 const Uint32 startphase = signal->theData[1]; 00248 const Uint32 typeOfStart = signal->theData[7]; 00249 00250 DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", 00251 startphase, typeOfStart)); 00252 00253 if(startphase == 3) 00254 { 00255 jam(); 00256 ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0); 00257 signal->theData[0] = reference(); 00258 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB); 00259 DBUG_VOID_RETURN; 00260 } 00261 00262 if(startphase == 5) 00263 { 00264 getNodeGroupMembers(signal); 00265 if (typeOfStart == NodeState::ST_NODE_RESTART || 00266 typeOfStart == NodeState::ST_INITIAL_NODE_RESTART) 00267 { 00268 jam(); 00269 00270 send_start_me_req(signal); 00271 return; 00272 } 00273 } 00274 00275 if(startphase == 7) 00276 { 00277 if (typeOfStart != NodeState::ST_NODE_RESTART && 00278 typeOfStart != NodeState::ST_INITIAL_NODE_RESTART) 00279 { 00280 for( Uint32 i = 0; i < c_no_of_buckets; i++) 00281 { 00282 if (get_responsible_node(i) == getOwnNodeId()) 00283 { 00284 // I'm running this bucket 00285 DBUG_PRINT("info",("bucket %u set to true", i)); 00286 m_active_buckets.set(i); 00287 ndbout_c("m_active_buckets.set(%d)", i); 00288 } 00289 } 00290 } 00291 00292 if(!m_active_buckets.isclear()) 00293 { 00294 NdbNodeBitmask tmp; 00295 Uint32 bucket = 0; 00296 while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound) 00297 { 00298 tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask)); 00299 bucket++; 00300 } 00301 00302 ndbassert(tmp.get(getOwnNodeId())); 00303 m_gcp_complete_rep_count = tmp.count();// I contribute 1 gcp complete rep 00304 } 00305 else 00306 m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep 00307 00308 if(typeOfStart == NodeState::ST_INITIAL_START && 00309 c_masterNodeId == getOwnNodeId()) 00310 { 00311 jam(); 00312 createSequence(signal); 00313 DBUG_VOID_RETURN; 00314 }//if 00315 }//if 00316 00317 if(startphase == 100) 00318 { 00322 sendSTTORRY(signal); 00323 return; 00324 } 00325 00326 if(startphase == 101) 00327 { 00328 if (typeOfStart == NodeState::ST_NODE_RESTART || 00329 typeOfStart == NodeState::ST_INITIAL_NODE_RESTART) 00330 { 00334 c_startup.m_wait_handover= true; 00335 check_start_handover(signal); 00336 return; 00337 } 00338 } 00339 sendSTTORRY(signal); 00340 00341 DBUG_VOID_RETURN; 00342 } 00343 00344 void 00345 Suma::send_start_me_req(Signal* signal) 00346 { 00347 Uint32 nodeId= c_startup.m_restart_server_node_id; 00348 do { 00349 nodeId = c_alive_nodes.find(nodeId + 1); 00350 00351 if(nodeId == getOwnNodeId()) 00352 continue; 00353 if(nodeId == NdbNodeBitmask::NotFound) 00354 { 00355 nodeId = 0; 00356 continue; 00357 } 00358 break; 00359 } while(true); 00360 00361 00362 infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId); 00363 c_startup.m_restart_server_node_id= nodeId; 00364 sendSignal(calcSumaBlockRef(nodeId), 00365 GSN_SUMA_START_ME_REQ, signal, 1, JBB); 00366 } 00367 00368 void 00369 Suma::execSUMA_START_ME_REF(Signal* signal) 00370 { 00371 const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr(); 00372 ndbrequire(ref->errorCode == SumaStartMeRef::Busy); 00373 00374 infoEvent("Suma: node %d refused %d", 00375 c_startup.m_restart_server_node_id, ref->errorCode); 00376 send_start_me_req(signal); 00377 } 00378 00379 void 00380 Suma::execSUMA_START_ME_CONF(Signal* signal) 00381 { 00382 infoEvent("Suma: node %d has completed restoring me", 00383 c_startup.m_restart_server_node_id); 00384 sendSTTORRY(signal); 00385 c_startup.m_restart_server_node_id= 0; 00386 } 00387 00388 void 00389 Suma::createSequence(Signal* signal) 00390 { 00391 jam(); 00392 DBUG_ENTER("Suma::createSequence"); 00393 00394 UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); 00395 00396 req->senderData = RNIL; 00397 req->sequenceId = SUMA_SEQUENCE; 00398 req->requestType = UtilSequenceReq::Create; 00399 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 00400 signal, UtilSequenceReq::SignalLength, JBB); 00401 // execUTIL_SEQUENCE_CONF will call createSequenceReply() 00402 DBUG_VOID_RETURN; 00403 } 00404 00405 void 00406 Suma::createSequenceReply(Signal* signal, 00407 UtilSequenceConf * conf, 00408 UtilSequenceRef * ref) 00409 { 00410 jam(); 00411 00412 if (ref != NULL) 00413 { 00414 switch ((UtilSequenceRef::ErrorCode)ref->errorCode) 00415 { 00416 case UtilSequenceRef::NoSuchSequence: 00417 ndbrequire(false); 00418 case UtilSequenceRef::TCError: 00419 { 00420 char buf[128]; 00421 snprintf(buf, sizeof(buf), 00422 "Startup failed during sequence creation. TC error %d", 00423 ref->TCErrorCode); 00424 progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf); 00425 } 00426 } 00427 ndbrequire(false); 00428 } 00429 00430 sendSTTORRY(signal); 00431 } 00432 00433 void 00434 Suma::execREAD_NODESCONF(Signal* signal){ 00435 jamEntry(); 00436 ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr(); 00437 00438 if(getNodeState().getNodeRestartInProgress()) 00439 { 00440 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes); 00441 c_alive_nodes.set(getOwnNodeId()); 00442 } 00443 else 00444 { 00445 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes); 00446 NdbNodeBitmask tmp; 00447 tmp.assign(NdbNodeBitmask::Size, conf->startedNodes); 00448 ndbrequire(tmp.isclear()); // No nodes can be started during SR 00449 } 00450 00451 c_masterNodeId = conf->masterNodeId; 00452 00453 sendSTTORRY(signal); 00454 } 00455 00456 void 00457 Suma::execAPI_START_REP(Signal* signal) 00458 { 00459 Uint32 nodeId = signal->theData[0]; 00460 c_connected_nodes.set(nodeId); 00461 00462 check_start_handover(signal); 00463 } 00464 00465 void 00466 Suma::check_start_handover(Signal* signal) 00467 { 00468 if(c_startup.m_wait_handover) 00469 { 00470 NodeBitmask tmp; 00471 tmp.assign(c_connected_nodes); 00472 tmp.bitAND(c_subscriber_nodes); 00473 if(!c_subscriber_nodes.equal(tmp)) 00474 { 00475 return; 00476 } 00477 00478 c_startup.m_wait_handover= false; 00479 send_handover_req(signal); 00480 } 00481 } 00482 00483 void 00484 Suma::send_handover_req(Signal* signal) 00485 { 00486 c_startup.m_handover_nodes.assign(c_alive_nodes); 00487 c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask); 00488 c_startup.m_handover_nodes.clear(getOwnNodeId()); 00489 Uint32 gci= m_last_complete_gci + 3; 00490 00491 SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend(); 00492 char buf[255]; 00493 c_startup.m_handover_nodes.getText(buf); 00494 infoEvent("Suma: initiate handover with nodes %s GCI: %d", 00495 buf, gci); 00496 00497 req->gci = gci; 00498 req->nodeId = getOwnNodeId(); 00499 00500 NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes); 00501 sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal, 00502 SumaHandoverReq::SignalLength, JBB); 00503 } 00504 00505 void 00506 Suma::sendSTTORRY(Signal* signal){ 00507 signal->theData[0] = 0; 00508 signal->theData[3] = 1; 00509 signal->theData[4] = 3; 00510 signal->theData[5] = 5; 00511 signal->theData[6] = 7; 00512 signal->theData[7] = 100; 00513 signal->theData[8] = 101; 00514 signal->theData[9] = 255; // No more start phases from missra 00515 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB); 00516 } 00517 00518 void 00519 Suma::execNDB_STTOR(Signal* signal) 00520 { 00521 jamEntry(); 00522 } 00523 00524 void 00525 Suma::execCONTINUEB(Signal* signal){ 00526 jamEntry(); 00527 Uint32 type= signal->theData[0]; 00528 switch(type){ 00529 case SumaContinueB::RELEASE_GCI: 00530 release_gci(signal, signal->theData[1], signal->theData[2]); 00531 return; 00532 case SumaContinueB::RESEND_BUCKET: 00533 resend_bucket(signal, 00534 signal->theData[1], 00535 signal->theData[2], 00536 signal->theData[3], 00537 signal->theData[4]); 00538 return; 00539 case SumaContinueB::OUT_OF_BUFFER_RELEASE: 00540 out_of_buffer_release(signal, signal->theData[1]); 00541 return; 00542 } 00543 } 00544 00545 /***************************************************************************** 00546 * 00547 * Node state handling 00548 * 00549 *****************************************************************************/ 00550 00551 void Suma::execAPI_FAILREQ(Signal* signal) 00552 { 00553 jamEntry(); 00554 DBUG_ENTER("Suma::execAPI_FAILREQ"); 00555 Uint32 failedApiNode = signal->theData[0]; 00556 //BlockReference retRef = signal->theData[1]; 00557 00558 c_failedApiNodes.set(failedApiNode); 00559 c_connected_nodes.clear(failedApiNode); 00560 bool found = removeSubscribersOnNode(signal, failedApiNode); 00561 00562 if(!found){ 00563 jam(); 00564 c_failedApiNodes.clear(failedApiNode); 00565 } 00566 00567 SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr(); 00568 Ptr<Gcp_record> gcp; 00569 for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp)) 00570 { 00571 ack->rep.gci = gcp.p->m_gci; 00572 if(gcp.p->m_subscribers.get(failedApiNode)) 00573 { 00574 ack->rep.senderRef = numberToRef(0, failedApiNode); 00575 sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal, 00576 SubGcpCompleteAck::SignalLength, JBB); 00577 } 00578 } 00579 00580 c_subscriber_nodes.clear(failedApiNode); 00581 00582 check_start_handover(signal); 00583 00584 DBUG_VOID_RETURN; 00585 }//execAPI_FAILREQ() 00586 00587 bool 00588 Suma::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) 00589 { 00590 DBUG_ENTER("Suma::removeSubscribersOnNode"); 00591 bool found = false; 00592 00593 KeyTable<Table>::Iterator it; 00594 for(c_tables.first(it);!it.isNull();c_tables.next(it)) 00595 { 00596 LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers); 00597 SubscriberPtr i_subbPtr; 00598 for(subbs.first(i_subbPtr);!i_subbPtr.isNull();) 00599 { 00600 SubscriberPtr subbPtr = i_subbPtr; 00601 subbs.next(i_subbPtr); 00602 jam(); 00603 if (refToNode(subbPtr.p->m_senderRef) == nodeId) { 00604 jam(); 00605 subbs.remove(subbPtr); 00606 c_removeDataSubscribers.add(subbPtr); 00607 found = true; 00608 } 00609 } 00610 if (subbs.isEmpty()) 00611 { 00612 // ToDo handle this 00613 } 00614 } 00615 if(found){ 00616 jam(); 00617 sendSubStopReq(signal); 00618 } 00619 DBUG_RETURN(found); 00620 } 00621 00622 void 00623 Suma::sendSubStopReq(Signal *signal, bool unlock){ 00624 static bool remove_lock = false; 00625 jam(); 00626 DBUG_ENTER("Suma::sendSubStopReq"); 00627 00628 SubscriberPtr subbPtr; 00629 c_removeDataSubscribers.first(subbPtr); 00630 if (subbPtr.isNull()){ 00631 jam(); 00632 #if 0 00633 signal->theData[0] = failedApiNode; 00634 signal->theData[1] = reference(); 00635 sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB); 00636 #endif 00637 c_failedApiNodes.clear(); 00638 00639 remove_lock = false; 00640 DBUG_VOID_RETURN; 00641 } 00642 00643 if(remove_lock && !unlock) { 00644 jam(); 00645 DBUG_VOID_RETURN; 00646 } 00647 remove_lock = true; 00648 00649 SubscriptionPtr subPtr; 00650 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 00651 00652 SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); 00653 req->senderRef = reference(); 00654 req->senderData = subbPtr.i; 00655 req->subscriberRef = subbPtr.p->m_senderRef; 00656 req->subscriberData = subbPtr.p->m_senderData; 00657 req->subscriptionId = subPtr.p->m_subscriptionId; 00658 req->subscriptionKey = subPtr.p->m_subscriptionKey; 00659 req->part = SubscriptionData::TableData; 00660 00661 sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB); 00662 DBUG_VOID_RETURN; 00663 } 00664 00665 void 00666 Suma::execSUB_STOP_CONF(Signal* signal){ 00667 jamEntry(); 00668 DBUG_ENTER("Suma::execSUB_STOP_CONF"); 00669 ndbassert(signal->getNoOfSections() == 0); 00670 sendSubStopReq(signal,true); 00671 DBUG_VOID_RETURN; 00672 } 00673 00674 void 00675 Suma::execSUB_STOP_REF(Signal* signal){ 00676 jamEntry(); 00677 DBUG_ENTER("Suma::execSUB_STOP_REF"); 00678 ndbassert(signal->getNoOfSections() == 0); 00679 00680 SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); 00681 00682 Uint32 senderData = ref->senderData; 00683 Uint32 subscriptionId = ref->subscriptionId; 00684 Uint32 subscriptionKey = ref->subscriptionKey; 00685 Uint32 part = ref->part; 00686 Uint32 subscriberData = ref->subscriberData; 00687 Uint32 subscriberRef = ref->subscriberRef; 00688 00689 if(ref->errorCode != 1411){ 00690 ndbrequire(false); 00691 } 00692 00693 SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); 00694 req->senderRef = reference(); 00695 req->senderData = senderData; 00696 req->subscriberRef = subscriberRef; 00697 req->subscriberData = subscriberData; 00698 req->subscriptionId = subscriptionId; 00699 req->subscriptionKey = subscriptionKey; 00700 req->part = part; 00701 00702 sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB); 00703 00704 DBUG_VOID_RETURN; 00705 } 00706 00707 void 00708 Suma::execNODE_FAILREP(Signal* signal){ 00709 jamEntry(); 00710 DBUG_ENTER("Suma::execNODE_FAILREP"); 00711 ndbassert(signal->getNoOfSections() == 0); 00712 00713 const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr(); 00714 NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes); 00715 00716 if(failed.get(Restart.nodeId)) 00717 { 00718 Restart.resetRestart(signal); 00719 } 00720 00721 signal->theData[0] = SumaContinueB::RESEND_BUCKET; 00722 00723 NdbNodeBitmask tmp; 00724 tmp.assign(c_alive_nodes); 00725 tmp.bitANDC(failed); 00726 00727 NdbNodeBitmask takeover_nodes; 00728 00729 if(c_nodes_in_nodegroup_mask.overlaps(failed)) 00730 { 00731 for( Uint32 i = 0; i < c_no_of_buckets; i++) 00732 { 00733 if(m_active_buckets.get(i)) 00734 continue; 00735 else if(m_switchover_buckets.get(i)) 00736 { 00737 Uint32 state= c_buckets[i].m_state; 00738 if((state & Bucket::BUCKET_HANDOVER) && 00739 failed.get(get_responsible_node(i))) 00740 { 00741 m_active_buckets.set(i); 00742 m_switchover_buckets.clear(i); 00743 ndbout_c("aborting handover"); 00744 } 00745 else if(state & Bucket::BUCKET_STARTING) 00746 { 00747 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 00748 "Nodefailure during SUMA takeover"); 00749 } 00750 } 00751 else if(get_responsible_node(i, tmp) == getOwnNodeId()) 00752 { 00753 start_resend(signal, i); 00754 } 00755 } 00756 } 00757 00758 c_alive_nodes.assign(tmp); 00759 00760 DBUG_VOID_RETURN; 00761 } 00762 00763 void 00764 Suma::execINCL_NODEREQ(Signal* signal){ 00765 jamEntry(); 00766 00767 //const Uint32 senderRef = signal->theData[0]; 00768 const Uint32 nodeId = signal->theData[1]; 00769 00770 ndbrequire(!c_alive_nodes.get(nodeId)); 00771 c_alive_nodes.set(nodeId); 00772 00773 #if 0 // if we include this DIH's got to be prepared, later if needed... 00774 signal->theData[0] = reference(); 00775 00776 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB); 00777 #endif 00778 } 00779 00780 void 00781 Suma::execSIGNAL_DROPPED_REP(Signal* signal){ 00782 jamEntry(); 00783 ndbrequire(false); 00784 } 00785 00786 /******************************************************************** 00787 * 00788 * Dump state 00789 * 00790 */ 00791 00792 static unsigned 00793 count_subscribers(const DLList<Suma::Subscriber> &subs) 00794 { 00795 unsigned n= 0; 00796 Suma::SubscriberPtr i_subbPtr; 00797 subs.first(i_subbPtr); 00798 while(!i_subbPtr.isNull()){ 00799 n++; 00800 subs.next(i_subbPtr); 00801 } 00802 return n; 00803 } 00804 00805 void 00806 Suma::execDUMP_STATE_ORD(Signal* signal){ 00807 jamEntry(); 00808 00809 Uint32 tCase = signal->theData[0]; 00810 #if 0 00811 if(tCase >= 8000 && tCase <= 8003){ 00812 SubscriptionPtr subPtr; 00813 c_subscriptions.getPtr(subPtr, g_subPtrI); 00814 00815 Ptr<SyncRecord> syncPtr; 00816 c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); 00817 00818 if(tCase == 8000){ 00819 syncPtr.p->startMeta(signal); 00820 } 00821 00822 if(tCase == 8001){ 00823 syncPtr.p->startScan(signal); 00824 } 00825 00826 if(tCase == 8002){ 00827 syncPtr.p->startTrigger(signal); 00828 } 00829 00830 if(tCase == 8003){ 00831 subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan; 00832 LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList); 00833 Uint32 tab = 0; 00834 Uint32 att[] = { 0, 1, 1 }; 00835 syncPtr.p->m_tableList.append(&tab, 1); 00836 attrs.append(att, 3); 00837 } 00838 } 00839 #endif 00840 if(tCase == 8004){ 00841 infoEvent("Suma: c_subscriberPool size: %d free: %d", 00842 c_subscriberPool.getSize(), 00843 c_subscriberPool.getNoOfFree()); 00844 00845 infoEvent("Suma: c_tablePool size: %d free: %d", 00846 c_tablePool.getSize(), 00847 c_tablePool.getNoOfFree()); 00848 00849 infoEvent("Suma: c_subscriptionPool size: %d free: %d", 00850 c_subscriptionPool.getSize(), 00851 c_subscriptionPool.getNoOfFree()); 00852 00853 infoEvent("Suma: c_syncPool size: %d free: %d", 00854 c_syncPool.getSize(), 00855 c_syncPool.getNoOfFree()); 00856 00857 infoEvent("Suma: c_dataBufferPool size: %d free: %d", 00858 c_dataBufferPool.getSize(), 00859 c_dataBufferPool.getNoOfFree()); 00860 00861 infoEvent("Suma: c_metaSubscribers count: %d", 00862 count_subscribers(c_metaSubscribers)); 00863 #if 0 00864 infoEvent("Suma: c_dataSubscribers count: %d", 00865 count_subscribers(c_dataSubscribers)); 00866 infoEvent("Suma: c_prepDataSubscribers count: %d", 00867 count_subscribers(c_prepDataSubscribers)); 00868 #endif 00869 infoEvent("Suma: c_removeDataSubscribers count: %d", 00870 count_subscribers(c_removeDataSubscribers)); 00871 } 00872 00873 if(tCase == 8005) 00874 { 00875 for(Uint32 i = 0; i<c_no_of_buckets; i++) 00876 { 00877 Bucket* ptr= c_buckets + i; 00878 infoEvent("Bucket %d %d%d-%x switch gci: %d max_acked_gci: %d max_gci: %d tail: %d head: %d", 00879 i, 00880 m_active_buckets.get(i), 00881 m_switchover_buckets.get(i), 00882 ptr->m_state, 00883 ptr->m_switchover_gci, 00884 ptr->m_max_acked_gci, 00885 ptr->m_buffer_head.m_max_gci, 00886 ptr->m_buffer_tail, 00887 ptr->m_buffer_head.m_page_id); 00888 } 00889 } 00890 } 00891 00892 /************************************************************* 00893 * 00894 * Creation of subscription id's 00895 * 00896 ************************************************************/ 00897 00898 void 00899 Suma::execCREATE_SUBID_REQ(Signal* signal) 00900 { 00901 jamEntry(); 00902 DBUG_ENTER("Suma::execCREATE_SUBID_REQ"); 00903 ndbassert(signal->getNoOfSections() == 0); 00904 CRASH_INSERTION(13001); 00905 00906 CreateSubscriptionIdReq const * req = 00907 (CreateSubscriptionIdReq*)signal->getDataPtr(); 00908 SubscriberPtr subbPtr; 00909 if(!c_subscriberPool.seize(subbPtr)){ 00910 jam(); 00911 sendSubIdRef(signal, req->senderRef, req->senderData, 1412); 00912 DBUG_VOID_RETURN; 00913 } 00914 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", 00915 c_subscriberPool.getSize(), 00916 c_subscriberPool.getNoOfFree())); 00917 00918 subbPtr.p->m_senderRef = req->senderRef; 00919 subbPtr.p->m_senderData = req->senderData; 00920 00921 UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); 00922 utilReq->senderData = subbPtr.i; 00923 utilReq->sequenceId = SUMA_SEQUENCE; 00924 utilReq->requestType = UtilSequenceReq::NextVal; 00925 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 00926 signal, UtilSequenceReq::SignalLength, JBB); 00927 00928 DBUG_VOID_RETURN; 00929 } 00930 00931 void 00932 Suma::execUTIL_SEQUENCE_CONF(Signal* signal) 00933 { 00934 jamEntry(); 00935 DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); 00936 ndbassert(signal->getNoOfSections() == 0); 00937 CRASH_INSERTION(13002); 00938 00939 UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); 00940 if(conf->requestType == UtilSequenceReq::Create) { 00941 jam(); 00942 createSequenceReply(signal, conf, NULL); 00943 DBUG_VOID_RETURN; 00944 } 00945 00946 Uint64 subId; 00947 memcpy(&subId,conf->sequenceValue,8); 00948 SubscriberPtr subbPtr; 00949 c_subscriberPool.getPtr(subbPtr,conf->senderData); 00950 00951 CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf; 00952 subconf->senderRef = reference(); 00953 subconf->senderData = subbPtr.p->m_senderData; 00954 subconf->subscriptionId = (Uint32)subId; 00955 subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF); 00956 00957 sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal, 00958 CreateSubscriptionIdConf::SignalLength, JBB); 00959 00960 c_subscriberPool.release(subbPtr); 00961 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", 00962 c_subscriberPool.getSize(), 00963 c_subscriberPool.getNoOfFree())); 00964 DBUG_VOID_RETURN; 00965 } 00966 00967 void 00968 Suma::execUTIL_SEQUENCE_REF(Signal* signal) 00969 { 00970 jamEntry(); 00971 DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); 00972 ndbassert(signal->getNoOfSections() == 0); 00973 UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); 00974 Uint32 err= ref->errorCode; 00975 00976 if(ref->requestType == UtilSequenceReq::Create) { 00977 jam(); 00978 createSequenceReply(signal, NULL, ref); 00979 DBUG_VOID_RETURN; 00980 } 00981 00982 Uint32 subData = ref->senderData; 00983 00984 SubscriberPtr subbPtr; 00985 c_subscriberPool.getPtr(subbPtr,subData); 00986 sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err); 00987 c_subscriberPool.release(subbPtr); 00988 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", 00989 c_subscriberPool.getSize(), 00990 c_subscriberPool.getNoOfFree())); 00991 DBUG_VOID_RETURN; 00992 }//execUTIL_SEQUENCE_REF() 00993 00994 00995 void 00996 Suma::sendSubIdRef(Signal* signal, 00997 Uint32 senderRef, Uint32 senderData, Uint32 errCode) 00998 { 00999 jam(); 01000 DBUG_ENTER("Suma::sendSubIdRef"); 01001 CreateSubscriptionIdRef * ref = 01002 (CreateSubscriptionIdRef *)signal->getDataPtrSend(); 01003 01004 ref->senderRef = reference(); 01005 ref->senderData = senderData; 01006 ref->errorCode = errCode; 01007 sendSignal(senderRef, 01008 GSN_CREATE_SUBID_REF, 01009 signal, 01010 CreateSubscriptionIdRef::SignalLength, 01011 JBB); 01012 01013 releaseSections(signal); 01014 DBUG_VOID_RETURN; 01015 } 01016 01017 /********************************************************** 01018 * Suma participant interface 01019 * 01020 * Creation of subscriptions 01021 */ 01022 01023 void 01024 Suma::addTableId(Uint32 tableId, 01025 SubscriptionPtr subPtr, SyncRecord *psyncRec) 01026 { 01027 DBUG_ENTER("Suma::addTableId"); 01028 DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId, subPtr.i)); 01029 subPtr.p->m_tableId= tableId; 01030 if(psyncRec != NULL) 01031 psyncRec->m_tableList.append(&tableId, 1); 01032 DBUG_VOID_RETURN; 01033 } 01034 01035 void 01036 Suma::execSUB_CREATE_REQ(Signal* signal) 01037 { 01038 jamEntry(); 01039 DBUG_ENTER("Suma::execSUB_CREATE_REQ"); 01040 ndbassert(signal->getNoOfSections() == 0); 01041 CRASH_INSERTION(13003); 01042 01043 const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr(); 01044 01045 const Uint32 subRef = req.senderRef; 01046 const Uint32 subData = req.senderData; 01047 const Uint32 subId = req.subscriptionId; 01048 const Uint32 subKey = req.subscriptionKey; 01049 const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags; 01050 const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags; 01051 const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0; 01052 const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0; 01053 const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ? 01054 Subscription::REPORT_ALL : 0; 01055 const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ? 01056 Subscription::REPORT_SUBSCRIBE : 0; 01057 const Uint32 tableId = req.tableId; 01058 Subscription::State state = (Subscription::State) req.state; 01059 if (signal->getLength() != SubCreateReq::SignalLength2) 01060 { 01061 /* 01062 api or restarted by older version 01063 if restarted by old version, do the best we can 01064 */ 01065 state = Subscription::DEFINED; 01066 } 01067 01068 Subscription key; 01069 key.m_subscriptionId = subId; 01070 key.m_subscriptionKey = subKey; 01071 01072 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u", 01073 key.m_subscriptionId, key.m_subscriptionKey)); 01074 01075 SubscriptionPtr subPtr; 01076 01077 if (addTableFlag) { 01078 ndbrequire(restartFlag); //TODO remove this 01079 01080 if(!c_subscriptions.find(subPtr, key)) { 01081 jam(); 01082 sendSubCreateRef(signal, 1407); 01083 DBUG_VOID_RETURN; 01084 } 01085 jam(); 01086 if (restartFlag) 01087 { 01088 ndbrequire(type != SubCreateReq::SingleTableScan); 01089 ndbrequire(req.tableId != subPtr.p->m_tableId); 01090 ndbrequire(type != SubCreateReq::TableEvent); 01091 addTableId(req.tableId, subPtr, 0); 01092 } 01093 } else { 01094 if (c_startup.m_restart_server_node_id && 01095 refToNode(subRef) != c_startup.m_restart_server_node_id) 01096 { 01101 jam(); 01102 sendSubStartRef(signal, 1405); 01103 DBUG_VOID_RETURN; 01104 } 01105 // Check that id/key is unique 01106 if(c_subscriptions.find(subPtr, key)) { 01107 jam(); 01108 sendSubCreateRef(signal, 1415); 01109 DBUG_VOID_RETURN; 01110 } 01111 if(!c_subscriptions.seize(subPtr)) { 01112 jam(); 01113 sendSubCreateRef(signal, 1412); 01114 DBUG_VOID_RETURN; 01115 } 01116 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d", 01117 c_subscriptionPool.getSize(), 01118 c_subscriptionPool.getNoOfFree())); 01119 jam(); 01120 subPtr.p->m_senderRef = subRef; 01121 subPtr.p->m_senderData = subData; 01122 subPtr.p->m_subscriptionId = subId; 01123 subPtr.p->m_subscriptionKey = subKey; 01124 subPtr.p->m_subscriptionType = type; 01125 subPtr.p->m_options = reportSubscribe | reportAll; 01126 subPtr.p->m_tableId = tableId; 01127 subPtr.p->m_table_ptrI = RNIL; 01128 subPtr.p->m_state = state; 01129 subPtr.p->n_subscribers = 0; 01130 subPtr.p->m_current_sync_ptrI = RNIL; 01131 01132 fprintf(stderr, "table %d options %x\n", subPtr.p->m_tableId, subPtr.p->m_options); 01133 DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u", 01134 key.m_subscriptionId, key.m_subscriptionKey)); 01135 01136 c_subscriptions.add(subPtr); 01137 } 01138 01139 SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend(); 01140 conf->senderRef = reference(); 01141 conf->senderData = subPtr.p->m_senderData; 01142 sendSignal(subRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB); 01143 DBUG_VOID_RETURN; 01144 } 01145 01146 void 01147 Suma::sendSubCreateRef(Signal* signal, Uint32 errCode) 01148 { 01149 jam(); 01150 SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend(); 01151 ref->errorCode = errCode; 01152 sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 01153 SubCreateRef::SignalLength, JBB); 01154 return; 01155 } 01156 01157 /********************************************************** 01158 * 01159 * Setting upp trigger for subscription 01160 * 01161 */ 01162 01163 void 01164 Suma::execSUB_SYNC_REQ(Signal* signal) 01165 { 01166 jamEntry(); 01167 DBUG_ENTER("Suma::execSUB_SYNC_REQ"); 01168 ndbassert(signal->getNoOfSections() <= 1); 01169 CRASH_INSERTION(13004); 01170 01171 SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr(); 01172 01173 SubscriptionPtr subPtr; 01174 Subscription key; 01175 key.m_subscriptionId = req->subscriptionId; 01176 key.m_subscriptionKey = req->subscriptionKey; 01177 01178 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u", 01179 key.m_subscriptionId, key.m_subscriptionKey)); 01180 01181 if(!c_subscriptions.find(subPtr, key)) 01182 { 01183 jam(); 01184 DBUG_PRINT("info",("Not found")); 01185 sendSubSyncRef(signal, 1407); 01186 DBUG_VOID_RETURN; 01187 } 01188 01189 bool ok = false; 01190 SubscriptionData::Part part = (SubscriptionData::Part)req->part; 01191 01192 Ptr<SyncRecord> syncPtr; 01193 if(!c_syncPool.seize(syncPtr)) 01194 { 01195 jam(); 01196 sendSubSyncRef(signal, 1416); 01197 DBUG_VOID_RETURN; 01198 } 01199 DBUG_PRINT("info",("c_syncPool size: %d free: %d", 01200 c_syncPool.getSize(), 01201 c_syncPool.getNoOfFree())); 01202 01203 syncPtr.p->m_senderRef = req->senderRef; 01204 syncPtr.p->m_senderData = req->senderData; 01205 syncPtr.p->m_subscriptionPtrI = subPtr.i; 01206 syncPtr.p->ptrI = syncPtr.i; 01207 syncPtr.p->m_error = 0; 01208 01209 subPtr.p->m_current_sync_ptrI = syncPtr.i; 01210 01211 { 01212 jam(); 01213 syncPtr.p->m_tableList.append(&subPtr.p->m_tableId, 1); 01214 if(signal->getNoOfSections() > 0){ 01215 SegmentedSectionPtr ptr; 01216 signal->getSection(ptr, SubSyncReq::ATTRIBUTE_LIST); 01217 LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList); 01218 append(attrBuf, ptr, getSectionSegmentPool()); 01219 releaseSections(signal); 01220 } 01221 } 01222 01223 TablePtr tabPtr; 01224 initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr); 01225 tabPtr.p->n_subscribers++; 01226 if (subPtr.p->m_options & Subscription::REPORT_ALL) 01227 tabPtr.p->m_reportAll = true; 01228 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 01229 tabPtr.p->m_tableId, tabPtr.p->n_subscribers)); 01230 DBUG_VOID_RETURN; 01231 01232 switch(part){ 01233 case SubscriptionData::MetaData: 01234 ndbrequire(false); 01235 #if 0 01236 ok = true; 01237 jam(); 01238 if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) { 01239 TableList::DataBufferIterator it; 01240 syncPtr.p->m_tableList.first(it); 01241 if(it.isNull()) { 01245 ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend(); 01246 req->senderRef = reference(); 01247 req->senderData = syncPtr.i; 01248 req->requestData = 0; 01252 req->setTableType(DictTabInfo::UserTable); 01253 01254 sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal, 01255 ListTablesReq::SignalLength, JBB); 01256 break; 01257 } 01258 } 01259 01260 syncPtr.p->startMeta(signal); 01261 #endif 01262 break; 01263 case SubscriptionData::TableData: { 01264 ok = true; 01265 jam(); 01266 syncPtr.p->startScan(signal); 01267 break; 01268 } 01269 } 01270 ndbrequire(ok); 01271 DBUG_VOID_RETURN; 01272 } 01273 01274 void 01275 Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){ 01276 jam(); 01277 SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend(); 01278 ref->errorCode = errCode; 01279 releaseSections(signal); 01280 sendSignal(signal->getSendersBlockRef(), 01281 GSN_SUB_SYNC_REF, 01282 signal, 01283 SubSyncRef::SignalLength, 01284 JBB); 01285 return; 01286 } 01287 01288 /********************************************************** 01289 * Dict interface 01290 */ 01291 01292 #if 0 01293 void 01294 Suma::execLIST_TABLES_CONF(Signal* signal){ 01295 jamEntry(); 01296 CRASH_INSERTION(13005); 01297 ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr(); 01298 SyncRecord* tmp = c_syncPool.getPtr(conf->senderData); 01299 tmp->runLIST_TABLES_CONF(signal); 01300 } 01301 #endif 01302 01303 01304 /************************************************************************* 01305 * 01306 * 01307 */ 01308 #if 0 01309 void 01310 Suma::Table::runLIST_TABLES_CONF(Signal* signal){ 01311 jam(); 01312 01313 ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr(); 01314 const Uint32 len = signal->length() - ListTablesConf::HeaderLength; 01315 01316 SubscriptionPtr subPtr; 01317 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); 01318 01319 for (unsigned i = 0; i < len; i++) { 01320 subPtr.p->m_maxTables++; 01321 suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this); 01322 } 01323 01324 // for (unsigned i = 0; i < len; i++) 01325 // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]); 01326 // m_tableList.append(&conf->tableData[0], len); 01327 01328 #if 0 01329 TableList::DataBufferIterator it; 01330 int i = 0; 01331 for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) { 01332 ndbout_c("%u listtableconf tableid %d", i++, *it.data); 01333 } 01334 #endif 01335 01336 if(len == ListTablesConf::DataLength){ 01337 jam(); 01338 // we expect more LIST_TABLE_CONF 01339 return; 01340 } 01341 01342 #if 0 01343 subPtr.p->m_currentTable = 0; 01344 subPtr.p->m_maxTables = 0; 01345 01346 TableList::DataBufferIterator it; 01347 for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) { 01348 subPtr.p->m_maxTables++; 01349 suma.addTableId(*it.data, subPtr, NULL); 01350 #ifdef NODEFAIL_DEBUG 01351 ndbout_c(" listtableconf tableid %d",*it.data); 01352 #endif 01353 } 01354 #endif 01355 01356 startMeta(signal); 01357 } 01358 #endif 01359 01360 01361 int 01362 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr, 01363 SubscriberPtr subbPtr) 01364 { 01365 DBUG_ENTER("Suma::initTable SubscriberPtr"); 01366 DBUG_PRINT("enter",("tableId: %d", tableId)); 01367 01368 int r= initTable(signal,tableId,tabPtr); 01369 01370 { 01371 LocalDLList<Subscriber> subscribers(c_subscriberPool, 01372 tabPtr.p->c_subscribers); 01373 subscribers.add(subbPtr); 01374 } 01375 01376 DBUG_PRINT("info",("added subscriber: %i", subbPtr.i)); 01377 01378 if (r) 01379 { 01380 // we have to wait getting tab info 01381 DBUG_RETURN(1); 01382 } 01383 01384 if (tabPtr.p->setupTrigger(signal, *this)) 01385 { 01386 // we have to wait for triggers to be setup 01387 DBUG_RETURN(1); 01388 } 01389 01390 completeOneSubscriber(signal, tabPtr, subbPtr); 01391 completeInitTable(signal, tabPtr); 01392 DBUG_RETURN(0); 01393 } 01394 01395 int 01396 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr, 01397 Ptr<SyncRecord> syncPtr) 01398 { 01399 jam(); 01400 DBUG_ENTER("Suma::initTable Ptr<SyncRecord>"); 01401 DBUG_PRINT("enter",("tableId: %d", tableId)); 01402 01403 int r= initTable(signal,tableId,tabPtr); 01404 01405 { 01406 LocalDLList<SyncRecord> syncRecords(c_syncPool,tabPtr.p->c_syncRecords); 01407 syncRecords.add(syncPtr); 01408 } 01409 01410 if (r) 01411 { 01412 // we have to wait getting tab info 01413 DBUG_RETURN(1); 01414 } 01415 completeInitTable(signal, tabPtr); 01416 DBUG_RETURN(0); 01417 } 01418 01419 int 01420 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr) 01421 { 01422 jam(); 01423 DBUG_ENTER("Suma::initTable"); 01424 01425 if (!c_tables.find(tabPtr, tableId) || 01426 tabPtr.p->m_state == Table::DROPPED || 01427 tabPtr.p->m_state == Table::ALTERED) 01428 { 01429 // table not being prepared 01430 // seize a new table, initialize and add to c_tables 01431 ndbrequire(c_tablePool.seize(tabPtr)); 01432 DBUG_PRINT("info",("c_tablePool size: %d free: %d", 01433 c_tablePool.getSize(), 01434 c_tablePool.getNoOfFree())); 01435 new (tabPtr.p) Table; 01436 01437 tabPtr.p->m_tableId= tableId; 01438 tabPtr.p->m_ptrI= tabPtr.i; 01439 tabPtr.p->n_subscribers = 0; 01440 DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u", 01441 tabPtr.p->m_tableId, tabPtr.i, tabPtr.p->n_subscribers)); 01442 01443 tabPtr.p->m_reportAll = false; 01444 01445 tabPtr.p->m_error = 0; 01446 tabPtr.p->m_schemaVersion = RNIL; 01447 tabPtr.p->m_state = Table::DEFINING; 01448 tabPtr.p->m_drop_subbPtr.p = 0; 01449 for (int j= 0; j < 3; j++) 01450 { 01451 tabPtr.p->m_hasTriggerDefined[j] = 0; 01452 tabPtr.p->m_hasOutstandingTriggerReq[j] = 0; 01453 tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID; 01454 } 01455 01456 c_tables.add(tabPtr); 01457 01458 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); 01459 req->senderRef = reference(); 01460 req->senderData = tabPtr.i; 01461 req->requestType = 01462 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; 01463 req->tableId = tableId; 01464 01465 DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId)); 01466 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 01467 GetTabInfoReq::SignalLength, JBB); 01468 DBUG_RETURN(1); 01469 } 01470 if (tabPtr.p->m_state == Table::DEFINING) 01471 { 01472 DBUG_RETURN(1); 01473 } 01474 // ToDo should be a ref signal instead 01475 ndbrequire(tabPtr.p->m_state == Table::DEFINED); 01476 DBUG_RETURN(0); 01477 } 01478 01479 void 01480 Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr) 01481 { 01482 jam(); 01483 DBUG_ENTER("Suma::completeOneSubscriber"); 01484 01485 if (tabPtr.p->m_error && 01486 (c_startup.m_restart_server_node_id == 0 || 01487 tabPtr.p->m_state != Table::DROPPED)) 01488 { 01489 sendSubStartRef(signal,subbPtr,tabPtr.p->m_error, 01490 SubscriptionData::TableData); 01491 tabPtr.p->n_subscribers--; 01492 } 01493 else 01494 { 01495 SubscriptionPtr subPtr; 01496 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 01497 subPtr.p->m_table_ptrI= tabPtr.i; 01498 sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3, 01499 SubscriptionData::TableData); 01500 } 01501 DBUG_VOID_RETURN; 01502 } 01503 01504 void 01505 Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr) 01506 { 01507 jam(); 01508 DBUG_ENTER("Suma::completeAllSubscribers"); 01509 // handle all subscribers 01510 { 01511 LocalDLList<Subscriber> subscribers(c_subscriberPool, 01512 tabPtr.p->c_subscribers); 01513 SubscriberPtr subbPtr; 01514 for(subscribers.first(subbPtr); 01515 !subbPtr.isNull(); 01516 subscribers.next(subbPtr)) 01517 { 01518 completeOneSubscriber(signal, tabPtr, subbPtr); 01519 } 01520 } 01521 DBUG_VOID_RETURN; 01522 } 01523 01524 void 01525 Suma::completeInitTable(Signal *signal, TablePtr tabPtr) 01526 { 01527 jam(); 01528 DBUG_ENTER("Suma::completeInitTable"); 01529 01530 // handle all syncRecords 01531 while (!tabPtr.p->c_syncRecords.isEmpty()) 01532 { 01533 Ptr<SyncRecord> syncPtr; 01534 { 01535 LocalDLList<SyncRecord> syncRecords(c_syncPool, 01536 tabPtr.p->c_syncRecords); 01537 syncRecords.first(syncPtr); 01538 syncRecords.remove(syncPtr); 01539 } 01540 syncPtr.p->ptrI = syncPtr.i; 01541 if (tabPtr.p->m_error == 0) 01542 { 01543 jam(); 01544 syncPtr.p->startScan(signal); 01545 } 01546 else 01547 { 01548 jam(); 01549 syncPtr.p->completeScan(signal, tabPtr.p->m_error); 01550 tabPtr.p->n_subscribers--; 01551 } 01552 } 01553 01554 if (tabPtr.p->m_error) 01555 { 01556 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 01557 tabPtr.p->m_tableId, tabPtr.p->n_subscribers)); 01558 tabPtr.p->checkRelease(*this); 01559 } 01560 else 01561 { 01562 tabPtr.p->m_state = Table::DEFINED; 01563 } 01564 01565 DBUG_VOID_RETURN; 01566 } 01567 01568 01569 void 01570 Suma::execGET_TABINFOREF(Signal* signal){ 01571 jamEntry(); 01572 GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr(); 01573 Uint32 tableId = ref->tableId; 01574 Uint32 senderData = ref->senderData; 01575 GetTabInfoRef::ErrorCode errorCode = 01576 (GetTabInfoRef::ErrorCode) ref->errorCode; 01577 int do_resend_request = 0; 01578 TablePtr tabPtr; 01579 c_tablePool.getPtr(tabPtr, senderData); 01580 switch (errorCode) 01581 { 01582 case GetTabInfoRef::TableNotDefined: 01583 // wrong state 01584 break; 01585 case GetTabInfoRef::InvalidTableId: 01586 // no such table 01587 break; 01588 case GetTabInfoRef::Busy: 01589 do_resend_request = 1; 01590 break; 01591 case GetTabInfoRef::TableNameTooLong: 01592 ndbrequire(false); 01593 } 01594 if (do_resend_request) 01595 { 01596 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); 01597 req->senderRef = reference(); 01598 req->senderData = senderData; 01599 req->requestType = 01600 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; 01601 req->tableId = tableId; 01602 sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 01603 30, GetTabInfoReq::SignalLength); 01604 return; 01605 } 01606 tabPtr.p->m_state = Table::DROPPED; 01607 tabPtr.p->m_error = errorCode; 01608 completeAllSubscribers(signal, tabPtr); 01609 completeInitTable(signal, tabPtr); 01610 } 01611 01612 void 01613 Suma::execGET_TABINFO_CONF(Signal* signal){ 01614 jamEntry(); 01615 01616 CRASH_INSERTION(13006); 01617 01618 if(!assembleFragments(signal)){ 01619 return; 01620 } 01621 01622 GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr(); 01623 Uint32 tableId = conf->tableId; 01624 TablePtr tabPtr; 01625 c_tablePool.getPtr(tabPtr, conf->senderData); 01626 SegmentedSectionPtr ptr; 01627 signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); 01628 ndbrequire(tabPtr.p->parseTable(ptr, *this)); 01629 releaseSections(signal); 01633 jam(); 01634 DihFragCountReq* req = (DihFragCountReq*)signal->getDataPtrSend(); 01635 req->m_connectionData = RNIL; 01636 req->m_tableRef = tableId; 01637 req->m_senderData = tabPtr.i; 01638 sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 01639 DihFragCountReq::SignalLength, JBB); 01640 } 01641 01642 bool 01643 Suma::Table::parseTable(SegmentedSectionPtr ptr, 01644 Suma &suma) 01645 { 01646 DBUG_ENTER("Suma::Table::parseTable"); 01647 01648 SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool()); 01649 01650 SimpleProperties::UnpackStatus s; 01651 DictTabInfo::Table tableDesc; tableDesc.init(); 01652 s = SimpleProperties::unpack(it, &tableDesc, 01653 DictTabInfo::TableMapping, 01654 DictTabInfo::TableMappingSize, 01655 true, true); 01656 01657 jam(); 01658 suma.suma_ndbrequire(s == SimpleProperties::Break); 01659 01660 #if 0 01661 ToDo handle this 01662 if(m_schemaVersion != tableDesc.TableVersion){ 01663 jam(); 01664 01665 release(* this); 01666 01667 // oops wrong schema version in stored tabledesc 01668 // we need to find all subscriptions with old table desc 01669 // and all subscribers to this 01670 // hopefully none 01671 c_tables.release(tabPtr); 01672 DBUG_PRINT("info",("c_tablePool size: %d free: %d", 01673 suma.c_tablePool.getSize(), 01674 suma.c_tablePool.getNoOfFree())); 01675 tabPtr.setNull(); 01676 DLHashTable<Suma::Subscription>::Iterator i_subPtr; 01677 c_subscriptions.first(i_subPtr); 01678 SubscriptionPtr subPtr; 01679 for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){ 01680 jam(); 01681 c_subscriptions.getPtr(subPtr, i_subPtr.curr.i); 01682 SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI); 01683 if (tmp == syncPtr_p) { 01684 jam(); 01685 continue; 01686 } 01687 if (subPtr.p->m_tables.get(tableId)) { 01688 jam(); 01689 subPtr.p->m_tables.clear(tableId); // remove this old table reference 01690 TableList::DataBufferIterator it; 01691 for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) { 01692 jam(); 01693 if (*it.data == tableId){ 01694 jam(); 01695 Uint32 *pdata = it.data; 01696 tmp->m_tableList.next(it); 01697 for(;!it.isNull();tmp->m_tableList.next(it)) { 01698 jam(); 01699 *pdata = *it.data; 01700 pdata = it.data; 01701 } 01702 *pdata = RNIL; // todo remove this last item... 01703 break; 01704 } 01705 } 01706 } 01707 } 01708 } 01709 #endif 01710 01711 if(m_attributes.getSize() != 0){ 01712 jam(); 01713 DBUG_RETURN(true); 01714 } 01715 01719 Uint32 noAttribs = tableDesc.NoOfAttributes; 01720 Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable); 01721 m_schemaVersion = tableDesc.TableVersion; 01722 01723 // The attribute buffer 01724 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes); 01725 01726 // Temporary buffer 01727 DataBuffer<15> theRest(suma.c_dataBufferPool); 01728 01729 if(!attrBuf.seize(noAttribs)){ 01730 jam(); 01731 suma.suma_ndbrequire(false); 01732 DBUG_RETURN(false); 01733 } 01734 01735 if(!theRest.seize(notFixed)){ 01736 jam(); 01737 suma.suma_ndbrequire(false); 01738 DBUG_RETURN(false); 01739 } 01740 01741 DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable 01742 DataBuffer<15>::DataBufferIterator restIt; // variable + nullable 01743 attrBuf.first(attrIt); 01744 theRest.first(restIt); 01745 01746 for(Uint32 i = 0; i < noAttribs; i++) { 01747 DictTabInfo::Attribute attrDesc; attrDesc.init(); 01748 s = SimpleProperties::unpack(it, &attrDesc, 01749 DictTabInfo::AttributeMapping, 01750 DictTabInfo::AttributeMappingSize, 01751 true, true); 01752 jam(); 01753 suma.suma_ndbrequire(s == SimpleProperties::Break); 01754 01755 if (!attrDesc.AttributeNullableFlag 01756 /* && !attrDesc.AttributeVariableFlag */) { 01757 jam(); 01758 * attrIt.data = attrDesc.AttributeId; 01759 attrBuf.next(attrIt); 01760 } else { 01761 jam(); 01762 * restIt.data = attrDesc.AttributeId; 01763 theRest.next(restIt); 01764 } 01765 01766 // Move to next attribute 01767 it.next(); 01768 } 01769 01773 theRest.first(restIt); 01774 for(; !restIt.isNull(); theRest.next(restIt)){ 01775 * attrIt.data = * restIt.data; 01776 attrBuf.next(attrIt); 01777 } 01778 01779 theRest.release(); 01780 01781 DBUG_RETURN(true); 01782 } 01783 01784 void 01785 Suma::execDI_FCOUNTREF(Signal* signal) 01786 { 01787 jamEntry(); 01788 DBUG_ENTER("Suma::execDI_FCOUNTREF"); 01789 DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr(); 01790 switch ((DihFragCountRef::ErrorCode) ref->m_error) 01791 { 01792 case DihFragCountRef::ErroneousTableState: 01793 jam(); 01794 if (ref->m_tableStatus == Dbdih::TabRecord::TS_CREATING) 01795 { 01796 const Uint32 tableId = ref->m_senderData; 01797 const Uint32 tabPtr_i = ref->m_tableRef; 01798 DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend(); 01799 01800 req->m_connectionData = RNIL; 01801 req->m_tableRef = tabPtr_i; 01802 req->m_senderData = tableId; 01803 sendSignalWithDelay(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 01804 DihFragCountReq::SignalLength, 01805 DihFragCountReq::RetryInterval); 01806 DBUG_VOID_RETURN; 01807 } 01808 ndbrequire(false); 01809 default: 01810 ndbrequire(false); 01811 } 01812 01813 DBUG_VOID_RETURN; 01814 } 01815 01816 void 01817 Suma::execDI_FCOUNTCONF(Signal* signal) 01818 { 01819 jamEntry(); 01820 DBUG_ENTER("Suma::execDI_FCOUNTCONF"); 01821 ndbassert(signal->getNoOfSections() == 0); 01822 DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr(); 01823 const Uint32 userPtr = conf->m_connectionData; 01824 const Uint32 fragCount = conf->m_fragmentCount; 01825 const Uint32 tableId = conf->m_tableRef; 01826 01827 ndbrequire(userPtr == RNIL && signal->length() == 5); 01828 01829 TablePtr tabPtr; 01830 tabPtr.i= conf->m_senderData; 01831 ndbrequire((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) != 0); 01832 ndbrequire(tabPtr.p->m_tableId == tableId); 01833 01834 LocalDataBuffer<15> fragBuf(c_dataBufferPool, tabPtr.p->m_fragments); 01835 ndbrequire(fragBuf.getSize() == 0); 01836 01837 tabPtr.p->m_fragCount = fragCount; 01838 01839 signal->theData[0] = RNIL; 01840 signal->theData[1] = tabPtr.i; 01841 signal->theData[2] = tableId; 01842 signal->theData[3] = 0; // Frag no 01843 sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB); 01844 01845 DBUG_VOID_RETURN; 01846 } 01847 01848 void 01849 Suma::execDIGETPRIMCONF(Signal* signal){ 01850 jamEntry(); 01851 DBUG_ENTER("Suma::execDIGETPRIMCONF"); 01852 ndbassert(signal->getNoOfSections() == 0); 01853 01854 const Uint32 userPtr = signal->theData[0]; 01855 const Uint32 nodeCount = signal->theData[6]; 01856 const Uint32 tableId = signal->theData[7]; 01857 const Uint32 fragNo = signal->theData[8]; 01858 01859 ndbrequire(userPtr == RNIL && signal->length() == 9); 01860 ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); 01861 01862 TablePtr tabPtr; 01863 tabPtr.i= signal->theData[1]; 01864 ndbrequire((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) != 0); 01865 ndbrequire(tabPtr.p->m_tableId == tableId); 01866 01867 { 01868 LocalDataBuffer<15> fragBuf(c_dataBufferPool,tabPtr.p->m_fragments); 01869 01873 FragmentDescriptor fd; 01874 fd.m_fragDesc.m_nodeId = signal->theData[2]; 01875 fd.m_fragDesc.m_fragmentNo = fragNo; 01876 signal->theData[2] = fd.m_dummy; 01877 fragBuf.append(&signal->theData[2], 1); 01878 } 01879 01880 const Uint32 nextFrag = fragNo + 1; 01881 if(nextFrag == tabPtr.p->m_fragCount) 01882 { 01888 if (tabPtr.p->c_subscribers.isEmpty()) 01889 { 01890 completeInitTable(signal,tabPtr); 01891 DBUG_VOID_RETURN; 01892 } 01893 tabPtr.p->setupTrigger(signal, *this); 01894 DBUG_VOID_RETURN; 01895 } 01896 signal->theData[0] = RNIL; 01897 signal->theData[1] = tabPtr.i; 01898 signal->theData[2] = tableId; 01899 signal->theData[3] = nextFrag; // Frag no 01900 sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB); 01901 01902 DBUG_VOID_RETURN; 01903 } 01904 01905 #if 0 01906 void 01907 Suma::SyncRecord::completeTableInit(Signal* signal) 01908 { 01909 jam(); 01910 SubscriptionPtr subPtr; 01911 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); 01912 01913 #if PRINT_ONLY 01914 ndbout_c("GSN_SUB_SYNC_CONF (meta)"); 01915 #else 01916 01917 suma.releaseSections(signal); 01918 01919 if (m_error) { 01920 SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend(); 01921 ref->senderRef = suma.reference(); 01922 ref->senderData = subPtr.p->m_senderData; 01923 ref->errorCode = SubSyncRef::Undefined; 01924 suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_REF, signal, 01925 SubSyncRef::SignalLength, JBB); 01926 } else { 01927 SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); 01928 conf->senderRef = suma.reference(); 01929 conf->senderData = subPtr.p->m_senderData; 01930 suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_CONF, signal, 01931 SubSyncConf::SignalLength, JBB); 01932 } 01933 #endif 01934 } 01935 #endif 01936 01937 /********************************************************** 01938 * 01939 * Scan interface 01940 * 01941 */ 01942 01943 void 01944 Suma::SyncRecord::startScan(Signal* signal) 01945 { 01946 jam(); 01947 DBUG_ENTER("Suma::SyncRecord::startScan"); 01948 01952 m_currentTable = 0; 01953 m_currentFragment = 0; 01954 nextScan(signal); 01955 DBUG_VOID_RETURN; 01956 } 01957 01958 bool 01959 Suma::SyncRecord::getNextFragment(TablePtr * tab, 01960 FragmentDescriptor * fd) 01961 { 01962 jam(); 01963 SubscriptionPtr subPtr; 01964 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); 01965 TableList::DataBufferIterator tabIt; 01966 DataBuffer<15>::DataBufferIterator fragIt; 01967 01968 m_tableList.position(tabIt, m_currentTable); 01969 for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++) 01970 { 01971 TablePtr tabPtr; 01972 ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data)); 01973 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); 01974 01975 fragBuf.position(fragIt, m_currentFragment); 01976 for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++) 01977 { 01978 FragmentDescriptor tmp; 01979 tmp.m_dummy = * fragIt.data; 01980 if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){ 01981 * fd = tmp; 01982 * tab = tabPtr; 01983 return true; 01984 } 01985 } 01986 m_currentFragment = 0; 01987 01988 tabPtr.p->n_subscribers--; 01989 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 01990 tabPtr.p->m_tableId, tabPtr.p->n_subscribers)); 01991 tabPtr.p->checkRelease(suma); 01992 } 01993 return false; 01994 } 01995 01996 void 01997 Suma::SyncRecord::nextScan(Signal* signal) 01998 { 01999 jam(); 02000 DBUG_ENTER("Suma::SyncRecord::nextScan"); 02001 TablePtr tabPtr; 02002 FragmentDescriptor fd; 02003 SubscriptionPtr subPtr; 02004 if(!getNextFragment(&tabPtr, &fd)){ 02005 jam(); 02006 completeScan(signal); 02007 DBUG_VOID_RETURN; 02008 } 02009 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); 02010 02011 DataBuffer<15>::Head head = m_attributeList; 02012 if(head.getSize() == 0){ 02013 head = tabPtr.p->m_attributes; 02014 } 02015 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head); 02016 02017 ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend(); 02018 const Uint32 parallelism = 16; 02019 const Uint32 attrLen = 5 + attrBuf.getSize(); 02020 02021 req->senderData = ptrI; 02022 req->resultRef = suma.reference(); 02023 req->tableId = tabPtr.p->m_tableId; 02024 req->requestInfo = 0; 02025 req->savePointId = 0; 02026 ScanFragReq::setLockMode(req->requestInfo, 0); 02027 ScanFragReq::setHoldLockFlag(req->requestInfo, 1); 02028 ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); 02029 ScanFragReq::setAttrLen(req->requestInfo, attrLen); 02030 req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; 02031 req->schemaVersion = tabPtr.p->m_schemaVersion; 02032 req->transId1 = 0; 02033 req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); 02034 req->clientOpPtr = (ptrI << 16); 02035 req->batch_size_rows= parallelism; 02036 req->batch_size_bytes= 0; 02037 suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 02038 ScanFragReq::SignalLength, JBB); 02039 02040 signal->theData[0] = ptrI; 02041 signal->theData[1] = 0; 02042 signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8); 02043 02044 // Return all 02045 signal->theData[3] = attrBuf.getSize(); 02046 signal->theData[4] = 0; 02047 signal->theData[5] = 0; 02048 signal->theData[6] = 0; 02049 signal->theData[7] = 0; 02050 02051 Uint32 dataPos = 8; 02052 DataBuffer<15>::DataBufferIterator it; 02053 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ 02054 AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0); 02055 if(dataPos == 25){ 02056 suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB); 02057 dataPos = 3; 02058 } 02059 } 02060 if(dataPos != 3){ 02061 suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB); 02062 } 02063 02064 m_currentTableId = tabPtr.p->m_tableId; 02065 m_currentNoOfAttributes = attrBuf.getSize(); 02066 02067 DBUG_VOID_RETURN; 02068 } 02069 02070 02071 void 02072 Suma::execSCAN_FRAGREF(Signal* signal){ 02073 jamEntry(); 02074 02075 // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr(); 02076 ndbrequire(false); 02077 } 02078 02079 void 02080 Suma::execSCAN_FRAGCONF(Signal* signal){ 02081 jamEntry(); 02082 DBUG_ENTER("Suma::execSCAN_FRAGCONF"); 02083 ndbassert(signal->getNoOfSections() == 0); 02084 CRASH_INSERTION(13011); 02085 02086 ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr(); 02087 02088 const Uint32 completed = conf->fragmentCompleted; 02089 const Uint32 senderData = conf->senderData; 02090 const Uint32 completedOps = conf->completedOps; 02091 02092 Ptr<SyncRecord> syncPtr; 02093 c_syncPool.getPtr(syncPtr, senderData); 02094 02095 if(completed != 2){ 02096 jam(); 02097 02098 #if PRINT_ONLY 02099 SubSyncContinueConf * const conf = 02100 (SubSyncContinueConf*)signal->getDataPtrSend(); 02101 conf->subscriptionId = subPtr.p->m_subscriptionId; 02102 conf->subscriptionKey = subPtr.p->m_subscriptionKey; 02103 execSUB_SYNC_CONTINUE_CONF(signal); 02104 #else 02105 SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend(); 02106 req->subscriberData = syncPtr.p->m_senderData; 02107 req->noOfRowsSent = completedOps; 02108 sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal, 02109 SubSyncContinueReq::SignalLength, JBB); 02110 #endif 02111 DBUG_VOID_RETURN; 02112 } 02113 02114 ndbrequire(completedOps == 0); 02115 02116 syncPtr.p->m_currentFragment++; 02117 syncPtr.p->nextScan(signal); 02118 DBUG_VOID_RETURN; 02119 } 02120 02121 void 02122 Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){ 02123 jamEntry(); 02124 ndbassert(signal->getNoOfSections() == 0); 02125 02126 CRASH_INSERTION(13012); 02127 02128 SubSyncContinueConf * const conf = 02129 (SubSyncContinueConf*)signal->getDataPtr(); 02130 02131 SubscriptionPtr subPtr; 02132 Subscription key; 02133 key.m_subscriptionId = conf->subscriptionId; 02134 key.m_subscriptionKey = conf->subscriptionKey; 02135 02136 ndbrequire(c_subscriptions.find(subPtr, key)); 02137 02138 ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend(); 02139 req->senderData = subPtr.p->m_current_sync_ptrI; 02140 req->closeFlag = 0; 02141 req->transId1 = 0; 02142 req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8); 02143 req->batch_size_rows = 16; 02144 req->batch_size_bytes = 0; 02145 sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 02146 ScanFragNextReq::SignalLength, JBB); 02147 } 02148 02149 void 02150 Suma::SyncRecord::completeScan(Signal* signal, int error) 02151 { 02152 jam(); 02153 DBUG_ENTER("Suma::SyncRecord::completeScan"); 02154 // m_tableList.release(); 02155 02156 #if PRINT_ONLY 02157 ndbout_c("GSN_SUB_SYNC_CONF (data)"); 02158 #else 02159 if (error == 0) 02160 { 02161 SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); 02162 conf->senderRef = suma.reference(); 02163 conf->senderData = m_senderData; 02164 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal, 02165 SubSyncConf::SignalLength, JBB); 02166 } 02167 else 02168 { 02169 SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend(); 02170 ref->senderRef = suma.reference(); 02171 ref->senderData = m_senderData; 02172 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal, 02173 SubSyncRef::SignalLength, JBB); 02174 } 02175 #endif 02176 02177 release(); 02178 02179 Ptr<Subscription> subPtr; 02180 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); 02181 ndbrequire(subPtr.p->m_current_sync_ptrI == ptrI); 02182 subPtr.p->m_current_sync_ptrI = RNIL; 02183 02184 suma.c_syncPool.release(ptrI); 02185 DBUG_PRINT("info",("c_syncPool size: %d free: %d", 02186 suma.c_syncPool.getSize(), 02187 suma.c_syncPool.getNoOfFree())); 02188 DBUG_VOID_RETURN; 02189 } 02190 02191 void 02192 Suma::execSCAN_HBREP(Signal* signal){ 02193 jamEntry(); 02194 #if 0 02195 ndbout << "execSCAN_HBREP" << endl << hex; 02196 for(int i = 0; i<signal->length(); i++){ 02197 ndbout << signal->theData[i] << " "; 02198 if(((i + 1) % 8) == 0) 02199 ndbout << endl << hex; 02200 } 02201 ndbout << endl; 02202 #endif 02203 } 02204 02205 /********************************************************** 02206 * 02207 * Suma participant interface 02208 * 02209 * Creation of subscriber 02210 * 02211 */ 02212 02213 void 02214 Suma::execSUB_START_REQ(Signal* signal){ 02215 jamEntry(); 02216 ndbassert(signal->getNoOfSections() == 0); 02217 DBUG_ENTER("Suma::execSUB_START_REQ"); 02218 SubStartReq * const req = (SubStartReq*)signal->getDataPtr(); 02219 02220 CRASH_INSERTION(13013); 02221 Uint32 senderRef = req->senderRef; 02222 Uint32 senderData = req->senderData; 02223 Uint32 subscriberData = req->subscriberData; 02224 Uint32 subscriberRef = req->subscriberRef; 02225 SubscriptionData::Part part = (SubscriptionData::Part)req->part; 02226 02227 Subscription key; 02228 key.m_subscriptionId = req->subscriptionId; 02229 key.m_subscriptionKey = req->subscriptionKey; 02230 02231 if (c_startup.m_restart_server_node_id && 02232 refToNode(senderRef) != c_startup.m_restart_server_node_id) 02233 { 02238 jam(); 02239 sendSubStartRef(signal, 1405); 02240 DBUG_VOID_RETURN; 02241 } 02242 02243 SubscriptionPtr subPtr; 02244 if(!c_subscriptions.find(subPtr, key)){ 02245 jam(); 02246 sendSubStartRef(signal, 1407); 02247 DBUG_VOID_RETURN; 02248 } 02249 02250 if (subPtr.p->m_state == Subscription::LOCKED) { 02251 jam(); 02252 DBUG_PRINT("info",("Locked")); 02253 sendSubStartRef(signal, 1411); 02254 DBUG_VOID_RETURN; 02255 } 02256 02257 if (subPtr.p->m_state == Subscription::DROPPED && 02258 c_startup.m_restart_server_node_id == 0) { 02259 jam(); 02260 DBUG_PRINT("info",("Dropped")); 02261 sendSubStartRef(signal, 1418); 02262 DBUG_VOID_RETURN; 02263 } 02264 02265 ndbrequire(subPtr.p->m_state == Subscription::DEFINED || 02266 c_startup.m_restart_server_node_id); 02267 02268 SubscriberPtr subbPtr; 02269 if(!c_subscriberPool.seize(subbPtr)){ 02270 jam(); 02271 sendSubStartRef(signal, 1412); 02272 DBUG_VOID_RETURN; 02273 } 02274 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", 02275 c_subscriberPool.getSize(), 02276 c_subscriberPool.getNoOfFree())); 02277 02278 c_subscriber_nodes.set(refToNode(subscriberRef)); 02279 02280 // setup subscription record 02281 if (subPtr.p->m_state == Subscription::DEFINED) 02282 subPtr.p->m_state = Subscription::LOCKED; 02283 // store these here for later use 02284 subPtr.p->m_senderRef = senderRef; 02285 subPtr.p->m_senderData = senderData; 02286 02287 // setup subscriber record 02288 subbPtr.p->m_senderRef = subscriberRef; 02289 subbPtr.p->m_senderData = subscriberData; 02290 subbPtr.p->m_subPtrI= subPtr.i; 02291 02292 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] " 02293 "tableId: %u id: %u key: %u", 02294 subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, 02295 subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData, 02296 subPtr.p->m_tableId, 02297 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey)); 02298 02299 TablePtr tabPtr; 02300 switch(part){ 02301 case SubscriptionData::MetaData: 02302 jam(); 02303 c_metaSubscribers.add(subbPtr); 02304 sendSubStartComplete(signal, subbPtr, 0, part); 02305 DBUG_VOID_RETURN; 02306 case SubscriptionData::TableData: 02307 jam(); 02308 initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr); 02309 tabPtr.p->n_subscribers++; 02310 if (subPtr.p->m_options & Subscription::REPORT_ALL) 02311 tabPtr.p->m_reportAll = true; 02312 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 02313 tabPtr.p->m_tableId, tabPtr.p->n_subscribers)); 02314 DBUG_VOID_RETURN; 02315 } 02316 ndbrequire(false); 02317 } 02318 02319 void 02320 Suma::sendSubStartComplete(Signal* signal, 02321 SubscriberPtr subbPtr, 02322 Uint32 firstGCI, 02323 SubscriptionData::Part part) 02324 { 02325 jam(); 02326 DBUG_ENTER("Suma::sendSubStartComplete"); 02327 02328 SubscriptionPtr subPtr; 02329 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 02330 ndbrequire(subPtr.p->m_state == Subscription::LOCKED || 02331 (subPtr.p->m_state == Subscription::DROPPED && 02332 c_startup.m_restart_server_node_id)); 02333 if (subPtr.p->m_state == Subscription::LOCKED) 02334 { 02335 jam(); 02336 subPtr.p->m_state = Subscription::DEFINED; 02337 } 02338 subPtr.p->n_subscribers++; 02339 02340 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] " 02341 "tableId: %u[i=%u] id: %u key: %u", 02342 subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, 02343 subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData, 02344 subPtr.p->m_tableId, subPtr.p->m_table_ptrI, 02345 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey)); 02346 02347 SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend(); 02348 02349 conf->senderRef = reference(); 02350 conf->senderData = subPtr.p->m_senderData; 02351 conf->subscriptionId = subPtr.p->m_subscriptionId; 02352 conf->subscriptionKey = subPtr.p->m_subscriptionKey; 02353 conf->firstGCI = firstGCI; 02354 conf->part = (Uint32) part; 02355 02356 DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr.i, 02357 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey)); 02358 sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal, 02359 SubStartConf::SignalLength, JBB); 02360 02361 reportAllSubscribers(signal, NdbDictionary::Event::_TE_SUBSCRIBE, 02362 subPtr, subbPtr); 02363 02364 DBUG_VOID_RETURN; 02365 } 02366 02367 void 02368 Suma::sendSubStartRef(Signal* signal, Uint32 errCode) 02369 { 02370 jam(); 02371 SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); 02372 ref->senderRef = reference(); 02373 ref->errorCode = errCode; 02374 releaseSections(signal); 02375 sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, 02376 SubStartRef::SignalLength, JBB); 02377 } 02378 void 02379 Suma::sendSubStartRef(Signal* signal, 02380 SubscriberPtr subbPtr, Uint32 error, 02381 SubscriptionData::Part part) 02382 { 02383 jam(); 02384 02385 SubscriptionPtr subPtr; 02386 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 02387 02388 ndbrequire(subPtr.p->m_state == Subscription::LOCKED || 02389 (subPtr.p->m_state == Subscription::DROPPED && 02390 c_startup.m_restart_server_node_id)); 02391 if (subPtr.p->m_state == Subscription::LOCKED) 02392 { 02393 jam(); 02394 subPtr.p->m_state = Subscription::DEFINED; 02395 } 02396 02397 SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend(); 02398 ref->senderRef = reference(); 02399 ref->senderData = subPtr.p->m_senderData; 02400 ref->subscriptionId = subPtr.p->m_subscriptionId; 02401 ref->subscriptionKey = subPtr.p->m_subscriptionKey; 02402 ref->part = (Uint32) part; 02403 ref->errorCode = error; 02404 02405 sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_REF, signal, 02406 SubStartRef::SignalLength, JBB); 02407 } 02408 02409 /********************************************************** 02410 * Suma participant interface 02411 * 02412 * Stopping and removing of subscriber 02413 * 02414 */ 02415 02416 void 02417 Suma::execSUB_STOP_REQ(Signal* signal){ 02418 jamEntry(); 02419 ndbassert(signal->getNoOfSections() == 0); 02420 DBUG_ENTER("Suma::execSUB_STOP_REQ"); 02421 02422 CRASH_INSERTION(13019); 02423 02424 SubStopReq * const req = (SubStopReq*)signal->getDataPtr(); 02425 Uint32 senderRef = req->senderRef; 02426 Uint32 senderData = req->senderData; 02427 Uint32 subscriberRef = req->subscriberRef; 02428 Uint32 subscriberData = req->subscriberData; 02429 SubscriptionPtr subPtr; 02430 Subscription key; 02431 key.m_subscriptionId = req->subscriptionId; 02432 key.m_subscriptionKey = req->subscriptionKey; 02433 Uint32 part = req->part; 02434 02435 if (key.m_subscriptionKey == 0 && 02436 key.m_subscriptionId == 0 && 02437 subscriberData == 0) 02438 { 02439 SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend(); 02440 02441 conf->senderRef = reference(); 02442 conf->senderData = senderData; 02443 conf->subscriptionId = key.m_subscriptionId; 02444 conf->subscriptionKey = key.m_subscriptionKey; 02445 conf->subscriberData = subscriberData; 02446 02447 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal, 02448 SubStopConf::SignalLength, JBB); 02449 02450 removeSubscribersOnNode(signal, refToNode(senderRef)); 02451 DBUG_VOID_RETURN; 02452 } 02453 02454 if(!c_subscriptions.find(subPtr, key)){ 02455 jam(); 02456 DBUG_PRINT("error", ("not found")); 02457 sendSubStopRef(signal, 1407); 02458 DBUG_VOID_RETURN; 02459 } 02460 02461 if (c_startup.m_restart_server_node_id && 02462 refToNode(senderRef) != c_startup.m_restart_server_node_id) 02463 { 02468 jam(); 02469 sendSubStopRef(signal, 1405); 02470 DBUG_VOID_RETURN; 02471 } 02472 02473 if (subPtr.p->m_state == Subscription::LOCKED) { 02474 jam(); 02475 DBUG_PRINT("error", ("locked")); 02476 sendSubStopRef(signal, 1411); 02477 DBUG_VOID_RETURN; 02478 } 02479 02480 ndbrequire(part == SubscriptionData::TableData); 02481 02482 TablePtr tabPtr; 02483 tabPtr.i = subPtr.p->m_table_ptrI; 02484 if (tabPtr.i == RNIL || 02485 !(tabPtr.p = c_tables.getPtr(tabPtr.i)) || 02486 tabPtr.p->m_tableId != subPtr.p->m_tableId) 02487 { 02488 jam(); 02489 DBUG_PRINT("error", ("no such table id %u[i=%u]", 02490 subPtr.p->m_tableId, subPtr.p->m_table_ptrI)); 02491 sendSubStopRef(signal, 1417); 02492 DBUG_VOID_RETURN; 02493 } 02494 02495 if (tabPtr.p->m_drop_subbPtr.p != 0) { 02496 jam(); 02497 DBUG_PRINT("error", ("table locked")); 02498 sendSubStopRef(signal, 1420); 02499 DBUG_VOID_RETURN; 02500 } 02501 02502 DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u", 02503 subPtr.i, subPtr.p->m_tableId, tabPtr.i, 02504 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey)); 02505 02506 SubscriberPtr subbPtr; 02507 if (senderRef == reference()){ 02508 jam(); 02509 c_subscriberPool.getPtr(subbPtr, senderData); 02510 ndbrequire(subbPtr.p->m_subPtrI == subPtr.i && 02511 subbPtr.p->m_senderRef == subscriberRef && 02512 subbPtr.p->m_senderData == subscriberData); 02513 c_removeDataSubscribers.remove(subbPtr); 02514 } 02515 else 02516 { 02517 jam(); 02518 LocalDLList<Subscriber> 02519 subscribers(c_subscriberPool,tabPtr.p->c_subscribers); 02520 02521 DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d", 02522 subPtr.i, subscriberRef, subscriberData)); 02523 for (subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr)) 02524 { 02525 jam(); 02526 DBUG_PRINT("info", 02527 ("search: subscription: %u, ref: %u, data: %u, subscriber %u", 02528 subbPtr.p->m_subPtrI, subbPtr.p->m_senderRef, 02529 subbPtr.p->m_senderData, subbPtr.i)); 02530 if (subbPtr.p->m_subPtrI == subPtr.i && 02531 subbPtr.p->m_senderRef == subscriberRef && 02532 subbPtr.p->m_senderData == subscriberData) 02533 { 02534 jam(); 02535 DBUG_PRINT("info",("found")); 02536 break; 02537 } 02538 } 02542 if (subbPtr.isNull()) { 02543 jam(); 02544 DBUG_PRINT("error", ("subscriber not found")); 02545 sendSubStopRef(signal, 1407); 02546 DBUG_VOID_RETURN; 02547 } 02548 subscribers.remove(subbPtr); 02549 } 02550 02551 subPtr.p->m_senderRef = senderRef; // store ref to requestor 02552 subPtr.p->m_senderData = senderData; // store ref to requestor 02553 02554 tabPtr.p->m_drop_subbPtr = subbPtr; 02555 02556 if (subPtr.p->m_state == Subscription::DEFINED) 02557 { 02558 jam(); 02559 subPtr.p->m_state = Subscription::LOCKED; 02560 } 02561 02562 if (tabPtr.p->m_state == Table::DROPPED) 02563 // not ALTERED here since trigger must be removed 02564 { 02565 jam(); 02566 tabPtr.p->n_subscribers--; 02567 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 02568 tabPtr.p->m_tableId, tabPtr.p->n_subscribers)); 02569 tabPtr.p->checkRelease(*this); 02570 sendSubStopComplete(signal, tabPtr.p->m_drop_subbPtr); 02571 tabPtr.p->m_drop_subbPtr.p = 0; 02572 } 02573 else 02574 { 02575 jam(); 02576 tabPtr.p->dropTrigger(signal,*this); 02577 } 02578 DBUG_VOID_RETURN; 02579 } 02580 02581 void 02582 Suma::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr) 02583 { 02584 jam(); 02585 DBUG_ENTER("Suma::sendSubStopComplete"); 02586 CRASH_INSERTION(13020); 02587 02588 DBUG_PRINT("info",("removed subscriber: %i", subbPtr.i)); 02589 02590 SubscriptionPtr subPtr; 02591 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 02592 02593 Uint32 senderRef= subPtr.p->m_senderRef; 02594 Uint32 senderData= subPtr.p->m_senderData; 02595 02596 subPtr.p->n_subscribers--; 02597 ndbassert( subPtr.p->m_state == Subscription::LOCKED || 02598 subPtr.p->m_state == Subscription::DROPPED ); 02599 if ( subPtr.p->m_state == Subscription::LOCKED ) 02600 { 02601 jam(); 02602 subPtr.p->m_state = Subscription::DEFINED; 02603 if (subPtr.p->n_subscribers == 0) 02604 { 02605 jam(); 02606 #if 1 02607 subPtr.p->m_table_ptrI = RNIL; 02608 #else 02609 TablePtr tabPtr; 02610 tabPtr.i = subPtr.p->m_table_ptrI; 02611 if ((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) && 02612 (tabPtr.p->m_state == Table::DROPPED || 02613 tabPtr.p->m_state == Table::ALTERED) && 02614 false) 02615 { 02616 // last subscriber, and table is dropped 02617 // safe to drop subscription 02618 c_subscriptions.release(subPtr); 02619 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d", 02620 c_subscriptionPool.getSize(), 02621 c_subscriptionPool.getNoOfFree())); 02622 } 02623 else 02624 { 02625 subPtr.p->m_table_ptrI = RNIL; 02626 } 02627 ndbassert(tabPtr.p != 0); 02628 #endif 02629 } 02630 } 02631 else if ( subPtr.p->n_subscribers == 0 ) 02632 { 02633 // subscription is marked to be removed 02634 // and there are no subscribers left 02635 jam(); 02636 ndbassert(subPtr.p->m_state == Subscription::DROPPED); 02637 completeSubRemove(subPtr); 02638 } 02639 02640 // let subscriber know that subscrber is stopped 02641 { 02642 SubTableData * data = (SubTableData*)signal->getDataPtrSend(); 02643 data->gci = m_last_complete_gci + 1; // XXX ??? 02644 data->tableId = 0; 02645 data->operation = NdbDictionary::Event::_TE_STOP; 02646 data->senderData = subbPtr.p->m_senderData; 02647 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 02648 SubTableData::SignalLength, JBB); 02649 } 02650 02651 SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend(); 02652 02653 conf->senderRef= reference(); 02654 conf->senderData= senderData; 02655 02656 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal, 02657 SubStopConf::SignalLength, JBB); 02658 02659 c_subscriberPool.release(subbPtr); 02660 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d", 02661 c_subscriberPool.getSize(), 02662 c_subscriberPool.getNoOfFree())); 02663 02664 reportAllSubscribers(signal, NdbDictionary::Event::_TE_UNSUBSCRIBE, 02665 subPtr, subbPtr); 02666 02667 DBUG_VOID_RETURN; 02668 } 02669 02670 // report new started subscriber to all other subscribers 02671 void 02672 Suma::reportAllSubscribers(Signal *signal, 02673 NdbDictionary::Event::_TableEvent table_event, 02674 SubscriptionPtr subPtr, 02675 SubscriberPtr subbPtr) 02676 { 02677 SubTableData * data = (SubTableData*)signal->getDataPtrSend(); 02678 02679 if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE && 02680 !c_startup.m_restart_server_node_id) 02681 { 02682 data->gci = m_last_complete_gci + 1; 02683 data->tableId = subPtr.p->m_tableId; 02684 data->operation = NdbDictionary::Event::_TE_ACTIVE; 02685 data->ndbd_nodeid = refToNode(reference()); 02686 data->changeMask = 0; 02687 data->totalLen = 0; 02688 data->req_nodeid = refToNode(subbPtr.p->m_senderRef); 02689 data->senderData = subbPtr.p->m_senderData; 02690 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 02691 SubTableData::SignalLength, JBB); 02692 } 02693 02694 if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)) 02695 { 02696 return; 02697 } 02698 if (subPtr.p->n_subscribers == 0) 02699 { 02700 ndbrequire(table_event != NdbDictionary::Event::_TE_SUBSCRIBE); 02701 return; 02702 } 02703 02704 //#ifdef VM_TRACE 02705 ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d", 02706 subPtr.i, subPtr.p->n_subscribers); 02707 //#endif 02708 data->gci = m_last_complete_gci + 1; 02709 data->tableId = subPtr.p->m_tableId; 02710 data->operation = table_event; 02711 data->ndbd_nodeid = refToNode(reference()); 02712 data->changeMask = 0; 02713 data->totalLen = 0; 02714 02715 TablePtr tabPtr; 02716 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI); 02717 LocalDLList<Subscriber> subbs(c_subscriberPool, tabPtr.p->c_subscribers); 02718 SubscriberPtr i_subbPtr; 02719 for(subbs.first(i_subbPtr); !i_subbPtr.isNull(); subbs.next(i_subbPtr)) 02720 { 02721 if (i_subbPtr.p->m_subPtrI == subPtr.i) 02722 { 02723 data->req_nodeid = refToNode(subbPtr.p->m_senderRef); 02724 data->senderData = i_subbPtr.p->m_senderData; 02725 sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 02726 SubTableData::SignalLength, JBB); 02727 //#ifdef VM_TRACE 02728 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d", 02729 table_event == NdbDictionary::Event::_TE_SUBSCRIBE ? 02730 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event, 02731 refToNode(i_subbPtr.p->m_senderRef), 02732 data->req_nodeid, data->senderData 02733 ); 02734 //#endif 02735 if (i_subbPtr.i != subbPtr.i) 02736 { 02737 data->req_nodeid = refToNode(i_subbPtr.p->m_senderRef); 02738 data->senderData = subbPtr.p->m_senderData; 02739 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 02740 SubTableData::SignalLength, JBB); 02741 //#ifdef VM_TRACE 02742 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d", 02743 table_event == NdbDictionary::Event::_TE_SUBSCRIBE ? 02744 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event, 02745 refToNode(subbPtr.p->m_senderRef), 02746 data->req_nodeid, data->senderData 02747 ); 02748 //#endif 02749 } 02750 } 02751 } 02752 } 02753 02754 void 02755 Suma::sendSubStopRef(Signal* signal, Uint32 errCode) 02756 { 02757 jam(); 02758 DBUG_ENTER("Suma::sendSubStopRef"); 02759 SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend(); 02760 ref->senderRef = reference(); 02761 ref->errorCode = errCode; 02762 sendSignal(signal->getSendersBlockRef(), 02763 GSN_SUB_STOP_REF, 02764 signal, 02765 SubStopRef::SignalLength, 02766 JBB); 02767 DBUG_VOID_RETURN; 02768 } 02769 02770 /********************************************************** 02771 * 02772 * Trigger admin interface 02773 * 02774 */ 02775 02776 int 02777 Suma::Table::setupTrigger(Signal* signal, 02778 Suma &suma) 02779 { 02780 jam(); 02781 DBUG_ENTER("Suma::Table::setupTrigger"); 02782 02783 int ret= 0; 02784 02785 AttributeMask attrMask; 02786 createAttributeMask(attrMask, suma); 02787 02788 for(Uint32 j = 0; j<3; j++) 02789 { 02790 Uint32 triggerId = (m_schemaVersion << 18) | (j << 16) | m_ptrI; 02791 if(m_hasTriggerDefined[j] == 0) 02792 { 02793 suma.suma_ndbrequire(m_triggerIds[j] == ILLEGAL_TRIGGER_ID); 02794 DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId, j)); 02795 CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend(); 02796 req->setUserRef(SUMA_REF); 02797 req->setConnectionPtr(m_ptrI); 02798 req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE); 02799 req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); 02800 req->setMonitorReplicas(true); 02801 req->setMonitorAllAttributes(j == TriggerEvent::TE_DELETE); 02802 req->setReceiverRef(SUMA_REF); 02803 req->setTriggerId(triggerId); 02804 req->setTriggerEvent((TriggerEvent::Value)j); 02805 req->setTableId(m_tableId); 02806 req->setAttributeMask(attrMask); 02807 req->setReportAllMonitoredAttributes(m_reportAll); 02808 suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 02809 signal, CreateTrigReq::SignalLength, JBB); 02810 ret= 1; 02811 } 02812 else 02813 { 02814 m_hasTriggerDefined[j]++; 02815 DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u", 02816 m_tableId, j, m_hasTriggerDefined[j])); 02817 } 02818 } 02819 DBUG_RETURN(ret); 02820 } 02821 02822 void 02823 Suma::Table::createAttributeMask(AttributeMask& mask, 02824 Suma &suma) 02825 { 02826 jam(); 02827 mask.clear(); 02828 DataBuffer<15>::DataBufferIterator it; 02829 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes); 02830 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ 02831 mask.set(* it.data); 02832 } 02833 } 02834 02835 void 02836 Suma::execCREATE_TRIG_CONF(Signal* signal){ 02837 jamEntry(); 02838 DBUG_ENTER("Suma::execCREATE_TRIG_CONF"); 02839 ndbassert(signal->getNoOfSections() == 0); 02840 CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); 02841 const Uint32 triggerId = conf->getTriggerId(); 02842 Uint32 type = (triggerId >> 16) & 0x3; 02843 Uint32 tableId = conf->getTableId(); 02844 02845 02846 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]", 02847 type, tableId,conf->getConnectionPtr(),triggerId & 0xFFFF)); 02848 02849 TablePtr tabPtr; 02850 c_tables.getPtr(tabPtr, conf->getConnectionPtr()); 02851 ndbrequire(tabPtr.p->m_tableId == tableId); 02852 ndbrequire(tabPtr.p->m_state == Table::DEFINING); 02853 02854 ndbrequire(type < 3); 02855 tabPtr.p->m_triggerIds[type] = triggerId; 02856 ndbrequire(tabPtr.p->m_hasTriggerDefined[type] == 0); 02857 tabPtr.p->m_hasTriggerDefined[type] = 1; 02858 02859 if (type == 2) 02860 { 02861 completeAllSubscribers(signal, tabPtr); 02862 completeInitTable(signal,tabPtr); 02863 DBUG_VOID_RETURN; 02864 } 02865 DBUG_VOID_RETURN; 02866 } 02867 02868 void 02869 Suma::execCREATE_TRIG_REF(Signal* signal){ 02870 jamEntry(); 02871 DBUG_ENTER("Suma::execCREATE_TRIG_REF"); 02872 ndbassert(signal->getNoOfSections() == 0); 02873 CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr(); 02874 const Uint32 triggerId = ref->getTriggerId(); 02875 Uint32 type = (triggerId >> 16) & 0x3; 02876 Uint32 tableId = ref->getTableId(); 02877 02878 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]", 02879 type, tableId,ref->getConnectionPtr(),triggerId & 0xFFFF)); 02880 02881 TablePtr tabPtr; 02882 c_tables.getPtr(tabPtr, ref->getConnectionPtr()); 02883 ndbrequire(tabPtr.p->m_tableId == tableId); 02884 ndbrequire(tabPtr.p->m_state == Table::DEFINING); 02885 02886 tabPtr.p->m_error= ref->getErrorCode(); 02887 02888 ndbrequire(type < 3); 02889 02890 if (type == 2) 02891 { 02892 completeAllSubscribers(signal, tabPtr); 02893 completeInitTable(signal,tabPtr); 02894 DBUG_VOID_RETURN; 02895 } 02896 02897 DBUG_VOID_RETURN; 02898 } 02899 02900 void 02901 Suma::Table::dropTrigger(Signal* signal,Suma& suma) 02902 { 02903 jam(); 02904 DBUG_ENTER("Suma::dropTrigger"); 02905 02906 m_hasOutstandingTriggerReq[0] = 02907 m_hasOutstandingTriggerReq[1] = 02908 m_hasOutstandingTriggerReq[2] = 1; 02909 for(Uint32 j = 0; j<3; j++){ 02910 jam(); 02911 suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID); 02912 if(m_hasTriggerDefined[j] == 1) { 02913 jam(); 02914 02915 DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend(); 02916 req->setConnectionPtr(m_ptrI); 02917 req->setUserRef(SUMA_REF); // Sending to myself 02918 req->setRequestType(DropTrigReq::RT_USER); 02919 req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE); 02920 req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); 02921 req->setIndexId(RNIL); 02922 02923 req->setTableId(m_tableId); 02924 req->setTriggerId(m_triggerIds[j]); 02925 req->setTriggerEvent((TriggerEvent::Value)j); 02926 02927 DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]", 02928 m_triggerIds[j], 02929 TriggerType::SUBSCRIPTION_BEFORE, 02930 TriggerActionTime::TA_DETACHED, 02931 j, 02932 m_tableId, j)); 02933 suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ, 02934 signal, DropTrigReq::SignalLength, JBB); 02935 } else { 02936 jam(); 02937 suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1); 02938 runDropTrigger(signal,m_triggerIds[j],suma); 02939 } 02940 } 02941 DBUG_VOID_RETURN; 02942 } 02943 02944 void 02945 Suma::execDROP_TRIG_REF(Signal* signal){ 02946 jamEntry(); 02947 DBUG_ENTER("Suma::execDROP_TRIG_REF"); 02948 ndbassert(signal->getNoOfSections() == 0); 02949 DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr(); 02950 if (ref->getErrorCode() != DropTrigRef::TriggerNotFound) 02951 { 02952 ndbrequire(false); 02953 } 02954 TablePtr tabPtr; 02955 c_tables.getPtr(tabPtr, ref->getConnectionPtr()); 02956 ndbrequire(ref->getTableId() == tabPtr.p->m_tableId); 02957 02958 tabPtr.p->runDropTrigger(signal, ref->getTriggerId(), *this); 02959 DBUG_VOID_RETURN; 02960 } 02961 02962 void 02963 Suma::execDROP_TRIG_CONF(Signal* signal){ 02964 jamEntry(); 02965 DBUG_ENTER("Suma::execDROP_TRIG_CONF"); 02966 ndbassert(signal->getNoOfSections() == 0); 02967 02968 DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr(); 02969 TablePtr tabPtr; 02970 c_tables.getPtr(tabPtr, conf->getConnectionPtr()); 02971 ndbrequire(conf->getTableId() == tabPtr.p->m_tableId); 02972 02973 tabPtr.p->runDropTrigger(signal, conf->getTriggerId(),*this); 02974 DBUG_VOID_RETURN; 02975 } 02976 02977 void 02978 Suma::Table::runDropTrigger(Signal* signal, 02979 Uint32 triggerId, 02980 Suma &suma) 02981 { 02982 jam(); 02983 Uint32 type = (triggerId >> 16) & 0x3; 02984 02985 suma.suma_ndbrequire(type < 3); 02986 suma.suma_ndbrequire(m_triggerIds[type] == triggerId); 02987 suma.suma_ndbrequire(m_hasTriggerDefined[type] > 0); 02988 suma.suma_ndbrequire(m_hasOutstandingTriggerReq[type] == 1); 02989 m_hasTriggerDefined[type]--; 02990 m_hasOutstandingTriggerReq[type] = 0; 02991 if (m_hasTriggerDefined[type] == 0) 02992 { 02993 jam(); 02994 m_triggerIds[type] = ILLEGAL_TRIGGER_ID; 02995 } 02996 if( m_hasOutstandingTriggerReq[0] || 02997 m_hasOutstandingTriggerReq[1] || 02998 m_hasOutstandingTriggerReq[2]) 02999 { 03000 // more to come 03001 jam(); 03002 return; 03003 } 03004 03005 #if 0 03006 ndbout_c("trigger completed"); 03007 #endif 03008 03009 03010 n_subscribers--; 03011 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u", 03012 m_tableId, n_subscribers)); 03013 checkRelease(suma); 03014 03015 suma.sendSubStopComplete(signal, m_drop_subbPtr); 03016 m_drop_subbPtr.p = 0; 03017 } 03018 03019 void Suma::suma_ndbrequire(bool v) { ndbrequire(v); } 03020 03021 void 03022 Suma::Table::checkRelease(Suma &suma) 03023 { 03024 jam(); 03025 DBUG_ENTER("Suma::Table::checkRelease"); 03026 if (n_subscribers == 0) 03027 { 03028 jam(); 03029 suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0); 03030 suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0); 03031 suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0); 03032 if (!c_subscribers.isEmpty()) 03033 { 03034 LocalDLList<Subscriber> 03035 subscribers(suma.c_subscriberPool,c_subscribers); 03036 SubscriberPtr subbPtr; 03037 for (subscribers.first(subbPtr);!subbPtr.isNull(); 03038 subscribers.next(subbPtr)) 03039 { 03040 jam(); 03041 DBUG_PRINT("info",("subscriber: %u", subbPtr.i)); 03042 } 03043 suma.suma_ndbrequire(false); 03044 } 03045 if (!c_syncRecords.isEmpty()) 03046 { 03047 LocalDLList<SyncRecord> 03048 syncRecords(suma.c_syncPool,c_syncRecords); 03049 Ptr<SyncRecord> syncPtr; 03050 for (syncRecords.first(syncPtr);!syncPtr.isNull(); 03051 syncRecords.next(syncPtr)) 03052 { 03053 jam(); 03054 DBUG_PRINT("info",("syncRecord: %u", syncPtr.i)); 03055 } 03056 suma.suma_ndbrequire(false); 03057 } 03058 release(suma); 03059 suma.c_tables.remove(m_ptrI); 03060 suma.c_tablePool.release(m_ptrI); 03061 DBUG_PRINT("info",("c_tablePool size: %d free: %d", 03062 suma.c_tablePool.getSize(), 03063 suma.c_tablePool.getNoOfFree())); 03064 } 03065 else 03066 { 03067 DBUG_PRINT("info",("n_subscribers: %d", n_subscribers)); 03068 } 03069 DBUG_VOID_RETURN; 03070 } 03071 03072 /********************************************************** 03073 * Scan data interface 03074 * 03075 * Assumption: one execTRANSID_AI contains all attr info 03076 * 03077 */ 03078 03079 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS 03080 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1 03081 03082 static Uint32 f_bufferLock = 0; 03083 static Uint32 f_buffer[SUMA_BUF_SZ]; 03084 static Uint32 f_trigBufferSize = 0; 03085 static Uint32 b_bufferLock = 0; 03086 static Uint32 b_buffer[SUMA_BUF_SZ]; 03087 static Uint32 b_trigBufferSize = 0; 03088 03089 void 03090 Suma::execTRANSID_AI(Signal* signal) 03091 { 03092 jamEntry(); 03093 DBUG_ENTER("Suma::execTRANSID_AI"); 03094 03095 CRASH_INSERTION(13015); 03096 TransIdAI * const data = (TransIdAI*)signal->getDataPtr(); 03097 const Uint32 opPtrI = data->connectPtr; 03098 const Uint32 length = signal->length() - 3; 03099 03100 if(f_bufferLock == 0){ 03101 f_bufferLock = opPtrI; 03102 } else { 03103 ndbrequire(f_bufferLock == opPtrI); 03104 } 03105 03106 Ptr<SyncRecord> syncPtr; 03107 c_syncPool.getPtr(syncPtr, (opPtrI >> 16)); 03108 03109 Uint32 sum = 0; 03110 Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE; 03111 Uint32 * headers = f_buffer; 03112 const Uint32 * src = &data->attrData[0]; 03113 const Uint32 * const end = &src[length]; 03114 03115 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes; 03116 for(Uint32 i = 0; i<attribs; i++){ 03117 Uint32 tmp = * src++; 03118 * headers++ = tmp; 03119 Uint32 len = AttributeHeader::getDataSize(tmp); 03120 03121 memcpy(dst, src, 4 * len); 03122 dst += len; 03123 src += len; 03124 sum += len; 03125 } 03126 03127 ndbrequire(src == end); 03128 03132 LinearSectionPtr ptr[3]; 03133 ptr[0].p = f_buffer; 03134 ptr[0].sz = attribs; 03135 03136 ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE; 03137 ptr[1].sz = sum; 03138 03139 SubscriptionPtr subPtr; 03140 c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI); 03141 03145 SubTableData * sdata = (SubTableData*)signal->getDataPtrSend(); 03146 Uint32 ref = subPtr.p->m_senderRef; 03147 sdata->tableId = syncPtr.p->m_currentTableId; 03148 sdata->senderData = subPtr.p->m_senderData; 03149 sdata->operation = NdbDictionary::Event::_TE_SCAN; // Scan 03150 sdata->gci = 0; // Undefined 03151 #if PRINT_ONLY 03152 ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum); 03153 #else 03154 sendSignal(ref, 03155 GSN_SUB_TABLE_DATA, 03156 signal, 03157 SubTableData::SignalLength, JBB, 03158 ptr, 2); 03159 #endif 03160 03164 f_bufferLock = 0; 03165 03166 DBUG_VOID_RETURN; 03167 } 03168 03169 /********************************************************** 03170 * 03171 * Trigger data interface 03172 * 03173 */ 03174 03175 void 03176 Suma::execTRIG_ATTRINFO(Signal* signal) 03177 { 03178 jamEntry(); 03179 DBUG_ENTER("Suma::execTRIG_ATTRINFO"); 03180 03181 CRASH_INSERTION(13016); 03182 TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr(); 03183 const Uint32 trigId = trg->getTriggerId(); 03184 03185 const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength; 03186 03187 if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){ 03188 jam(); 03189 03190 ndbrequire(b_bufferLock == trigId); 03191 03192 memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen); 03193 b_trigBufferSize += dataLen; 03194 03195 // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize); 03196 } else { 03197 jam(); 03198 03199 if(f_bufferLock == 0){ 03200 f_bufferLock = trigId; 03201 f_trigBufferSize = 0; 03202 b_bufferLock = trigId; 03203 b_trigBufferSize = 0; 03204 } else { 03205 ndbrequire(f_bufferLock == trigId); 03206 } 03207 03208 memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen); 03209 f_trigBufferSize += dataLen; 03210 } 03211 03212 03213 DBUG_VOID_RETURN; 03214 } 03215 03216 #ifdef NODEFAIL_DEBUG2 03217 static int theCounts[64] = {0}; 03218 #endif 03219 03220 Uint32 03221 Suma::get_responsible_node(Uint32 bucket) const 03222 { 03223 // id will contain id to responsible suma or 03224 // RNIL if we don't have nodegroup info yet 03225 03226 jam(); 03227 Uint32 node; 03228 const Bucket* ptr= c_buckets + bucket; 03229 for(Uint32 i = 0; i<MAX_REPLICAS; i++) 03230 { 03231 node= ptr->m_nodes[i]; 03232 if(c_alive_nodes.get(node)) 03233 { 03234 break; 03235 } 03236 } 03237 03238 03239 #ifdef NODEFAIL_DEBUG2 03240 if(node != 0) 03241 { 03242 theCounts[node]++; 03243 ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u", 03244 n,D, id, theCounts[node]); 03245 } 03246 #endif 03247 return node; 03248 } 03249 03250 Uint32 03251 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const 03252 { 03253 jam(); 03254 Uint32 node; 03255 const Bucket* ptr= c_buckets + bucket; 03256 for(Uint32 i = 0; i<MAX_REPLICAS; i++) 03257 { 03258 node= ptr->m_nodes[i]; 03259 if(mask.get(node)) 03260 { 03261 return node; 03262 } 03263 } 03264 03265 return 0; 03266 } 03267 03268 bool 03269 Suma::check_switchover(Uint32 bucket, Uint32 gci) 03270 { 03271 const Uint32 send_mask = (Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER); 03272 bool send = c_buckets[bucket].m_state & send_mask; 03273 ndbassert(m_switchover_buckets.get(bucket)); 03274 if(unlikely(gci >= c_buckets[bucket].m_switchover_gci)) 03275 { 03276 return send; 03277 } 03278 return !send; 03279 } 03280 03281 static 03282 Uint32 03283 reformat(Signal* signal, LinearSectionPtr ptr[3], 03284 Uint32 * src_1, Uint32 sz_1, 03285 Uint32 * src_2, Uint32 sz_2) 03286 { 03287 Uint32 noOfAttrs = 0, dataLen = 0; 03288 Uint32 * headers = signal->theData + 25; 03289 Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE; 03290 03291 ptr[0].p = headers; 03292 ptr[1].p = dst; 03293 03294 while(sz_1 > 0){ 03295 jam(); 03296 Uint32 tmp = * src_1 ++; 03297 * headers ++ = tmp; 03298 Uint32 len = AttributeHeader::getDataSize(tmp); 03299 memcpy(dst, src_1, 4 * len); 03300 dst += len; 03301 src_1 += len; 03302 03303 noOfAttrs++; 03304 dataLen += len; 03305 sz_1 -= (1 + len); 03306 } 03307 assert(sz_1 == 0); 03308 03309 ptr[0].sz = noOfAttrs; 03310 ptr[1].sz = dataLen; 03311 03312 ptr[2].p = src_2; 03313 ptr[2].sz = sz_2; 03314 03315 return sz_2 > 0 ? 3 : 2; 03316 } 03317 03318 void 03319 Suma::execFIRE_TRIG_ORD(Signal* signal) 03320 { 03321 jamEntry(); 03322 DBUG_ENTER("Suma::execFIRE_TRIG_ORD"); 03323 ndbassert(signal->getNoOfSections() == 0); 03324 03325 CRASH_INSERTION(13016); 03326 FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr(); 03327 const Uint32 trigId = trg->getTriggerId(); 03328 const Uint32 hashValue = trg->getHashValue(); 03329 const Uint32 gci = trg->getGCI(); 03330 const Uint32 event = trg->getTriggerEvent(); 03331 TablePtr tabPtr; 03332 tabPtr.i = trigId & 0xFFFF; 03333 03334 DBUG_PRINT("enter",("tabPtr.i=%u", tabPtr.i)); 03335 ndbrequire(f_bufferLock == trigId); 03339 f_bufferLock = 0; 03340 b_bufferLock = 0; 03341 03342 Uint32 bucket= hashValue % c_no_of_buckets; 03343 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); 03344 if(m_active_buckets.get(bucket) || 03345 (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci)))) 03346 { 03347 m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci); 03348 Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords(); 03349 ndbrequire(sz == f_trigBufferSize); 03350 03351 LinearSectionPtr ptr[3]; 03352 const Uint32 nptr= reformat(signal, ptr, 03353 f_buffer, sz, b_buffer, b_trigBufferSize); 03354 Uint32 ptrLen= 0; 03355 for(Uint32 i =0; i < nptr; i++) 03356 ptrLen+= ptr[i].sz; 03360 ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0); 03361 03362 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; 03363 data->gci = gci; 03364 data->tableId = tabPtr.p->m_tableId; 03365 data->operation = event; 03366 data->logType = 0; 03367 data->changeMask = 0; 03368 data->totalLen = ptrLen; 03369 03370 { 03371 LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); 03372 SubscriberPtr subbPtr; 03373 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) 03374 { 03375 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", 03376 refToNode(subbPtr.p->m_senderRef))); 03377 data->senderData = subbPtr.p->m_senderData; 03378 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 03379 SubTableData::SignalLength, JBB, ptr, nptr); 03380 } 03381 } 03382 } 03383 else 03384 { 03385 Uint32* dst; 03386 Uint32 sz = f_trigBufferSize + b_trigBufferSize + 2; 03387 if((dst = get_buffer_ptr(signal, bucket, gci, sz))) 03388 { 03389 * dst++ = tabPtr.i; 03390 * dst++ = (event << 16) | f_trigBufferSize; 03391 memcpy(dst, f_buffer, f_trigBufferSize << 2); 03392 dst += f_trigBufferSize; 03393 memcpy(dst, b_buffer, b_trigBufferSize << 2); 03394 } 03395 } 03396 03397 DBUG_VOID_RETURN; 03398 } 03399 03400 void 03401 Suma::execSUB_GCP_COMPLETE_REP(Signal* signal) 03402 { 03403 jamEntry(); 03404 ndbassert(signal->getNoOfSections() == 0); 03405 03406 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend(); 03407 Uint32 gci = m_last_complete_gci = rep->gci; 03408 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); 03409 03413 if(!m_switchover_buckets.isclear()) 03414 { 03415 NdbNodeBitmask takeover_nodes; 03416 NdbNodeBitmask handover_nodes; 03417 Uint32 i = m_switchover_buckets.find(0); 03418 for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1)) 03419 { 03420 if(c_buckets[i].m_switchover_gci == gci) 03421 { 03422 Uint32 state = c_buckets[i].m_state; 03423 m_switchover_buckets.clear(i); 03424 printf("switchover complete bucket %d state: %x", i, state); 03425 if(state & Bucket::BUCKET_STARTING) 03426 { 03430 m_active_buckets.set(i); 03431 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING; 03432 ndbout_c("starting"); 03433 m_gcp_complete_rep_count = 1; 03434 } 03435 else if(state & Bucket::BUCKET_TAKEOVER) 03436 { 03440 Bucket* bucket= c_buckets + i; 03441 Page_pos pos= bucket->m_buffer_head; 03442 ndbrequire(pos.m_max_gci < gci); 03443 03444 Buffer_page* page= (Buffer_page*) 03445 m_tup->c_page_pool.getPtr(pos.m_page_id); 03446 ndbout_c("takeover %d", pos.m_page_id); 03447 page->m_max_gci = pos.m_max_gci; 03448 page->m_words_used = pos.m_page_pos; 03449 page->m_next_page = RNIL; 03450 memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head)); 03451 bucket->m_buffer_head.m_page_id = RNIL; 03452 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1; 03453 03454 m_active_buckets.set(i); 03455 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER; 03456 takeover_nodes.set(c_buckets[i].m_switchover_node); 03457 } 03458 else 03459 { 03463 ndbrequire(state & Bucket::BUCKET_HANDOVER); 03464 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER; 03465 handover_nodes.set(c_buckets[i].m_switchover_node); 03466 ndbout_c("handover"); 03467 } 03468 } 03469 } 03470 ndbassert(handover_nodes.count() == 0 || 03471 m_gcp_complete_rep_count > handover_nodes.count()); 03472 m_gcp_complete_rep_count -= handover_nodes.count(); 03473 m_gcp_complete_rep_count += takeover_nodes.count(); 03474 03475 if(getNodeState().startLevel == NodeState::SL_STARTING && 03476 m_switchover_buckets.isclear() && 03477 c_startup.m_handover_nodes.isclear()) 03478 { 03479 sendSTTORRY(signal); 03480 } 03481 } 03482 03483 if(ERROR_INSERTED(13010)) 03484 { 03485 CLEAR_ERROR_INSERT_VALUE; 03486 ndbout_c("Don't send GCP_COMPLETE_REP(%d)", gci); 03487 return; 03488 } 03489 03493 rep->gci = gci; 03494 rep->senderRef = reference(); 03495 rep->gcp_complete_rep_count = m_gcp_complete_rep_count; 03496 03497 if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear()) 03498 { 03499 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes); 03500 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal, 03501 SubGcpCompleteRep::SignalLength, JBB); 03502 03503 Ptr<Gcp_record> gcp; 03504 if(c_gcp_list.seize(gcp)) 03505 { 03506 gcp.p->m_gci = gci; 03507 gcp.p->m_subscribers = c_subscriber_nodes; 03508 } 03509 } 03510 03514 for(Uint32 i = 0; i<c_no_of_buckets; i++) 03515 { 03516 if(m_active_buckets.get(i)) 03517 continue; 03518 03519 if(c_buckets[i].m_buffer_tail != RNIL) 03520 { 03521 Uint32* dst; 03522 get_buffer_ptr(signal, i, gci, 0); 03523 } 03524 } 03525 03526 if(gci == m_out_of_buffer_gci) 03527 { 03528 infoEvent("Reenable event buffer"); 03529 m_out_of_buffer_gci = 0; 03530 } 03531 } 03532 03533 void 03534 Suma::execCREATE_TAB_CONF(Signal *signal) 03535 { 03536 jamEntry(); 03537 DBUG_ENTER("Suma::execCREATE_TAB_CONF"); 03538 03539 #if 0 03540 CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr(); 03541 Uint32 tableId = conf->senderData; 03542 03543 TablePtr tabPtr; 03544 initTable(signal,tableId,tabPtr); 03545 #endif 03546 DBUG_VOID_RETURN; 03547 } 03548 03549 void 03550 Suma::execDROP_TAB_CONF(Signal *signal) 03551 { 03552 jamEntry(); 03553 DBUG_ENTER("Suma::execDROP_TAB_CONF"); 03554 ndbassert(signal->getNoOfSections() == 0); 03555 03556 DropTabConf * const conf = (DropTabConf*)signal->getDataPtr(); 03557 Uint32 senderRef= conf->senderRef; 03558 Uint32 tableId= conf->tableId; 03559 03560 TablePtr tabPtr; 03561 if (!c_tables.find(tabPtr, tableId) || 03562 tabPtr.p->m_state == Table::DROPPED || 03563 tabPtr.p->m_state == Table::ALTERED) 03564 { 03565 DBUG_VOID_RETURN; 03566 } 03567 03568 DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i)); 03569 03570 tabPtr.p->m_state = Table::DROPPED; 03571 for (int j= 0; j < 3; j++) 03572 { 03573 if (!tabPtr.p->m_hasOutstandingTriggerReq[j]) 03574 { 03575 tabPtr.p->m_hasTriggerDefined[j] = 0; 03576 tabPtr.p->m_hasOutstandingTriggerReq[j] = 0; 03577 tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID; 03578 } 03579 else 03580 tabPtr.p->m_hasTriggerDefined[j] = 1; 03581 } 03582 if (senderRef == 0) 03583 { 03584 DBUG_VOID_RETURN; 03585 } 03586 // dict coordinator sends info to API 03587 03588 SubTableData * data = (SubTableData*)signal->getDataPtrSend(); 03589 data->gci = m_last_complete_gci+1; 03590 data->tableId = tableId; 03591 data->operation = NdbDictionary::Event::_TE_DROP; 03592 data->req_nodeid = refToNode(senderRef); 03593 03594 { 03595 LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers); 03596 SubscriberPtr subbPtr; 03597 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr)) 03598 { 03599 jam(); 03600 /* 03601 * get subscription ptr for this subscriber 03602 */ 03603 SubscriptionPtr subPtr; 03604 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 03605 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) { 03606 jam(); 03607 continue; 03608 //continue in for-loop if the table is not part of 03609 //the subscription. Otherwise, send data to subscriber. 03610 } 03611 data->senderData= subbPtr.p->m_senderData; 03612 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 03613 SubTableData::SignalLength, JBB); 03614 DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i)); 03615 } 03616 } 03617 DBUG_VOID_RETURN; 03618 } 03619 03620 static Uint32 b_dti_buf[MAX_WORDS_META_FILE]; 03621 03622 void 03623 Suma::execALTER_TAB_REQ(Signal *signal) 03624 { 03625 jamEntry(); 03626 DBUG_ENTER("Suma::execALTER_TAB_REQ"); 03627 ndbassert(signal->getNoOfSections() == 1); 03628 03629 AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr(); 03630 Uint32 senderRef= req->senderRef; 03631 Uint32 tableId= req->tableId; 03632 Uint32 changeMask= req->changeMask; 03633 TablePtr tabPtr; 03634 if (!c_tables.find(tabPtr, tableId) || 03635 tabPtr.p->m_state == Table::DROPPED || 03636 tabPtr.p->m_state == Table::ALTERED) 03637 { 03638 DBUG_VOID_RETURN; 03639 } 03640 03641 DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i)); 03642 Table::State old_state = tabPtr.p->m_state; 03643 tabPtr.p->m_state = Table::ALTERED; 03644 // triggers must be removed, waiting for sub stop req for that 03645 03646 if (senderRef == 0) 03647 { 03648 DBUG_VOID_RETURN; 03649 } 03650 // dict coordinator sends info to API 03651 03652 // Copy DICT_TAB_INFO to local buffer 03653 SegmentedSectionPtr tabInfoPtr; 03654 signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO); 03655 #ifndef DBUG_OFF 03656 ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz); 03657 SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool()); 03658 reader.printAll(ndbout); 03659 #endif 03660 copy(b_dti_buf, tabInfoPtr); 03661 LinearSectionPtr ptr[3]; 03662 ptr[0].p = b_dti_buf; 03663 ptr[0].sz = tabInfoPtr.sz; 03664 03665 releaseSections(signal); 03666 03667 SubTableData * data = (SubTableData*)signal->getDataPtrSend(); 03668 data->gci = m_last_complete_gci+1; 03669 data->tableId = tableId; 03670 data->operation = NdbDictionary::Event::_TE_ALTER; 03671 data->req_nodeid = refToNode(senderRef); 03672 data->logType = 0; 03673 data->changeMask = changeMask; 03674 data->totalLen = tabInfoPtr.sz; 03675 { 03676 LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers); 03677 SubscriberPtr subbPtr; 03678 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr)) 03679 { 03680 jam(); 03681 /* 03682 * get subscription ptr for this subscriber 03683 */ 03684 SubscriptionPtr subPtr; 03685 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 03686 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) { 03687 jam(); 03688 continue; 03689 //continue in for-loop if the table is not part of 03690 //the subscription. Otherwise, send data to subscriber. 03691 } 03692 03693 data->senderData= subbPtr.p->m_senderData; 03694 Callback c = { 0, 0 }; 03695 sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 03696 SubTableData::SignalLength, JBB, ptr, 1, c); 03697 DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i)); 03698 } 03699 } 03700 if (AlterTableReq::getFrmFlag(changeMask)) 03701 { 03702 // Frm changes only are handled on-line 03703 tabPtr.p->m_state = old_state; 03704 } 03705 DBUG_VOID_RETURN; 03706 } 03707 03708 void 03709 Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal) 03710 { 03711 jamEntry(); 03712 ndbassert(signal->getNoOfSections() == 0); 03713 03714 SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr(); 03715 Uint32 gci = ack->rep.gci; 03716 Uint32 senderRef = ack->rep.senderRef; 03717 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); 03718 03719 if (refToBlock(senderRef) == SUMA) { 03720 jam(); 03721 // Ack from other SUMA 03722 Uint32 nodeId= refToNode(senderRef); 03723 for(Uint32 i = 0; i<c_no_of_buckets; i++) 03724 { 03725 if(m_active_buckets.get(i) || 03726 (m_switchover_buckets.get(i) && (check_switchover(i, gci))) || 03727 (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId)) 03728 { 03729 release_gci(signal, i, gci); 03730 } 03731 } 03732 return; 03733 } 03734 03735 // Ack from User and not an ack from other SUMA, redistribute in nodegroup 03736 03737 Uint32 nodeId = refToNode(senderRef); 03738 03739 jam(); 03740 Ptr<Gcp_record> gcp; 03741 for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp)) 03742 { 03743 if(gcp.p->m_gci == gci) 03744 { 03745 gcp.p->m_subscribers.clear(nodeId); 03746 if(!gcp.p->m_subscribers.isclear()) 03747 { 03748 jam(); 03749 return; 03750 } 03751 break; 03752 } 03753 } 03754 03755 if(gcp.isNull()) 03756 { 03757 ndbout_c("ACK wo/ gcp record (gci: %d)", gci); 03758 } 03759 else 03760 { 03761 c_gcp_list.release(gcp); 03762 } 03763 03764 CRASH_INSERTION(13011); 03765 if(ERROR_INSERTED(13012)) 03766 { 03767 CLEAR_ERROR_INSERT_VALUE; 03768 ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK"); 03769 return; 03770 } 03771 03772 ack->rep.senderRef = reference(); 03773 NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask); 03774 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal, 03775 SubGcpCompleteAck::SignalLength, JBB); 03776 } 03777 03778 /************************************************************** 03779 * 03780 * Removing subscription 03781 * 03782 */ 03783 03784 void 03785 Suma::execSUB_REMOVE_REQ(Signal* signal) 03786 { 03787 jamEntry(); 03788 DBUG_ENTER("Suma::execSUB_REMOVE_REQ"); 03789 ndbassert(signal->getNoOfSections() == 0); 03790 03791 CRASH_INSERTION(13021); 03792 03793 const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr(); 03794 SubscriptionPtr subPtr; 03795 Subscription key; 03796 key.m_subscriptionId = req.subscriptionId; 03797 key.m_subscriptionKey = req.subscriptionKey; 03798 03799 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u", 03800 key.m_subscriptionId, key.m_subscriptionKey)); 03801 03802 if(!c_subscriptions.find(subPtr, key)) 03803 { 03804 jam(); 03805 DBUG_PRINT("info",("Not found")); 03806 sendSubRemoveRef(signal, req, 1407); 03807 DBUG_VOID_RETURN; 03808 } 03809 if (subPtr.p->m_state == Subscription::LOCKED) 03810 { 03814 jam(); 03815 sendSubRemoveRef(signal, req, 1413); 03816 DBUG_VOID_RETURN; 03817 } 03818 if (subPtr.p->m_state == Subscription::DROPPED) 03819 { 03823 jam(); 03824 sendSubRemoveRef(signal, req, 1419); 03825 DBUG_VOID_RETURN; 03826 } 03827 03828 ndbrequire(subPtr.p->m_state == Subscription::DEFINED); 03829 DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers)); 03830 03831 if (subPtr.p->n_subscribers == 0) 03832 { 03833 // no subscribers on the subscription 03834 // remove it 03835 jam(); 03836 completeSubRemove(subPtr); 03837 } 03838 else 03839 { 03840 // subscribers left on the subscription 03841 // mark it to be removed once all subscribers 03842 // are removed 03843 jam(); 03844 subPtr.p->m_state = Subscription::DROPPED; 03845 } 03846 03847 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); 03848 conf->senderRef = reference(); 03849 conf->senderData = req.senderData; 03850 conf->subscriptionId = req.subscriptionId; 03851 conf->subscriptionKey = req.subscriptionKey; 03852 03853 sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal, 03854 SubRemoveConf::SignalLength, JBB); 03855 03856 DBUG_VOID_RETURN; 03857 } 03858 03859 void 03860 Suma::completeSubRemove(SubscriptionPtr subPtr) 03861 { 03862 DBUG_ENTER("Suma::completeSubRemove"); 03863 Uint32 subscriptionId = subPtr.p->m_subscriptionId; 03864 Uint32 subscriptionKey = subPtr.p->m_subscriptionKey; 03865 03866 c_subscriptions.release(subPtr); 03867 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d", 03868 c_subscriptionPool.getSize(), 03869 c_subscriptionPool.getNoOfFree())); 03870 03874 #if 0 03875 ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d", 03876 c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree()); 03877 #endif 03878 03879 if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) { 03880 jam(); 03881 #if 0 03882 ndbout_c("SUB_REMOVE_REQ:Clearing c_tables"); 03883 #endif 03884 int count= 0; 03885 KeyTable<Table>::Iterator it; 03886 for(c_tables.first(it); !it.isNull(); ) 03887 { 03888 // ndbrequire(false); 03889 03890 DBUG_PRINT("error",("trailing table id: %d[i=%d] n_subscribers: %d m_state: %d", 03891 it.curr.p->m_tableId, 03892 it.curr.p->m_ptrI, 03893 it.curr.p->n_subscribers, 03894 it.curr.p->m_state)); 03895 03896 LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers); 03897 SubscriberPtr subbPtr; 03898 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr)) 03899 { 03900 DBUG_PRINT("error",("subscriber %d, m_subPtrI: %d", subbPtr.i, subbPtr.p->m_subPtrI)); 03901 } 03902 03903 it.curr.p->release(* this); 03904 TablePtr tabPtr = it.curr; 03905 c_tables.next(it); 03906 c_tables.remove(tabPtr); 03907 c_tablePool.release(tabPtr); 03908 DBUG_PRINT("info",("c_tablePool size: %d free: %d", 03909 c_tablePool.getSize(), 03910 c_tablePool.getNoOfFree())); 03911 count++; 03912 } 03913 DBUG_ASSERT(count == 0); 03914 } 03915 DBUG_VOID_RETURN; 03916 } 03917 03918 void 03919 Suma::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req, 03920 Uint32 errCode) 03921 { 03922 jam(); 03923 DBUG_ENTER("Suma::sendSubRemoveRef"); 03924 SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend(); 03925 ref->senderRef = reference(); 03926 ref->senderData = req.senderData; 03927 ref->subscriptionId = req.subscriptionId; 03928 ref->subscriptionKey = req.subscriptionKey; 03929 ref->errorCode = errCode; 03930 releaseSections(signal); 03931 sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, 03932 signal, SubRemoveRef::SignalLength, JBB); 03933 DBUG_VOID_RETURN; 03934 } 03935 03936 void 03937 Suma::Table::release(Suma & suma){ 03938 jam(); 03939 03940 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes); 03941 attrBuf.release(); 03942 03943 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments); 03944 fragBuf.release(); 03945 03946 m_state = UNDEFINED; 03947 #ifndef DBUG_OFF 03948 if (n_subscribers != 0) 03949 abort(); 03950 #endif 03951 } 03952 03953 void 03954 Suma::SyncRecord::release(){ 03955 jam(); 03956 m_tableList.release(); 03957 03958 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList); 03959 attrBuf.release(); 03960 } 03961 03962 03963 /************************************************************** 03964 * 03965 * Restarting remote node functions, master functionality 03966 * (slave does nothing special) 03967 * - triggered on INCL_NODEREQ calling startNode 03968 * - included node will issue START_ME when it's ready to start 03969 * the subscribers 03970 * 03971 */ 03972 03973 void 03974 Suma::execSUMA_START_ME_REQ(Signal* signal) { 03975 jamEntry(); 03976 DBUG_ENTER("Suma::execSUMA_START_ME"); 03977 ndbassert(signal->getNoOfSections() == 0); 03978 Restart.runSUMA_START_ME_REQ(signal, signal->getSendersBlockRef()); 03979 DBUG_VOID_RETURN; 03980 } 03981 03982 void 03983 Suma::execSUB_CREATE_REF(Signal* signal) { 03984 jamEntry(); 03985 DBUG_ENTER("Suma::execSUB_CREATE_REF"); 03986 ndbassert(signal->getNoOfSections() == 0); 03987 SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr(); 03988 Uint32 error= ref->errorCode; 03989 if (error != 1415) 03990 { 03991 /* 03992 * This will happen if an api node connects during while other node 03993 * is restarting, and in this case the subscription will already 03994 * have been created. 03995 * ToDo: more complete handling of api nodes joining during 03996 * node restart 03997 */ 03998 Uint32 senderRef = signal->getSendersBlockRef(); 03999 BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef)); 04000 // for some reason we did not manage to create a subscription 04001 // on the starting node 04002 SystemError * const sysErr = (SystemError*)&signal->theData[0]; 04003 sysErr->errorCode = SystemError::CopySubscriptionRef; 04004 sysErr->errorRef = reference(); 04005 sysErr->data1 = error; 04006 sysErr->data2 = 0; 04007 sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal, 04008 SystemError::SignalLength, JBB); 04009 Restart.resetRestart(signal); 04010 DBUG_VOID_RETURN; 04011 } 04012 // SubCreateConf has same signaldata as SubCreateRef 04013 Restart.runSUB_CREATE_CONF(signal); 04014 DBUG_VOID_RETURN; 04015 } 04016 04017 void 04018 Suma::execSUB_CREATE_CONF(Signal* signal) 04019 { 04020 jamEntry(); 04021 DBUG_ENTER("Suma::execSUB_CREATE_CONF"); 04022 ndbassert(signal->getNoOfSections() == 0); 04023 Restart.runSUB_CREATE_CONF(signal); 04024 DBUG_VOID_RETURN; 04025 } 04026 04027 void 04028 Suma::execSUB_START_CONF(Signal* signal) 04029 { 04030 jamEntry(); 04031 DBUG_ENTER("Suma::execSUB_START_CONF"); 04032 ndbassert(signal->getNoOfSections() == 0); 04033 Restart.runSUB_START_CONF(signal); 04034 DBUG_VOID_RETURN; 04035 } 04036 04037 void 04038 Suma::execSUB_START_REF(Signal* signal) { 04039 jamEntry(); 04040 DBUG_ENTER("Suma::execSUB_START_REF"); 04041 ndbassert(signal->getNoOfSections() == 0); 04042 SubStartRef *const ref= (SubStartRef *)signal->getDataPtr(); 04043 Uint32 error= ref->errorCode; 04044 { 04045 Uint32 senderRef = signal->getSendersBlockRef(); 04046 BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef)); 04047 // for some reason we did not manage to start a subscriber 04048 // on the starting node 04049 SystemError * const sysErr = (SystemError*)&signal->theData[0]; 04050 sysErr->errorCode = SystemError::CopySubscriberRef; 04051 sysErr->errorRef = reference(); 04052 sysErr->data1 = error; 04053 sysErr->data2 = 0; 04054 sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal, 04055 SystemError::SignalLength, JBB); 04056 Restart.resetRestart(signal); 04057 } 04058 DBUG_VOID_RETURN; 04059 } 04060 04061 Suma::Restart::Restart(Suma& s) : suma(s) 04062 { 04063 nodeId = 0; 04064 } 04065 04066 void 04067 Suma::Restart::runSUMA_START_ME_REQ(Signal* signal, Uint32 sumaRef) 04068 { 04069 jam(); 04070 DBUG_ENTER("Suma::Restart::runSUMA_START_ME"); 04071 04072 if(nodeId != 0) 04073 { 04074 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend(); 04075 ref->errorCode = SumaStartMeRef::Busy; 04076 suma.sendSignal(sumaRef, GSN_SUMA_START_ME_REF, signal, 04077 SumaStartMeRef::SignalLength, JBB); 04078 return; 04079 } 04080 04081 nodeId = refToNode(sumaRef); 04082 startNode(signal, sumaRef); 04083 04084 DBUG_VOID_RETURN; 04085 } 04086 04087 void 04088 Suma::Restart::startNode(Signal* signal, Uint32 sumaRef) 04089 { 04090 jam(); 04091 DBUG_ENTER("Suma::Restart::startNode"); 04092 04093 // right now we can only handle restarting one node 04094 // at a time in a node group 04095 04096 createSubscription(signal, sumaRef); 04097 DBUG_VOID_RETURN; 04098 } 04099 04100 void 04101 Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef) 04102 { 04103 jam(); 04104 DBUG_ENTER("Suma::Restart::createSubscription"); 04105 suma.c_subscriptions.first(c_subIt); 04106 nextSubscription(signal, sumaRef); 04107 DBUG_VOID_RETURN; 04108 } 04109 04110 void 04111 Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef) 04112 { 04113 jam(); 04114 DBUG_ENTER("Suma::Restart::nextSubscription"); 04115 04116 if (c_subIt.isNull()) 04117 { 04118 jam(); 04119 completeSubscription(signal, sumaRef); 04120 DBUG_VOID_RETURN; 04121 } 04122 SubscriptionPtr subPtr; 04123 subPtr.i = c_subIt.curr.i; 04124 subPtr.p = suma.c_subscriptions.getPtr(subPtr.i); 04125 04126 suma.c_subscriptions.next(c_subIt); 04127 04128 SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend(); 04129 04130 req->senderRef = suma.reference(); 04131 req->senderData = subPtr.i; 04132 req->subscriptionId = subPtr.p->m_subscriptionId; 04133 req->subscriptionKey = subPtr.p->m_subscriptionKey; 04134 req->subscriptionType = subPtr.p->m_subscriptionType | 04135 SubCreateReq::RestartFlag; 04136 04137 switch (subPtr.p->m_subscriptionType) { 04138 case SubCreateReq::TableEvent: 04139 jam(); 04140 req->tableId = subPtr.p->m_tableId; 04141 req->state = subPtr.p->m_state; 04142 suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal, 04143 SubCreateReq::SignalLength2, JBB); 04144 DBUG_VOID_RETURN; 04145 case SubCreateReq::SingleTableScan: 04146 jam(); 04147 nextSubscription(signal, sumaRef); 04148 DBUG_VOID_RETURN; 04149 case SubCreateReq::SelectiveTableSnapshot: 04150 case SubCreateReq::DatabaseSnapshot: 04151 ndbrequire(false); 04152 } 04153 ndbrequire(false); 04154 } 04155 04156 void 04157 Suma::Restart::runSUB_CREATE_CONF(Signal* signal) 04158 { 04159 jam(); 04160 DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF"); 04161 04162 const Uint32 senderRef = signal->senderBlockRef(); 04163 Uint32 sumaRef = signal->getSendersBlockRef(); 04164 04165 SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr(); 04166 04167 SubscriptionPtr subPtr; 04168 suma.c_subscriptions.getPtr(subPtr,conf->senderData); 04169 04170 switch(subPtr.p->m_subscriptionType) { 04171 case SubCreateReq::TableEvent: 04172 if (1) 04173 { 04174 jam(); 04175 nextSubscription(signal, sumaRef); 04176 } else { 04177 jam(); 04178 SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend(); 04179 04180 req->senderRef = suma.reference(); 04181 req->senderData = subPtr.i; 04182 req->subscriptionId = subPtr.p->m_subscriptionId; 04183 req->subscriptionKey = subPtr.p->m_subscriptionKey; 04184 req->subscriptionType = subPtr.p->m_subscriptionType | 04185 SubCreateReq::RestartFlag | 04186 SubCreateReq::AddTableFlag; 04187 04188 req->tableId = 0; 04189 04190 suma.sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal, 04191 SubCreateReq::SignalLength, JBB); 04192 } 04193 DBUG_VOID_RETURN; 04194 case SubCreateReq::SingleTableScan: 04195 case SubCreateReq::SelectiveTableSnapshot: 04196 case SubCreateReq::DatabaseSnapshot: 04197 ndbrequire(false); 04198 } 04199 ndbrequire(false); 04200 } 04201 04202 void 04203 Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef) 04204 { 04205 jam(); 04206 DBUG_ENTER("Suma::Restart::completeSubscription"); 04207 startSubscriber(signal, sumaRef); 04208 DBUG_VOID_RETURN; 04209 } 04210 04211 void 04212 Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef) 04213 { 04214 jam(); 04215 DBUG_ENTER("Suma::Restart::startSubscriber"); 04216 suma.c_tables.first(c_tabIt); 04217 if (c_tabIt.isNull()) 04218 { 04219 completeSubscriber(signal, sumaRef); 04220 DBUG_VOID_RETURN; 04221 } 04222 SubscriberPtr subbPtr; 04223 { 04224 LocalDLList<Subscriber> 04225 subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers); 04226 subbs.first(subbPtr); 04227 } 04228 nextSubscriber(signal, sumaRef, subbPtr); 04229 DBUG_VOID_RETURN; 04230 } 04231 04232 void 04233 Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef, 04234 SubscriberPtr subbPtr) 04235 { 04236 jam(); 04237 DBUG_ENTER("Suma::Restart::nextSubscriber"); 04238 while (subbPtr.isNull()) 04239 { 04240 jam(); 04241 DBUG_PRINT("info",("prev tableId %u",c_tabIt.curr.p->m_tableId)); 04242 suma.c_tables.next(c_tabIt); 04243 if (c_tabIt.isNull()) 04244 { 04245 completeSubscriber(signal, sumaRef); 04246 DBUG_VOID_RETURN; 04247 } 04248 DBUG_PRINT("info",("next tableId %u",c_tabIt.curr.p->m_tableId)); 04249 04250 LocalDLList<Subscriber> 04251 subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers); 04252 subbs.first(subbPtr); 04253 } 04254 04255 /* 04256 * get subscription ptr for this subscriber 04257 */ 04258 04259 SubscriptionPtr subPtr; 04260 suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); 04261 switch (subPtr.p->m_subscriptionType) { 04262 case SubCreateReq::TableEvent: 04263 jam(); 04264 sendSubStartReq(subPtr, subbPtr, signal, sumaRef); 04265 DBUG_VOID_RETURN; 04266 case SubCreateReq::SelectiveTableSnapshot: 04267 case SubCreateReq::DatabaseSnapshot: 04268 case SubCreateReq::SingleTableScan: 04269 ndbrequire(false); 04270 } 04271 ndbrequire(false); 04272 } 04273 04274 void 04275 Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr, 04276 Signal* signal, Uint32 sumaRef) 04277 { 04278 jam(); 04279 DBUG_ENTER("Suma::Restart::sendSubStartReq"); 04280 SubStartReq * req = (SubStartReq *)signal->getDataPtrSend(); 04281 04282 req->senderRef = suma.reference(); 04283 req->senderData = subbPtr.i; 04284 req->subscriptionId = subPtr.p->m_subscriptionId; 04285 req->subscriptionKey = subPtr.p->m_subscriptionKey; 04286 req->part = SubscriptionData::TableData; 04287 req->subscriberData = subbPtr.p->m_senderData; 04288 req->subscriberRef = subbPtr.p->m_senderRef; 04289 04290 // restarting suma will not respond to this until startphase 5 04291 // since it is not until then data copying has been completed 04292 DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u]", 04293 subbPtr.i, 04294 subPtr.p->m_subscriptionId, 04295 subPtr.p->m_subscriptionKey, 04296 subPtr.p->m_tableId)); 04297 04298 suma.sendSignal(sumaRef, GSN_SUB_START_REQ, 04299 signal, SubStartReq::SignalLength2, JBB); 04300 DBUG_VOID_RETURN; 04301 } 04302 04303 void 04304 Suma::Restart::runSUB_START_CONF(Signal* signal) 04305 { 04306 jam(); 04307 DBUG_ENTER("Suma::Restart::runSUB_START_CONF"); 04308 04309 SubStartConf * const conf = (SubStartConf*)signal->getDataPtr(); 04310 04311 Subscription key; 04312 SubscriptionPtr subPtr; 04313 key.m_subscriptionId = conf->subscriptionId; 04314 key.m_subscriptionKey = conf->subscriptionKey; 04315 ndbrequire(suma.c_subscriptions.find(subPtr, key)); 04316 04317 TablePtr tabPtr; 04318 ndbrequire(suma.c_tables.find(tabPtr, subPtr.p->m_tableId)); 04319 04320 SubscriberPtr subbPtr; 04321 { 04322 LocalDLList<Subscriber> 04323 subbs(suma.c_subscriberPool,tabPtr.p->c_subscribers); 04324 subbs.getPtr(subbPtr, conf->senderData); 04325 DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u", 04326 subbPtr.i,key.m_subscriptionId,key.m_subscriptionKey, 04327 subPtr.p->m_tableId)); 04328 subbs.next(subbPtr); 04329 } 04330 04331 Uint32 sumaRef = signal->getSendersBlockRef(); 04332 nextSubscriber(signal, sumaRef, subbPtr); 04333 04334 DBUG_VOID_RETURN; 04335 } 04336 04337 void 04338 Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef) 04339 { 04340 DBUG_ENTER("Suma::Restart::completeSubscriber"); 04341 completeRestartingNode(signal, sumaRef); 04342 DBUG_VOID_RETURN; 04343 } 04344 04345 void 04346 Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef) 04347 { 04348 jam(); 04349 DBUG_ENTER("Suma::Restart::completeRestartingNode"); 04350 //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend(); 04351 suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal, 04352 SumaStartMeConf::SignalLength, JBB); 04353 resetRestart(signal); 04354 DBUG_VOID_RETURN; 04355 } 04356 04357 void 04358 Suma::Restart::resetRestart(Signal* signal) 04359 { 04360 jam(); 04361 DBUG_ENTER("Suma::Restart::resetRestart"); 04362 nodeId = 0; 04363 DBUG_VOID_RETURN; 04364 } 04365 04366 // only run on restarting suma 04367 04368 void 04369 Suma::execSUMA_HANDOVER_REQ(Signal* signal) 04370 { 04371 jamEntry(); 04372 DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ"); 04373 // Uint32 sumaRef = signal->getSendersBlockRef(); 04374 SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr(); 04375 04376 Uint32 gci = req->gci; 04377 Uint32 nodeId = req->nodeId; 04378 Uint32 new_gci = m_last_complete_gci + MAX_CONCURRENT_GCP + 1; 04379 04380 Uint32 start_gci = (gci > new_gci ? gci : new_gci); 04381 // mark all active buckets really belonging to restarting SUMA 04382 04383 Bucket_mask tmp; 04384 for( Uint32 i = 0; i < c_no_of_buckets; i++) 04385 { 04386 if(get_responsible_node(i) == nodeId) 04387 { 04388 if (m_active_buckets.get(i)) 04389 { 04390 // I'm running this bucket but it should really be the restarted node 04391 tmp.set(i); 04392 m_active_buckets.clear(i); 04393 m_switchover_buckets.set(i); 04394 c_buckets[i].m_switchover_gci = start_gci; 04395 c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER; 04396 c_buckets[i].m_switchover_node = nodeId; 04397 ndbout_c("prepare to handover bucket: %d", i); 04398 } 04399 else if(m_switchover_buckets.get(i)) 04400 { 04401 ndbout_c("dont handover bucket: %d %d", i, nodeId); 04402 } 04403 } 04404 } 04405 04406 SumaHandoverConf* conf= (SumaHandoverConf*)signal->getDataPtrSend(); 04407 tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask); 04408 conf->gci = start_gci; 04409 conf->nodeId = getOwnNodeId(); 04410 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal, 04411 SumaHandoverConf::SignalLength, JBB); 04412 04413 DBUG_VOID_RETURN; 04414 } 04415 04416 // only run on all but restarting suma 04417 void 04418 Suma::execSUMA_HANDOVER_REF(Signal* signal) 04419 { 04420 ndbrequire(false); 04421 } 04422 04423 void 04424 Suma::execSUMA_HANDOVER_CONF(Signal* signal) { 04425 jamEntry(); 04426 DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF"); 04427 04428 SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr(); 04429 04430 Uint32 gci = conf->gci; 04431 Uint32 nodeId = conf->nodeId; 04432 Bucket_mask tmp; 04433 tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask); 04434 #ifdef HANDOVER_DEBUG 04435 ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci); 04436 #endif 04437 04438 for( Uint32 i = 0; i < c_no_of_buckets; i++) 04439 { 04440 if (tmp.get(i)) 04441 { 04442 ndbrequire(get_responsible_node(i) == getOwnNodeId()); 04443 // We should run this bucket, but _nodeId_ is 04444 c_buckets[i].m_switchover_gci = gci; 04445 c_buckets[i].m_state |= Bucket::BUCKET_STARTING; 04446 } 04447 } 04448 04449 char buf[255]; 04450 tmp.getText(buf); 04451 infoEvent("Suma: handover from node %d gci: %d buckets: %s (%d)", 04452 nodeId, gci, buf, c_no_of_buckets); 04453 m_switchover_buckets.bitOR(tmp); 04454 c_startup.m_handover_nodes.clear(nodeId); 04455 DBUG_VOID_RETURN; 04456 } 04457 04458 static 04459 NdbOut& 04460 operator<<(NdbOut & out, const Suma::Page_pos & pos) 04461 { 04462 out << "[ Page_pos:" 04463 << " m_page_id: " << pos.m_page_id 04464 << " m_page_pos: " << pos.m_page_pos 04465 << " m_max_gci: " << pos.m_max_gci 04466 << " ]"; 04467 return out; 04468 } 04469 04470 Uint32* 04471 Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint32 gci, Uint32 sz) 04472 { 04473 sz += 1; // len 04474 Bucket* bucket= c_buckets+buck; 04475 Page_pos pos= bucket->m_buffer_head; 04476 04477 Buffer_page* page = 0; 04478 Uint32 *ptr = 0; 04479 04480 if (likely(pos.m_page_id != RNIL)) 04481 { 04482 page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id); 04483 ptr= page->m_data + pos.m_page_pos; 04484 } 04485 04486 const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022)); 04487 04488 pos.m_page_pos += sz; 04489 pos.m_last_gci = gci; 04490 Uint32 max = pos.m_max_gci > gci ? pos.m_max_gci : gci; 04491 04492 if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS)) 04493 { 04494 pos.m_max_gci = max; 04495 bucket->m_buffer_head = pos; 04496 * ptr++ = (0x8000 << 16) | sz; // Same gci 04497 return ptr; 04498 } 04499 else if(pos.m_page_pos + 1 <= Buffer_page::DATA_WORDS) 04500 { 04501 loop: 04502 pos.m_max_gci = max; 04503 pos.m_page_pos += 1; 04504 bucket->m_buffer_head = pos; 04505 * ptr++ = (sz + 1); 04506 * ptr++ = gci; 04507 return ptr; 04508 } 04509 else 04510 { 04516 Uint32 next; 04517 if(unlikely((next= seize_page()) == RNIL)) 04518 { 04522 out_of_buffer(signal); 04523 return 0; 04524 } 04525 04526 if(likely(pos.m_page_id != RNIL)) 04527 { 04528 page->m_max_gci = pos.m_max_gci; 04529 page->m_words_used = pos.m_page_pos - sz; 04530 page->m_next_page= next; 04531 } 04532 else 04533 { 04534 bucket->m_buffer_tail = next; 04535 } 04536 04537 memset(&pos, 0, sizeof(pos)); 04538 pos.m_page_id = next; 04539 pos.m_page_pos = sz; 04540 pos.m_last_gci = gci; 04541 04542 page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id); 04543 page->m_next_page= RNIL; 04544 ptr= page->m_data; 04545 goto loop; // 04546 } 04547 } 04548 04549 void 04550 Suma::out_of_buffer(Signal* signal) 04551 { 04552 if(m_out_of_buffer_gci) 04553 { 04554 return; 04555 } 04556 04557 m_out_of_buffer_gci = m_last_complete_gci - 1; 04558 infoEvent("Out of event buffer: nodefailure will cause event failures"); 04559 04560 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE; 04561 signal->theData[1] = 0; 04562 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); 04563 } 04564 04565 void 04566 Suma::out_of_buffer_release(Signal* signal, Uint32 buck) 04567 { 04568 Bucket* bucket= c_buckets+buck; 04569 Uint32 tail= bucket->m_buffer_tail; 04570 04571 if(tail != RNIL) 04572 { 04573 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail); 04574 bucket->m_buffer_tail = page->m_next_page; 04575 free_page(tail, page); 04576 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE; 04577 signal->theData[1] = buck; 04578 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); 04579 return; 04580 } 04581 04585 bucket->m_buffer_head.m_page_id = RNIL; 04586 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1; 04587 04588 buck++; 04589 if(buck != c_no_of_buckets) 04590 { 04591 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE; 04592 signal->theData[1] = buck; 04593 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB); 04594 return; 04595 } 04596 04601 m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci 04602 ? m_max_seen_gci + 1 : m_last_complete_gci + 1; 04603 } 04604 04605 Uint32 04606 Suma::seize_page() 04607 { 04608 if(unlikely(m_out_of_buffer_gci)) 04609 { 04610 return RNIL; 04611 } 04612 loop: 04613 Ptr<Page_chunk> ptr; 04614 Uint32 ref= m_first_free_page; 04615 if(likely(ref != RNIL)) 04616 { 04617 m_first_free_page = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page; 04618 Uint32 chunk = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i; 04619 c_page_chunk_pool.getPtr(ptr, chunk); 04620 ndbassert(ptr.p->m_free); 04621 ptr.p->m_free--; 04622 return ref; 04623 } 04624 04625 if(!c_page_chunk_pool.seize(ptr)) 04626 return RNIL; 04627 04628 Uint32 count; 04629 m_tup->allocConsPages(16, count, ref); 04630 ndbrequire(count > 0); 04631 04632 ndbout_c("alloc_chunk(%d %d) - ", ref, count); 04633 04634 m_first_free_page = ptr.p->m_page_id = ref; 04635 ptr.p->m_size = count; 04636 ptr.p->m_free = count; 04637 04638 Buffer_page* page; 04639 for(Uint32 i = 0; i<count; i++) 04640 { 04641 page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref); 04642 page->m_page_state= SUMA_SEQUENCE; 04643 page->m_page_chunk_ptr_i = ptr.i; 04644 page->m_next_page = ++ref; 04645 } 04646 page->m_next_page = RNIL; 04647 04648 goto loop; 04649 } 04650 04651 void 04652 Suma::free_page(Uint32 page_id, Buffer_page* page) 04653 { 04654 Ptr<Page_chunk> ptr; 04655 ndbrequire(page->m_page_state == SUMA_SEQUENCE); 04656 04657 Uint32 chunk= page->m_page_chunk_ptr_i; 04658 04659 c_page_chunk_pool.getPtr(ptr, chunk); 04660 04661 ptr.p->m_free ++; 04662 page->m_next_page = m_first_free_page; 04663 ndbrequire(ptr.p->m_free <= ptr.p->m_size); 04664 04665 m_first_free_page = page_id; 04666 } 04667 04668 void 04669 Suma::release_gci(Signal* signal, Uint32 buck, Uint32 gci) 04670 { 04671 Bucket* bucket= c_buckets+buck; 04672 Uint32 tail= bucket->m_buffer_tail; 04673 Page_pos head= bucket->m_buffer_head; 04674 Uint32 max_acked = bucket->m_max_acked_gci; 04675 04676 const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND; 04677 if(unlikely(bucket->m_state & mask)) 04678 { 04679 jam(); 04680 ndbout_c("release_gci(%d, %d) -> node failure -> abort", buck, gci); 04681 return; 04682 } 04683 04684 bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci); 04685 if(unlikely(tail == RNIL)) 04686 { 04687 return; 04688 } 04689 04690 if(tail == head.m_page_id) 04691 { 04692 if(gci >= head.m_max_gci) 04693 { 04694 jam(); 04695 head.m_page_pos = 0; 04696 head.m_max_gci = gci; 04697 head.m_last_gci = 0; 04698 bucket->m_buffer_head = head; 04699 } 04700 return; 04701 } 04702 else 04703 { 04704 jam(); 04705 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail); 04706 Uint32 max_gci = page->m_max_gci; 04707 Uint32 next_page = page->m_next_page; 04708 04709 ndbassert(max_gci); 04710 04711 if(gci >= max_gci) 04712 { 04713 jam(); 04714 free_page(tail, page); 04715 04716 bucket->m_buffer_tail = next_page; 04717 signal->theData[0] = SumaContinueB::RELEASE_GCI; 04718 signal->theData[1] = buck; 04719 signal->theData[2] = gci; 04720 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB); 04721 return; 04722 } 04723 else 04724 { 04725 //ndbout_c("do nothing..."); 04726 } 04727 } 04728 } 04729 04730 static Uint32 g_cnt = 0; 04731 04732 void 04733 Suma::start_resend(Signal* signal, Uint32 buck) 04734 { 04735 printf("start_resend(%d, ", buck); 04736 04737 if(m_out_of_buffer_gci) 04738 { 04739 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR, 04740 "Nodefailure while out of event buffer"); 04741 return; 04742 } 04743 04747 Bucket* bucket= c_buckets + buck; 04748 Page_pos pos= bucket->m_buffer_head; 04749 04750 if(pos.m_page_id == RNIL) 04751 { 04752 jam(); 04753 m_active_buckets.set(buck); 04754 m_gcp_complete_rep_count ++; 04755 ndbout_c("empty bucket(RNIL) -> active"); 04756 return; 04757 } 04758 04759 Uint32 min= bucket->m_max_acked_gci + 1; 04760 Uint32 max = pos.m_max_gci; 04761 04762 ndbrequire(max <= m_max_seen_gci); 04763 04764 if(min > max) 04765 { 04766 ndbrequire(pos.m_page_pos <= 2); 04767 ndbrequire(pos.m_page_id == bucket->m_buffer_tail); 04768 m_active_buckets.set(buck); 04769 m_gcp_complete_rep_count ++; 04770 ndbout_c("empty bucket -> active"); 04771 return; 04772 } 04773 04774 g_cnt = 0; 04775 bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND); 04776 bucket->m_switchover_node = get_responsible_node(buck); 04777 bucket->m_switchover_gci = max + 1; 04778 04779 m_switchover_buckets.set(buck); 04780 04781 signal->theData[1] = buck; 04782 signal->theData[2] = min; 04783 signal->theData[3] = 0; 04784 signal->theData[4] = 0; 04785 sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB); 04786 04787 ndbout_c("min: %d - max: %d) page: %d", min, max, bucket->m_buffer_tail); 04788 ndbrequire(max >= min); 04789 } 04790 04791 void 04792 Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, 04793 Uint32 pos, Uint32 last_gci) 04794 { 04795 Bucket* bucket= c_buckets+buck; 04796 Uint32 tail= bucket->m_buffer_tail; 04797 04798 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail); 04799 Uint32 max_gci = page->m_max_gci; 04800 Uint32 next_page = page->m_next_page; 04801 Uint32 *ptr = page->m_data + pos; 04802 Uint32 *end = page->m_data + page->m_words_used; 04803 bool delay = false; 04804 04805 ndbrequire(tail != RNIL); 04806 04807 if(tail == bucket->m_buffer_head.m_page_id) 04808 { 04809 max_gci= bucket->m_buffer_head.m_max_gci; 04810 end= page->m_data + bucket->m_buffer_head.m_page_pos; 04811 next_page= RNIL; 04812 04813 if(ptr == end) 04814 { 04815 delay = true; 04816 goto next; 04817 } 04818 } 04819 else if(pos == 0 && min_gci > max_gci) 04820 { 04821 free_page(tail, page); 04822 tail = bucket->m_buffer_tail = next_page; 04823 ndbout_c("pos==0 && min_gci(%d) > max_gci(%d) resend switching page to %d", min_gci, max_gci, tail); 04824 goto next; 04825 } 04826 04827 #if 0 04828 for(Uint32 i = 0; i<page->m_words_used; i++) 04829 { 04830 printf("%.8x ", page->m_data[i]); 04831 if(((i + 1) % 8) == 0) 04832 printf("\n"); 04833 } 04834 printf("\n"); 04835 #endif 04836 04837 while(ptr < end) 04838 { 04839 Uint32 *src = ptr; 04840 Uint32 tmp = * src++; 04841 Uint32 sz = tmp & 0xFFFF; 04842 04843 ptr += sz; 04844 04845 if(! (tmp & (0x8000 << 16))) 04846 { 04847 sz--; 04848 last_gci = * src ++; 04849 } 04850 else 04851 { 04852 ndbrequire(ptr - sz > page->m_data); 04853 } 04854 04855 if(last_gci < min_gci) 04856 { 04857 continue; 04858 } 04859 04860 if(sz == 1) 04861 { 04862 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend(); 04863 rep->gci = last_gci; 04864 rep->senderRef = reference(); 04865 rep->gcp_complete_rep_count = 1; 04866 04867 char buf[255]; 04868 c_subscriber_nodes.getText(buf); 04869 ndbout_c("resending GCI: %d rows: %d -> %s", last_gci, g_cnt, buf); 04870 g_cnt = 0; 04871 04872 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes); 04873 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal, 04874 SubGcpCompleteRep::SignalLength, JBB); 04875 } 04876 else 04877 { 04878 g_cnt++; 04879 Uint32 table = * src++ ; 04880 Uint32 event = * src >> 16; 04881 Uint32 sz_1 = (* src ++) & 0xFFFF; 04882 04883 ndbassert(sz - 2 >= sz_1); 04884 04885 LinearSectionPtr ptr[3]; 04886 const Uint32 nptr= reformat(signal, ptr, 04887 src, sz_1, 04888 src + sz_1, sz - 2 - sz_1); 04889 Uint32 ptrLen= 0; 04890 for(Uint32 i =0; i < nptr; i++) 04891 ptrLen+= ptr[i].sz; 04895 Ptr<Table> tabPtr; 04896 ndbrequire((tabPtr.p = c_tablePool.getPtr(table)) != 0); 04897 04898 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; 04899 data->gci = last_gci; 04900 data->tableId = tabPtr.p->m_tableId; 04901 data->operation = event; 04902 data->logType = 0; 04903 data->changeMask = 0; 04904 data->totalLen = ptrLen; 04905 04906 { 04907 LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); 04908 SubscriberPtr subbPtr; 04909 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) 04910 { 04911 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", 04912 refToNode(subbPtr.p->m_senderRef))); 04913 data->senderData = subbPtr.p->m_senderData; 04914 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal, 04915 SubTableData::SignalLength, JBB, ptr, nptr); 04916 } 04917 } 04918 } 04919 04920 break; 04921 } 04922 04923 if(ptr == end && (tail != bucket->m_buffer_head.m_page_id)) 04924 { 04928 free_page(tail, page); 04929 tail = bucket->m_buffer_tail = next_page; 04930 pos = 0; 04931 last_gci = 0; 04932 ndbout_c("ptr == end -> resend switching page to %d", tail); 04933 } 04934 else 04935 { 04936 pos = (ptr - page->m_data); 04937 } 04938 04939 next: 04940 if(tail == RNIL) 04941 { 04942 bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND; 04943 ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER)); 04944 ndbout_c("resend done..."); 04945 return; 04946 } 04947 04948 signal->theData[0] = SumaContinueB::RESEND_BUCKET; 04949 signal->theData[1] = buck; 04950 signal->theData[2] = min_gci; 04951 signal->theData[3] = pos; 04952 signal->theData[4] = last_gci; 04953 if(!delay) 04954 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 5, JBB); 04955 else 04956 sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 5); 04957 } 04958 04959 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&); 04960
1.4.7

