#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include <TransporterCallback.hpp>#include <TransporterRegistry.hpp>#include "NdbApiSignal.hpp"#include <NdbOut.hpp>#include <NdbEnv.h>#include <NdbSleep.h>#include "API.hpp"#include <ConfigRetriever.hpp>#include <mgmapi_config_parameters.h>#include <mgmapi_configuration.hpp>#include <NdbConfig.h>#include <ndb_version.h>#include <SignalLoggerManager.hpp>#include <signaldata/AlterTable.hpp>#include <signaldata/SumaImpl.hpp>Include dependency graph for TransporterFacade.cpp:

Go to the source code of this file.
Defines | |
| #define | TRP_DEBUG(t) |
| #define | CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 |
Functions | |
| static int | numberToIndex (int number) |
| static int | indexToNumber (int index) |
| void | reportError (void *callbackObj, NodeId nodeId, TransporterError errorCode, const char *info) |
| void | reportSendLen (void *callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes) |
| void | reportReceiveLen (void *callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes) |
| void | reportConnect (void *callbackObj, NodeId nodeId) |
| void | reportDisconnect (void *callbackObj, NodeId nodeId, Uint32 error) |
| void | transporter_recv_from (void *callbackObj, NodeId nodeId) |
| int | checkJobBuffer () |
| void | execute (void *callbackObj, SignalHeader *const header, Uint8 prio, Uint32 *const theData, LinearSectionPtr ptr[3]) |
| void | copy (Uint32 *&insertPtr, class SectionSegmentPool &thePool, const SegmentedSectionPtr &_ptr) |
| void * | runSendRequest_C (void *me) |
| void * | runReceiveResponse_C (void *me) |
| #define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 |
Definition at line 1046 of file TransporterFacade.cpp.
Referenced by TransporterFacade::sendFragmentedSignal().
| #define TRP_DEBUG | ( | t | ) |
Definition at line 57 of file TransporterFacade.cpp.
Referenced by execute(), and TransporterFacade::init().
| int checkJobBuffer | ( | ) |
Report connection broken
Check to see if jobbbuffers are starting to get full and if so call doJob
Definition at line 142 of file TransporterFacade.cpp.
Referenced by TransporterRegistry::performReceive().
Here is the caller graph for this function:

| void copy | ( | Uint32 *& | insertPtr, | |
| class SectionSegmentPool & | thePool, | |||
| const SegmentedSectionPtr & | _ptr | |||
| ) |
Definition at line 375 of file TransporterFacade.cpp.
References abort().
00376 { 00377 abort(); 00378 }
Here is the call graph for this function:

