00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include "Trix.hpp" 00018 00019 #include <string.h> 00020 #include <kernel_types.h> 00021 #include <NdbOut.hpp> 00022 00023 #include <signaldata/ReadNodesConf.hpp> 00024 #include <signaldata/NodeFailRep.hpp> 00025 #include <signaldata/DumpStateOrd.hpp> 00026 #include <signaldata/GetTabInfo.hpp> 00027 #include <signaldata/DictTabInfo.hpp> 00028 #include <signaldata/BuildIndx.hpp> 00029 #include <signaldata/SumaImpl.hpp> 00030 #include <signaldata/UtilPrepare.hpp> 00031 #include <signaldata/UtilExecute.hpp> 00032 #include <signaldata/UtilRelease.hpp> 00033 #include <SectionReader.hpp> 00034 #include <AttributeHeader.hpp> 00035 00036 #define CONSTRAINT_VIOLATION 893 00037 00038 #define DEBUG(x) { ndbout << "TRIX::" << x << endl; } 00039 00043 Trix::Trix(Block_context& ctx) : 00044 SimulatedBlock(TRIX, ctx), 00045 c_theNodes(c_theNodeRecPool), 00046 c_masterNodeId(0), 00047 c_masterTrixRef(0), 00048 c_noNodesFailed(0), 00049 c_noActiveNodes(0), 00050 c_theSubscriptions(c_theSubscriptionRecPool) 00051 { 00052 BLOCK_CONSTRUCTOR(Trix); 00053 00054 // Add received signals 00055 addRecSignal(GSN_READ_CONFIG_REQ, &Trix::execREAD_CONFIG_REQ); 00056 addRecSignal(GSN_STTOR, &Trix::execSTTOR); 00057 addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT 00058 addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF); 00059 addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF); 00060 addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP); 00061 addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ); 00062 addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD); 00063 00064 // Index build 00065 addRecSignal(GSN_BUILDINDXREQ, &Trix::execBUILDINDXREQ); 00066 // Dump testing 00067 addRecSignal(GSN_BUILDINDXCONF, &Trix::execBUILDINDXCONF); 00068 addRecSignal(GSN_BUILDINDXREF, &Trix::execBUILDINDXREF); 00069 00070 00071 addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF); 00072 addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF); 00073 addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF); 00074 addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF); 00075 addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF); 00076 addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF); 00077 00078 00079 // Suma signals 00080 addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF); 00081 addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF); 00082 addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF); 00083 addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF); 00084 addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF); 00085 addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF); 00086 addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ); 00087 addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA); 00088 } 00089 00093 Trix::~Trix() 00094 { 00095 } 00096 00097 void 00098 Trix::execREAD_CONFIG_REQ(Signal* signal) 00099 { 00100 jamEntry(); 00101 00102 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); 00103 00104 Uint32 ref = req->senderRef; 00105 Uint32 senderData = req->senderData; 00106 00107 const ndb_mgm_configuration_iterator * p = 00108 m_ctx.m_config.getOwnConfigIterator(); 00109 ndbrequire(p != 0); 00110 00111 // Allocate pool sizes 00112 c_theAttrOrderBufferPool.setSize(100); 00113 c_theSubscriptionRecPool.setSize(100); 00114 00115 DLList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool); 00116 SubscriptionRecPtr subptr; 00117 while(subscriptions.seize(subptr) == true) { 00118 new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool); 00119 } 00120 subscriptions.release(); 00121 00122 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); 00123 conf->senderRef = reference(); 00124 conf->senderData = senderData; 00125 sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 00126 ReadConfigConf::SignalLength, JBB); 00127 } 00128 00132 void Trix::execSTTOR(Signal* signal) 00133 { 00134 jamEntry(); 00135 00136 //const Uint32 startphase = signal->theData[1]; 00137 const Uint32 theSignalKey = signal->theData[6]; 00138 00139 signal->theData[0] = theSignalKey; 00140 signal->theData[3] = 1; 00141 signal->theData[4] = 255; // No more start phases from missra 00142 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB); 00143 return; 00144 }//Trix::execSTTOR() 00145 00149 void Trix::execNDB_STTOR(Signal* signal) 00150 { 00151 jamEntry(); 00152 BlockReference ndbcntrRef = signal->theData[0]; 00153 Uint16 startphase = signal->theData[2]; /* RESTART PHASE */ 00154 Uint16 mynode = signal->theData[1]; 00155 //Uint16 restarttype = signal->theData[3]; 00156 //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */ 00157 //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */ 00158 switch (startphase) { 00159 case 3: 00160 jam(); 00161 /* SYMBOLIC START PHASE 4 */ 00162 /* ABSOLUTE PHASE 5 */ 00163 /* REQUEST NODE IDENTITIES FROM DBDIH */ 00164 signal->theData[0] = calcTrixBlockRef(mynode); 00165 sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB); 00166 return; 00167 break; 00168 case 6: 00169 break; 00170 default: 00171 break; 00172 } 00173 } 00174 00178 void Trix::execREAD_NODESCONF(Signal* signal) 00179 { 00180 jamEntry(); 00181 00182 ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr(); 00183 //Uint32 noOfNodes = readNodes->noOfNodes; 00184 NodeRecPtr nodeRecPtr; 00185 00186 c_masterNodeId = readNodes->masterNodeId; 00187 c_masterTrixRef = RNIL; 00188 c_noNodesFailed = 0; 00189 00190 for(unsigned i = 0; i < MAX_NDB_NODES; i++) { 00191 jam(); 00192 if(NodeBitmask::get(readNodes->allNodes, i)) { 00193 // Node is defined 00194 jam(); 00195 ndbrequire(c_theNodes.seizeId(nodeRecPtr, i)); 00196 nodeRecPtr.p->trixRef = calcTrixBlockRef(i); 00197 if (i == c_masterNodeId) { 00198 c_masterTrixRef = nodeRecPtr.p->trixRef; 00199 } 00200 if(NodeBitmask::get(readNodes->inactiveNodes, i)){ 00201 // Node is not active 00202 jam(); 00209 arrGuard(c_noNodesFailed, MAX_NDB_NODES); 00210 nodeRecPtr.p->alive = false; 00211 c_noNodesFailed++; 00212 c_blockState = Trix::NODE_FAILURE; 00213 } 00214 else { 00215 // Node is active 00216 jam(); 00217 c_noActiveNodes++; 00218 nodeRecPtr.p->alive = true; 00219 } 00220 } 00221 } 00222 if (c_noNodesFailed == 0) { 00223 c_blockState = Trix::STARTED; 00224 } 00225 } 00226 00230 void Trix::execREAD_NODESREF(Signal* signal) 00231 { 00232 // NYI 00233 } 00234 00238 void Trix::execNODE_FAILREP(Signal* signal) 00239 { 00240 jamEntry(); 00241 NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr(); 00242 00243 //Uint32 failureNr = nodeFail->failNo; 00244 //Uint32 numberNodes = nodeFail->noOfNodes; 00245 Uint32 masterNodeId = nodeFail->masterNodeId; 00246 00247 NodeRecPtr nodeRecPtr; 00248 00249 for(c_theNodes.first(nodeRecPtr); 00250 nodeRecPtr.i != RNIL; 00251 c_theNodes.next(nodeRecPtr)) { 00252 if(NodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) { 00253 nodeRecPtr.p->alive = false; 00254 c_noNodesFailed++; 00255 c_noActiveNodes--; 00256 } 00257 } 00258 if (c_masterNodeId != masterNodeId) { 00259 c_masterNodeId = masterNodeId; 00260 NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId); 00261 c_masterTrixRef = nodeRec->trixRef; 00262 } 00263 } 00264 00268 void Trix::execINCL_NODEREQ(Signal* signal) 00269 { 00270 jamEntry(); 00271 UintR node_id = signal->theData[1]; 00272 NodeRecord* nodeRec = c_theNodes.getPtr(node_id); 00273 nodeRec->alive = true; 00274 c_noNodesFailed--; 00275 c_noActiveNodes++; 00276 nodeRec->trixRef = calcTrixBlockRef(node_id); 00277 if (c_noNodesFailed == 0) { 00278 c_blockState = Trix::STARTED; 00279 } 00280 } 00281 00282 // Debugging 00283 void 00284 Trix::execDUMP_STATE_ORD(Signal* signal) 00285 { 00286 jamEntry(); 00287 00288 DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr(); 00289 00290 switch(dumpStateOrd->args[0]) { 00291 case(300): {// ok 00292 // index2 -T; index2 -I -n10000; index2 -c 00293 // all dump 300 0 0 0 0 0 4 2 00294 // select_count INDEX0000 00295 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00296 00297 MEMCOPY_NO_WORDS(buildIndxReq, 00298 signal->theData + 1, 00299 BuildIndxReq::SignalLength); 00300 buildIndxReq->setUserRef(reference()); // return to me 00301 buildIndxReq->setParallelism(10); 00302 Uint32 indexColumns[1] = {1}; 00303 Uint32 keyColumns[1] = {0}; 00304 struct LinearSectionPtr orderPtr[2]; 00305 buildIndxReq->setColumnOrder(indexColumns, 1, keyColumns, 1, orderPtr); 00306 sendSignal(reference(), 00307 GSN_BUILDINDXREQ, 00308 signal, 00309 BuildIndxReq::SignalLength, 00310 JBB, 00311 orderPtr, 00312 BuildIndxReq::NoOfSections); 00313 break; 00314 } 00315 case(301): { // ok 00316 // index2 -T; index2 -I -n10000; index2 -c -p 00317 // all dump 301 0 0 0 0 0 4 2 00318 // select_count INDEX0000 00319 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00320 00321 MEMCOPY_NO_WORDS(buildIndxReq, 00322 signal->theData + 1, 00323 BuildIndxReq::SignalLength); 00324 buildIndxReq->setUserRef(reference()); // return to me 00325 buildIndxReq->setParallelism(10); 00326 Uint32 indexColumns[2] = {0, 1}; 00327 Uint32 keyColumns[1] = {0}; 00328 struct LinearSectionPtr orderPtr[2]; 00329 buildIndxReq->setColumnOrder(indexColumns, 2, keyColumns, 1, orderPtr); 00330 sendSignal(reference(), 00331 GSN_BUILDINDXREQ, 00332 signal, 00333 BuildIndxReq::SignalLength, 00334 JBB, 00335 orderPtr, 00336 BuildIndxReq::NoOfSections); 00337 break; 00338 } 00339 case(302): { // ok 00340 // index -T; index -I -n1000; index -c -p 00341 // all dump 302 0 0 0 0 0 4 2 00342 // select_count PNUMINDEX0000 00343 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00344 00345 MEMCOPY_NO_WORDS(buildIndxReq, 00346 signal->theData + 1, 00347 BuildIndxReq::SignalLength); 00348 buildIndxReq->setUserRef(reference()); // return to me 00349 buildIndxReq->setParallelism(10); 00350 Uint32 indexColumns[3] = {0, 3, 5}; 00351 Uint32 keyColumns[1] = {0}; 00352 struct LinearSectionPtr orderPtr[2]; 00353 buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr); 00354 sendSignal(reference(), 00355 GSN_BUILDINDXREQ, 00356 signal, 00357 BuildIndxReq::SignalLength, 00358 JBB, 00359 orderPtr, 00360 BuildIndxReq::NoOfSections); 00361 break; 00362 } 00363 case(303): { // ok 00364 // index -T -2; index -I -2 -n1000; index -c -p 00365 // all dump 303 0 0 0 0 0 4 2 00366 // select_count PNUMINDEX0000 00367 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00368 00369 MEMCOPY_NO_WORDS(buildIndxReq, 00370 signal->theData + 1, 00371 BuildIndxReq::SignalLength); 00372 buildIndxReq->setUserRef(reference()); // return to me 00373 buildIndxReq->setParallelism(10); 00374 Uint32 indexColumns[3] = {0, 3, 5}; 00375 Uint32 keyColumns[2] = {0, 1}; 00376 struct LinearSectionPtr orderPtr[2]; 00377 buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr); 00378 sendSignal(reference(), 00379 GSN_BUILDINDXREQ, 00380 signal, 00381 BuildIndxReq::SignalLength, 00382 JBB, 00383 orderPtr, 00384 BuildIndxReq::NoOfSections); 00385 break; 00386 } 00387 case(304): { // ok 00388 // index -T -L; index -I -L -n1000; index -c -p 00389 // all dump 304 0 0 0 0 0 4 2 00390 // select_count PNUMINDEX0000 00391 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00392 00393 MEMCOPY_NO_WORDS(buildIndxReq, 00394 signal->theData + 1, 00395 BuildIndxReq::SignalLength); 00396 buildIndxReq->setUserRef(reference()); // return to me 00397 buildIndxReq->setParallelism(10); 00398 Uint32 indexColumns[3] = {0, 3, 5}; 00399 Uint32 keyColumns[1] = {0}; 00400 struct LinearSectionPtr orderPtr[2]; 00401 buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 1, orderPtr); 00402 sendSignal(reference(), 00403 GSN_BUILDINDXREQ, 00404 signal, 00405 BuildIndxReq::SignalLength, 00406 JBB, 00407 orderPtr, 00408 BuildIndxReq::NoOfSections); 00409 break; 00410 } 00411 case(305): { // ok 00412 // index -T -2 -L; index -I -2 -L -n1000; index -c -p 00413 // all dump 305 0 0 0 0 0 4 2 00414 // select_count PNUMINDEX0000 00415 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtrSend(); 00416 00417 MEMCOPY_NO_WORDS(buildIndxReq, 00418 signal->theData + 1, 00419 BuildIndxReq::SignalLength); 00420 buildIndxReq->setUserRef(reference()); // return to me 00421 buildIndxReq->setParallelism(10); 00422 Uint32 indexColumns[3] = {0, 3, 5}; 00423 Uint32 keyColumns[2] = {0, 1}; 00424 struct LinearSectionPtr orderPtr[2]; 00425 buildIndxReq->setColumnOrder(indexColumns, 3, keyColumns, 2, orderPtr); 00426 sendSignal(reference(), 00427 GSN_BUILDINDXREQ, 00428 signal, 00429 BuildIndxReq::SignalLength, 00430 JBB, 00431 orderPtr, 00432 BuildIndxReq::NoOfSections); 00433 break; 00434 } 00435 default: { 00436 // Ignore 00437 } 00438 } 00439 } 00440 00441 // Build index 00445 void Trix:: execBUILDINDXREQ(Signal* signal) 00446 { 00447 jamEntry(); 00448 DBUG_ENTER("Trix:: execBUILDINDXREQ"); 00449 00450 BuildIndxReq * buildIndxReq = (BuildIndxReq *)signal->getDataPtr(); 00451 00452 // Seize a subscription record 00453 SubscriptionRecPtr subRecPtr; 00454 SubscriptionRecord* subRec; 00455 00456 if (!c_theSubscriptions.seizeId(subRecPtr, buildIndxReq->getBuildId())) { 00457 // Failed to allocate subscription record 00458 BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend(); 00459 00460 buildIndxRef->setErrorCode(BuildIndxRef::AllocationFailure); 00461 releaseSections(signal); 00462 sendSignal(buildIndxReq->getUserRef(), 00463 GSN_BUILDINDXREF, signal, BuildIndxRef::SignalLength, JBB); 00464 DBUG_VOID_RETURN; 00465 } 00466 subRec = subRecPtr.p; 00467 subRec->errorCode = BuildIndxRef::NoError; 00468 subRec->userReference = buildIndxReq->getUserRef(); 00469 subRec->connectionPtr = buildIndxReq->getConnectionPtr(); 00470 subRec->subscriptionId = buildIndxReq->getBuildId(); 00471 subRec->subscriptionKey = buildIndxReq->getBuildKey(); 00472 subRec->indexType = buildIndxReq->getIndexType(); 00473 subRec->sourceTableId = buildIndxReq->getTableId(); 00474 subRec->targetTableId = buildIndxReq->getIndexId(); 00475 subRec->parallelism = buildIndxReq->getParallelism(); 00476 subRec->expectedConf = 0; 00477 subRec->subscriptionCreated = false; 00478 subRec->pendingSubSyncContinueConf = false; 00479 subRec->prepareId = RNIL; 00480 00481 // Get column order segments 00482 Uint32 noOfSections = signal->getNoOfSections(); 00483 if(noOfSections > 0) { 00484 SegmentedSectionPtr ptr; 00485 signal->getSection(ptr, BuildIndxReq::INDEX_COLUMNS); 00486 append(subRec->attributeOrder, ptr, getSectionSegmentPool()); 00487 subRec->noOfIndexColumns = ptr.sz; 00488 } 00489 if(noOfSections > 1) { 00490 SegmentedSectionPtr ptr; 00491 signal->getSection(ptr, BuildIndxReq::KEY_COLUMNS); 00492 append(subRec->attributeOrder, ptr, getSectionSegmentPool()); 00493 subRec->noOfKeyColumns = ptr.sz; 00494 } 00495 #if 0 00496 // Debugging 00497 printf("Trix:: execBUILDINDXREQ: Attribute order:\n"); 00498 subRec->attributeOrder.print(stdout); 00499 #endif 00500 releaseSections(signal); 00501 prepareInsertTransactions(signal, subRecPtr); 00502 DBUG_VOID_RETURN; 00503 } 00504 00505 void Trix:: execBUILDINDXCONF(Signal* signal) 00506 { 00507 printf("Trix:: execBUILDINDXCONF\n"); 00508 } 00509 00510 void Trix:: execBUILDINDXREF(Signal* signal) 00511 { 00512 printf("Trix:: execBUILDINDXREF\n"); 00513 } 00514 00515 void Trix::execUTIL_PREPARE_CONF(Signal* signal) 00516 { 00517 jamEntry(); 00518 UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr(); 00519 SubscriptionRecPtr subRecPtr; 00520 SubscriptionRecord* subRec; 00521 00522 subRecPtr.i = utilPrepareConf->senderData; 00523 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00524 printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i); 00525 return; 00526 } 00527 subRecPtr.p = subRec; 00528 subRec->prepareId = utilPrepareConf->prepareId; 00529 setupSubscription(signal, subRecPtr); 00530 } 00531 00532 void Trix::execUTIL_PREPARE_REF(Signal* signal) 00533 { 00534 jamEntry(); 00535 UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr(); 00536 SubscriptionRecPtr subRecPtr; 00537 SubscriptionRecord* subRec; 00538 00539 subRecPtr.i = utilPrepareRef->senderData; 00540 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00541 printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i); 00542 return; 00543 } 00544 subRecPtr.p = subRec; 00545 subRec->errorCode = BuildIndxRef::InternalError; 00546 } 00547 00548 void Trix::execUTIL_EXECUTE_CONF(Signal* signal) 00549 { 00550 jamEntry(); 00551 UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr(); 00552 SubscriptionRecPtr subRecPtr; 00553 SubscriptionRecord* subRec; 00554 00555 subRecPtr.i = utilExecuteConf->senderData; 00556 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00557 printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i); 00558 return; 00559 } 00560 subRecPtr.p = subRec; 00561 subRec->expectedConf--; 00562 checkParallelism(signal, subRec); 00563 if (subRec->expectedConf == 0) 00564 buildComplete(signal, subRecPtr); 00565 } 00566 00567 void Trix::execUTIL_EXECUTE_REF(Signal* signal) 00568 { 00569 jamEntry(); 00570 UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr(); 00571 SubscriptionRecPtr subRecPtr; 00572 SubscriptionRecord* subRec; 00573 00574 subRecPtr.i = utilExecuteRef->senderData; 00575 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00576 printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i); 00577 return; 00578 } 00579 subRecPtr.p = subRec; 00580 ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError); 00581 if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION) 00582 buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique); 00583 else 00584 buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); 00585 } 00586 00587 void Trix::execSUB_CREATE_CONF(Signal* signal) 00588 { 00589 jamEntry(); 00590 DBUG_ENTER("Trix::execSUB_CREATE_CONF"); 00591 SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr(); 00592 SubscriptionRecPtr subRecPtr; 00593 SubscriptionRecord* subRec; 00594 00595 subRecPtr.i = subCreateConf->senderData; 00596 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00597 printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i); 00598 DBUG_VOID_RETURN; 00599 } 00600 subRec->subscriptionCreated = true; 00601 subRecPtr.p = subRec; 00602 00603 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u", 00604 subRecPtr.i, subRecPtr.p->subscriptionId, 00605 subRecPtr.p->subscriptionKey)); 00606 00607 startTableScan(signal, subRecPtr); 00608 DBUG_VOID_RETURN; 00609 } 00610 00611 void Trix::execSUB_CREATE_REF(Signal* signal) 00612 { 00613 jamEntry(); 00614 DBUG_ENTER("Trix::execSUB_CREATE_REF"); 00615 // THIS SIGNAL IS NEVER SENT FROM SUMA? 00616 /* 00617 SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr(); 00618 SubscriptionRecPtr subRecPtr; 00619 SubscriptionRecord* subRec; 00620 00621 subRecPtr.i = subCreateRef->subscriberData; 00622 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00623 printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i); 00624 return; 00625 } 00626 subRecPtr.p = subRec; 00627 buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); 00628 */ 00629 DBUG_VOID_RETURN; 00630 } 00631 00632 void Trix::execSUB_SYNC_CONF(Signal* signal) 00633 { 00634 jamEntry(); 00635 DBUG_ENTER("Trix::execSUB_SYNC_CONF"); 00636 SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr(); 00637 SubscriptionRecPtr subRecPtr; 00638 SubscriptionRecord* subRec; 00639 00640 subRecPtr.i = subSyncConf->senderData; 00641 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00642 printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n", 00643 subRecPtr.i); 00644 DBUG_VOID_RETURN; 00645 } 00646 00647 subRecPtr.p = subRec; 00648 subRec->expectedConf--; 00649 checkParallelism(signal, subRec); 00650 if (subRec->expectedConf == 0) 00651 buildComplete(signal, subRecPtr); 00652 DBUG_VOID_RETURN; 00653 } 00654 00655 void Trix::execSUB_SYNC_REF(Signal* signal) 00656 { 00657 jamEntry(); 00658 DBUG_ENTER("Trix::execSUB_SYNC_REF"); 00659 SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr(); 00660 SubscriptionRecPtr subRecPtr; 00661 SubscriptionRecord* subRec; 00662 00663 subRecPtr.i = subSyncRef->senderData; 00664 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00665 printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i); 00666 DBUG_VOID_RETURN; 00667 } 00668 subRecPtr.p = subRec; 00669 buildFailed(signal, subRecPtr, BuildIndxRef::InternalError); 00670 DBUG_VOID_RETURN; 00671 } 00672 00673 void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal) 00674 { 00675 SubSyncContinueReq * subSyncContinueReq = 00676 (SubSyncContinueReq *) signal->getDataPtr(); 00677 00678 SubscriptionRecPtr subRecPtr; 00679 SubscriptionRecord* subRec; 00680 subRecPtr.i = subSyncContinueReq->subscriberData; 00681 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00682 printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i); 00683 return; 00684 } 00685 subRecPtr.p = subRec; 00686 subRec->pendingSubSyncContinueConf = true; 00687 checkParallelism(signal, subRec); 00688 } 00689 00690 void Trix::execSUB_TABLE_DATA(Signal* signal) 00691 { 00692 jamEntry(); 00693 DBUG_ENTER("Trix::execSUB_TABLE_DATA"); 00694 SubTableData * subTableData = (SubTableData *)signal->getDataPtr(); 00695 SubscriptionRecPtr subRecPtr; 00696 SubscriptionRecord* subRec; 00697 subRecPtr.i = subTableData->senderData; 00698 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) { 00699 printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i); 00700 DBUG_VOID_RETURN; 00701 } 00702 subRecPtr.p = subRec; 00703 SegmentedSectionPtr headerPtr, dataPtr; 00704 if (!signal->getSection(headerPtr, 0)) { 00705 printf("Trix::execSUB_TABLE_DATA: Failed to get header section\n"); 00706 } 00707 if (!signal->getSection(dataPtr, 1)) { 00708 printf("Trix::execSUB_TABLE_DATA: Failed to get data section\n"); 00709 } 00710 executeInsertTransaction(signal, subRecPtr, headerPtr, dataPtr); 00711 DBUG_VOID_RETURN; 00712 } 00713 00714 void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr) 00715 { 00716 jam(); 00717 DBUG_ENTER("Trix::setupSubscription"); 00718 SubscriptionRecord* subRec = subRecPtr.p; 00719 SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend(); 00720 // Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns; 00721 subCreateReq->senderRef = reference(); 00722 subCreateReq->senderData = subRecPtr.i; 00723 subCreateReq->subscriptionId = subRec->subscriptionId; 00724 subCreateReq->subscriptionKey = subRec->subscriptionKey; 00725 subCreateReq->tableId = subRec->sourceTableId; 00726 subCreateReq->subscriptionType = SubCreateReq::SingleTableScan; 00727 00728 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u", 00729 subRecPtr.i, subCreateReq->subscriptionId, 00730 subCreateReq->subscriptionKey)); 00731 00732 sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ, 00733 signal, SubCreateReq::SignalLength, JBB); 00734 DBUG_VOID_RETURN; 00735 } 00736 00737 void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr) 00738 { 00739 jam(); 00740 00741 Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2]; 00742 SubscriptionRecord* subRec = subRecPtr.p; 00743 AttrOrderBuffer::DataBufferIterator iter; 00744 Uint32 i = 0; 00745 00746 bool moreAttributes = subRec->attributeOrder.first(iter); 00747 while (moreAttributes) { 00748 attributeList[i++] = *iter.data; 00749 moreAttributes = subRec->attributeOrder.next(iter); 00750 } 00751 // Merge index and key column segments 00752 struct LinearSectionPtr orderPtr[3]; 00753 orderPtr[0].p = attributeList; 00754 orderPtr[0].sz = subRec->attributeOrder.getSize(); 00755 00756 SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend(); 00757 subSyncReq->senderRef = reference(); 00758 subSyncReq->senderData = subRecPtr.i; 00759 subSyncReq->subscriptionId = subRec->subscriptionId; 00760 subSyncReq->subscriptionKey = subRec->subscriptionKey; 00761 subSyncReq->part = SubscriptionData::TableData; 00762 00763 subRecPtr.p->expectedConf = 1; 00764 00765 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u", 00766 subRecPtr.i, subSyncReq->subscriptionId, 00767 subSyncReq->subscriptionKey)); 00768 00769 sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, 00770 signal, SubSyncReq::SignalLength, JBB, orderPtr, 1); 00771 } 00772 00773 void Trix::prepareInsertTransactions(Signal* signal, 00774 SubscriptionRecPtr subRecPtr) 00775 { 00776 SubscriptionRecord* subRec = subRecPtr.p; 00777 UtilPrepareReq * utilPrepareReq = 00778 (UtilPrepareReq *)signal->getDataPtrSend(); 00779 00780 jam(); 00781 utilPrepareReq->senderRef = reference(); 00782 utilPrepareReq->senderData = subRecPtr.i; 00783 00784 const Uint32 pageSizeInWords = 128; 00785 Uint32 propPage[pageSizeInWords]; 00786 LinearWriter w(&propPage[0],128); 00787 w.first(); 00788 w.add(UtilPrepareReq::NoOfOperations, 1); 00789 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write); 00790 w.add(UtilPrepareReq::TableId, subRec->targetTableId); 00791 // Add index attributes in increasing order and one PK attribute 00792 for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++) 00793 w.add(UtilPrepareReq::AttributeId, i); 00794 00795 #if 0 00796 // Debugging 00797 SimplePropertiesLinearReader reader(propPage, w.getWordsUsed()); 00798 printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n"); 00799 reader.printAll(ndbout); 00800 #endif 00801 00802 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections]; 00803 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage; 00804 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed(); 00805 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal, 00806 UtilPrepareReq::SignalLength, JBB, 00807 sectionsPtr, UtilPrepareReq::NoOfSections); 00808 } 00809 00810 void Trix::executeInsertTransaction(Signal* signal, 00811 SubscriptionRecPtr subRecPtr, 00812 SegmentedSectionPtr headerPtr, 00813 SegmentedSectionPtr dataPtr) 00814 { 00815 jam(); 00816 SubscriptionRecord* subRec = subRecPtr.p; 00817 UtilExecuteReq * utilExecuteReq = 00818 (UtilExecuteReq *)signal->getDataPtrSend(); 00819 Uint32* headerBuffer = signal->theData + 25; 00820 Uint32* dataBuffer = headerBuffer + headerPtr.sz; 00821 00822 utilExecuteReq->senderRef = reference(); 00823 utilExecuteReq->senderData = subRecPtr.i; 00824 utilExecuteReq->prepareId = subRec->prepareId; 00825 #if 0 00826 printf("Header size %u\n", headerPtr.sz); 00827 for(int i = 0; i < headerPtr.sz; i++) 00828 printf("H'%.8x ", headerBuffer[i]); 00829 printf("\n"); 00830 00831 printf("Data size %u\n", dataPtr.sz); 00832 for(int i = 0; i < dataPtr.sz; i++) 00833 printf("H'%.8x ", dataBuffer[i]); 00834 printf("\n"); 00835 #endif 00836 // Save scan result in linear buffers 00837 copy(headerBuffer, headerPtr); 00838 copy(dataBuffer, dataPtr); 00839 00840 // Calculate packed key size 00841 Uint32 noOfKeyData = 0; 00842 for(Uint32 i = 0; i < headerPtr.sz; i++) { 00843 AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i; 00844 00845 // Filter out NULL attributes 00846 if (keyAttrHead->isNULL()) 00847 return; 00848 00849 if (i < subRec->noOfIndexColumns) 00850 // Renumber index attributes in consequtive order 00851 keyAttrHead->setAttributeId(i); 00852 else 00853 // Calculate total size of PK attribute 00854 noOfKeyData += keyAttrHead->getDataSize(); 00855 } 00856 // Increase expected CONF count 00857 subRec->expectedConf++; 00858 00859 // Pack key attributes 00860 AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns, 00861 subRec->noOfIndexColumns, 00862 noOfKeyData << 2); 00863 00864 struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections]; 00865 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer; 00866 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = 00867 subRec->noOfIndexColumns + 1; 00868 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer; 00869 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz; 00870 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal, 00871 UtilExecuteReq::SignalLength, JBB, 00872 sectionsPtr, UtilExecuteReq::NoOfSections); 00873 } 00874 00875 void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr) 00876 { 00877 SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend(); 00878 req->senderRef = reference(); 00879 req->senderData = subRecPtr.i; 00880 req->subscriptionId = subRecPtr.p->subscriptionId; 00881 req->subscriptionKey = subRecPtr.p->subscriptionKey; 00882 sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, 00883 SubRemoveReq::SignalLength, JBB); 00884 } 00885 00886 void Trix::buildFailed(Signal* signal, 00887 SubscriptionRecPtr subRecPtr, 00888 BuildIndxRef::ErrorCode errorCode) 00889 { 00890 SubscriptionRecord* subRec = subRecPtr.p; 00891 00892 subRec->errorCode = errorCode; 00893 // Continue accumulating since we currently cannot stop SUMA 00894 subRec->expectedConf--; 00895 checkParallelism(signal, subRec); 00896 if (subRec->expectedConf == 0) 00897 buildComplete(signal, subRecPtr); 00898 } 00899 00900 void 00901 Trix::execSUB_REMOVE_REF(Signal* signal){ 00902 jamEntry(); 00903 //@todo 00904 ndbrequire(false); 00905 } 00906 00907 void 00908 Trix::execSUB_REMOVE_CONF(Signal* signal){ 00909 jamEntry(); 00910 00911 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); 00912 00913 SubscriptionRecPtr subRecPtr; 00914 c_theSubscriptions.getPtr(subRecPtr, conf->senderData); 00915 00916 if(subRecPtr.p->prepareId != RNIL){ 00917 jam(); 00918 00919 UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend(); 00920 req->prepareId = subRecPtr.p->prepareId; 00921 req->senderData = subRecPtr.i; 00922 00923 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal, 00924 UtilReleaseReq::SignalLength , JBB); 00925 return; 00926 } 00927 00928 { 00929 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); 00930 conf->senderData = subRecPtr.i; 00931 execUTIL_RELEASE_CONF(signal); 00932 } 00933 } 00934 00935 void 00936 Trix::execUTIL_RELEASE_REF(Signal* signal){ 00937 jamEntry(); 00938 ndbrequire(false); 00939 } 00940 00941 void 00942 Trix::execUTIL_RELEASE_CONF(Signal* signal){ 00943 00944 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend(); 00945 00946 SubscriptionRecPtr subRecPtr; 00947 c_theSubscriptions.getPtr(subRecPtr, conf->senderData); 00948 00949 if(subRecPtr.p->errorCode == BuildIndxRef::NoError){ 00950 // Build is complete, reply to original sender 00951 BuildIndxConf * buildIndxConf = (BuildIndxConf *)signal->getDataPtrSend(); 00952 buildIndxConf->setUserRef(subRecPtr.p->userReference); 00953 buildIndxConf->setConnectionPtr(subRecPtr.p->connectionPtr); 00954 buildIndxConf->setRequestType(BuildIndxReq::RT_TRIX); 00955 buildIndxConf->setIndexType(subRecPtr.p->indexType); 00956 buildIndxConf->setTableId(subRecPtr.p->sourceTableId); 00957 buildIndxConf->setIndexId(subRecPtr.p->targetTableId); 00958 00959 sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXCONF, signal, 00960 BuildIndxConf::SignalLength , JBB); 00961 } else { 00962 // Build failed, reply to original sender 00963 BuildIndxRef * buildIndxRef = (BuildIndxRef *)signal->getDataPtrSend(); 00964 buildIndxRef->setUserRef(subRecPtr.p->userReference); 00965 buildIndxRef->setConnectionPtr(subRecPtr.p->connectionPtr); 00966 buildIndxRef->setRequestType(BuildIndxReq::RT_TRIX); 00967 buildIndxRef->setIndexType(subRecPtr.p->indexType); 00968 buildIndxRef->setTableId(subRecPtr.p->sourceTableId); 00969 buildIndxRef->setIndexId(subRecPtr.p->targetTableId); 00970 buildIndxRef->setErrorCode(subRecPtr.p->errorCode); 00971 00972 sendSignal(subRecPtr.p->userReference, GSN_BUILDINDXREF, signal, 00973 BuildIndxRef::SignalLength , JBB); 00974 } 00975 00976 // Release subscription record 00977 subRecPtr.p->attributeOrder.release(); 00978 c_theSubscriptions.release(subRecPtr.i); 00979 } 00980 00981 void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec) 00982 { 00983 if ((subRec->pendingSubSyncContinueConf) && 00984 (subRec->expectedConf < subRec->parallelism)) { 00985 SubSyncContinueConf * subSyncContinueConf = 00986 (SubSyncContinueConf *) signal->getDataPtrSend(); 00987 subSyncContinueConf->subscriptionId = subRec->subscriptionId; 00988 subSyncContinueConf->subscriptionKey = subRec->subscriptionKey; 00989 sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 00990 SubSyncContinueConf::SignalLength , JBB); 00991 subRec->pendingSubSyncContinueConf = false; 00992 } 00993 } 00994 00995 BLOCK_FUNCTIONS(Trix) 00996 00997 template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);
1.4.7

