00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 00018 #include <ndb_global.h> 00019 #include <kernel_types.h> 00020 00021 #include "NdbDictionaryImpl.hpp" 00022 #include "API.hpp" 00023 #include <NdbOut.hpp> 00024 #include "NdbApiSignal.hpp" 00025 #include "TransporterFacade.hpp" 00026 #include <signaldata/CreateEvnt.hpp> 00027 #include <signaldata/SumaImpl.hpp> 00028 #include <SimpleProperties.hpp> 00029 #include <Bitmask.hpp> 00030 #include <AttributeHeader.hpp> 00031 #include <AttributeList.hpp> 00032 #include <NdbError.hpp> 00033 #include <BaseString.hpp> 00034 #include <UtilBuffer.hpp> 00035 #include <NdbDictionary.hpp> 00036 #include <Ndb.hpp> 00037 #include "NdbImpl.hpp" 00038 #include "DictCache.hpp" 00039 #include <portlib/NdbMem.h> 00040 #include <NdbRecAttr.hpp> 00041 #include <NdbBlob.hpp> 00042 #include <NdbEventOperation.hpp> 00043 #include "NdbEventOperationImpl.hpp" 00044 #include <signaldata/AlterTable.hpp> 00045 00046 #include <EventLogger.hpp> 00047 extern EventLogger g_eventLogger; 00048 00049 static Gci_container_pod g_empty_gci_container; 00050 static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4; 00051 static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1; 00052 00053 #ifdef VM_TRACE 00054 static void 00055 print_std(const SubTableData * sdata, LinearSectionPtr ptr[3]) 00056 { 00057 printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation); 00058 for (int i = 0; i <= 2; i++) { 00059 printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz); 00060 for (int j = 0; j < ptr[i].sz; j++) 00061 printf("%08x ", ptr[i].p[j]); 00062 printf("\n"); 00063 } 00064 } 00065 #endif 00066 00067 /* 00068 * Class NdbEventOperationImpl 00069 * 00070 * 00071 */ 00072 00073 // todo handle several ndb objects 00074 // todo free allocated data when closing NdbEventBuffer 00075 00076 NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &f, 00077 Ndb *theNdb, 00078 const char* eventName) : 00079 NdbEventOperation(*this), 00080 m_facade(&f), 00081 m_ndb(theNdb), 00082 m_state(EO_ERROR), 00083 m_oid(~(Uint32)0) 00084 { 00085 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl"); 00086 00087 assert(m_ndb != NULL); 00088 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); 00089 assert(myDict != NULL); 00090 00091 const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); 00092 if (!myEvnt) 00093 { 00094 m_error.code= myDict->getNdbError().code; 00095 DBUG_VOID_RETURN; 00096 } 00097 00098 init(myEvnt->m_impl); 00099 DBUG_VOID_RETURN; 00100 } 00101 00102 NdbEventOperationImpl::NdbEventOperationImpl(Ndb *theNdb, 00103 NdbEventImpl& evnt) : 00104 NdbEventOperation(*this), 00105 m_facade(this), 00106 m_ndb(theNdb), 00107 m_state(EO_ERROR), 00108 m_oid(~(Uint32)0) 00109 { 00110 DBUG_ENTER("NdbEventOperationImpl::NdbEventOperationImpl [evnt]"); 00111 init(evnt); 00112 DBUG_VOID_RETURN; 00113 } 00114 00115 void 00116 NdbEventOperationImpl::init(NdbEventImpl& evnt) 00117 { 00118 DBUG_ENTER("NdbEventOperationImpl::init"); 00119 00120 m_magic_number = 0; 00121 mi_type = 0; 00122 m_change_mask = 0; 00123 #ifdef VM_TRACE 00124 m_data_done_count = 0; 00125 m_data_count = 0; 00126 #endif 00127 m_next = 0; 00128 m_prev = 0; 00129 00130 m_eventId = 0; 00131 theFirstPkAttrs[0] = NULL; 00132 theCurrentPkAttrs[0] = NULL; 00133 theFirstPkAttrs[1] = NULL; 00134 theCurrentPkAttrs[1] = NULL; 00135 theFirstDataAttrs[0] = NULL; 00136 theCurrentDataAttrs[0] = NULL; 00137 theFirstDataAttrs[1] = NULL; 00138 theCurrentDataAttrs[1] = NULL; 00139 00140 theBlobList = NULL; 00141 theBlobOpList = NULL; 00142 theMainOp = NULL; 00143 00144 m_data_item= NULL; 00145 m_eventImpl = NULL; 00146 00147 m_custom_data= 0; 00148 m_has_error= 1; 00149 00150 // we should lookup id in Dictionary, TODO 00151 // also make sure we only have one listener on each event 00152 00153 m_eventImpl = &evnt; 00154 00155 m_eventId = m_eventImpl->m_eventId; 00156 00157 m_oid= m_ndb->theImpl->theNdbObjectIdMap.map(this); 00158 00159 m_state= EO_CREATED; 00160 00161 m_node_bit_mask.clear(); 00162 #ifdef ndb_event_stores_merge_events_flag 00163 m_mergeEvents = m_eventImpl->m_mergeEvents; 00164 #else 00165 m_mergeEvents = false; 00166 #endif 00167 m_ref_count = 0; 00168 DBUG_PRINT("info", ("m_ref_count = 0 for op: %p", this)); 00169 00170 m_has_error= 0; 00171 00172 DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid)); 00173 DBUG_VOID_RETURN; 00174 } 00175 00176 NdbEventOperationImpl::~NdbEventOperationImpl() 00177 { 00178 DBUG_ENTER("NdbEventOperationImpl::~NdbEventOperationImpl"); 00179 m_magic_number= 0; 00180 00181 if (m_oid == ~(Uint32)0) 00182 DBUG_VOID_RETURN; 00183 00184 stop(); 00185 // m_bufferHandle->dropSubscribeEvent(m_bufferId); 00186 ; // ToDo? We should send stop signal here 00187 00188 if (theMainOp == NULL) 00189 { 00190 NdbEventOperationImpl* tBlobOp = theBlobOpList; 00191 while (tBlobOp != NULL) 00192 { 00193 NdbEventOperationImpl *op = tBlobOp; 00194 tBlobOp = tBlobOp->m_next; 00195 delete op; 00196 } 00197 } 00198 00199 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_oid, this); 00200 DBUG_PRINT("exit",("this: %p/%p oid: %u main: %p", 00201 this, m_facade, m_oid, theMainOp)); 00202 00203 if (m_eventImpl) 00204 { 00205 delete m_eventImpl->m_facade; 00206 m_eventImpl= 0; 00207 } 00208 00209 DBUG_VOID_RETURN; 00210 } 00211 00212 NdbEventOperation::State 00213 NdbEventOperationImpl::getState() 00214 { 00215 return m_state; 00216 } 00217 00218 NdbRecAttr* 00219 NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) 00220 { 00221 DBUG_ENTER("NdbEventOperationImpl::getValue"); 00222 if (m_state != EO_CREATED) { 00223 ndbout_c("NdbEventOperationImpl::getValue may only be called between " 00224 "instantiation and execute()"); 00225 DBUG_RETURN(NULL); 00226 } 00227 00228 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName); 00229 00230 if (tAttrInfo == NULL) { 00231 ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName); 00232 DBUG_RETURN(NULL); 00233 } 00234 00235 DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n)); 00236 } 00237 00238 NdbRecAttr* 00239 NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n) 00240 { 00241 DBUG_ENTER("NdbEventOperationImpl::getValue"); 00242 // Insert Attribute Id into ATTRINFO part. 00243 00244 NdbRecAttr **theFirstAttr; 00245 NdbRecAttr **theCurrentAttr; 00246 00247 if (tAttrInfo->getPrimaryKey()) 00248 { 00249 theFirstAttr = &theFirstPkAttrs[n]; 00250 theCurrentAttr = &theCurrentPkAttrs[n]; 00251 } 00252 else 00253 { 00254 theFirstAttr = &theFirstDataAttrs[n]; 00255 theCurrentAttr = &theCurrentDataAttrs[n]; 00256 } 00257 00258 /************************************************************************ 00259 * Get a Receive Attribute object and link it into the operation object. 00260 ************************************************************************/ 00261 NdbRecAttr *tAttr = m_ndb->getRecAttr(); 00262 if (tAttr == NULL) { 00263 exit(-1); 00264 //setErrorCodeAbort(4000); 00265 DBUG_RETURN(NULL); 00266 } 00267 00268 /********************************************************************** 00269 * Now set the attribute identity and the pointer to the data in 00270 * the RecAttr object 00271 * Also set attribute size, array size and attribute type 00272 ********************************************************************/ 00273 if (tAttr->setup(tAttrInfo, aValue)) { 00274 //setErrorCodeAbort(4000); 00275 m_ndb->releaseRecAttr(tAttr); 00276 exit(-1); 00277 DBUG_RETURN(NULL); 00278 } 00279 //theErrorLine++; 00280 00281 tAttr->setUNDEFINED(); 00282 00283 // We want to keep the list sorted to make data insertion easier later 00284 00285 if (*theFirstAttr == NULL) { 00286 *theFirstAttr = tAttr; 00287 *theCurrentAttr = tAttr; 00288 tAttr->next(NULL); 00289 } else { 00290 Uint32 tAttrId = tAttrInfo->m_attrId; 00291 if (tAttrId > (*theCurrentAttr)->attrId()) { // right order 00292 (*theCurrentAttr)->next(tAttr); 00293 tAttr->next(NULL); 00294 *theCurrentAttr = tAttr; 00295 } else if ((*theFirstAttr)->next() == NULL || // only one in list 00296 (*theFirstAttr)->attrId() > tAttrId) {// or first 00297 tAttr->next(*theFirstAttr); 00298 *theFirstAttr = tAttr; 00299 } else { // at least 2 in list and not first and not last 00300 NdbRecAttr *p = *theFirstAttr; 00301 NdbRecAttr *p_next = p->next(); 00302 while (tAttrId > p_next->attrId()) { 00303 p = p_next; 00304 p_next = p->next(); 00305 } 00306 if (tAttrId == p_next->attrId()) { // Using same attribute twice 00307 tAttr->release(); // do I need to do this? 00308 m_ndb->releaseRecAttr(tAttr); 00309 exit(-1); 00310 DBUG_RETURN(NULL); 00311 } 00312 // this is it, between p and p_next 00313 p->next(tAttr); 00314 tAttr->next(p_next); 00315 } 00316 } 00317 DBUG_RETURN(tAttr); 00318 } 00319 00320 NdbBlob* 00321 NdbEventOperationImpl::getBlobHandle(const char *colName, int n) 00322 { 00323 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)"); 00324 00325 assert(m_mergeEvents); 00326 00327 if (m_state != EO_CREATED) { 00328 ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between " 00329 "instantiation and execute()"); 00330 DBUG_RETURN(NULL); 00331 } 00332 00333 NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName); 00334 00335 if (tAttrInfo == NULL) { 00336 ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName); 00337 DBUG_RETURN(NULL); 00338 } 00339 00340 NdbBlob* bh = getBlobHandle(tAttrInfo, n); 00341 DBUG_RETURN(bh); 00342 } 00343 00344 NdbBlob* 00345 NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n) 00346 { 00347 DBUG_ENTER("NdbEventOperationImpl::getBlobHandle"); 00348 DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n)); 00349 00350 // as in NdbOperation, create only one instance 00351 NdbBlob* tBlob = theBlobList; 00352 NdbBlob* tLastBlob = NULL; 00353 while (tBlob != NULL) { 00354 if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n) 00355 DBUG_RETURN(tBlob); 00356 tLastBlob = tBlob; 00357 tBlob = tBlob->theNext; 00358 } 00359 00360 NdbEventOperationImpl* tBlobOp = NULL; 00361 00362 const bool is_tinyblob = (tAttrInfo->getPartSize() == 0); 00363 assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL)); 00364 00365 if (! is_tinyblob) { 00366 // blob event name 00367 char bename[MAX_TAB_NAME_SIZE]; 00368 NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo); 00369 00370 // find blob event op if any (it serves both post and pre handles) 00371 tBlobOp = theBlobOpList; 00372 NdbEventOperationImpl* tLastBlopOp = NULL; 00373 while (tBlobOp != NULL) { 00374 if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) { 00375 break; 00376 } 00377 tLastBlopOp = tBlobOp; 00378 tBlobOp = tBlobOp->m_next; 00379 } 00380 00381 DBUG_PRINT("info", ("%s blob event op for %s", 00382 tBlobOp ? " reuse" : " create", bename)); 00383 00384 // create blob event op if not found 00385 if (tBlobOp == NULL) { 00386 // get blob event 00387 NdbDictionaryImpl& dict = 00388 NdbDictionaryImpl::getImpl(*m_ndb->getDictionary()); 00389 NdbEventImpl* blobEvnt = 00390 dict.getBlobEvent(*this->m_eventImpl, tAttrInfo->m_column_no); 00391 if (blobEvnt == NULL) { 00392 m_error.code = dict.m_error.code; 00393 DBUG_RETURN(NULL); 00394 } 00395 00396 // create blob event operation 00397 tBlobOp = 00398 m_ndb->theEventBuffer->createEventOperation(*blobEvnt, m_error); 00399 if (tBlobOp == NULL) 00400 DBUG_RETURN(NULL); 00401 00402 // pointer to main table op 00403 tBlobOp->theMainOp = this; 00404 tBlobOp->m_mergeEvents = m_mergeEvents; 00405 00406 // to hide blob op it is linked under main op, not under m_ndb 00407 if (tLastBlopOp == NULL) 00408 theBlobOpList = tBlobOp; 00409 else 00410 tLastBlopOp->m_next = tBlobOp; 00411 tBlobOp->m_next = NULL; 00412 } 00413 } 00414 00415 tBlob = m_ndb->getNdbBlob(); 00416 if (tBlob == NULL) { 00417 m_error.code = m_ndb->getNdbError().code; 00418 DBUG_RETURN(NULL); 00419 } 00420 00421 // calls getValue on inline and blob part 00422 if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) { 00423 m_error.code = tBlob->getNdbError().code; 00424 m_ndb->releaseNdbBlob(tBlob); 00425 DBUG_RETURN(NULL); 00426 } 00427 00428 // add to list end 00429 if (tLastBlob == NULL) 00430 theBlobList = tBlob; 00431 else 00432 tLastBlob->theNext = tBlob; 00433 tBlob->theNext = NULL; 00434 DBUG_RETURN(tBlob); 00435 } 00436 00437 int 00438 NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob, 00439 Uint32 part, Uint32 count) 00440 { 00441 DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts"); 00442 DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d", 00443 part, count, blob->theEventBlobVersion)); 00444 00445 NdbEventOperationImpl* blob_op = blob->theBlobEventOp; 00446 00447 EventBufData* main_data = m_data_item; 00448 DBUG_PRINT_EVENT("info", ("main_data=%p", main_data)); 00449 assert(main_data != NULL); 00450 00451 // search for blob parts list head 00452 EventBufData* head; 00453 assert(m_data_item != NULL); 00454 head = m_data_item->m_next_blob; 00455 while (head != NULL) 00456 { 00457 if (head->m_event_op == blob_op) 00458 { 00459 DBUG_PRINT_EVENT("info", ("found blob parts head %p", head)); 00460 break; 00461 } 00462 head = head->m_next_blob; 00463 } 00464 00465 Uint32 nparts = 0; 00466 EventBufData* data = head; 00467 // XXX optimize using part no ordering 00468 while (data != NULL) 00469 { 00470 /* 00471 * Hack part no directly out of buffer since it is not returned 00472 * in pre data (PK buglet). For part data use receive_event(). 00473 * This means extra copy. 00474 */ 00475 blob_op->m_data_item = data; 00476 int r = blob_op->receive_event(); 00477 assert(r > 0); 00478 Uint32 no = data->get_blob_part_no(); 00479 Uint32 sz = blob->thePartSize; 00480 const char* src = blob->theBlobEventDataBuf.data; 00481 00482 DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part sz=%u", data, no, sz)); 00483 00484 if (part <= no && no < part + count) 00485 { 00486 DBUG_PRINT_EVENT("info", ("part within read range")); 00487 memcpy(buf + (no - part) * sz, src, sz); 00488 nparts++; 00489 } 00490 else 00491 { 00492 DBUG_PRINT_EVENT("info", ("part outside read range")); 00493 } 00494 data = data->m_next; 00495 } 00496 assert(nparts == count); 00497 00498 DBUG_RETURN_EVENT(0); 00499 } 00500 00501 int 00502 NdbEventOperationImpl::execute() 00503 { 00504 DBUG_ENTER("NdbEventOperationImpl::execute"); 00505 m_ndb->theEventBuffer->add_drop_lock(); 00506 int r = execute_nolock(); 00507 m_ndb->theEventBuffer->add_drop_unlock(); 00508 DBUG_RETURN(r); 00509 } 00510 00511 int 00512 NdbEventOperationImpl::execute_nolock() 00513 { 00514 DBUG_ENTER("NdbEventOperationImpl::execute_nolock"); 00515 DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob")); 00516 00517 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); 00518 if (!myDict) { 00519 m_error.code= m_ndb->getNdbError().code; 00520 DBUG_RETURN(-1); 00521 } 00522 00523 if (theFirstPkAttrs[0] == NULL && 00524 theFirstDataAttrs[0] == NULL) { // defaults to get all 00525 } 00526 00527 m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER; 00528 m_state= EO_EXECUTING; 00529 mi_type= m_eventImpl->mi_type; 00530 m_ndb->theEventBuffer->add_op(); 00531 int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this); 00532 if (r == 0) { 00533 if (theMainOp == NULL) { 00534 DBUG_PRINT("info", ("execute blob ops")); 00535 NdbEventOperationImpl* blob_op = theBlobOpList; 00536 while (blob_op != NULL) { 00537 r = blob_op->execute_nolock(); 00538 if (r != 0) 00539 break; 00540 blob_op = blob_op->m_next; 00541 } 00542 } 00543 if (r == 0) 00544 { 00545 m_ref_count++; 00546 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this)); 00547 DBUG_RETURN(0); 00548 } 00549 } 00550 //Error 00551 m_state= EO_ERROR; 00552 mi_type= 0; 00553 m_magic_number= 0; 00554 m_error.code= myDict->getNdbError().code; 00555 m_ndb->theEventBuffer->remove_op(); 00556 DBUG_RETURN(r); 00557 } 00558 00559 int 00560 NdbEventOperationImpl::stop() 00561 { 00562 DBUG_ENTER("NdbEventOperationImpl::stop"); 00563 int i; 00564 00565 for (i=0 ; i<2; i++) { 00566 NdbRecAttr *p = theFirstPkAttrs[i]; 00567 while (p) { 00568 NdbRecAttr *p_next = p->next(); 00569 m_ndb->releaseRecAttr(p); 00570 p = p_next; 00571 } 00572 theFirstPkAttrs[i]= 0; 00573 } 00574 for (i=0 ; i<2; i++) { 00575 NdbRecAttr *p = theFirstDataAttrs[i]; 00576 while (p) { 00577 NdbRecAttr *p_next = p->next(); 00578 m_ndb->releaseRecAttr(p); 00579 p = p_next; 00580 } 00581 theFirstDataAttrs[i]= 0; 00582 } 00583 00584 if (m_state != EO_EXECUTING) 00585 { 00586 DBUG_RETURN(-1); 00587 } 00588 00589 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); 00590 if (!myDict) { 00591 m_error.code= m_ndb->getNdbError().code; 00592 DBUG_RETURN(-1); 00593 } 00594 00595 m_ndb->theEventBuffer->add_drop_lock(); 00596 int r= NdbDictionaryImpl::getImpl(*myDict).stopSubscribeEvent(*this); 00597 m_ndb->theEventBuffer->remove_op(); 00598 m_state= EO_DROPPED; 00599 mi_type= 0; 00600 if (r == 0) { 00601 m_ndb->theEventBuffer->add_drop_unlock(); 00602 DBUG_RETURN(0); 00603 } 00604 //Error 00605 m_error.code= NdbDictionaryImpl::getImpl(*myDict).m_error.code; 00606 m_state= EO_ERROR; 00607 m_ndb->theEventBuffer->add_drop_unlock(); 00608 DBUG_RETURN(r); 00609 } 00610 00611 const bool NdbEventOperationImpl::tableNameChanged() const 00612 { 00613 return (bool)AlterTableReq::getNameFlag(m_change_mask); 00614 } 00615 00616 const bool NdbEventOperationImpl::tableFrmChanged() const 00617 { 00618 return (bool)AlterTableReq::getFrmFlag(m_change_mask); 00619 } 00620 00621 const bool NdbEventOperationImpl::tableFragmentationChanged() const 00622 { 00623 return (bool)AlterTableReq::getFragDataFlag(m_change_mask); 00624 } 00625 00626 const bool NdbEventOperationImpl::tableRangeListChanged() const 00627 { 00628 return (bool)AlterTableReq::getRangeListFlag(m_change_mask); 00629 } 00630 00631 Uint64 00632 NdbEventOperationImpl::getGCI() 00633 { 00634 return m_data_item->sdata->gci; 00635 } 00636 00637 Uint64 00638 NdbEventOperationImpl::getLatestGCI() 00639 { 00640 return m_ndb->theEventBuffer->getLatestGCI(); 00641 } 00642 00643 bool 00644 NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal, 00645 LinearSectionPtr ptr[3]) 00646 { 00647 DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA"); 00648 const SubTableData * const sdata= 00649 CAST_CONSTPTR(SubTableData, signal->getDataPtr()); 00650 00651 if(signal->isFirstFragment()){ 00652 m_fragmentId = signal->getFragmentId(); 00653 m_buffer.grow(4 * sdata->totalLen); 00654 } else { 00655 if(m_fragmentId != signal->getFragmentId()){ 00656 abort(); 00657 } 00658 } 00659 const Uint32 i = SubTableData::DICT_TAB_INFO; 00660 DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u", 00661 4 * ptr[i].sz, m_fragmentId)); 00662 m_buffer.append(ptr[i].p, 4 * ptr[i].sz); 00663 00664 if(!signal->isLastFragment()){ 00665 DBUG_RETURN(FALSE); 00666 } 00667 00668 DBUG_RETURN(TRUE); 00669 } 00670 00671 00672 int 00673 NdbEventOperationImpl::receive_event() 00674 { 00675 Uint32 operation= (Uint32)m_data_item->sdata->operation; 00676 if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT)) 00677 { 00678 DBUG_ENTER("NdbEventOperationImpl::receive_event"); 00679 DBUG_PRINT("info",("sdata->operation %u this: %p", operation, this)); 00680 if (operation == NdbDictionary::Event::_TE_ALTER) 00681 { 00682 // Parse the new table definition and 00683 // create a table object 00684 NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); 00685 NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict); 00686 NdbError error; 00687 NdbDictInterface dif(error); 00688 NdbTableImpl *at; 00689 m_change_mask = m_data_item->sdata->changeMask; 00690 error.code = dif.parseTableInfo(&at, 00691 (Uint32*)m_buffer.get_data(), 00692 m_buffer.length() / 4, 00693 true); 00694 m_buffer.clear(); 00695 if (unlikely(!at)) 00696 { 00697 DBUG_PRINT("info", ("Failed to parse DictTabInfo error %u", 00698 error.code)); 00699 ndbout_c("Failed to parse DictTabInfo error %u", error.code); 00700 DBUG_RETURN(1); 00701 } 00702 at->buildColumnHash(); 00703 00704 NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl; 00705 m_eventImpl->m_tableImpl = at; 00706 00707 DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x", 00708 tmp_table_impl, at)); 00709 00710 // change the rec attrs to refer to the new table object 00711 int i; 00712 for (i = 0; i < 2; i++) 00713 { 00714 NdbRecAttr *p = theFirstPkAttrs[i]; 00715 while (p) 00716 { 00717 int no = p->getColumn()->getColumnNo(); 00718 NdbColumnImpl *tAttrInfo = at->getColumn(no); 00719 DBUG_PRINT("info", ("rec_attr: 0x%x " 00720 "switching column impl 0x%x -> 0x%x", 00721 p, p->m_column, tAttrInfo)); 00722 p->m_column = tAttrInfo; 00723 p = p->next(); 00724 } 00725 } 00726 for (i = 0; i < 2; i++) 00727 { 00728 NdbRecAttr *p = theFirstDataAttrs[i]; 00729 while (p) 00730 { 00731 int no = p->getColumn()->getColumnNo(); 00732 NdbColumnImpl *tAttrInfo = at->getColumn(no); 00733 DBUG_PRINT("info", ("rec_attr: 0x%x " 00734 "switching column impl 0x%x -> 0x%x", 00735 p, p->m_column, tAttrInfo)); 00736 p->m_column = tAttrInfo; 00737 p = p->next(); 00738 } 00739 } 00740 if (tmp_table_impl) 00741 delete tmp_table_impl; 00742 } 00743 DBUG_RETURN(1); 00744 } 00745 00746 DBUG_ENTER_EVENT("NdbEventOperationImpl::receive_event"); 00747 DBUG_PRINT_EVENT("info",("sdata->operation %u this: %p", operation, this)); 00748 // now move the data into the RecAttrs 00749 00750 int is_update= operation == NdbDictionary::Event::_TE_UPDATE; 00751 00752 Uint32 *aAttrPtr = m_data_item->ptr[0].p; 00753 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz; 00754 Uint32 *aDataPtr = m_data_item->ptr[1].p; 00755 00756 DBUG_DUMP_EVENT("after",(char*)m_data_item->ptr[1].p, m_data_item->ptr[1].sz*4); 00757 DBUG_DUMP_EVENT("before",(char*)m_data_item->ptr[2].p, m_data_item->ptr[2].sz*4); 00758 00759 // copy data into the RecAttr's 00760 // we assume that the respective attribute lists are sorted 00761 00762 // first the pk's 00763 { 00764 NdbRecAttr *tAttr= theFirstPkAttrs[0]; 00765 NdbRecAttr *tAttr1= theFirstPkAttrs[1]; 00766 while(tAttr) 00767 { 00768 assert(aAttrPtr < aAttrEndPtr); 00769 unsigned tDataSz= AttributeHeader(*aAttrPtr).getByteSize(); 00770 assert(tAttr->attrId() == 00771 AttributeHeader(*aAttrPtr).getAttributeId()); 00772 receive_data(tAttr, aDataPtr, tDataSz); 00773 if (is_update) 00774 receive_data(tAttr1, aDataPtr, tDataSz); 00775 else 00776 tAttr1->setUNDEFINED(); // do not leave unspecified 00777 tAttr1= tAttr1->next(); 00778 // next 00779 aAttrPtr++; 00780 aDataPtr+= (tDataSz + 3) >> 2; 00781 tAttr= tAttr->next(); 00782 } 00783 } 00784 00785 NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0]; 00786 00787 Uint32 tRecAttrId; 00788 Uint32 tAttrId; 00789 Uint32 tDataSz; 00790 int hasSomeData=0; 00791 while ((aAttrPtr < aAttrEndPtr) && (tWorkingRecAttr != NULL)) { 00792 tRecAttrId = tWorkingRecAttr->attrId(); 00793 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); 00794 tDataSz = AttributeHeader(*aAttrPtr).getByteSize(); 00795 00796 while (tAttrId > tRecAttrId) { 00797 DBUG_PRINT_EVENT("info",("undef [%u] %u 0x%x [%u] 0x%x", 00798 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr)); 00799 tWorkingRecAttr->setUNDEFINED(); 00800 tWorkingRecAttr = tWorkingRecAttr->next(); 00801 if (tWorkingRecAttr == NULL) 00802 break; 00803 tRecAttrId = tWorkingRecAttr->attrId(); 00804 } 00805 if (tWorkingRecAttr == NULL) 00806 break; 00807 00808 if (tAttrId == tRecAttrId) { 00809 hasSomeData++; 00810 00811 DBUG_PRINT_EVENT("info",("set [%u] %u 0x%x [%u] 0x%x", 00812 tAttrId, tDataSz, *aDataPtr, tRecAttrId, aDataPtr)); 00813 00814 receive_data(tWorkingRecAttr, aDataPtr, tDataSz); 00815 tWorkingRecAttr = tWorkingRecAttr->next(); 00816 } 00817 aAttrPtr++; 00818 aDataPtr += (tDataSz + 3) >> 2; 00819 } 00820 00821 while (tWorkingRecAttr != NULL) { 00822 tRecAttrId = tWorkingRecAttr->attrId(); 00823 //printf("set undefined [%u] %u %u [%u]\n", 00824 // tAttrId, tDataSz, *aDataPtr, tRecAttrId); 00825 tWorkingRecAttr->setUNDEFINED(); 00826 tWorkingRecAttr = tWorkingRecAttr->next(); 00827 } 00828 00829 tWorkingRecAttr = theFirstDataAttrs[1]; 00830 aDataPtr = m_data_item->ptr[2].p; 00831 Uint32 *aDataEndPtr = aDataPtr + m_data_item->ptr[2].sz; 00832 while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { 00833 tRecAttrId = tWorkingRecAttr->attrId(); 00834 tAttrId = AttributeHeader(*aDataPtr).getAttributeId(); 00835 tDataSz = AttributeHeader(*aDataPtr).getByteSize(); 00836 aDataPtr++; 00837 while (tAttrId > tRecAttrId) { 00838 tWorkingRecAttr->setUNDEFINED(); 00839 tWorkingRecAttr = tWorkingRecAttr->next(); 00840 if (tWorkingRecAttr == NULL) 00841 break; 00842 tRecAttrId = tWorkingRecAttr->attrId(); 00843 } 00844 if (tWorkingRecAttr == NULL) 00845 break; 00846 if (tAttrId == tRecAttrId) { 00847 assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()); 00848 hasSomeData++; 00849 00850 receive_data(tWorkingRecAttr, aDataPtr, tDataSz); 00851 tWorkingRecAttr = tWorkingRecAttr->next(); 00852 } 00853 aDataPtr += (tDataSz + 3) >> 2; 00854 } 00855 while (tWorkingRecAttr != NULL) { 00856 tWorkingRecAttr->setUNDEFINED(); 00857 tWorkingRecAttr = tWorkingRecAttr->next(); 00858 } 00859 00860 if (hasSomeData || !is_update) 00861 { 00862 DBUG_RETURN_EVENT(1); 00863 } 00864 00865 DBUG_RETURN_EVENT(0); 00866 } 00867 00868 NdbDictionary::Event::TableEvent 00869 NdbEventOperationImpl::getEventType() 00870 { 00871 return (NdbDictionary::Event::TableEvent) 00872 (1 << (unsigned)m_data_item->sdata->operation); 00873 } 00874 00875 00876 00877 void 00878 NdbEventOperationImpl::print() 00879 { 00880 int i; 00881 ndbout << "EventId " << m_eventId << "\n"; 00882 00883 for (i = 0; i < 2; i++) { 00884 NdbRecAttr *p = theFirstPkAttrs[i]; 00885 ndbout << " %u " << i; 00886 while (p) { 00887 ndbout << " : " << p->attrId() << " = " << *p; 00888 p = p->next(); 00889 } 00890 ndbout << "\n"; 00891 } 00892 for (i = 0; i < 2; i++) { 00893 NdbRecAttr *p = theFirstDataAttrs[i]; 00894 ndbout << " %u " << i; 00895 while (p) { 00896 ndbout << " : " << p->attrId() << " = " << *p; 00897 p = p->next(); 00898 } 00899 ndbout << "\n"; 00900 } 00901 } 00902 00903 void 00904 NdbEventOperationImpl::printAll() 00905 { 00906 Uint32 *aAttrPtr = m_data_item->ptr[0].p; 00907 Uint32 *aAttrEndPtr = aAttrPtr + m_data_item->ptr[0].sz; 00908 Uint32 *aDataPtr = m_data_item->ptr[1].p; 00909 00910 //tRecAttr->setup(tAttrInfo, aValue)) { 00911 00912 Uint32 tAttrId; 00913 Uint32 tDataSz; 00914 for (; aAttrPtr < aAttrEndPtr; ) { 00915 tAttrId = AttributeHeader(*aAttrPtr).getAttributeId(); 00916 tDataSz = AttributeHeader(*aAttrPtr).getDataSize(); 00917 00918 aAttrPtr++; 00919 aDataPtr += tDataSz; 00920 } 00921 } 00922 00923 /* 00924 * Class NdbEventBuffer 00925 * Each Ndb object has a Object. 00926 */ 00927 00928 // ToDo ref count this so it get's destroyed 00929 NdbMutex *NdbEventBuffer::p_add_drop_mutex= 0; 00930 00931 NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : 00932 m_system_nodes(ndb->theImpl->theNoOfDBnodes), 00933 m_ndb(ndb), 00934 m_latestGCI(0), 00935 m_total_alloc(0), 00936 m_free_thresh(10), 00937 m_min_free_thresh(10), 00938 m_max_free_thresh(100), 00939 m_gci_slip_thresh(3), 00940 m_dropped_ev_op(0), 00941 m_active_op_count(0) 00942 { 00943 #ifdef VM_TRACE 00944 m_latest_command= "NdbEventBuffer::NdbEventBuffer"; 00945 m_flush_gci = 0; 00946 #endif 00947 00948 if ((p_cond = NdbCondition_Create()) == NULL) { 00949 ndbout_c("NdbEventHandle: NdbCondition_Create() failed"); 00950 exit(-1); 00951 } 00952 m_mutex= ndb->theImpl->theWaiter.m_mutex; 00953 lock(); 00954 if (p_add_drop_mutex == 0) 00955 { 00956 if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) { 00957 ndbout_c("NdbEventBuffer: NdbMutex_Create() failed"); 00958 exit(-1); 00959 } 00960 } 00961 unlock(); 00962 00963 // ToDo set event buffer size 00964 // pre allocate event data array 00965 m_sz= 0; 00966 #ifdef VM_TRACE 00967 m_free_data_count= 0; 00968 #endif 00969 m_free_data= 0; 00970 m_free_data_sz= 0; 00971 00972 // initialize lists 00973 bzero(&g_empty_gci_container, sizeof(Gci_container)); 00974 init_gci_containers(); 00975 } 00976 00977 NdbEventBuffer::~NdbEventBuffer() 00978 { 00979 // todo lock? what if receive thread writes here? 00980 NdbEventOperationImpl* op= m_dropped_ev_op; 00981 while ((op = m_dropped_ev_op)) 00982 { 00983 m_dropped_ev_op = m_dropped_ev_op->m_next; 00984 delete op->m_facade; 00985 } 00986 00987 unsigned j; 00988 Uint32 sz= m_active_gci.size(); 00989 Gci_container* array = (Gci_container*)m_active_gci.getBase(); 00990 for(j = 0; j < sz; j++) 00991 { 00992 array[j].~Gci_container(); 00993 } 00994 00995 for (j= 0; j < m_allocated_data.size(); j++) 00996 { 00997 unsigned sz= m_allocated_data[j]->sz; 00998 EventBufData *data= m_allocated_data[j]->data; 00999 EventBufData *end_data= data+sz; 01000 for (; data < end_data; data++) 01001 { 01002 if (data->sdata) 01003 NdbMem_Free(data->sdata); 01004 } 01005 NdbMem_Free((char*)m_allocated_data[j]); 01006 } 01007 01008 NdbCondition_Destroy(p_cond); 01009 01010 lock(); 01011 if (p_add_drop_mutex) 01012 { 01013 NdbMutex_Destroy(p_add_drop_mutex); 01014 p_add_drop_mutex = 0; 01015 } 01016 unlock(); 01017 } 01018 01019 void 01020 NdbEventBuffer::add_op() 01021 { 01022 if(m_active_op_count == 0) 01023 { 01024 init_gci_containers(); 01025 } 01026 m_active_op_count++; 01027 } 01028 01029 void 01030 NdbEventBuffer::remove_op() 01031 { 01032 m_active_op_count--; 01033 } 01034 01035 void 01036 NdbEventBuffer::init_gci_containers() 01037 { 01038 bzero(&m_complete_data, sizeof(m_complete_data)); 01039 m_latest_complete_GCI = m_latestGCI = 0; 01040 m_active_gci.clear(); 01041 m_active_gci.fill(2 * ACTIVE_GCI_DIRECTORY_SIZE - 1, g_empty_gci_container); 01042 } 01043 01044 int NdbEventBuffer::expand(unsigned sz) 01045 { 01046 unsigned alloc_size= 01047 sizeof(EventBufData_chunk) +(sz-1)*sizeof(EventBufData); 01048 EventBufData_chunk *chunk_data= 01049 (EventBufData_chunk *)NdbMem_Allocate(alloc_size); 01050 01051 chunk_data->sz= sz; 01052 m_allocated_data.push_back(chunk_data); 01053 01054 EventBufData *data= chunk_data->data; 01055 EventBufData *end_data= data+sz; 01056 EventBufData *last_data= m_free_data; 01057 01058 bzero((void*)data, sz*sizeof(EventBufData)); 01059 for (; data < end_data; data++) 01060 { 01061 data->m_next= last_data; 01062 last_data= data; 01063 } 01064 m_free_data= last_data; 01065 01066 m_sz+= sz; 01067 #ifdef VM_TRACE 01068 m_free_data_count+= sz; 01069 #endif 01070 return 0; 01071 } 01072 01073 int 01074 NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) 01075 { 01076 int ret= 1; 01077 #ifdef VM_TRACE 01078 const char *m_latest_command_save= m_latest_command; 01079 m_latest_command= "NdbEventBuffer::pollEvents"; 01080 #endif 01081 01082 NdbMutex_Lock(m_mutex); 01083 NdbEventOperationImpl *ev_op= move_data(); 01084 if (unlikely(ev_op == 0 && aMillisecondNumber)) 01085 { 01086 NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber); 01087 ev_op= move_data(); 01088 if (unlikely(ev_op == 0)) 01089 ret= 0; 01090 } 01091 if (latestGCI) 01092 *latestGCI= m_latestGCI; 01093 #ifdef VM_TRACE 01094 if (ev_op) 01095 { 01096 // m_mutex is locked 01097 // update event ops data counters 01098 ev_op->m_data_count-= ev_op->m_data_done_count; 01099 ev_op->m_data_done_count= 0; 01100 } 01101 m_latest_command= m_latest_command_save; 01102 #endif 01103 NdbMutex_Unlock(m_mutex); // we have moved the data 01104 return ret; 01105 } 01106 01107 int 01108 NdbEventBuffer::flushIncompleteEvents(Uint64 gci) 01109 { 01113 // called by user thread, so we need to lock the data 01114 lock(); 01115 Uint32 i; 01116 Uint32 sz= m_active_gci.size(); 01117 Gci_container* array = (Gci_container*)m_active_gci.getBase(); 01118 for(i = 0; i < sz; i++) 01119 { 01120 Gci_container* tmp = array + i; 01121 if (tmp->m_gci && tmp->m_gci < gci) 01122 { 01123 // we have found an old not-completed gci, remove it 01124 ndbout_c("ndb: flushing incomplete epoch %lld (<%lld)", tmp->m_gci, gci); 01125 if(!tmp->m_data.is_empty()) 01126 { 01127 free_list(tmp->m_data); 01128 } 01129 tmp->~Gci_container(); 01130 bzero(tmp, sizeof(Gci_container)); 01131 } 01132 } 01133 #ifdef VM_TRACE 01134 m_flush_gci = gci; 01135 #endif 01136 unlock(); 01137 return 0; 01138 } 01139 01140 NdbEventOperation * 01141 NdbEventBuffer::nextEvent() 01142 { 01143 DBUG_ENTER_EVENT("NdbEventBuffer::nextEvent"); 01144 #ifdef VM_TRACE 01145 const char *m_latest_command_save= m_latest_command; 01146 #endif 01147 01148 if (m_used_data.m_count > 1024) 01149 { 01150 #ifdef VM_TRACE 01151 m_latest_command= "NdbEventBuffer::nextEvent (lock)"; 01152 #endif 01153 NdbMutex_Lock(m_mutex); 01154 // return m_used_data to m_free_data 01155 free_list(m_used_data); 01156 01157 NdbMutex_Unlock(m_mutex); 01158 } 01159 #ifdef VM_TRACE 01160 m_latest_command= "NdbEventBuffer::nextEvent"; 01161 #endif 01162 01163 EventBufData *data; 01164 while ((data= m_available_data.m_head)) 01165 { 01166 NdbEventOperationImpl *op= data->m_event_op; 01167 DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op)); 01168 01169 /* 01170 * If merge is on, blob part sub-events must not be seen on this level. 01171 * If merge is not on, there are no blob part sub-events. 01172 */ 01173 assert(op->theMainOp == NULL); 01174 01175 // set NdbEventOperation data 01176 op->m_data_item= data; 01177 01178 // remove item from m_available_data 01179 m_available_data.remove_first(); 01180 01181 // add it to used list 01182 m_used_data.append_used_data(data); 01183 01184 #ifdef VM_TRACE 01185 op->m_data_done_count++; 01186 #endif 01187 01188 int r= op->receive_event(); 01189 if (r > 0) 01190 { 01191 if (op->m_state == NdbEventOperation::EO_EXECUTING) 01192 { 01193 #ifdef VM_TRACE 01194 m_latest_command= m_latest_command_save; 01195 #endif 01196 NdbBlob* tBlob = op->theBlobList; 01197 while (tBlob != NULL) 01198 { 01199 (void)tBlob->atNextEvent(); 01200 tBlob = tBlob->theNext; 01201 } 01202 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); 01203 while (gci_ops && op->getGCI() > gci_ops->m_gci) 01204 { 01205 deleteUsedEventOperations(); 01206 gci_ops = m_available_data.next_gci_ops(); 01207 } 01208 assert(gci_ops && (op->getGCI() == gci_ops->m_gci)); 01209 // to return TE_NUL it should be made into data event 01210 if (data->sdata->operation == NdbDictionary::Event::_TE_NUL) 01211 { 01212 DBUG_PRINT_EVENT("info", ("skip _TE_NUL")); 01213 continue; 01214 } 01215 DBUG_RETURN_EVENT(op->m_facade); 01216 } 01217 // the next event belonged to an event op that is no 01218 // longer valid, skip to next 01219 continue; 01220 } 01221 #ifdef VM_TRACE 01222 m_latest_command= m_latest_command_save; 01223 #endif 01224 } 01225 m_error.code= 0; 01226 #ifdef VM_TRACE 01227 m_latest_command= m_latest_command_save; 01228 #endif 01229 01230 // free all "per gci unique" collected operations 01231 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); 01232 while (gci_ops) 01233 { 01234 deleteUsedEventOperations(); 01235 gci_ops = m_available_data.next_gci_ops(); 01236 } 01237 DBUG_RETURN_EVENT(0); 01238 } 01239 01240 NdbEventOperationImpl* 01241 NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) 01242 { 01243 DBUG_ENTER("NdbEventBuffer::getGCIEventOperations"); 01244 EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); 01245 if (*iter < gci_ops->m_gci_op_count) 01246 { 01247 EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++]; 01248 if (event_types != NULL) 01249 *event_types = g.event_types; 01250 DBUG_PRINT("info", ("gci: %d g.op: %x g.event_types: %x", 01251 (unsigned)gci_ops->m_gci, g.op, g.event_types)); 01252 DBUG_RETURN(g.op); 01253 } 01254 DBUG_RETURN(NULL); 01255 } 01256 01257 void 01258 NdbEventBuffer::deleteUsedEventOperations() 01259 { 01260 Uint32 iter= 0; 01261 const NdbEventOperation *op_f; 01262 while ((op_f= getGCIEventOperations(&iter, NULL)) != NULL) 01263 { 01264 NdbEventOperationImpl *op = &op_f->m_impl; 01265 DBUG_ASSERT(op->m_ref_count > 0); 01266 op->m_ref_count--; 01267 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); 01268 if (op->m_ref_count == 0) 01269 { 01270 DBUG_PRINT("info", ("deleting op: %p", op)); 01271 DBUG_ASSERT(op->m_node_bit_mask.isclear()); 01272 if (op->m_next) 01273 op->m_next->m_prev = op->m_prev; 01274 if (op->m_prev) 01275 op->m_prev->m_next = op->m_next; 01276 else 01277 m_dropped_ev_op = op->m_next; 01278 delete op->m_facade; 01279 } 01280 } 01281 } 01282 01283 static 01284 NdbOut& 01285 operator<<(NdbOut& out, const Gci_container& gci) 01286 { 01287 out << "[ GCI: " << gci.m_gci 01288 << " state: " << hex << gci.m_state 01289 << " head: " << hex << gci.m_data.m_head 01290 << " tail: " << hex << gci.m_data.m_tail 01291 #ifdef VM_TRACE 01292 << " cnt: " << dec << gci.m_data.m_count 01293 #endif 01294 << " gcp: " << dec << gci.m_gcp_complete_rep_count 01295 << "]"; 01296 return out; 01297 } 01298 01299 static 01300 NdbOut& 01301 operator<<(NdbOut& out, const Gci_container_pod& gci) 01302 { 01303 Gci_container* ptr = (Gci_container*)&gci; 01304 out << *ptr; 01305 return out; 01306 } 01307 01308 01309 static 01310 Gci_container* 01311 find_bucket_chained(Vector<Gci_container_pod> * active, Uint64 gci 01312 #ifdef VM_TRACE 01313 ,Uint64 flush_gci 01314 #endif 01315 ) 01316 { 01317 Uint32 pos = (gci & ACTIVE_GCI_MASK); 01318 Gci_container *bucket= ((Gci_container*)active->getBase()) + pos; 01319 01320 if(gci > bucket->m_gci) 01321 { 01322 Gci_container* move; 01323 Uint32 move_pos = pos + ACTIVE_GCI_DIRECTORY_SIZE; 01324 do 01325 { 01326 active->fill(move_pos, g_empty_gci_container); 01327 // Needs to recomputed after fill 01328 bucket = ((Gci_container*)active->getBase()) + pos; 01329 move = ((Gci_container*)active->getBase()) + move_pos; 01330 if(move->m_gcp_complete_rep_count == 0) 01331 { 01332 memcpy(move, bucket, sizeof(Gci_container)); 01333 bzero(bucket, sizeof(Gci_container)); 01334 bucket->m_gci = gci; 01335 bucket->m_gcp_complete_rep_count = ~(Uint32)0; 01336 #ifdef VM_TRACE 01337 if (gci < flush_gci) 01338 { 01339 ndbout_c("received old gci %llu < flush gci %llu", gci, flush_gci); 01340 assert(false); 01341 } 01342 #endif 01343 return bucket; 01344 } 01345 move_pos += ACTIVE_GCI_DIRECTORY_SIZE; 01346 } while(true); 01347 } 01348 else 01349 { 01350 Uint32 size = active->size() - ACTIVE_GCI_DIRECTORY_SIZE; 01351 do 01352 { 01353 pos += ACTIVE_GCI_DIRECTORY_SIZE; 01354 bucket += ACTIVE_GCI_DIRECTORY_SIZE; 01355 01356 if(bucket->m_gci == gci) 01357 { 01358 #ifdef VM_TRACE 01359 if (gci < flush_gci) 01360 { 01361 ndbout_c("received old gci %llu < flush gci %llu", gci, flush_gci); 01362 assert(false); 01363 } 01364 #endif 01365 return bucket; 01366 } 01367 01368 } while(pos < size); 01369 01370 return 0; 01371 } 01372 } 01373 01374 inline 01375 Gci_container* 01376 find_bucket(Vector<Gci_container_pod> * active, Uint64 gci 01377 #ifdef VM_TRACE 01378 ,Uint64 flush_gci 01379 #endif 01380 ) 01381 { 01382 Uint32 pos = (gci & ACTIVE_GCI_MASK); 01383 Gci_container *bucket= ((Gci_container*)active->getBase()) + pos; 01384 if(likely(gci == bucket->m_gci)) 01385 return bucket; 01386 01387 return find_bucket_chained(active,gci 01388 #ifdef VM_TRACE 01389 , flush_gci 01390 #endif 01391 ); 01392 } 01393 01394 static 01395 void 01396 crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket, 01397 const SubGcpCompleteRep * const rep, 01398 Uint32 nodes) 01399 { 01400 Uint32 old_cnt = bucket->m_gcp_complete_rep_count; 01401 01402 ndbout_c("INVALID SUB_GCP_COMPLETE_REP"); 01403 ndbout_c("gci: %d", rep->gci); 01404 ndbout_c("sender: %x", rep->senderRef); 01405 ndbout_c("count: %d", rep->gcp_complete_rep_count); 01406 ndbout_c("bucket count: %u", old_cnt); 01407 ndbout_c("nodes: %u", nodes); 01408 abort(); 01409 } 01410 01411 void 01412 NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) 01413 { 01414 if (unlikely(m_active_op_count == 0)) 01415 { 01416 return; 01417 } 01418 01419 DBUG_ENTER_EVENT("NdbEventBuffer::execSUB_GCP_COMPLETE_REP"); 01420 01421 const Uint64 gci= rep->gci; 01422 const Uint32 cnt= rep->gcp_complete_rep_count; 01423 01424 Gci_container *bucket = find_bucket(&m_active_gci, gci 01425 #ifdef VM_TRACE 01426 , m_flush_gci 01427 #endif 01428 ); 01429 01430 if (unlikely(bucket == 0)) 01431 { 01436 #ifdef VM_TRACE 01437 ndbout << "bucket == 0, gci:" << gci 01438 << " complete: " << m_complete_data << endl; 01439 for(Uint32 i = 0; i<m_active_gci.size(); i++) 01440 { 01441 ndbout << i << " - " << m_active_gci[i] << endl; 01442 } 01443 #endif 01444 DBUG_VOID_RETURN_EVENT; 01445 } 01446 01447 Uint32 old_cnt = bucket->m_gcp_complete_rep_count; 01448 if(unlikely(old_cnt == ~(Uint32)0)) 01449 { 01450 old_cnt = m_system_nodes; 01451 } 01452 01453 //assert(old_cnt >= cnt); 01454 if (unlikely(! (old_cnt >= cnt))) 01455 { 01456 crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_system_nodes); 01457 } 01458 bucket->m_gcp_complete_rep_count = old_cnt - cnt; 01459 01460 if(old_cnt == cnt) 01461 { 01462 if(likely(gci == m_latestGCI + 1 || m_latestGCI == 0)) 01463 { 01464 m_latestGCI = m_complete_data.m_gci = gci; // before reportStatus 01465 if(!bucket->m_data.is_empty()) 01466 { 01467 #ifdef VM_TRACE 01468 assert(bucket->m_data.m_count); 01469 #endif 01470 m_complete_data.m_data.append_list(&bucket->m_data, gci); 01471 } 01472 reportStatus(); 01473 bzero(bucket, sizeof(Gci_container)); 01474 bucket->m_gci = gci + ACTIVE_GCI_DIRECTORY_SIZE; 01475 bucket->m_gcp_complete_rep_count = m_system_nodes; 01476 if(unlikely(m_latest_complete_GCI > gci)) 01477 { 01478 complete_outof_order_gcis(); 01479 } 01480 01481 // signal that somethings happened 01482 01483 NdbCondition_Signal(p_cond); 01484 } 01485 else 01486 { 01488 ndbout_c("out of order bucket: %d gci: %lld m_latestGCI: %lld", 01489 bucket-(Gci_container*)m_active_gci.getBase(), 01490 gci, m_latestGCI); 01491 bucket->m_state = Gci_container::GC_COMPLETE; 01492 bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused 01493 m_latest_complete_GCI = gci; 01494 } 01495 } 01496 01497 DBUG_VOID_RETURN_EVENT; 01498 } 01499 01500 void 01501 NdbEventBuffer::complete_outof_order_gcis() 01502 { 01503 Uint64 start_gci = m_latestGCI + 1; 01504 Uint64 stop_gci = m_latest_complete_GCI; 01505 01506 const Uint32 size = m_active_gci.size(); 01507 Gci_container* array= (Gci_container*)m_active_gci.getBase(); 01508 01509 ndbout_c("complete_outof_order_gcis"); 01510 for(Uint32 i = 0; i<size; i++) 01511 { 01512 ndbout << i << " - " << array[i] << endl; 01513 } 01514 01515 for(; start_gci <= stop_gci; start_gci++) 01516 { 01520 Uint32 i; 01521 Gci_container* bucket= 0; 01522 for(i = 0; i<size; i++) 01523 { 01524 Gci_container* tmp = array + i; 01525 if(tmp->m_gci == start_gci && tmp->m_state == Gci_container::GC_COMPLETE) 01526 { 01527 bucket= tmp; 01528 break; 01529 } 01530 } 01531 if(bucket == 0) 01532 { 01533 break; 01534 } 01535 01536 printf("complete_outof_order_gcis - completing %lld", start_gci); 01537 if(!bucket->m_data.is_empty()) 01538 { 01539 #ifdef VM_TRACE 01540 assert(bucket->m_data.m_count); 01541 #endif 01542 m_complete_data.m_data.append_list(&bucket->m_data, start_gci); 01543 #ifdef VM_TRACE 01544 ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count, 01545 m_complete_data.m_data.m_count); 01546 #else 01547 ndbout_c(""); 01548 #endif 01549 } 01550 bzero(bucket, sizeof(Gci_container)); 01551 if(i < ACTIVE_GCI_DIRECTORY_SIZE) 01552 { 01553 bucket->m_gci = start_gci + ACTIVE_GCI_DIRECTORY_SIZE; 01554 bucket->m_gcp_complete_rep_count = m_system_nodes; 01555 } 01556 01557 m_latestGCI = m_complete_data.m_gci = start_gci; 01558 } 01559 01560 ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI); 01561 } 01562 01563 void 01564 NdbEventBuffer::report_node_connected(Uint32 node_id) 01565 { 01566 NdbEventOperation* op= m_ndb->getEventOperation(0); 01567 if (op == 0) 01568 return; 01569 01570 DBUG_ENTER("NdbEventBuffer::report_node_connected"); 01571 SubTableData data; 01572 LinearSectionPtr ptr[3]; 01573 bzero(&data, sizeof(data)); 01574 bzero(ptr, sizeof(ptr)); 01575 01576 data.tableId = ~0; 01577 data.operation = NdbDictionary::Event::_TE_ACTIVE; 01578 data.req_nodeid = (Uint8)node_id; 01579 data.ndbd_nodeid = (Uint8)node_id; 01580 data.logType = SubTableData::LOG; 01581 data.gci = m_latestGCI + 1; 01585 { 01586 // no need to lock()/unlock(), receive thread calls this 01587 NdbEventOperationImpl* impl = &op->m_impl; 01588 do if (!impl->m_node_bit_mask.isclear()) 01589 { 01590 data.senderData = impl->m_oid; 01591 insertDataL(impl, &data, ptr); 01592 } while((impl = impl->m_next)); 01593 for (impl = m_dropped_ev_op; impl; impl = impl->m_next) 01594 if (!impl->m_node_bit_mask.isclear()) 01595 { 01596 data.senderData = impl->m_oid; 01597 insertDataL(impl, &data, ptr); 01598 } 01599 } 01600 DBUG_VOID_RETURN; 01601 } 01602 01603 void 01604 NdbEventBuffer::report_node_failure(Uint32 node_id) 01605 { 01606 NdbEventOperation* op= m_ndb->getEventOperation(0); 01607 if (op == 0) 01608 return; 01609 01610 DBUG_ENTER("NdbEventBuffer::report_node_failure"); 01611 SubTableData data; 01612 LinearSectionPtr ptr[3]; 01613 bzero(&data, sizeof(data)); 01614 bzero(ptr, sizeof(ptr)); 01615 01616 data.tableId = ~0; 01617 data.operation = NdbDictionary::Event::_TE_NODE_FAILURE; 01618 data.req_nodeid = (Uint8)node_id; 01619 data.ndbd_nodeid = (Uint8)node_id; 01620 data.logType = SubTableData::LOG; 01621 data.gci = m_latestGCI + 1; 01625 { 01626 // no need to lock()/unlock(), receive thread calls this 01627 NdbEventOperationImpl* impl = &op->m_impl; 01628 do if (!impl->m_node_bit_mask.isclear()) 01629 { 01630 data.senderData = impl->m_oid; 01631 insertDataL(impl, &data, ptr); 01632 } while((impl = impl->m_next)); 01633 for (impl = m_dropped_ev_op; impl; impl = impl->m_next) 01634 if (!impl->m_node_bit_mask.isclear()) 01635 { 01636 data.senderData = impl->m_oid; 01637 insertDataL(impl, &data, ptr); 01638 } 01639 } 01640 DBUG_VOID_RETURN; 01641 } 01642 01643 void 01644 NdbEventBuffer::completeClusterFailed() 01645 { 01646 NdbEventOperation* op= m_ndb->getEventOperation(0); 01647 if (op == 0) 01648 return; 01649 01650 DBUG_ENTER("NdbEventBuffer::completeClusterFailed"); 01651 SubTableData data; 01652 LinearSectionPtr ptr[3]; 01653 bzero(&data, sizeof(data)); 01654 bzero(ptr, sizeof(ptr)); 01655 01656 data.tableId = ~0; 01657 data.operation = NdbDictionary::Event::_TE_CLUSTER_FAILURE; 01658 data.logType = SubTableData::LOG; 01659 data.gci = m_latestGCI + 1; 01660 01661 #ifdef VM_TRACE 01662 m_flush_gci = 0; 01663 #endif 01664 01668 { 01669 // no need to lock()/unlock(), receive thread calls this 01670 NdbEventOperationImpl* impl = &op->m_impl; 01671 do if (!impl->m_node_bit_mask.isclear()) 01672 { 01673 data.senderData = impl->m_oid; 01674 insertDataL(impl, &data, ptr); 01675 } while((impl = impl->m_next)); 01676 for (impl = m_dropped_ev_op; impl; impl = impl->m_next) 01677 if (!impl->m_node_bit_mask.isclear()) 01678 { 01679 data.senderData = impl->m_oid; 01680 insertDataL(impl, &data, ptr); 01681 } 01682 } 01683 01687 Uint32 i; 01688 Uint32 sz= m_active_gci.size(); 01689 Uint64 gci= data.gci; 01690 Gci_container* bucket = 0; 01691 Gci_container* array = (Gci_container*)m_active_gci.getBase(); 01692 for(i = 0; i < sz; i++) 01693 { 01694 Gci_container* tmp = array + i; 01695 if (tmp->m_gci > gci) 01696 { 01697 if(!tmp->m_data.is_empty()) 01698 { 01699 free_list(tmp->m_data); 01700 } 01701 tmp->~Gci_container(); 01702 bzero(tmp, sizeof(Gci_container)); 01703 } 01704 else if (tmp->m_gcp_complete_rep_count) 01705 { 01706 if (tmp->m_gci == gci) 01707 { 01708 bucket= tmp; 01709 continue; 01710 } 01711 // we have found an old not-completed gci 01712 // something is wrong, assert in debug, but try so salvage 01713 // in release 01714 ndbout_c("out of order bucket detected at cluster disconnect, " 01715 "data.gci: %u. tmp->m_gci: %u", 01716 (unsigned)data.gci, (unsigned)tmp->m_gci); 01717 assert(false); 01718 if(!tmp->m_data.is_empty()) 01719 { 01720 free_list(tmp->m_data); 01721 } 01722 tmp->~Gci_container(); 01723 bzero(tmp, sizeof(Gci_container)); 01724 } 01725 } 01726 01727 if (bucket == 0) 01728 { 01729 // no bucket to complete 01730 DBUG_VOID_RETURN; 01731 } 01732 01733 const Uint32 cnt= bucket->m_gcp_complete_rep_count = 1; 01734 bucket->m_gci = gci; 01735 bucket->m_gcp_complete_rep_count = cnt; 01736 01740 SubGcpCompleteRep rep; 01741 rep.gci= gci; 01742 rep.gcp_complete_rep_count= cnt; 01743 execSUB_GCP_COMPLETE_REP(&rep); 01744 01745 DBUG_VOID_RETURN; 01746 } 01747 01748 Uint64 01749 NdbEventBuffer::getLatestGCI() 01750 { 01751 return m_latestGCI; 01752 } 01753 01754 int 01755 NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, 01756 const SubTableData * const sdata, 01757 LinearSectionPtr ptr[3]) 01758 { 01759 DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL"); 01760 Uint64 gci= sdata->gci; 01761 const bool is_data_event = 01762 sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; 01763 01764 if (!is_data_event) 01765 { 01766 switch (sdata->operation) 01767 { 01768 case NdbDictionary::Event::_TE_NODE_FAILURE: 01769 op->m_node_bit_mask.clear(sdata->ndbd_nodeid); 01770 break; 01771 case NdbDictionary::Event::_TE_ACTIVE: 01772 op->m_node_bit_mask.set(sdata->ndbd_nodeid); 01773 // internal event, do not relay to user 01774 DBUG_RETURN_EVENT(0); 01775 break; 01776 case NdbDictionary::Event::_TE_CLUSTER_FAILURE: 01777 op->m_node_bit_mask.clear(); 01778 DBUG_ASSERT(op->m_ref_count > 0); 01779 op->m_ref_count--; 01780 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); 01781 break; 01782 case NdbDictionary::Event::_TE_STOP: 01783 op->m_node_bit_mask.clear(sdata->ndbd_nodeid); 01784 if (op->m_node_bit_mask.isclear()) 01785 { 01786 DBUG_ASSERT(op->m_ref_count > 0); 01787 op->m_ref_count--; 01788 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); 01789 } 01790 break; 01791 default: 01792 break; 01793 } 01794 } 01795 01796 if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) ) 01797 { 01798 Gci_container* bucket= find_bucket(&m_active_gci, gci 01799 #ifdef VM_TRACE 01800 , m_flush_gci 01801 #endif 01802 ); 01803 01804 DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId)); 01805 DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d", 01806 sdata->gci, sdata->tableId, sdata->operation, 01807 sdata->req_nodeid)); 01808 01809 if (unlikely(bucket == 0)) 01810 { 01815 DBUG_RETURN_EVENT(0); 01816 } 01817 01818 const bool is_blob_event = (op->theMainOp != NULL); 01819 const bool use_hash = op->m_mergeEvents && is_data_event; 01820 01821 if (! is_data_event && is_blob_event) 01822 { 01823 // currently subscribed to but not used 01824 DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table")); 01825 DBUG_RETURN_EVENT(0); 01826 } 01827 01828 // find position in bucket hash table 01829 EventBufData* data = 0; 01830 EventBufData_hash::Pos hpos; 01831 if (use_hash) 01832 { 01833 bucket->m_data_hash.search(hpos, op, ptr); 01834 data = hpos.data; 01835 } 01836 01837 if (data == 0) 01838 { 01839 // allocate new result buffer 01840 data = alloc_data(); 01841 if (unlikely(data == 0)) 01842 { 01843 op->m_has_error = 2; 01844 DBUG_RETURN_EVENT(-1); 01845 } 01846 if (unlikely(copy_data(sdata, ptr, data))) 01847 { 01848 op->m_has_error = 3; 01849 DBUG_RETURN_EVENT(-1); 01850 } 01851 data->m_event_op = op; 01852 if (! is_blob_event || ! is_data_event) 01853 { 01854 bucket->m_data.append_data(data); 01855 } 01856 else 01857 { 01858 // find or create main event for this blob event 01859 EventBufData_hash::Pos main_hpos; 01860 int ret = get_main_data(bucket, main_hpos, data); 01861 if (ret == -1) 01862 { 01863 op->m_has_error = 4; 01864 DBUG_RETURN_EVENT(-1); 01865 } 01866 EventBufData* main_data = main_hpos.data; 01867 if (ret != 0) // main event was created 01868 { 01869 main_data->m_event_op = op->theMainOp; 01870 bucket->m_data.append_data(main_data); 01871 if (use_hash) 01872 { 01873 main_data->m_pkhash = main_hpos.pkhash; 01874 bucket->m_data_hash.append(main_hpos, main_data); 01875 } 01876 } 01877 // link blob event under main event 01878 add_blob_data(main_data, data); 01879 } 01880 if (use_hash) 01881 { 01882 data->m_pkhash = hpos.pkhash; 01883 bucket->m_data_hash.append(hpos, data); 01884 } 01885 #ifdef VM_TRACE 01886 op->m_data_count++; 01887 #endif 01888 } 01889 else 01890 { 01891 // event with same op, PK found, merge into old buffer 01892 if (unlikely(merge_data(sdata, ptr, data))) 01893 { 01894 op->m_has_error = 3; 01895 DBUG_RETURN_EVENT(-1); 01896 } 01897 // merge is on so we do not report blob part events 01898 if (! is_blob_event) { 01899 // report actual operation and the composite 01900 // there is no way to "fix" the flags for a composite op 01901 // since the flags represent multiple ops on multiple PKs 01902 // XXX fix by doing merge at end of epoch (extra mem cost) 01903 { 01904 EventBufData_list::Gci_op g = { op, (1 << sdata->operation) }; 01905 bucket->m_data.add_gci_op(g); 01906 } 01907 { 01908 EventBufData_list::Gci_op g = { op, (1 << data->sdata->operation) }; 01909 bucket->m_data.add_gci_op(g); 01910 } 01911 } 01912 } 01913 DBUG_RETURN_EVENT(0); 01914 } 01915 01916 #ifdef VM_TRACE 01917 if ((Uint32)op->m_eventImpl->mi_type & (1 << (Uint32)sdata->operation)) 01918 { 01919 DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId)); 01920 DBUG_RETURN_EVENT(0); 01921 } 01922 else { 01923 DBUG_PRINT_EVENT("info",("skipped")); 01924 DBUG_RETURN_EVENT(0); 01925 } 01926 #else 01927 DBUG_RETURN_EVENT(0); 01928 #endif 01929 } 01930 01931 // allocate EventBufData 01932 EventBufData* 01933 NdbEventBuffer::alloc_data() 01934 { 01935 DBUG_ENTER_EVENT("alloc_data"); 01936 EventBufData* data = m_free_data; 01937 01938 if (unlikely(data == 0)) 01939 { 01940 #ifdef VM_TRACE 01941 assert(m_free_data_count == 0); 01942 assert(m_free_data_sz == 0); 01943 #endif 01944 expand(4000); 01945 reportStatus(); 01946 01947 data = m_free_data; 01948 if (unlikely(data == 0)) 01949 { 01950 #ifdef VM_TRACE 01951 printf("m_latest_command: %s\n", m_latest_command); 01952 printf("no free data, m_latestGCI %lld\n", 01953 m_latestGCI); 01954 printf("m_free_data_count %d\n", m_free_data_count); 01955 printf("m_available_data_count %d first gci %d last gci %d\n", 01956 m_available_data.m_count, 01957 m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0, 01958 m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0); 01959 printf("m_used_data_count %d\n", m_used_data.m_count); 01960 #endif 01961 DBUG_RETURN_EVENT(0); // TODO handle this, overrun, or, skip? 01962 } 01963 } 01964 01965 // remove data from free list 01966 m_free_data = data->m_next; 01967 data->m_next = 0; 01968 #ifdef VM_TRACE 01969 m_free_data_count--; 01970 assert(m_free_data_sz >= data->sz); 01971 #endif 01972 m_free_data_sz -= data->sz; 01973 DBUG_RETURN_EVENT(data); 01974 } 01975 01976 // allocate initial or bigger memory area in EventBufData 01977 // takes sizes from given ptr and sets up data->ptr 01978 int 01979 NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) 01980 { 01981 DBUG_ENTER("NdbEventBuffer::alloc_mem"); 01982 DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz)); 01983 const Uint32 min_alloc_size = 128; 01984 01985 Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2; 01986 Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2; 01987 if (alloc_size < min_alloc_size) 01988 alloc_size = min_alloc_size; 01989 01990 if (data->sz < alloc_size) 01991 { 01992 NdbMem_Free((char*)data->memory); 01993 assert(m_total_alloc >= data->sz); 01994 m_total_alloc -= data->sz; 01995 data->memory = 0; 01996 data->sz = 0; 01997 01998 data->memory = (Uint32*)NdbMem_Allocate(alloc_size); 01999 if (data->memory == 0) 02000 DBUG_RETURN(-1); 02001 data->sz = alloc_size; 02002 m_total_alloc += data->sz; 02003 } 02004 02005 Uint32* memptr = data->memory; 02006 memptr += sz4; 02007 int i; 02008 for (i = 0; i <= 2; i++) 02009 { 02010 data->ptr[i].p = memptr; 02011 data->ptr[i].sz = ptr[i].sz; 02012 memptr += ptr[i].sz; 02013 } 02014 02015 DBUG_RETURN(0); 02016 } 02017 02018 int 02019 NdbEventBuffer::copy_data(const SubTableData * const sdata, 02020 LinearSectionPtr ptr[3], 02021 EventBufData* data) 02022 { 02023 DBUG_ENTER_EVENT("NdbEventBuffer::copy_data"); 02024 02025 if (alloc_mem(data, ptr) != 0) 02026 DBUG_RETURN_EVENT(-1); 02027 memcpy(data->sdata, sdata, sizeof(SubTableData)); 02028 int i; 02029 for (i = 0; i <= 2; i++) 02030 memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2); 02031 DBUG_RETURN_EVENT(0); 02032 } 02033 02034 static struct Ev_t { 02035 enum { 02036 enum_INS = NdbDictionary::Event::_TE_INSERT, 02037 enum_DEL = NdbDictionary::Event::_TE_DELETE, 02038 enum_UPD = NdbDictionary::Event::_TE_UPDATE, 02039 enum_NUL = NdbDictionary::Event::_TE_NUL, 02040 enum_IDM = 254, // idempotent op possibly allowed on NF 02041 enum_ERR = 255 // always impossible 02042 }; 02043 int t1, t2, t3; 02044 } ev_t[] = { 02045 { Ev_t::enum_INS, Ev_t::enum_INS, Ev_t::enum_IDM }, 02046 { Ev_t::enum_INS, Ev_t::enum_DEL, Ev_t::enum_NUL }, //ok 02047 { Ev_t::enum_INS, Ev_t::enum_UPD, Ev_t::enum_INS }, //ok 02048 { Ev_t::enum_DEL, Ev_t::enum_INS, Ev_t::enum_UPD }, //ok 02049 { Ev_t::enum_DEL, Ev_t::enum_DEL, Ev_t::enum_IDM }, 02050 { Ev_t::enum_DEL, Ev_t::enum_UPD, Ev_t::enum_ERR }, 02051 { Ev_t::enum_UPD, Ev_t::enum_INS, Ev_t::enum_ERR }, 02052 { Ev_t::enum_UPD, Ev_t::enum_DEL, Ev_t::enum_DEL }, //ok 02053 { Ev_t::enum_UPD, Ev_t::enum_UPD, Ev_t::enum_UPD } //ok 02054 }; 02055 02056 /* 02057 * | INS | DEL | UPD 02058 * 0 | pk ah + all ah | pk ah | pk ah + new ah 02059 * 1 | pk ad + all ad | old pk ad | new pk ad + new ad 02060 * 2 | empty | old non-pk ah+ad | old ah+ad 02061 */ 02062 02063 static AttributeHeader 02064 copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2, 02065 Uint32 flags) 02066 { 02067 AttributeHeader ah(p2[i2]); 02068 bool do_copy = (flags & 1); 02069 if (do_copy) 02070 p1[i1] = p2[i2]; 02071 i1++; 02072 i2++; 02073 return ah; 02074 } 02075 02076 static void 02077 copy_attr(AttributeHeader ah, 02078 Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2, 02079 Uint32 flags) 02080 { 02081 bool do_copy = (flags & 1); 02082 bool with_head = (flags & 2); 02083 Uint32 n = with_head + ah.getDataSize(); 02084 if (do_copy) 02085 { 02086 Uint32 k; 02087 for (k = 0; k < n; k++) 02088 p1[j1 + k] = p2[j2 + k]; 02089 } 02090 j1 += n; 02091 j2 += n; 02092 } 02093 02094 int 02095 NdbEventBuffer::merge_data(const SubTableData * const sdata, 02096 LinearSectionPtr ptr2[3], 02097 EventBufData* data) 02098 { 02099 DBUG_ENTER_EVENT("NdbEventBuffer::merge_data"); 02100 02101 Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys; 02102 02103 int t1 = data->sdata->operation; 02104 int t2 = sdata->operation; 02105 if (t1 == Ev_t::enum_NUL) 02106 DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data)); 02107 02108 Ev_t* tp = 0; 02109 int i; 02110 for (i = 0; i < sizeof(ev_t)/sizeof(ev_t[0]); i++) { 02111 if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) { 02112 tp = &ev_t[i]; 02113 break; 02114 } 02115 } 02116 assert(tp != 0 && tp->t3 != Ev_t::enum_ERR); 02117 02118 if (tp->t3 == Ev_t::enum_IDM) { 02119 LinearSectionPtr (&ptr1)[3] = data->ptr; 02120 02121 /* 02122 * TODO 02123 * - can get data in INS ptr2[2] which is supposed to be empty 02124 * - can get extra data in DEL ptr2[2] 02125 * - why does DBUG_PRINT not work in this file ??? 02126 * 02127 * replication + bug#19872 can ignore this since merge is on 02128 * only for tables with explicit PK and before data is not used 02129 */ 02130 const int maxsec = 1; // ignore section 2 02131 02132 int i; 02133 for (i = 0; i <= maxsec; i++) { 02134 if (ptr1[i].sz != ptr2[i].sz || 02135 memcmp(ptr1[i].p, ptr2[i].p, ptr1[i].sz << 2) != 0) { 02136 DBUG_PRINT("info", ("idempotent op %d*%d data differs in sec %d", 02137 tp->t1, tp->t2, i)); 02138 assert(false); 02139 DBUG_RETURN_EVENT(-1); 02140 } 02141 } 02142 DBUG_PRINT("info", ("idempotent op %d*%d data ok", tp->t1, tp->t2)); 02143 DBUG_RETURN_EVENT(0); 02144 } 02145 02146 // save old data 02147 EventBufData olddata = *data; 02148 data->memory = 0; 02149 data->sz = 0; 02150 02151 // compose ptr1 o ptr2 = ptr 02152 LinearSectionPtr (&ptr1)[3] = olddata.ptr; 02153 LinearSectionPtr (&ptr)[3] = data->ptr; 02154 02155 // loop twice where first loop only sets sizes 02156 int loop; 02157 int result = 0; 02158 for (loop = 0; loop <= 1; loop++) 02159 { 02160 if (loop == 1) 02161 { 02162 if (alloc_mem(data, ptr) != 0) 02163 { 02164 result = -1; 02165 goto end; 02166 } 02167 *data->sdata = *sdata; 02168 data->sdata->operation = tp->t3; 02169 } 02170 02171 ptr[0].sz = ptr[1].sz = ptr[2].sz = 0; 02172 02173 // copy pk from new version 02174 { 02175 AttributeHeader ah; 02176 Uint32 i = 0; 02177 Uint32 j = 0; 02178 Uint32 i2 = 0; 02179 Uint32 j2 = 0; 02180 while (i < nkey) 02181 { 02182 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop); 02183 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop); 02184 } 02185 ptr[0].sz = i; 02186 ptr[1].sz = j; 02187 } 02188 02189 // merge after values, new version overrides 02190 if (tp->t3 != Ev_t::enum_DEL) 02191 { 02192 AttributeHeader ah; 02193 Uint32 i = ptr[0].sz; 02194 Uint32 j = ptr[1].sz; 02195 Uint32 i1 = 0; 02196 Uint32 j1 = 0; 02197 Uint32 i2 = nkey; 02198 Uint32 j2 = ptr[1].sz; 02199 while (i1 < nkey) 02200 { 02201 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize(); 02202 } 02203 while (1) 02204 { 02205 bool b1 = (i1 < ptr1[0].sz); 02206 bool b2 = (i2 < ptr2[0].sz); 02207 if (b1 && b2) 02208 { 02209 Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId(); 02210 Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId(); 02211 if (id1 < id2) 02212 b2 = false; 02213 else if (id1 > id2) 02214 b1 = false; 02215 else 02216 { 02217 j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize(); 02218 b1 = false; 02219 } 02220 } 02221 if (b1) 02222 { 02223 ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop); 02224 copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop); 02225 } 02226 else if (b2) 02227 { 02228 ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop); 02229 copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop); 02230 } 02231 else 02232 break; 02233 } 02234 ptr[0].sz = i; 02235 ptr[1].sz = j; 02236 } 02237 02238 // merge before values, old version overrides 02239 if (tp->t3 != Ev_t::enum_INS) 02240 { 02241 AttributeHeader ah; 02242 Uint32 k = 0; 02243 Uint32 k1 = 0; 02244 Uint32 k2 = 0; 02245 while (1) 02246 { 02247 bool b1 = (k1 < ptr1[2].sz); 02248 bool b2 = (k2 < ptr2[2].sz); 02249 if (b1 && b2) 02250 { 02251 Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId(); 02252 Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId(); 02253 if (id1 < id2) 02254 b2 = false; 02255 else if (id1 > id2) 02256 b1 = false; 02257 else 02258 { 02259 k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize(); 02260 b2 = false; 02261 } 02262 } 02263 if (b1) 02264 { 02265 ah = AttributeHeader(ptr1[2].p[k1]); 02266 copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2); 02267 } 02268 else if (b2) 02269 { 02270 ah = AttributeHeader(ptr2[2].p[k2]); 02271 copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2); 02272 } 02273 else 02274 break; 02275 } 02276 ptr[2].sz = k; 02277 } 02278 } 02279 02280 end: 02281 // free old data 02282 NdbMem_Free((char*)olddata.memory); 02283 assert(m_total_alloc >= olddata.sz); 02284 m_total_alloc -= olddata.sz; 02285 02286 DBUG_RETURN_EVENT(result); 02287 } 02288 02289 /* 02290 * Given blob part event, find main table event on inline part. It 02291 * should exist (force in TUP) but may arrive later. If so, create 02292 * NUL event on main table. The real event replaces it later. 02293 */ 02294 02295 // write attribute headers for concatened PK 02296 static void 02297 split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer, 02298 const Uint32* pk_buffer, Uint32 pk_sz) 02299 { 02300 Uint32 sz = 0; // words parsed so far 02301 Uint32 n; // pk attr count 02302 Uint32 i; 02303 for (i = n = 0; i < t->m_columns.size() && n < t->m_noOfKeys; i++) 02304 { 02305 const NdbColumnImpl* c = t->getColumn(i); 02306 assert(c != NULL); 02307 if (! c->m_pk) 02308 continue; 02309 02310 assert(sz < pk_sz); 02311 Uint32 bytesize = c->m_attrSize * c->m_arraySize; 02312 Uint32 lb, len; 02313 bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_buffer[sz], bytesize, 02314 lb, len); 02315 assert(ok); 02316 02317 AttributeHeader ah(i, lb + len); 02318 ah_buffer[n++] = ah.m_value; 02319 sz += ah.getDataSize(); 02320 } 02321 assert(n == t->m_noOfKeys && sz <= pk_sz); 02322 } 02323 02324 int 02325 NdbEventBuffer::get_main_data(Gci_container* bucket, 02326 EventBufData_hash::Pos& hpos, 02327 EventBufData* blob_data) 02328 { 02329 DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data"); 02330 02331 NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp; 02332 assert(main_op != NULL); 02333 const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl; 02334 02335 // create LinearSectionPtr for main table key 02336 LinearSectionPtr ptr[3]; 02337 Uint32 ah_buffer[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY]; 02338 ptr[0].sz = mainTable->m_noOfKeys; 02339 ptr[0].p = ah_buffer; 02340 ptr[1].sz = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize(); 02341 ptr[1].p = blob_data->ptr[1].p; 02342 ptr[2].sz = 0; 02343 ptr[2].p = 0; 02344 split_concatenated_pk(mainTable, ptr[0].p, ptr[1].p, ptr[1].sz); 02345 02346 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2); 02347 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2); 02348 02349 // search for main event buffer 02350 bucket->m_data_hash.search(hpos, main_op, ptr); 02351 if (hpos.data != NULL) 02352 DBUG_RETURN_EVENT(0); 02353 02354 // not found, create a place-holder 02355 EventBufData* main_data = alloc_data(); 02356 if (main_data == NULL) 02357 DBUG_RETURN_EVENT(-1); 02358 SubTableData sdata = *blob_data->sdata; 02359 sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id; 02360 sdata.operation = NdbDictionary::Event::_TE_NUL; 02361 if (copy_data(&sdata, ptr, main_data) != 0) 02362 DBUG_RETURN_EVENT(-1); 02363 hpos.data = main_data; 02364 02365 DBUG_RETURN_EVENT(1); 02366 } 02367 02368 void 02369 NdbEventBuffer::add_blob_data(EventBufData* main_data, 02370 EventBufData* blob_data) 02371 { 02372 DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data"); 02373 DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data)); 02374 EventBufData* head; 02375 head = main_data->m_next_blob; 02376 while (head != NULL) 02377 { 02378 if (head->m_event_op == blob_data->m_event_op) 02379 break; 02380 head = head->m_next_blob; 02381 } 02382 if (head == NULL) 02383 { 02384 head = blob_data; 02385 head->m_next_blob = main_data->m_next_blob; 02386 main_data->m_next_blob = head; 02387 } 02388 else 02389 { 02390 blob_data->m_next = head->m_next; 02391 head->m_next = blob_data; 02392 } 02393 DBUG_VOID_RETURN_EVENT; 02394 } 02395 02396 NdbEventOperationImpl * 02397 NdbEventBuffer::move_data() 02398 { 02399 // handle received data 02400 if (!m_complete_data.m_data.is_empty()) 02401 { 02402 // move this list to last in m_available_data 02403 m_available_data.append_list(&m_complete_data.m_data, 0); 02404 02405 bzero(&m_complete_data, sizeof(m_complete_data)); 02406 } 02407 02408 // handle used data 02409 if (!m_used_data.is_empty()) 02410 { 02411 // return m_used_data to m_free_data 02412 free_list(m_used_data); 02413 } 02414 if (!m_available_data.is_empty()) 02415 { 02416 DBUG_ENTER_EVENT("NdbEventBuffer::move_data"); 02417 #ifdef VM_TRACE 02418 DBUG_PRINT_EVENT("exit",("m_available_data_count %u", m_available_data.m_count)); 02419 #endif 02420 DBUG_RETURN_EVENT(m_available_data.m_head->m_event_op); 02421 } 02422 return 0; 02423 } 02424 02425 void 02426 NdbEventBuffer::free_list(EventBufData_list &list) 02427 { 02428 // return list to m_free_data 02429 list.m_tail->m_next= m_free_data; 02430 m_free_data= list.m_head; 02431 #ifdef VM_TRACE 02432 m_free_data_count+= list.m_count; 02433 #endif 02434 m_free_data_sz+= list.m_sz; 02435 02436 // free blobs XXX unacceptable performance, fix later 02437 { 02438 EventBufData* data = list.m_head; 02439 while (1) { 02440 while (data->m_next_blob != NULL) { 02441 EventBufData* blob_head = data->m_next_blob; 02442 data->m_next_blob = blob_head->m_next_blob; 02443 blob_head->m_next_blob = NULL; 02444 while (blob_head != NULL) { 02445 EventBufData* blob_part = blob_head; 02446 blob_head = blob_head->m_next; 02447 blob_part->m_next = m_free_data; 02448 m_free_data = blob_part; 02449 #ifdef VM_TRACE 02450 m_free_data_count++; 02451 #endif 02452 m_free_data_sz += blob_part->sz; 02453 } 02454 } 02455 if (data == list.m_tail) 02456 break; 02457 data = data->m_next; 02458 } 02459 } 02460 02461 // list returned to m_free_data 02462 list.m_head = list.m_tail = NULL; 02463 list.m_count = list.m_sz = 0; 02464 } 02465 02466 void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) 02467 { 02468 move_gci_ops(list, gci); 02469 02470 if (m_tail) 02471 m_tail->m_next= list->m_head; 02472 else 02473 m_head= list->m_head; 02474 m_tail= list->m_tail; 02475 m_count+= list->m_count; 02476 m_sz+= list->m_sz; 02477 } 02478 02479 void 02480 EventBufData_list::add_gci_op(Gci_op g) 02481 { 02482 DBUG_ENTER_EVENT("EventBufData_list::add_gci_op"); 02483 DBUG_PRINT_EVENT("info", ("p.op: %p g.event_types: %x", g.op, g.event_types)); 02484 assert(g.op != NULL && g.op->theMainOp == NULL); // as in nextEvent 02485 Uint32 i; 02486 for (i = 0; i < m_gci_op_count; i++) { 02487 if (m_gci_op_list[i].op == g.op) 02488 break; 02489 } 02490 if (i < m_gci_op_count) { 02491 m_gci_op_list[i].event_types |= g.event_types; 02492 } else { 02493 if (m_gci_op_count == m_gci_op_alloc) { 02494 Uint32 n = 1 + 2 * m_gci_op_alloc; 02495 Gci_op* old_list = m_gci_op_list; 02496 m_gci_op_list = new Gci_op [n]; 02497 if (m_gci_op_alloc != 0) { 02498 Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op); 02499 memcpy(m_gci_op_list, old_list, bytes); 02500 DBUG_PRINT_EVENT("info", ("this: %p delete m_gci_op_list: %p", 02501 this, old_list)); 02502 delete [] old_list; 02503 } 02504 DBUG_PRINT_EVENT("info", ("this: %p new m_gci_op_list: %p", 02505 this, m_gci_op_list)); 02506 m_gci_op_alloc = n; 02507 } 02508 assert(m_gci_op_count < m_gci_op_alloc); 02509 #ifndef DBUG_OFF 02510 i = m_gci_op_count; 02511 #endif 02512 g.op->m_ref_count++; 02513 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op)); 02514 m_gci_op_list[m_gci_op_count++] = g; 02515 } 02516 DBUG_PRINT_EVENT("exit", ("m_gci_op_list[%u].event_types: %x", i, m_gci_op_list[i].event_types)); 02517 DBUG_VOID_RETURN_EVENT; 02518 } 02519 02520 void 02521 EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci) 02522 { 02523 DBUG_ENTER_EVENT("EventBufData_list::move_gci_ops"); 02524 DBUG_PRINT_EVENT("info", ("this: %p list: %p gci: %llu", 02525 this, list, gci)); 02526 assert(!m_is_not_multi_list); 02527 if (!list->m_is_not_multi_list) 02528 { 02529 assert(gci == 0); 02530 if (m_gci_ops_list_tail) 02531 m_gci_ops_list_tail->m_next = list->m_gci_ops_list; 02532 else 02533 { 02534 m_gci_ops_list = list->m_gci_ops_list; 02535 } 02536 m_gci_ops_list_tail = list->m_gci_ops_list_tail; 02537 goto end; 02538 } 02539 { 02540 Gci_ops *new_gci_ops = new Gci_ops; 02541 DBUG_PRINT_EVENT("info", ("this: %p m_gci_op_list: %p", 02542 new_gci_ops, list->m_gci_op_list)); 02543 if (m_gci_ops_list_tail) 02544 m_gci_ops_list_tail->m_next = new_gci_ops; 02545 else 02546 { 02547 assert(m_gci_ops_list == 0); 02548 m_gci_ops_list = new_gci_ops; 02549 } 02550 m_gci_ops_list_tail = new_gci_ops; 02551 02552 new_gci_ops->m_gci_op_list = list->m_gci_op_list; 02553 new_gci_ops->m_gci_op_count = list->m_gci_op_count; 02554 new_gci_ops->m_gci = gci; 02555 new_gci_ops->m_next = 0; 02556 } 02557 end: 02558 list->m_gci_op_list = 0; 02559 list->m_gci_ops_list_tail = 0; 02560 list->m_gci_op_alloc = 0; 02561 DBUG_VOID_RETURN_EVENT; 02562 } 02563 02564 NdbEventOperation* 02565 NdbEventBuffer::createEventOperation(const char* eventName, 02566 NdbError &theError) 02567 { 02568 DBUG_ENTER("NdbEventBuffer::createEventOperation"); 02569 NdbEventOperation* tOp= new NdbEventOperation(m_ndb, eventName); 02570 if (tOp == 0) 02571 { 02572 theError.code= 4000; 02573 DBUG_RETURN(NULL); 02574 } 02575 if (tOp->getState() != NdbEventOperation::EO_CREATED) { 02576 theError.code= tOp->getNdbError().code; 02577 delete tOp; 02578 DBUG_RETURN(NULL); 02579 } 02580 getEventOperationImpl(tOp)->m_ref_count = 1; 02581 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", 02582 getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp))); 02583 DBUG_RETURN(tOp); 02584 } 02585 02586 NdbEventOperationImpl* 02587 NdbEventBuffer::createEventOperation(NdbEventImpl& evnt, 02588 NdbError &theError) 02589 { 02590 DBUG_ENTER("NdbEventBuffer::createEventOperation [evnt]"); 02591 NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt); 02592 if (tOp == 0) 02593 { 02594 theError.code= 4000; 02595 DBUG_RETURN(NULL); 02596 } 02597 if (tOp->getState() != NdbEventOperation::EO_CREATED) { 02598 theError.code= tOp->getNdbError().code; 02599 delete tOp; 02600 DBUG_RETURN(NULL); 02601 } 02602 DBUG_RETURN(tOp); 02603 } 02604 02605 void 02606 NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) 02607 { 02608 DBUG_ENTER("NdbEventBuffer::dropEventOperation"); 02609 NdbEventOperationImpl* op= getEventOperationImpl(tOp); 02610 02611 op->stop(); 02612 // stop blob event ops 02613 if (op->theMainOp == NULL) 02614 { 02615 NdbEventOperationImpl* tBlobOp = op->theBlobOpList; 02616 while (tBlobOp != NULL) 02617 { 02618 tBlobOp->stop(); 02619 tBlobOp = tBlobOp->m_next; 02620 } 02621 02622 // release blob handles now, further access is user error 02623 while (op->theBlobList != NULL) 02624 { 02625 NdbBlob* tBlob = op->theBlobList; 02626 op->theBlobList = tBlob->theNext; 02627 m_ndb->releaseNdbBlob(tBlob); 02628 } 02629 } 02630 02631 DBUG_ASSERT(op->m_ref_count > 0); 02632 op->m_ref_count--; 02633 DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op)); 02634 if (op->m_ref_count == 0) 02635 { 02636 DBUG_PRINT("info", ("deleting op: %p", op)); 02637 DBUG_ASSERT(op->m_node_bit_mask.isclear()); 02638 delete op->m_facade; 02639 } 02640 else 02641 { 02642 op->m_next= m_dropped_ev_op; 02643 op->m_prev= 0; 02644 if (m_dropped_ev_op) 02645 m_dropped_ev_op->m_prev= op; 02646 m_dropped_ev_op= op; 02647 } 02648 DBUG_VOID_RETURN; 02649 } 02650 02651 void 02652 NdbEventBuffer::reportStatus() 02653 { 02654 EventBufData *apply_buf= m_available_data.m_head; 02655 Uint64 apply_gci, latest_gci= m_latestGCI; 02656 if (apply_buf == 0) 02657 apply_buf= m_complete_data.m_data.m_head; 02658 if (apply_buf) 02659 apply_gci= apply_buf->sdata->gci; 02660 else 02661 apply_gci= latest_gci; 02662 02663 if (100*m_free_data_sz < m_min_free_thresh*m_total_alloc && 02664 m_total_alloc > 1024*1024) 02665 { 02666 /* report less free buffer than m_free_thresh, 02667 next report when more free than 2 * m_free_thresh 02668 */ 02669 m_min_free_thresh= 0; 02670 m_max_free_thresh= 2 * m_free_thresh; 02671 goto send_report; 02672 } 02673 02674 if (100*m_free_data_sz > m_max_free_thresh*m_total_alloc && 02675 m_total_alloc > 1024*1024) 02676 { 02677 /* report more free than 2 * m_free_thresh 02678 next report when less free than m_free_thresh 02679 */ 02680 m_min_free_thresh= m_free_thresh; 02681 m_max_free_thresh= 100; 02682 goto send_report; 02683 } 02684 if (latest_gci-apply_gci >= m_gci_slip_thresh) 02685 { 02686 goto send_report; 02687 } 02688 return; 02689 02690 send_report: 02691 Uint32 data[8]; 02692 data[0]= NDB_LE_EventBufferStatus; 02693 data[1]= m_total_alloc-m_free_data_sz; 02694 data[2]= m_total_alloc; 02695 data[3]= 0; 02696 data[4]= apply_gci & ~(Uint32)0; 02697 data[5]= apply_gci >> 32; 02698 data[6]= latest_gci & ~(Uint32)0; 02699 data[7]= latest_gci >> 32; 02700 m_ndb->theImpl->send_event_report(data,8); 02701 #ifdef VM_TRACE 02702 assert(m_total_alloc >= m_free_data_sz); 02703 #endif 02704 } 02705 02706 // hash table routines 02707 02708 // could optimize the all-fixed case 02709 Uint32 02710 EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) 02711 { 02712 DBUG_ENTER_EVENT("EventBufData_hash::getpkhash"); 02713 DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2); 02714 DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2); 02715 02716 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; 02717 02718 // in all cases ptr[0] = pk ah.. ptr[1] = pk ad.. 02719 // for pk update (to equivalent pk) post/pre values give same hash 02720 Uint32 nkey = tab->m_noOfKeys; 02721 assert(nkey != 0 && nkey <= ptr[0].sz); 02722 const Uint32* hptr = ptr[0].p; 02723 const uchar* dptr = (uchar*)ptr[1].p; 02724 02725 // hash registers 02726 ulong nr1 = 0; 02727 ulong nr2 = 0; 02728 while (nkey-- != 0) 02729 { 02730 AttributeHeader ah(*hptr++); 02731 Uint32 bytesize = ah.getByteSize(); 02732 assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz)); 02733 02734 Uint32 i = ah.getAttributeId(); 02735 const NdbColumnImpl* col = tab->getColumn(i); 02736 assert(col != 0); 02737 02738 Uint32 lb, len; 02739 bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len); 02740 assert(ok); 02741 02742 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin; 02743 (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2); 02744 dptr += ((bytesize + 3) / 4) * 4; 02745 } 02746 DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1)); 02747 DBUG_RETURN_EVENT(nr1); 02748 } 02749 02750 bool 02751 EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]) 02752 { 02753 DBUG_ENTER_EVENT("EventBufData_hash::getpkequal"); 02754 DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2); 02755 DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2); 02756 DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2); 02757 DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2); 02758 02759 const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; 02760 02761 Uint32 nkey = tab->m_noOfKeys; 02762 assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz); 02763 const Uint32* hptr1 = ptr1[0].p; 02764 const Uint32* hptr2 = ptr2[0].p; 02765 const uchar* dptr1 = (uchar*)ptr1[1].p; 02766 const uchar* dptr2 = (uchar*)ptr2[1].p; 02767 02768 bool equal = true; 02769 02770 while (nkey-- != 0) 02771 { 02772 AttributeHeader ah1(*hptr1++); 02773 AttributeHeader ah2(*hptr2++); 02774 // sizes can differ on update of varchar endspace 02775 Uint32 bytesize1 = ah1.getByteSize(); 02776 Uint32 bytesize2 = ah2.getByteSize(); 02777 assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz)); 02778 assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz)); 02779 02780 assert(ah1.getAttributeId() == ah2.getAttributeId()); 02781 Uint32 i = ah1.getAttributeId(); 02782 const NdbColumnImpl* col = tab->getColumn(i); 02783 assert(col != 0); 02784 02785 Uint32 lb1, len1; 02786 bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1); 02787 Uint32 lb2, len2; 02788 bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2); 02789 assert(ok1 && ok2 && lb1 == lb2); 02790 02791 CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin; 02792 int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false); 02793 if (res != 0) 02794 { 02795 equal = false; 02796 break; 02797 } 02798 dptr1 += ((bytesize1 + 3) / 4) * 4; 02799 dptr2 += ((bytesize2 + 3) / 4) * 4; 02800 } 02801 02802 DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false")); 02803 DBUG_RETURN_EVENT(equal); 02804 } 02805 02806 void 02807 EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) 02808 { 02809 DBUG_ENTER_EVENT("EventBufData_hash::search"); 02810 Uint32 pkhash = getpkhash(op, ptr); 02811 Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE; 02812 EventBufData* data = m_hash[index]; 02813 while (data != 0) 02814 { 02815 if (data->m_event_op == op && 02816 data->m_pkhash == pkhash && 02817 getpkequal(op, data->ptr, ptr)) 02818 break; 02819 data = data->m_next_hash; 02820 } 02821 hpos.index = index; 02822 hpos.data = data; 02823 hpos.pkhash = pkhash; 02824 DBUG_PRINT_EVENT("info", ("search result=%p", data)); 02825 DBUG_VOID_RETURN_EVENT; 02826 } 02827 02828 template class Vector<Gci_container_pod>; 02829 template class Vector<NdbEventBuffer::EventBufData_chunk*>;
1.4.7