| void execute | ( | void * | callbackObj, | |
| SignalHeader *const | header, | |||
| Uint8 | prio, | |||
| Uint32 *const | theData, | |||
| LinearSectionPtr | ptr[3] | |||
| ) |
The execute function : Handle received signal
Handle received signal immediately to avoid any unnecessary copying of data, allocation of memory and other things. Copying of data could be interesting to support several priority levels and to support a special memory structure when executing the signals. Neither of those are interesting when receiving data in the NDBAPI. The NDBAPI will thus read signal data directly as it was written by the sender (SCI sender is other node, Shared memory sender is other process and TCP/IP sender is the OS that writes the TCP/IP message into a message buffer).
Block number == 2047 is used to signal a signal that consists of multiple instances of the same signal. This is an effort to package the signals so as to avoid unnecessary communication overhead since TCP/IP has a great performance impact.
Since it contains at least two data packets we will first copy the signal data to safe place.
Set the data length of the signal and the receivers block reference and then call the API.
The signal was aimed for the Cluster Manager. We handle it immediately here.
Report
Reply
Definition at line 190 of file TransporterFacade.cpp.
References abort(), API_CLUSTERMGR, API_PACKED, AlterTableRep::changeType, AlterTableRep::CT_ALTERED, ArbitMgr::doChoose(), ArbitMgr::doStart(), ArbitMgr::doStop(), ClusterMgr::execAPI_REGCONF(), ClusterMgr::execAPI_REGREF(), ClusterMgr::execAPI_REGREQ(), ClusterMgr::execNF_COMPLETEREP(), ClusterMgr::execNODE_FAILREP(), SignalLoggerManager::executeSignal(), SignalLoggerManager::flushSignalLog(), TransporterFacade::for_each(), TransporterFacade::ThreadData::get(), NdbApiSignal::getDataPtrSend(), NdbApiSignal::getLength(), GSN_ALTER_TABLE_REP, GSN_API_REGCONF, GSN_API_REGREF, GSN_API_REGREQ, GSN_ARBIT_CHOOSEREQ, GSN_ARBIT_STARTREQ, GSN_ARBIT_STOPORD, GSN_NF_COMPLETEREP, GSN_NODE_FAILREP, GSN_SUB_GCP_COMPLETE_ACK, GSN_SUB_GCP_COMPLETE_REP, TransporterFacade::ThreadData::Object_Execute::m_executeFunction, TransporterFacade::m_globalDictCache, SignalHeader::m_noOfSections, TransporterFacade::ThreadData::Object_Execute::m_object, TransporterFacade::m_threads, memcpy, MIN_API_BLOCK_NO, ndbout(), NULL, numberToRef(), TransporterFacade::ownId(), p, refToBlock(), refToNode(), TransporterFacade::sendSignalUnCond(), NdbApiSignal::setDataPtr(), AlterTableRep::tableId, AlterTableRep::tableVersion, TransporterFacade::theArbitMgr, TransporterFacade::theClusterMgr, SignalHeader::theLength, TransporterFacade::theOwnId, SignalHeader::theReceiversBlockNumber, SignalHeader::theSendersBlockRef, SignalHeader::theVerId_signalNumber, and TRP_DEBUG.
00192 { 00193 00194 TransporterFacade * theFacade = (TransporterFacade*)callbackObj; 00195 TransporterFacade::ThreadData::Object_Execute oe; 00196 Uint32 tRecBlockNo = header->theReceiversBlockNumber; 00197 00198 #ifdef API_TRACE 00199 if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){ 00200 signalLogger.executeSignal(* header, 00201 prio, 00202 theData, 00203 theFacade->ownId(), 00204 ptr, header->m_noOfSections); 00205 signalLogger.flushSignalLog(); 00206 } 00207 #endif 00208 00209 if (tRecBlockNo >= MIN_API_BLOCK_NO) { 00210 oe = theFacade->m_threads.get(tRecBlockNo); 00211 if (oe.m_object != 0 && oe.m_executeFunction != 0) { 00223 NdbApiSignal tmpSignal(*header); 00224 NdbApiSignal * tSignal = &tmpSignal; 00225 tSignal->setDataPtr(theData); 00226 (* oe.m_executeFunction) (oe.m_object, tSignal, ptr); 00227 }//if 00228 } else if (tRecBlockNo == API_PACKED) { 00235 Uint32 Tlength = header->theLength; 00236 Uint32 Tsent = 0; 00241 while (Tsent < Tlength) { 00242 Uint32 Theader = theData[Tsent]; 00243 Tsent++; 00244 Uint32 TpacketLen = (Theader & 0x1F) + 3; 00245 tRecBlockNo = Theader >> 16; 00246 if (TpacketLen <= 25) { 00247 if ((TpacketLen + Tsent) <= Tlength) { 00252 header->theLength = TpacketLen; 00253 header->theReceiversBlockNumber = tRecBlockNo; 00254 Uint32* tDataPtr = &theData[Tsent]; 00255 Tsent += TpacketLen; 00256 if (tRecBlockNo >= MIN_API_BLOCK_NO) { 00257 oe = theFacade->m_threads.get(tRecBlockNo); 00258 if(oe.m_object != 0 && oe.m_executeFunction != 0){ 00259 NdbApiSignal tmpSignal(*header); 00260 NdbApiSignal * tSignal = &tmpSignal; 00261 tSignal->setDataPtr(tDataPtr); 00262 (*oe.m_executeFunction)(oe.m_object, tSignal, 0); 00263 } 00264 } 00265 } 00266 } 00267 } 00268 return; 00269 } else if (tRecBlockNo == API_CLUSTERMGR) { 00274 ClusterMgr * clusterMgr = theFacade->theClusterMgr; 00275 const Uint32 gsn = header->theVerId_signalNumber; 00276 00277 switch (gsn){ 00278 case GSN_API_REGREQ: 00279 clusterMgr->execAPI_REGREQ(theData); 00280 break; 00281 00282 case GSN_API_REGCONF: 00283 clusterMgr->execAPI_REGCONF(theData); 00284 break; 00285 00286 case GSN_API_REGREF: 00287 clusterMgr->execAPI_REGREF(theData); 00288 break; 00289 00290 case GSN_NODE_FAILREP: 00291 clusterMgr->execNODE_FAILREP(theData); 00292 break; 00293 00294 case GSN_NF_COMPLETEREP: 00295 clusterMgr->execNF_COMPLETEREP(theData); 00296 break; 00297 00298 case GSN_ARBIT_STARTREQ: 00299 if (theFacade->theArbitMgr != NULL) 00300 theFacade->theArbitMgr->doStart(theData); 00301 break; 00302 00303 case GSN_ARBIT_CHOOSEREQ: 00304 if (theFacade->theArbitMgr != NULL) 00305 theFacade->theArbitMgr->doChoose(theData); 00306 break; 00307 00308 case GSN_ARBIT_STOPORD: 00309 if(theFacade->theArbitMgr != NULL) 00310 theFacade->theArbitMgr->doStop(theData); 00311 break; 00312 00313 case GSN_ALTER_TABLE_REP: 00314 { 00315 const AlterTableRep* rep = (const AlterTableRep*)theData; 00316 theFacade->m_globalDictCache.lock(); 00317 theFacade->m_globalDictCache. 00318 alter_table_rep((const char*)ptr[0].p, 00319 rep->tableId, 00320 rep->tableVersion, 00321 rep->changeType == AlterTableRep::CT_ALTERED); 00322 theFacade->m_globalDictCache.unlock(); 00323 break; 00324 } 00325 case GSN_SUB_GCP_COMPLETE_REP: 00326 { 00330 NdbApiSignal tSignal(* header); 00331 tSignal.setDataPtr(theData); 00332 theFacade->for_each(&tSignal, ptr); 00333 00337 { 00338 Uint32* send= tSignal.getDataPtrSend(); 00339 memcpy(send, theData, tSignal.getLength() << 2); 00340 ((SubGcpCompleteAck*)send)->rep.senderRef = 00341 numberToRef(API_CLUSTERMGR, theFacade->theOwnId); 00342 Uint32 ref= header->theSendersBlockRef; 00343 Uint32 aNodeId= refToNode(ref); 00344 tSignal.theReceiversBlockNumber= refToBlock(ref); 00345 tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK; 00346 theFacade->sendSignalUnCond(&tSignal, aNodeId); 00347 } 00348 break; 00349 } 00350 default: 00351 break; 00352 00353 } 00354 return; 00355 } else { 00356 ; // Ignore all other block numbers. 00357 if(header->theVerId_signalNumber!=3) { 00358 TRP_DEBUG( "TransporterFacade received signal to unknown block no." ); 00359 ndbout << "BLOCK NO: " << tRecBlockNo << " sig " 00360 << header->theVerId_signalNumber << endl; 00361 abort(); 00362 } 00363 } 00364 }
Here is the call graph for this function:

| static int indexToNumber | ( | int | index | ) | [static] |
Definition at line 49 of file TransporterFacade.cpp.
References MIN_API_BLOCK_NO.
Referenced by TransporterFacade::connected(), and TransporterFacade::ThreadData::open().
00050 { 00051 return index + MIN_API_BLOCK_NO; 00052 }
Here is the caller graph for this function:

| static int numberToIndex | ( | int | number | ) | [static] |
Definition at line 44 of file TransporterFacade.cpp.
References MIN_API_BLOCK_NO.
Referenced by TransporterFacade::checkForceSend(), TransporterFacade::ThreadData::close(), and TransporterFacade::forceSend().
00045 { 00046 return number - MIN_API_BLOCK_NO; 00047 }
Here is the caller graph for this function:

| void reportConnect | ( | void * | callbackObj, | |
| NodeId | nodeId | |||
| ) |
Report connection established
Definition at line 112 of file TransporterFacade.cpp.
References ndbout_c().
Referenced by TransporterRegistry::report_connect().
00112 { 00113 #ifdef REPORT_TRANSPORTER 00114 ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId); 00115 #endif 00116 ((TransporterFacade*)(callbackObj))->reportConnected(nodeId); 00117 }
Here is the call graph for this function:

Here is the caller graph for this function:

Report connection broken
Definition at line 123 of file TransporterFacade.cpp.
References ndbout_c().
Referenced by OSE_Transporter::disconnectOrd(), TransporterRegistry::report_disconnect(), and reportError().
00123 { 00124 #ifdef REPORT_TRANSPORTER 00125 ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId); 00126 #endif 00127 ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId); 00128 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void reportError | ( | void * | callbackObj, | |
| NodeId | nodeId, | |||
| TransporterError | errorCode, | |||
| const char * | info = 0 | |||
| ) |
Report error
Definition at line 65 of file TransporterFacade.cpp.
References ndbout_c(), and TE_DO_DISCONNECT.
Referenced by OSE_Receiver::checkWaitStack(), OSE_Receiver::doReceive(), OSE_Receiver::insertWaitStack(), TransporterRegistry::prepareSend(), Transporter::report_error(), and TransporterRegistry::unpack().
00067 { 00068 #ifdef REPORT_TRANSPORTER 00069 ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s", 00070 (int)nodeId, (int)errorCode, info ? info : ""); 00071 #endif 00072 if(errorCode & TE_DO_DISCONNECT) { 00073 ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode, 00074 info ? info : ""); 00075 ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId); 00076 } 00077 }
Here is the call graph for this function:

Here is the caller graph for this function:

Report average receive length in bytes (4096 last receives)
Definition at line 97 of file TransporterFacade.cpp.
References ndbout_c().
Referenced by TCP_Transporter::doReceive().
00098 { 00099 #ifdef REPORT_TRANSPORTER 00100 ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", 00101 (int)nodeId, (Uint32)(bytes/count)); 00102 #endif 00103 (void)nodeId; 00104 (void)count; 00105 (void)bytes; 00106 }
Here is the call graph for this function:

Here is the caller graph for this function:

Report average send length in bytes (4096 last sends)
Definition at line 83 of file TransporterFacade.cpp.
References ndbout_c().
Referenced by TCP_Transporter::doSend().
00083 { 00084 #ifdef REPORT_TRANSPORTER 00085 ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", 00086 (int)nodeId, (Uint32)(bytes/count)); 00087 #endif 00088 (void)nodeId; 00089 (void)count; 00090 (void)bytes; 00091 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void* runReceiveResponse_C | ( | void * | me | ) |
Definition at line 484 of file TransporterFacade.cpp.
00485 { 00486 ((TransporterFacade*) me)->threadMainReceive(); 00487 return 0; 00488 }
| void* runSendRequest_C | ( | void * | me | ) |
Definition at line 449 of file TransporterFacade.cpp.
00450 { 00451 ((TransporterFacade*) me)->threadMainSend(); 00452 return 0; 00453 }
| void transporter_recv_from | ( | void * | callbackObj, | |
| NodeId | nodeId | |||
| ) |
Definition at line 131 of file TransporterFacade.cpp.
Referenced by TransporterRegistry::performReceive().
00131 { 00132 ((TransporterFacade*)(callbackObj))->hb_received(nodeId); 00133 }
Here is the caller graph for this function:

1.4.7

