#include <TransporterRegistry.hpp>
Collaboration diagram for TransporterRegistry:

Definition at line 91 of file TransporterRegistry.hpp.
Definition at line 170 of file TransporterRegistry.hpp.
00170 { 00171 CONNECTED = 0, 00172 CONNECTING = 1, 00173 DISCONNECTED = 2, 00174 DISCONNECTING = 3 00175 };
| TransporterRegistry::TransporterRegistry | ( | void * | callback = 0, |
|
| unsigned | maxTransporters = MAX_NTRANSPORTERS, |
|||
| unsigned | sizeOfLongSignalMemory = 100 | |||
| ) |
Constructor
Definition at line 81 of file TransporterRegistry.cpp.
References callbackObj, DBUG_ENTER, DBUG_VOID_RETURN, DISCONNECTED, ioStates, m_mgm_handle, maxTransporters, nodeIdSpecified, NoHalt, nOSETransporters, nSCITransporters, nSHMTransporters, nTCPTransporters, nTransporters, NULL, performStates, sendCounter, theOSEJunkSocketRecv, theOSEJunkSocketSend, theOSEReceiver, theOSETransporters, theSCITransporters, theSHMTransporters, theTCPTransporters, theTransporters, and theTransporterTypes.
00084 { 00085 DBUG_ENTER("TransporterRegistry::TransporterRegistry"); 00086 00087 nodeIdSpecified = false; 00088 maxTransporters = _maxTransporters; 00089 sendCounter = 1; 00090 m_mgm_handle= 0; 00091 00092 callbackObj=callback; 00093 00094 theTCPTransporters = new TCP_Transporter * [maxTransporters]; 00095 theSCITransporters = new SCI_Transporter * [maxTransporters]; 00096 theSHMTransporters = new SHM_Transporter * [maxTransporters]; 00097 theOSETransporters = new OSE_Transporter * [maxTransporters]; 00098 theTransporterTypes = new TransporterType [maxTransporters]; 00099 theTransporters = new Transporter * [maxTransporters]; 00100 performStates = new PerformState [maxTransporters]; 00101 ioStates = new IOState [maxTransporters]; 00102 00103 // Initialize member variables 00104 nTransporters = 0; 00105 nTCPTransporters = 0; 00106 nSCITransporters = 0; 00107 nSHMTransporters = 0; 00108 nOSETransporters = 0; 00109 00110 // Initialize the transporter arrays 00111 for (unsigned i=0; i<maxTransporters; i++) { 00112 theTCPTransporters[i] = NULL; 00113 theSCITransporters[i] = NULL; 00114 theSHMTransporters[i] = NULL; 00115 theOSETransporters[i] = NULL; 00116 theTransporters[i] = NULL; 00117 performStates[i] = DISCONNECTED; 00118 ioStates[i] = NoHalt; 00119 } 00120 theOSEReceiver = 0; 00121 theOSEJunkSocketSend = 0; 00122 theOSEJunkSocketRecv = 0; 00123 00124 DBUG_VOID_RETURN; 00125 }
| TransporterRegistry::~TransporterRegistry | ( | ) |
Stops the server, disconnects all the transporter and deletes them and remove it from the transporter arrays
Definition at line 148 of file TransporterRegistry.cpp.
References DBUG_ENTER, DBUG_VOID_RETURN, OSE_Receiver::destroyPhantom(), ioStates, m_mgm_handle, ndb_mgm_destroy_handle(), NULL, performStates, removeAll(), theOSEReceiver, theOSETransporters, theSCITransporters, theSHMTransporters, theTCPTransporters, theTransporters, and theTransporterTypes.
00149 { 00150 DBUG_ENTER("TransporterRegistry::~TransporterRegistry"); 00151 00152 removeAll(); 00153 00154 delete[] theTCPTransporters; 00155 delete[] theSCITransporters; 00156 delete[] theSHMTransporters; 00157 delete[] theOSETransporters; 00158 delete[] theTransporterTypes; 00159 delete[] theTransporters; 00160 delete[] performStates; 00161 delete[] ioStates; 00162 00163 #ifdef NDB_OSE_TRANSPORTER 00164 if(theOSEReceiver != NULL){ 00165 theOSEReceiver->destroyPhantom(); 00166 delete theOSEReceiver; 00167 theOSEReceiver = 0; 00168 } 00169 #endif 00170 if (m_mgm_handle) 00171 ndb_mgm_destroy_handle(&m_mgm_handle); 00172 00173 DBUG_VOID_RETURN; 00174 }
Here is the call graph for this function:

| void TransporterRegistry::add_transporter_interface | ( | NodeId | remoteNodeId, | |
| const char * | interf, | |||
| int | s_port | |||
| ) |
Definition at line 1378 of file TransporterRegistry.cpp.
References DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, TransporterRegistry::Transporter_interface::m_interface, TransporterRegistry::Transporter_interface::m_remote_nodeId, TransporterRegistry::Transporter_interface::m_s_service_port, m_transporter_interface, strcmp(), and strlen().
01381 { 01382 DBUG_ENTER("TransporterRegistry::add_transporter_interface"); 01383 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port)); 01384 if (interf && strlen(interf) == 0) 01385 interf= 0; 01386 01387 for (unsigned i= 0; i < m_transporter_interface.size(); i++) 01388 { 01389 Transporter_interface &tmp= m_transporter_interface[i]; 01390 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0) 01391 continue; 01392 if (interf != 0 && tmp.m_interface != 0 && 01393 strcmp(interf, tmp.m_interface) == 0) 01394 { 01395 DBUG_VOID_RETURN; // found match, no need to insert 01396 } 01397 if (interf == 0 && tmp.m_interface == 0) 01398 { 01399 DBUG_VOID_RETURN; // found match, no need to insert 01400 } 01401 } 01402 Transporter_interface t; 01403 t.m_remote_nodeId= remoteNodeId; 01404 t.m_s_service_port= s_port; 01405 t.m_interface= interf; 01406 m_transporter_interface.push_back(t); 01407 DBUG_PRINT("exit",("interface and port added")); 01408 DBUG_VOID_RETURN; 01409 }
Here is the call graph for this function:

| bool TransporterRegistry::connect_client | ( | NdbMgmHandle * | h | ) |
Definition at line 1570 of file TransporterRegistry.cpp.
References Transporter::connect_client(), connect_ndb_mgmd(), DBUG_ENTER, DBUG_RETURN, h, ndb_mgm_get_mgmd_nodeid(), ndbout_c(), and theTransporters.
Referenced by main().
01571 { 01572 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)"); 01573 01574 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h); 01575 01576 if(!mgm_nodeid) 01577 { 01578 ndbout_c("%s: %d", __FILE__, __LINE__); 01579 return false; 01580 } 01581 Transporter * t = theTransporters[mgm_nodeid]; 01582 if (!t) 01583 { 01584 ndbout_c("%s: %d", __FILE__, __LINE__); 01585 return false; 01586 } 01587 DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h))); 01588 }
Here is the call graph for this function:

Here is the caller graph for this function:

| NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd | ( | NdbMgmHandle * | h | ) |
Given a connected NdbMgmHandle, turns it into a transporter and returns the socket.
convert_to_transporter also disposes of the handle (i.e. we don't leak memory here.
Definition at line 1594 of file TransporterRegistry.cpp.
References CFG_CONNECTION_SERVER_PORT, get_localNodeId(), h, m_transporter_interface, NDB_INVALID_SOCKET, ndb_mgm_convert_to_transporter(), ndb_mgm_destroy_handle(), ndb_mgm_get_latest_error(), ndb_mgm_get_latest_error_desc(), ndb_mgm_set_connection_int_parameter(), NDB_SOCKET_TYPE, ndbout_c(), and NULL.
01595 { 01596 struct ndb_mgm_reply mgm_reply; 01597 01598 if ( h==NULL || *h == NULL ) 01599 { 01600 ndbout_c("%s: %d", __FILE__, __LINE__); 01601 return NDB_INVALID_SOCKET; 01602 } 01603 01604 for(unsigned int i=0;i < m_transporter_interface.size();i++) 01605 if (m_transporter_interface[i].m_s_service_port < 0 01606 && ndb_mgm_set_connection_int_parameter(*h, 01607 get_localNodeId(), 01608 m_transporter_interface[i].m_remote_nodeId, 01609 CFG_CONNECTION_SERVER_PORT, 01610 m_transporter_interface[i].m_s_service_port, 01611 &mgm_reply) < 0) 01612 { 01613 ndbout_c("Error: %s: %d", 01614 ndb_mgm_get_latest_error_desc(*h), 01615 ndb_mgm_get_latest_error(*h)); 01616 ndbout_c("%s: %d", __FILE__, __LINE__); 01617 ndb_mgm_destroy_handle(h); 01618 return NDB_INVALID_SOCKET; 01619 } 01620 01625 NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h); 01626 if ( sockfd == NDB_INVALID_SOCKET) 01627 { 01628 ndbout_c("Error: %s: %d", 01629 ndb_mgm_get_latest_error_desc(*h), 01630 ndb_mgm_get_latest_error(*h)); 01631 ndbout_c("%s: %d", __FILE__, __LINE__); 01632 ndb_mgm_destroy_handle(h); 01633 } 01634 return sockfd; 01635 }
Here is the call graph for this function:

| NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd | ( | SocketClient * | sc | ) |
Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter and returns the socket.
Set connectstring
Definition at line 1641 of file TransporterRegistry.cpp.
References BaseString::assfmt(), BaseString::c_str(), SocketClient::get_port(), SocketClient::get_server_name(), h, NDB_INVALID_SOCKET, ndb_mgm_connect(), ndb_mgm_create_handle(), ndb_mgm_destroy_handle(), ndb_mgm_set_connectstring(), and NULL.
Referenced by connect_client(), and Transporter::connect_client().
01642 { 01643 NdbMgmHandle h= ndb_mgm_create_handle(); 01644 01645 if ( h == NULL ) 01646 { 01647 return NDB_INVALID_SOCKET; 01648 } 01649 01653 { 01654 BaseString cs; 01655 cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port()); 01656 ndb_mgm_set_connectstring(h, cs.c_str()); 01657 } 01658 01659 if(ndb_mgm_connect(h, 0, 0, 0)<0) 01660 { 01661 ndb_mgm_destroy_handle(&h); 01662 return NDB_INVALID_SOCKET; 01663 } 01664 01665 return connect_ndb_mgmd(&h); 01666 }
Here is the call graph for this function:

Here is the caller graph for this function:

| bool TransporterRegistry::connect_server | ( | NDB_SOCKET_TYPE | sockfd | ) |
after a connect from client, perform connection using correct transporter
Definition at line 204 of file TransporterRegistry.cpp.
References buf, Transporter::connect_server(), CONNECTING, DBUG_ENTER, DBUG_PRINT, DBUG_RETURN, Logger::error(), g_eventLogger, Transporter::getLocalNodeId(), SocketInputStream::gets(), Transporter::m_type, maxTransporters, performStates, SocketOutputStream::println(), theTransporters, tt_SHM_TRANSPORTER, and Logger::warning().
Referenced by TransporterService::newSession(), and MgmtSrvr::transporter_connect().
00205 { 00206 DBUG_ENTER("TransporterRegistry::connect_server"); 00207 00208 // read node id from client 00209 // read transporter type 00210 int nodeId, remote_transporter_type= -1; 00211 SocketInputStream s_input(sockfd); 00212 char buf[256]; 00213 if (s_input.gets(buf, 256) == 0) { 00214 DBUG_PRINT("error", ("Could not get node id from client")); 00215 DBUG_RETURN(false); 00216 } 00217 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type); 00218 switch (r) { 00219 case 2: 00220 break; 00221 case 1: 00222 // we're running version prior to 4.1.9 00223 // ok, but with no checks on transporter configuration compatability 00224 break; 00225 default: 00226 DBUG_PRINT("error", ("Error in node id from client")); 00227 DBUG_RETURN(false); 00228 } 00229 00230 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d", 00231 nodeId,remote_transporter_type)); 00232 00233 //check that nodeid is valid and that there is an allocated transporter 00234 if ( nodeId < 0 || nodeId >= (int)maxTransporters) { 00235 DBUG_PRINT("error", ("Node id out of range from client")); 00236 DBUG_RETURN(false); 00237 } 00238 if (theTransporters[nodeId] == 0) { 00239 DBUG_PRINT("error", ("No transporter for this node id from client")); 00240 DBUG_RETURN(false); 00241 } 00242 00243 //check that the transporter should be connected 00244 if (performStates[nodeId] != TransporterRegistry::CONNECTING) { 00245 DBUG_PRINT("error", ("Transporter in wrong state for this node id from client")); 00246 DBUG_RETURN(false); 00247 } 00248 00249 Transporter *t= theTransporters[nodeId]; 00250 00251 // send info about own id (just as response to acknowledge connection) 00252 // send info on own transporter type 00253 SocketOutputStream s_output(sockfd); 00254 s_output.println("%d %d", t->getLocalNodeId(), t->m_type); 00255 00256 if (remote_transporter_type != -1) 00257 { 00258 if (remote_transporter_type != t->m_type) 00259 { 00260 DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d", 00261 t->m_type, remote_transporter_type)); 00262 g_eventLogger.error("Incompatible configuration: Transporter type " 00263 "mismatch with node %d", nodeId); 00264 00265 // wait for socket close for 1 second to let message arrive at client 00266 { 00267 fd_set a_set; 00268 FD_ZERO(&a_set); 00269 FD_SET(sockfd, &a_set); 00270 struct timeval timeout; 00271 timeout.tv_sec = 1; timeout.tv_usec = 0; 00272 select(sockfd+1, &a_set, 0, 0, &timeout); 00273 } 00274 DBUG_RETURN(false); 00275 } 00276 } 00277 else if (t->m_type == tt_SHM_TRANSPORTER) 00278 { 00279 g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId); 00280 } 00281 00282 // setup transporter (transporter responsible for closing sockfd) 00283 t->connect_server(sockfd); 00284 00285 DBUG_RETURN(true); 00286 }
Here is the call graph for this function:

Here is the caller graph for this function:

| bool TransporterRegistry::createOSETransporter | ( | struct TransporterConfiguration * | config | ) |
Definition at line 340 of file TransporterRegistry.cpp.
References TransporterConfiguration::checksum, DISCONNECTED, Transporter::getRemoteNodeId(), init(), OSE_Transporter::initTransporter(), TransporterConfiguration::localHostName, localNodeId, TransporterConfiguration::localNodeId, nodeIdSpecified, nOSETransporters, nTransporters, NULL, TransporterConfiguration::ose, OSE_Receiver, performStates, TransporterConfiguration::remoteHostName, TransporterConfiguration::remoteNodeId, TransporterConfiguration::serverNodeId, TransporterConfiguration::signalId, theOSEReceiver, theOSETransporters, theTransporters, theTransporterTypes, and tt_OSE_TRANSPORTER.
00340 { 00341 #ifdef NDB_OSE_TRANSPORTER 00342 00343 if(!nodeIdSpecified){ 00344 init(conf->localNodeId); 00345 } 00346 00347 if(conf->localNodeId != localNodeId) 00348 return false; 00349 00350 if(theTransporters[conf->remoteNodeId] != NULL) 00351 return false; 00352 00353 if(theOSEReceiver == NULL){ 00354 theOSEReceiver = new OSE_Receiver(this, 00355 10, 00356 localNodeId); 00357 } 00358 00359 OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize, 00360 conf->ose.prioBSignalSize, 00361 localNodeId, 00362 conf->localHostName, 00363 conf->remoteNodeId, 00364 conf->serverNodeId, 00365 conf->remoteHostName, 00366 conf->checksum, 00367 conf->signalId); 00368 if (t == NULL) 00369 return false; 00370 else if (!t->initTransporter()) { 00371 delete t; 00372 return false; 00373 } 00374 // Put the transporter in the transporter arrays 00375 theOSETransporters[nOSETransporters] = t; 00376 theTransporters[t->getRemoteNodeId()] = t; 00377 theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER; 00378 performStates[t->getRemoteNodeId()] = DISCONNECTED; 00379 00380 nTransporters++; 00381 nOSETransporters++; 00382 00383 return true; 00384 #else 00385 return false; 00386 #endif 00387 }
Here is the call graph for this function:

| bool TransporterRegistry::createSCITransporter | ( | struct TransporterConfiguration * | config | ) |
Definition at line 390 of file TransporterRegistry.cpp.
References abort(), DISCONNECTED, Transporter::getRemoteNodeId(), init(), SCI_Transporter::initSCI(), SCI_Transporter::initTransporter(), localNodeId, nodeIdSpecified, nSCITransporters, nTransporters, NULL, performStates, theSCITransporters, theTransporters, theTransporterTypes, and tt_SCI_TRANSPORTER.
00390 { 00391 #ifdef NDB_SCI_TRANSPORTER 00392 00393 if(!SCI_Transporter::initSCI()) 00394 abort(); 00395 00396 if(!nodeIdSpecified){ 00397 init(config->localNodeId); 00398 } 00399 00400 if(config->localNodeId != localNodeId) 00401 return false; 00402 00403 if(theTransporters[config->remoteNodeId] != NULL) 00404 return false; 00405 00406 SCI_Transporter * t = new SCI_Transporter(*this, 00407 config->localHostName, 00408 config->remoteHostName, 00409 config->s_port, 00410 config->isMgmConnection, 00411 config->sci.sendLimit, 00412 config->sci.bufferSize, 00413 config->sci.nLocalAdapters, 00414 config->sci.remoteSciNodeId0, 00415 config->sci.remoteSciNodeId1, 00416 localNodeId, 00417 config->remoteNodeId, 00418 config->serverNodeId, 00419 config->checksum, 00420 config->signalId); 00421 00422 if (t == NULL) 00423 return false; 00424 else if (!t->initTransporter()) { 00425 delete t; 00426 return false; 00427 } 00428 // Put the transporter in the transporter arrays 00429 theSCITransporters[nSCITransporters] = t; 00430 theTransporters[t->getRemoteNodeId()] = t; 00431 theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER; 00432 performStates[t->getRemoteNodeId()] = DISCONNECTED; 00433 nTransporters++; 00434 nSCITransporters++; 00435 00436 return true; 00437 #else 00438 return false; 00439 #endif 00440 }
Here is the call graph for this function:

| bool TransporterRegistry::createSHMTransporter | ( | struct TransporterConfiguration * | config | ) |
Definition at line 443 of file TransporterRegistry.cpp.
References DBUG_ENTER, DBUG_PRINT, DBUG_RETURN, DISCONNECTED, g_ndb_shm_signum, Transporter::getRemoteNodeId(), init(), SHM_Transporter::initTransporter(), localNodeId, nodeIdSpecified, nSHMTransporters, nTransporters, NULL, performStates, SHM_Transporter, theSHMTransporters, theTransporters, theTransporterTypes, TRUE, and tt_SHM_TRANSPORTER.
00443 { 00444 DBUG_ENTER("TransporterRegistry::createTransporter SHM"); 00445 #ifdef NDB_SHM_TRANSPORTER 00446 if(!nodeIdSpecified){ 00447 init(config->localNodeId); 00448 } 00449 00450 if(config->localNodeId != localNodeId) 00451 return false; 00452 00453 if (!g_ndb_shm_signum) { 00454 g_ndb_shm_signum= config->shm.signum; 00455 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); 00460 NdbThread_set_shm_sigmask(TRUE); 00461 } 00462 00463 if(config->shm.signum != g_ndb_shm_signum) 00464 return false; 00465 00466 if(theTransporters[config->remoteNodeId] != NULL) 00467 return false; 00468 00469 SHM_Transporter * t = new SHM_Transporter(*this, 00470 config->localHostName, 00471 config->remoteHostName, 00472 config->s_port, 00473 config->isMgmConnection, 00474 localNodeId, 00475 config->remoteNodeId, 00476 config->serverNodeId, 00477 config->checksum, 00478 config->signalId, 00479 config->shm.shmKey, 00480 config->shm.shmSize 00481 ); 00482 if (t == NULL) 00483 return false; 00484 else if (!t->initTransporter()) { 00485 delete t; 00486 return false; 00487 } 00488 // Put the transporter in the transporter arrays 00489 theSHMTransporters[nSHMTransporters] = t; 00490 theTransporters[t->getRemoteNodeId()] = t; 00491 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER; 00492 performStates[t->getRemoteNodeId()] = DISCONNECTED; 00493 00494 nTransporters++; 00495 nSHMTransporters++; 00496 00497 DBUG_RETURN(true); 00498 #else 00499 DBUG_RETURN(false); 00500 #endif 00501 }
Here is the call graph for this function:

| bool TransporterRegistry::createTCPTransporter | ( | struct TransporterConfiguration * | config | ) |
createTransporter
If the config object indicates that the transporter to be created will act as a server and no server is started, startServer is called. A transporter of the selected kind is created and it is put in the transporter arrays.
Definition at line 289 of file TransporterRegistry.cpp.
References DISCONNECTED, Transporter::getRemoteNodeId(), init(), TCP_Transporter::initTransporter(), localNodeId, nodeIdSpecified, nTCPTransporters, nTransporters, NULL, performStates, theTCPTransporters, theTransporters, theTransporterTypes, and tt_TCP_TRANSPORTER.
00289 { 00290 #ifdef NDB_TCP_TRANSPORTER 00291 00292 if(!nodeIdSpecified){ 00293 init(config->localNodeId); 00294 } 00295 00296 if(config->localNodeId != localNodeId) 00297 return false; 00298 00299 if(theTransporters[config->remoteNodeId] != NULL) 00300 return false; 00301 00302 TCP_Transporter * t = new TCP_Transporter(*this, 00303 config->tcp.sendBufferSize, 00304 config->tcp.maxReceiveSize, 00305 config->localHostName, 00306 config->remoteHostName, 00307 config->s_port, 00308 config->isMgmConnection, 00309 localNodeId, 00310 config->remoteNodeId, 00311 config->serverNodeId, 00312 config->checksum, 00313 config->signalId); 00314 if (t == NULL) 00315 return false; 00316 else if (!t->initTransporter()) { 00317 delete t; 00318 return false; 00319 } 00320 00321 // Put the transporter in the transporter arrays 00322 theTCPTransporters[nTCPTransporters] = t; 00323 theTransporters[t->getRemoteNodeId()] = t; 00324 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER; 00325 performStates[t->getRemoteNodeId()] = DISCONNECTED; 00326 nTransporters++; 00327 nTCPTransporters++; 00328 00329 #if defined NDB_OSE || defined NDB_SOFTOSE 00330 t->theReceiverPid = theReceiverPid; 00331 #endif 00332 00333 return true; 00334 #else 00335 return false; 00336 #endif 00337 }
Here is the call graph for this function:

| void TransporterRegistry::disconnectAll | ( | ) |
Disconnect all transporters
Definition at line 185 of file TransporterRegistry.cpp.
References maxTransporters, NULL, and theTransporters.
Referenced by stopReceiving().
00185 { 00186 for(unsigned i = 0; i<maxTransporters; i++){ 00187 if(theTransporters[i] != NULL) 00188 theTransporters[i]->doDisconnect(); 00189 } 00190 }
Here is the caller graph for this function:

| void TransporterRegistry::do_connect | ( | NodeId | node_id | ) |
Get and set methods for PerformState
Definition at line 1175 of file TransporterRegistry.cpp.
References CONNECTED, CONNECTING, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, DISCONNECTED, DISCONNECTING, and performStates.
Referenced by TransporterFacade::doConnect(), Cmvmi::execDISCONNECT_REP(), Cmvmi::execOPEN_COMREQ(), and Cmvmi::execSTART_ORD().
01176 { 01177 PerformState &curr_state = performStates[node_id]; 01178 switch(curr_state){ 01179 case DISCONNECTED: 01180 break; 01181 case CONNECTED: 01182 return; 01183 case CONNECTING: 01184 return; 01185 case DISCONNECTING: 01186 break; 01187 } 01188 DBUG_ENTER("TransporterRegistry::do_connect"); 01189 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id)); 01190 curr_state= CONNECTING; 01191 DBUG_VOID_RETURN; 01192 }
Here is the caller graph for this function:

| void TransporterRegistry::do_disconnect | ( | NodeId | node_id | ) |
Definition at line 1194 of file TransporterRegistry.cpp.
References CONNECTED, CONNECTING, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, DISCONNECTED, DISCONNECTING, and performStates.
Referenced by TransporterFacade::doDisconnect(), Cmvmi::execCLOSE_COMREQ(), Cmvmi::execCONNECT_REP(), and Cmvmi::execSTART_ORD().
01195 { 01196 PerformState &curr_state = performStates[node_id]; 01197 switch(curr_state){ 01198 case DISCONNECTED: 01199 return; 01200 case CONNECTED: 01201 break; 01202 case CONNECTING: 01203 break; 01204 case DISCONNECTING: 01205 return; 01206 } 01207 DBUG_ENTER("TransporterRegistry::do_disconnect"); 01208 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id)); 01209 curr_state= DISCONNECTING; 01210 DBUG_VOID_RETURN; 01211 }
Here is the caller graph for this function:

| void TransporterRegistry::external_IO | ( | Uint32 | timeOutMillis | ) |
external_IO
Equal to: poll(...); perform_IO()
Definition at line 728 of file TransporterRegistry.cpp.
References performReceive(), performSend(), and pollReceive().
Referenced by client(), main(), and server().
00728 { 00729 //----------------------------------------------------------- 00730 // Most of the time we will send the buffers here and then wait 00731 // for new signals. Thus we start by sending without timeout 00732 // followed by the receive part where we expect to sleep for 00733 // a while. 00734 //----------------------------------------------------------- 00735 if(pollReceive(timeOutMillis)){ 00736 performReceive(); 00737 } 00738 performSend(); 00739 }
Here is the call graph for this function:

Here is the caller graph for this function:

| int TransporterRegistry::forceSendCheck | ( | int | sendLimit | ) |
Force sending if more than or equal to sendLimit number have asked for send. Returns 0 if not sending and 1 if sending.
Definition at line 1128 of file TransporterRegistry.cpp.
References performSend(), and sendCounter.
Referenced by TransporterFacade::checkForceSend(), and TransporterFacade::forceSend().
01128 { 01129 int tSendCounter = sendCounter; 01130 sendCounter = tSendCounter + 1; 01131 if (tSendCounter >= sendLimit) { 01132 performSend(); 01133 sendCounter = 1; 01134 return 1; 01135 }//if 01136 return 0; 01137 }//TransporterRegistry::forceSendCheck()
Here is the call graph for this function:

Here is the caller graph for this function:

| struct in_addr TransporterRegistry::get_connect_address | ( | NodeId | node_id | ) | const |
Definition at line 59 of file TransporterRegistry.cpp.
Referenced by Qmgr::execAPI_VERSION_REQ(), and MgmtSrvr::get_connect_address().
00060 { 00061 return theTransporters[node_id]->m_connect_address; 00062 }
Here is the caller graph for this function:

Get free buffer space
Get free bytes in send buffer for node</node>
Definition at line 573 of file TransporterRegistry.cpp.
References Transporter::get_free_buffer(), likely, and theTransporters.
00574 { 00575 Transporter *t; 00576 if(likely((t = theTransporters[node]) != 0)) 00577 { 00578 return t->get_free_buffer(); 00579 } 00580 return 0; 00581 }
Here is the call graph for this function:

| NodeId TransporterRegistry::get_localNodeId | ( | ) | [inline] |
Definition at line 269 of file TransporterRegistry.hpp.
References localNodeId.
Referenced by connect_ndb_mgmd().
00269 { return localNodeId; };
Here is the caller graph for this function:

| NdbMgmHandle TransporterRegistry::get_mgm_handle | ( | void | ) | [inline] |
Definition at line 110 of file TransporterRegistry.hpp.
References m_mgm_handle.
00110 { return m_mgm_handle; };
| Transporter * TransporterRegistry::get_transporter | ( | NodeId | nodeId | ) |
Definition at line 1566 of file TransporterRegistry.cpp.
References theTransporters.
01566 { 01567 return theTransporters[nodeId]; 01568 }
| const char* TransporterRegistry::getPerformStateString | ( | NodeId | nodeId | ) | const [inline] |
Definition at line 176 of file TransporterRegistry.hpp.
References performStates, and performStateString.
Referenced by Cmvmi::execDUMP_STATE_ORD().
00177 { return performStateString[(unsigned)performStates[nodeId]]; };
Here is the caller graph for this function:

Definition at line 193 of file TransporterRegistry.cpp.
References DBUG_ENTER, DBUG_RETURN, DEBUG, localNodeId, and nodeIdSpecified.
Referenced by createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), main(), and prioTransporterTest().
00193 { 00194 DBUG_ENTER("TransporterRegistry::init"); 00195 nodeIdSpecified = true; 00196 localNodeId = nodeId; 00197 00198 DEBUG("TransporterRegistry started node: " << localNodeId); 00199 00200 DBUG_RETURN(true); 00201 }
Here is the caller graph for this function:

Get and set methods for IOState
Definition at line 1155 of file TransporterRegistry.cpp.
References ioStates.
01155 { 01156 return ioStates[nodeId]; 01157 }
Definition at line 184 of file TransporterRegistry.hpp.
References CONNECTED, and performStates.
Referenced by Cmvmi::execSTART_ORD(), TransporterFacade::isConnected(), performReceive(), and performSend().
00184 { return performStates[node_id] == CONNECTED; };
Here is the caller graph for this function:

| void TransporterRegistry::performReceive | ( | ) |
Definition at line 908 of file TransporterRegistry.cpp.
References assert, callbackObj, SHM_Transporter::checkConnected(), SCI_Transporter::checkConnected(), checkJobBuffer(), TCP_Transporter::doReceive(), OSE_Receiver::doReceive(), TCP_Transporter::getReceiveData(), OSE_Receiver::getReceiveData(), SHM_Transporter::getReceivePtr(), SCI_Transporter::getReceivePtr(), Transporter::getRemoteNodeId(), TCP_Transporter::getSocket(), OSE_Receiver::hasData(), ioStates, is_connected(), Transporter::isConnected(), NDB_SOCKET_TYPE, nTCPTransporters, tcpReadSelectReply, tcpReadset, theOSEReceiver, theSCITransporters, theSHMTransporters, theTCPTransporters, transporter_recv_from(), unpack(), TCP_Transporter::updateReceiveDataPtr(), OSE_Receiver::updateReceiveDataPtr(), SHM_Transporter::updateReceivePtr(), and SCI_Transporter::updateReceivePtr().
Referenced by external_IO(), TransporterFacade::external_poll(), ThreadConfig::ipControlLoop(), and TransporterFacade::threadMainReceive().
00909 { 00910 #ifdef NDB_OSE_TRANSPORTER 00911 if(theOSEReceiver != 0) 00912 { 00913 while(theOSEReceiver->hasData()) 00914 { 00915 NodeId remoteNodeId; 00916 Uint32 * readPtr; 00917 Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr); 00918 transporter_recv_from(callbackObj, remoteNodeId); 00919 Uint32 szUsed = unpack(readPtr, 00920 sz, 00921 remoteNodeId, 00922 ioStates[remoteNodeId]); 00923 #ifdef DEBUG_TRANSPORTER 00924 00928 assert(sz == szUsed); 00929 #endif 00930 theOSEReceiver->updateReceiveDataPtr(szUsed); 00931 theOSEReceiver->doReceive(0); 00932 // checkJobBuffer(); 00933 } 00934 } 00935 #endif 00936 00937 #ifdef NDB_TCP_TRANSPORTER 00938 if(tcpReadSelectReply > 0) 00939 { 00940 for (int i=0; i<nTCPTransporters; i++) 00941 { 00942 checkJobBuffer(); 00943 TCP_Transporter *t = theTCPTransporters[i]; 00944 const NodeId nodeId = t->getRemoteNodeId(); 00945 const NDB_SOCKET_TYPE socket = t->getSocket(); 00946 if(is_connected(nodeId)){ 00947 if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) 00948 { 00949 const int receiveSize = t->doReceive(); 00950 if(receiveSize > 0) 00951 { 00952 Uint32 * ptr; 00953 Uint32 sz = t->getReceiveData(&ptr); 00954 transporter_recv_from(callbackObj, nodeId); 00955 Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); 00956 t->updateReceiveDataPtr(szUsed); 00957 } 00958 } 00959 } 00960 } 00961 } 00962 #endif 00963 00964 #ifdef NDB_SCI_TRANSPORTER 00965 //performReceive 00966 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) 00967 for (int i=0; i<nSCITransporters; i++) 00968 { 00969 checkJobBuffer(); 00970 SCI_Transporter *t = theSCITransporters[i]; 00971 const NodeId nodeId = t->getRemoteNodeId(); 00972 if(is_connected(nodeId)) 00973 { 00974 if(t->isConnected() && t->checkConnected()) 00975 { 00976 Uint32 * readPtr, * eodPtr; 00977 t->getReceivePtr(&readPtr, &eodPtr); 00978 transporter_recv_from(callbackObj, nodeId); 00979 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); 00980 t->updateReceivePtr(newPtr); 00981 } 00982 } 00983 } 00984 #endif 00985 #ifdef NDB_SHM_TRANSPORTER 00986 for (int i=0; i<nSHMTransporters; i++) 00987 { 00988 checkJobBuffer(); 00989 SHM_Transporter *t = theSHMTransporters[i]; 00990 const NodeId nodeId = t->getRemoteNodeId(); 00991 if(is_connected(nodeId)){ 00992 if(t->isConnected() && t->checkConnected()) 00993 { 00994 Uint32 * readPtr, * eodPtr; 00995 t->getReceivePtr(&readPtr, &eodPtr); 00996 transporter_recv_from(callbackObj, nodeId); 00997 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); 00998 t->updateReceivePtr(newPtr); 00999 } 01000 } 01001 } 01002 #endif 01003 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::performSend | ( | ) |
Definition at line 1007 of file TransporterRegistry.cpp.
References TCP_Transporter::doSend(), OSE_Transporter::doSend(), TCP_Transporter::getSocket(), is_connected(), nOSETransporters, nTCPTransporters, sendCounter, theOSETransporters, theSCITransporters, theSHMTransporters, and theTCPTransporters.
Referenced by external_IO(), forceSendCheck(), ThreadConfig::ipControlLoop(), and TransporterFacade::threadMainSend().
01008 { 01009 int i; 01010 sendCounter = 1; 01011 01012 #ifdef NDB_OSE_TRANSPORTER 01013 for (int i = 0; i < nOSETransporters; i++) 01014 { 01015 OSE_Transporter *t = theOSETransporters[i]; 01016 if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected())) 01017 { 01018 t->doSend(); 01019 }//if 01020 }//for 01021 #endif 01022 01023 #ifdef NDB_TCP_TRANSPORTER 01024 #ifdef NDB_OSE 01025 { 01026 int maxSocketValue = 0; 01027 01028 // Needed for TCP/IP connections 01029 // The writeset are used by select 01030 fd_set writeset; 01031 FD_ZERO(&writeset); 01032 01033 // Prepare for sending and receiving 01034 for (i = 0; i < nTCPTransporters; i++) { 01035 TCP_Transporter * t = theTCPTransporters[i]; 01036 01037 // If the transporter is connected 01038 if ((t->hasDataToSend()) && (t->isConnected())) { 01039 const int socket = t->getSocket(); 01040 // Find the highest socket value. It will be used by select 01041 if (socket > maxSocketValue) { 01042 maxSocketValue = socket; 01043 }//if 01044 FD_SET(socket, &writeset); 01045 }//if 01046 }//for 01047 01048 // The highest socket value plus one 01049 if(maxSocketValue == 0) 01050 return; 01051 01052 maxSocketValue++; 01053 struct timeval timeout = { 0, 1025 }; 01054 Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout); 01055 01056 if (tmp == 0) 01057 { 01058 return; 01059 }//if 01060 for (i = 0; i < nTCPTransporters; i++) { 01061 TCP_Transporter *t = theTCPTransporters[i]; 01062 const NodeId nodeId = t->getRemoteNodeId(); 01063 const int socket = t->getSocket(); 01064 if(is_connected(nodeId)){ 01065 if(t->isConnected() && FD_ISSET(socket, &writeset)) { 01066 t->doSend(); 01067 }//if 01068 }//if 01069 }//for 01070 } 01071 #endif 01072 #ifdef NDB_TCP_TRANSPORTER 01073 for (i = x; i < nTCPTransporters; i++) 01074 { 01075 TCP_Transporter *t = theTCPTransporters[i]; 01076 if (t && t->hasDataToSend() && t->isConnected() && 01077 is_connected(t->getRemoteNodeId())) 01078 { 01079 t->doSend(); 01080 } 01081 } 01082 for (i = 0; i < x && i < nTCPTransporters; i++) 01083 { 01084 TCP_Transporter *t = theTCPTransporters[i]; 01085 if (t && t->hasDataToSend() && t->isConnected() && 01086 is_connected(t->getRemoteNodeId())) 01087 { 01088 t->doSend(); 01089 } 01090 } 01091 x++; 01092 if (x == nTCPTransporters) x = 0; 01093 #endif 01094 #endif 01095 #ifdef NDB_SCI_TRANSPORTER 01096 //scroll through the SCI transporters, 01097 // get each transporter, check if connected, send data 01098 for (i=0; i<nSCITransporters; i++) { 01099 SCI_Transporter *t = theSCITransporters[i]; 01100 const NodeId nodeId = t->getRemoteNodeId(); 01101 01102 if(is_connected(nodeId)) 01103 { 01104 if(t->isConnected() && t->hasDataToSend()) { 01105 t->doSend(); 01106 } //if 01107 } //if 01108 } 01109 #endif 01110 01111 #ifdef NDB_SHM_TRANSPORTER 01112 for (i=0; i<nSHMTransporters; i++) 01113 { 01114 SHM_Transporter *t = theSHMTransporters[i]; 01115 const NodeId nodeId = t->getRemoteNodeId(); 01116 if(is_connected(nodeId)) 01117 { 01118 if(t->isConnected()) 01119 { 01120 t->doSend(); 01121 } 01122 } 01123 } 01124 #endif 01125 }
Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 840 of file TransporterRegistry.cpp.
References errno, NDB_SOCKET_TYPE, ndbout_c(), NdbSleep_MilliSleep(), nTCPTransporters, SOCKET_ERROR, tcpReadSelectReply, tcpReadset, and theTCPTransporters.
Referenced by pollReceive().
00841 { 00842 if (false && nTCPTransporters == 0) 00843 { 00844 tcpReadSelectReply = 0; 00845 return 0; 00846 } 00847 00848 struct timeval timeout; 00849 #ifdef NDB_OSE 00850 // Return directly if there are no TCP transporters configured 00851 00852 if(timeOutMillis <= 1){ 00853 timeout.tv_sec = 0; 00854 timeout.tv_usec = 1025; 00855 } else { 00856 timeout.tv_sec = timeOutMillis / 1000; 00857 timeout.tv_usec = (timeOutMillis % 1000) * 1000; 00858 } 00859 #else 00860 timeout.tv_sec = timeOutMillis / 1000; 00861 timeout.tv_usec = (timeOutMillis % 1000) * 1000; 00862 #endif 00863 00864 NDB_SOCKET_TYPE maxSocketValue = -1; 00865 00866 // Needed for TCP/IP connections 00867 // The read- and writeset are used by select 00868 00869 FD_ZERO(&tcpReadset); 00870 00871 // Prepare for sending and receiving 00872 for (int i = 0; i < nTCPTransporters; i++) { 00873 TCP_Transporter * t = theTCPTransporters[i]; 00874 00875 // If the transporter is connected 00876 if (t->isConnected()) { 00877 00878 const NDB_SOCKET_TYPE socket = t->getSocket(); 00879 // Find the highest socket value. It will be used by select 00880 if (socket > maxSocketValue) 00881 maxSocketValue = socket; 00882 00883 // Put the connected transporters in the socket read-set 00884 FD_SET(socket, &tcpReadset); 00885 } 00886 } 00887 00888 // The highest socket value plus one 00889 maxSocketValue++; 00890 00891 tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); 00892 if(false && tcpReadSelectReply == -1 && errno == EINTR) 00893 ndbout_c("woke-up by signal"); 00894 00895 #ifdef NDB_WIN32 00896 if(tcpReadSelectReply == SOCKET_ERROR) 00897 { 00898 NdbSleep_MilliSleep(timeOutMillis); 00899 } 00900 #endif 00901 00902 return tcpReadSelectReply; 00903 }
Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 742 of file TransporterRegistry.cpp.
References nSCITransporters, nSHMTransporters, nTCPTransporters, poll_OSE(), poll_SCI(), poll_SHM(), poll_TCP(), and tcpReadSelectReply.
Referenced by external_IO(), TransporterFacade::external_poll(), ThreadConfig::ipControlLoop(), and TransporterFacade::threadMainReceive().
00742 { 00743 Uint32 retVal = 0; 00744 #ifdef NDB_OSE_TRANSPORTER 00745 retVal |= poll_OSE(timeOutMillis); 00746 retVal |= poll_TCP(0); 00747 return retVal; 00748 #endif 00749 00750 if((nSCITransporters) > 0) 00751 { 00752 timeOutMillis=0; 00753 } 00754 00755 #ifdef NDB_SHM_TRANSPORTER 00756 if(nSHMTransporters > 0) 00757 { 00758 Uint32 res = poll_SHM(0); 00759 if(res) 00760 { 00761 retVal |= res; 00762 timeOutMillis = 0; 00763 } 00764 } 00765 #endif 00766 00767 #ifdef NDB_TCP_TRANSPORTER 00768 if(nTCPTransporters > 0 || retVal == 0) 00769 { 00770 retVal |= poll_TCP(timeOutMillis); 00771 } 00772 else 00773 tcpReadSelectReply = 0; 00774 #endif 00775 #ifdef NDB_SCI_TRANSPORTER 00776 if(nSCITransporters > 0) 00777 retVal |= poll_SCI(timeOutMillis); 00778 #endif 00779 #ifdef NDB_SHM_TRANSPORTER 00780 if(nSHMTransporters > 0 && retVal == 0) 00781 { 00782 int res = poll_SHM(0); 00783 retVal |= res; 00784 } 00785 #endif 00786 return retVal; 00787 }
Here is the call graph for this function:

Here is the caller graph for this function:

| SendStatus TransporterRegistry::prepareSend | ( | const SignalHeader *const | signalHeader, | |
| Uint8 | prio, | |||
| const Uint32 *const | signalData, | |||
| NodeId | nodeId, | |||
| class SectionSegmentPool & | pool, | |||
| const SegmentedSectionPtr | ptr[3] | |||
| ) |
Definition at line 656 of file TransporterRegistry.cpp.
References callbackObj, DEBUG, Packer::getMessageLength(), Transporter::getWritePtr(), HaltIO, HaltOutput, ioStates, Transporter::isConnected(), Transporter::m_packer, MAX_MESSAGE_SIZE, NdbSleep_MilliSleep(), nSCITransporters, nSHMTransporters, NULL, Packer::pack(), reportError(), SEND_BLOCKED, SEND_BUFFER_FULL, SEND_DISCONNECTED, SEND_MESSAGE_TOO_BIG, SEND_OK, SEND_UNKNOWN_NODE, TE_SEND_BUFFER_FULL, TE_SIGNAL_LOST_SEND_BUFFER_FULL, SignalHeader::theReceiversBlockNumber, theTransporters, Transporter::updateWritePtr(), and WARNING.
00661 { 00662 00663 00664 Transporter *t = theTransporters[nodeId]; 00665 if(t != NULL && 00666 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 00667 ((signalHeader->theReceiversBlockNumber == 252)|| 00668 (signalHeader->theReceiversBlockNumber == 4002)))) { 00669 00670 if(t->isConnected()){ 00671 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr); 00672 if(lenBytes <= MAX_MESSAGE_SIZE){ 00673 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio); 00674 if(insertPtr != 0){ 00675 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr); 00676 t->updateWritePtr(lenBytes, prio); 00677 return SEND_OK; 00678 } 00679 00680 00685 int sleepTime = 2; 00686 for(int i = 0; i<50; i++){ 00687 if((nSHMTransporters+nSCITransporters) == 0) 00688 NdbSleep_MilliSleep(sleepTime); 00689 insertPtr = t->getWritePtr(lenBytes, prio); 00690 if(insertPtr != 0){ 00691 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr); 00692 t->updateWritePtr(lenBytes, prio); 00693 break; 00694 } 00695 } 00696 00697 if(insertPtr != 0){ 00701 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL); 00702 return SEND_OK; 00703 } 00704 00705 WARNING("Signal to " << nodeId << " lost(buffer)"); 00706 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL); 00707 return SEND_BUFFER_FULL; 00708 } else { 00709 return SEND_MESSAGE_TOO_BIG; 00710 } 00711 } else { 00712 DEBUG("Signal to " << nodeId << " lost(disconnect) "); 00713 return SEND_DISCONNECTED; 00714 } 00715 } else { 00716 DEBUG("Discarding message to block: " 00717 << signalHeader->theReceiversBlockNumber 00718 << " node: " << nodeId); 00719 00720 if(t == NULL) 00721 return SEND_UNKNOWN_NODE; 00722 00723 return SEND_BLOCKED; 00724 } 00725 }
Here is the call graph for this function:

| SendStatus TransporterRegistry::prepareSend | ( | const SignalHeader *const | signalHeader, | |
| Uint8 | prio, | |||
| const Uint32 *const | signalData, | |||
| NodeId | nodeId, | |||
| const LinearSectionPtr | ptr[3] | |||
| ) |
prepareSend
When IOState is HaltOutput or HaltIO do not send or insert any signals in the SendBuffer, unless it is intended for the remote CMVMI block (blockno 252) Perform prepareSend on the transporter.
NOTE signalHeader->xxxBlockRef should contain block numbers and not references
Definition at line 585 of file TransporterRegistry.cpp.
References callbackObj, DEBUG, Packer::getMessageLength(), Transporter::getWritePtr(), HaltIO, HaltOutput, ioStates, Transporter::isConnected(), Transporter::m_packer, MAX_MESSAGE_SIZE, NdbSleep_MilliSleep(), nSCITransporters, nSHMTransporters, NULL, Packer::pack(), reportError(), SEND_BLOCKED, SEND_BUFFER_FULL, SEND_DISCONNECTED, SEND_MESSAGE_TOO_BIG, SEND_OK, SEND_UNKNOWN_NODE, TE_SEND_BUFFER_FULL, TE_SIGNAL_LOST_SEND_BUFFER_FULL, SignalHeader::theReceiversBlockNumber, theTransporters, Transporter::updateWritePtr(), and WARNING.
Referenced by execute(), TransporterFacade::sendFragmentedSignal(), TransporterFacade::sendSignal(), SignalSender::sendSignal(), SimulatedBlock::sendSignal(), sendSignalTo(), and TransporterFacade::sendSignalUnCond().
00589 { 00590 00591 00592 Transporter *t = theTransporters[nodeId]; 00593 if(t != NULL && 00594 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 00595 ((signalHeader->theReceiversBlockNumber == 252) || 00596 (signalHeader->theReceiversBlockNumber == 4002)))) { 00597 00598 if(t->isConnected()){ 00599 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr); 00600 if(lenBytes <= MAX_MESSAGE_SIZE){ 00601 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio); 00602 if(insertPtr != 0){ 00603 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); 00604 t->updateWritePtr(lenBytes, prio); 00605 return SEND_OK; 00606 } 00607 00608 int sleepTime = 2; 00609 00614 for(int i = 0; i<50; i++){ 00615 if((nSHMTransporters+nSCITransporters) == 0) 00616 NdbSleep_MilliSleep(sleepTime); 00617 insertPtr = t->getWritePtr(lenBytes, prio); 00618 if(insertPtr != 0){ 00619 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); 00620 t->updateWritePtr(lenBytes, prio); 00621 break; 00622 } 00623 } 00624 00625 if(insertPtr != 0){ 00629 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL); 00630 return SEND_OK; 00631 } 00632 00633 WARNING("Signal to " << nodeId << " lost(buffer)"); 00634 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL); 00635 return SEND_BUFFER_FULL; 00636 } else { 00637 return SEND_MESSAGE_TOO_BIG; 00638 } 00639 } else { 00640 DEBUG("Signal to " << nodeId << " lost(disconnect) "); 00641 return SEND_DISCONNECTED; 00642 } 00643 } else { 00644 DEBUG("Discarding message to block: " 00645 << signalHeader->theReceiversBlockNumber 00646 << " node: " << nodeId); 00647 00648 if(t == NULL) 00649 return SEND_UNKNOWN_NODE; 00650 00651 return SEND_BLOCKED; 00652 } 00653 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::removeAll | ( | ) |
Remove all transporters
Definition at line 177 of file TransporterRegistry.cpp.
References maxTransporters, NULL, removeTransporter(), and theTransporters.
Referenced by NdbShutdown(), and ~TransporterRegistry().
00177 { 00178 for(unsigned i = 0; i<maxTransporters; i++){ 00179 if(theTransporters[i] != NULL) 00180 removeTransporter(theTransporters[i]->getRemoteNodeId()); 00181 } 00182 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::removeTransporter | ( | NodeId | nodeId | ) | [private] |
Disconnect the transporter and remove it from theTransporters array. Do not allow any holes in theTransporters. Delete the transporter and remove it from theIndexedTransporters array
Definition at line 505 of file TransporterRegistry.cpp.
References DEBUG, Transporter::doDisconnect(), localNodeId, nOSETransporters, nSCITransporters, nSHMTransporters, nTCPTransporters, nTransporters, NULL, theOSETransporters, theSCITransporters, theSHMTransporters, theTCPTransporters, theTransporters, theTransporterTypes, tt_OSE_TRANSPORTER, tt_SCI_TRANSPORTER, tt_SHM_TRANSPORTER, and tt_TCP_TRANSPORTER.
Referenced by removeAll().
00505 { 00506 00507 DEBUG("Removing transporter from " << localNodeId 00508 << " to " << nodeId); 00509 00510 if(theTransporters[nodeId] == NULL) 00511 return; 00512 00513 theTransporters[nodeId]->doDisconnect(); 00514 00515 const TransporterType type = theTransporterTypes[nodeId]; 00516 00517 int ind = 0; 00518 switch(type){ 00519 case tt_TCP_TRANSPORTER: 00520 #ifdef NDB_TCP_TRANSPORTER 00521 for(; ind < nTCPTransporters; ind++) 00522 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId) 00523 break; 00524 ind++; 00525 for(; ind<nTCPTransporters; ind++) 00526 theTCPTransporters[ind-1] = theTCPTransporters[ind]; 00527 nTCPTransporters --; 00528 #endif 00529 break; 00530 case tt_SCI_TRANSPORTER: 00531 #ifdef NDB_SCI_TRANSPORTER 00532 for(; ind < nSCITransporters; ind++) 00533 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId) 00534 break; 00535 ind++; 00536 for(; ind<nSCITransporters; ind++) 00537 theSCITransporters[ind-1] = theSCITransporters[ind]; 00538 nSCITransporters --; 00539 #endif 00540 break; 00541 case tt_SHM_TRANSPORTER: 00542 #ifdef NDB_SHM_TRANSPORTER 00543 for(; ind < nSHMTransporters; ind++) 00544 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId) 00545 break; 00546 ind++; 00547 for(; ind<nSHMTransporters; ind++) 00548 theSHMTransporters[ind-1] = theSHMTransporters[ind]; 00549 nSHMTransporters --; 00550 #endif 00551 break; 00552 case tt_OSE_TRANSPORTER: 00553 #ifdef NDB_OSE_TRANSPORTER 00554 for(; ind < nOSETransporters; ind++) 00555 if(theOSETransporters[ind]->getRemoteNodeId() == nodeId) 00556 break; 00557 ind++; 00558 for(; ind<nOSETransporters; ind++) 00559 theOSETransporters[ind-1] = theOSETransporters[ind]; 00560 nOSETransporters --; 00561 #endif 00562 break; 00563 } 00564 00565 nTransporters--; 00566 00567 // Delete the transporter and remove it from theTransporters array 00568 delete theTransporters[nodeId]; 00569 theTransporters[nodeId] = NULL; 00570 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::report_connect | ( | NodeId | node_id | ) |
Definition at line 1214 of file TransporterRegistry.cpp.
References callbackObj, CONNECTED, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, performStates, and reportConnect().
Referenced by update_connections().
01215 { 01216 DBUG_ENTER("TransporterRegistry::report_connect"); 01217 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id)); 01218 performStates[node_id] = CONNECTED; 01219 reportConnect(callbackObj, node_id); 01220 DBUG_VOID_RETURN; 01221 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::report_disconnect | ( | NodeId | node_id, | |
| int | errnum | |||
| ) |
Definition at line 1224 of file TransporterRegistry.cpp.
References callbackObj, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, DISCONNECTED, performStates, and reportDisconnect().
Referenced by Transporter::report_disconnect(), and update_connections().
01225 { 01226 DBUG_ENTER("TransporterRegistry::report_disconnect"); 01227 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id)); 01228 performStates[node_id] = DISCONNECTED; 01229 reportDisconnect(callbackObj, node_id, errnum); 01230 DBUG_VOID_RETURN; 01231 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::set_mgm_handle | ( | NdbMgmHandle | h | ) |
this handle will be used in the client connect thread to fetch information on dynamic ports. The old handle (if set) is destroyed, and this is destroyed by the destructor
Definition at line 127 of file TransporterRegistry.cpp.
References buf, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, h, m_mgm_handle, ndb_mgm_destroy_handle(), and ndb_mgm_get_connectstring().
Referenced by MgmtSrvr::connect_to_self().
00128 { 00129 DBUG_ENTER("TransporterRegistry::set_mgm_handle"); 00130 if (m_mgm_handle) 00131 ndb_mgm_destroy_handle(&m_mgm_handle); 00132 m_mgm_handle= h; 00133 #ifndef DBUG_OFF 00134 if (h) 00135 { 00136 char buf[256]; 00137 DBUG_PRINT("info",("handle set with connectstring: %s", 00138 ndb_mgm_get_connectstring(h,buf, sizeof(buf)))); 00139 } 00140 else 00141 { 00142 DBUG_PRINT("info",("handle set to NULL")); 00143 } 00144 #endif 00145 DBUG_VOID_RETURN; 00146 }
Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1160 of file TransporterRegistry.cpp.
References DEBUG, and ioStates.
Referenced by TransporterFacade::doConnect(), Cmvmi::execCLOSE_COMREQ(), Cmvmi::execCONNECT_REP(), Cmvmi::execENABLE_COMORD(), Cmvmi::execOPEN_COMREQ(), and Cmvmi::execSTART_ORD().
01160 { 01161 DEBUG("TransporterRegistry::setIOState(" 01162 << nodeId << ", " << state << ")"); 01163 ioStates[nodeId] = state; 01164 }
Here is the caller graph for this function:

| bool TransporterRegistry::start_clients | ( | ) |
Definition at line 1350 of file TransporterRegistry.cpp.
References m_run_start_clients_thread, m_start_clients_thread, NDB_THREAD_PRIO_LOW, NdbThread_Create(), and run_start_clients_C().
Referenced by main(), and TransporterFacade::threadMainSend().
01351 { 01352 m_run_start_clients_thread= true; 01353 m_start_clients_thread= NdbThread_Create(run_start_clients_C, 01354 (void**)this, 01355 32768, 01356 "ndb_start_clients", 01357 NDB_THREAD_PRIO_LOW); 01358 if (m_start_clients_thread == 0) { 01359 m_run_start_clients_thread= false; 01360 return false; 01361 } 01362 return true; 01363 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::start_clients_thread | ( | ) |
Definition at line 1261 of file TransporterRegistry.cpp.
References CFG_CONNECTION_SERVER_PORT, connected, CONNECTING, DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, DISCONNECTING, m_mgm_handle, m_run_start_clients_thread, n, ndb_mgm_connect(), ndb_mgm_disconnect(), ndb_mgm_get_connection_int_parameter(), ndb_mgm_is_connected(), ndbout_c(), NdbSleep_MilliSleep(), nTransporters, performStates, and theTransporters.
01262 { 01263 DBUG_ENTER("TransporterRegistry::start_clients_thread"); 01264 while (m_run_start_clients_thread) { 01265 NdbSleep_MilliSleep(100); 01266 for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){ 01267 Transporter * t = theTransporters[i]; 01268 if (!t) 01269 continue; 01270 n++; 01271 01272 const NodeId nodeId = t->getRemoteNodeId(); 01273 switch(performStates[nodeId]){ 01274 case CONNECTING: 01275 if(!t->isConnected() && !t->isServer) { 01276 bool connected= false; 01280 if (t->get_s_port()) 01281 connected= t->connect_client(); 01282 01286 if( !connected && t->get_s_port() <= 0) { // Port is dynamic 01287 int server_port= 0; 01288 struct ndb_mgm_reply mgm_reply; 01289 01290 if(!ndb_mgm_is_connected(m_mgm_handle)) 01291 ndb_mgm_connect(m_mgm_handle, 0, 0, 0); 01292 01293 if(ndb_mgm_is_connected(m_mgm_handle)) 01294 { 01295 int res= 01296 ndb_mgm_get_connection_int_parameter(m_mgm_handle, 01297 t->getRemoteNodeId(), 01298 t->getLocalNodeId(), 01299 CFG_CONNECTION_SERVER_PORT, 01300 &server_port, 01301 &mgm_reply); 01302 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)", 01303 server_port,t->getRemoteNodeId(), 01304 t->getLocalNodeId(),res)); 01305 if( res >= 0 ) 01306 { 01311 if (server_port) 01312 t->set_s_port(server_port); 01313 } 01314 else if(ndb_mgm_is_connected(m_mgm_handle)) 01315 { 01316 ndbout_c("Failed to get dynamic port to connect to: %d", res); 01317 ndb_mgm_disconnect(m_mgm_handle); 01318 } 01319 else 01320 { 01321 ndbout_c("Management server closed connection early. " 01322 "It is probably being shut down (or has problems). " 01323 "We will retry the connection."); 01324 } 01325 } 01334 } 01335 } 01336 break; 01337 case DISCONNECTING: 01338 if(t->isConnected()) 01339 t->doDisconnect(); 01340 break; 01341 default: 01342 break; 01343 } 01344 } 01345 } 01346 DBUG_VOID_RETURN; 01347 }
Here is the call graph for this function:

| bool TransporterRegistry::start_service | ( | SocketServer & | server | ) |
Definition at line 1412 of file TransporterRegistry.cpp.
References DBUG_ENTER, DBUG_PRINT, DBUG_RETURN, m_transporter_interface, ndbout_c(), nodeIdSpecified, port, TransporterService::setTransporterRegistry(), SocketServer::setup(), and TransporterService.
Referenced by TransporterFacade::init(), and main().
01413 { 01414 struct ndb_mgm_reply mgm_reply; 01415 01416 DBUG_ENTER("TransporterRegistry::start_service"); 01417 if (m_transporter_interface.size() > 0 && !nodeIdSpecified) 01418 { 01419 ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified"); 01420 DBUG_RETURN(false); 01421 } 01422 01423 for (unsigned i= 0; i < m_transporter_interface.size(); i++) 01424 { 01425 Transporter_interface &t= m_transporter_interface[i]; 01426 01427 unsigned short port= (unsigned short)t.m_s_service_port; 01428 if(t.m_s_service_port<0) 01429 port= -t.m_s_service_port; // is a dynamic port 01430 TransporterService *transporter_service = 01431 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd")); 01432 if(!socket_server.setup(transporter_service, 01433 &port, t.m_interface)) 01434 { 01435 DBUG_PRINT("info", ("Trying new port")); 01436 port= 0; 01437 if(t.m_s_service_port>0 01438 || !socket_server.setup(transporter_service, 01439 &port, t.m_interface)) 01440 { 01441 /* 01442 * If it wasn't a dynamically allocated port, or 01443 * our attempts at getting a new dynamic port failed 01444 */ 01445 ndbout_c("Unable to setup transporter service port: %s:%d!\n" 01446 "Please check if the port is already used,\n" 01447 "(perhaps the node is already running)", 01448 t.m_interface ? t.m_interface : "*", t.m_s_service_port); 01449 delete transporter_service; 01450 DBUG_RETURN(false); 01451 } 01452 } 01453 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic 01454 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port)); 01455 transporter_service->setTransporterRegistry(this); 01456 } 01457 DBUG_RETURN(true); 01458 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::startReceiving | ( | ) |
Start/Stop receiving
Definition at line 1470 of file TransporterRegistry.cpp.
References OSE_Receiver::createPhantom(), DBUG_ENTER, DBUG_PRINT, DBUG_VOID_RETURN, errno, Logger::error(), FALSE, g_eventLogger, g_ndb_shm_signum, m_shm_own_pid, nTCPTransporters, NULL, sigemptyset, theOSEJunkSocketRecv, theOSEReceiver, and theTCPTransporters.
Referenced by main(), prioTransporterTest(), and TransporterFacade::threadMainReceive().
01471 { 01472 DBUG_ENTER("TransporterRegistry::startReceiving"); 01473 #ifdef NDB_OSE_TRANSPORTER 01474 if(theOSEReceiver != NULL){ 01475 theOSEReceiver->createPhantom(); 01476 } 01477 #endif 01478 01479 #ifdef NDB_OSE 01480 theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0); 01481 #endif 01482 01483 #if defined NDB_OSE || defined NDB_SOFTOSE 01484 theReceiverPid = current_process(); 01485 for(int i = 0; i<nTCPTransporters; i++) 01486 theTCPTransporters[i]->theReceiverPid = theReceiverPid; 01487 #endif 01488 01489 #ifdef NDB_SHM_TRANSPORTER 01490 m_shm_own_pid = getpid(); 01491 if (g_ndb_shm_signum) 01492 { 01493 DBUG_PRINT("info",("Install signal handler for signum %d", 01494 g_ndb_shm_signum)); 01495 struct sigaction sa; 01496 NdbThread_set_shm_sigmask(FALSE); 01497 sigemptyset(&sa.sa_mask); 01498 sa.sa_handler = shm_sig_handler; 01499 sa.sa_flags = 0; 01500 int ret; 01501 while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR); 01502 if(ret != 0) 01503 { 01504 DBUG_PRINT("error",("Install failed")); 01505 g_eventLogger.error("Failed to install signal handler for" 01506 " SHM transporter, signum %d, errno: %d (%s)", 01507 g_ndb_shm_signum, errno, strerror(errno)); 01508 } 01509 } 01510 #endif // NDB_SHM_TRANSPORTER 01511 DBUG_VOID_RETURN; 01512 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::startSending | ( | ) |
Start/Stop sending
Definition at line 1538 of file TransporterRegistry.cpp.
References theOSEJunkSocketSend.
Referenced by main(), prioTransporterTest(), and TransporterFacade::threadMainSend().
01538 { 01539 #if defined NDB_OSE || defined NDB_SOFTOSE 01540 theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0); 01541 #endif 01542 }
Here is the caller graph for this function:

| bool TransporterRegistry::stop_clients | ( | ) |
Definition at line 1366 of file TransporterRegistry.cpp.
References m_run_start_clients_thread, m_start_clients_thread, NdbThread_Destroy(), NdbThread_WaitFor(), and status.
Referenced by NdbShutdown(), and TransporterFacade::threadMainSend().
01367 { 01368 if (m_start_clients_thread) { 01369 m_run_start_clients_thread= false; 01370 void* status; 01371 NdbThread_WaitFor(m_start_clients_thread, &status); 01372 NdbThread_Destroy(&m_start_clients_thread); 01373 } 01374 return true; 01375 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::stopReceiving | ( | ) |
Definition at line 1515 of file TransporterRegistry.cpp.
References OSE_Receiver::destroyPhantom(), disconnectAll(), NULL, theOSEJunkSocketRecv, and theOSEReceiver.
Referenced by NdbShutdown(), and TransporterFacade::threadMainReceive().
01515 { 01516 #ifdef NDB_OSE_TRANSPORTER 01517 if(theOSEReceiver != NULL){ 01518 theOSEReceiver->destroyPhantom(); 01519 } 01520 #endif 01521 01527 disconnectAll(); 01528 01529 #if defined NDB_OSE || defined NDB_SOFTOSE 01530 if(theOSEJunkSocketRecv > 0) 01531 close(theOSEJunkSocketRecv); 01532 theOSEJunkSocketRecv = -1; 01533 #endif 01534 01535 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::stopSending | ( | ) |
Definition at line 1545 of file TransporterRegistry.cpp.
References theOSEJunkSocketSend.
Referenced by NdbShutdown(), and TransporterFacade::threadMainSend().
01545 { 01546 #if defined NDB_OSE || defined NDB_SOFTOSE 01547 if(theOSEJunkSocketSend > 0) 01548 close(theOSEJunkSocketSend); 01549 theOSEJunkSocketSend = -1; 01550 #endif 01551 }
Here is the caller graph for this function:

| Uint32 * TransporterRegistry::unpack | ( | Uint32 * | readPtr, | |
| Uint32 * | eodPtr, | |||
| NodeId | remoteNodeId, | |||
| IOState | state | |||
| ) | [private] |
Definition at line 212 of file Packer.cpp.
References callbackObj, computeChecksum(), Protocol6::createSignalHeader(), DEBUG, execute(), Protocol6::getByteOrder(), Protocol6::getCheckSumIncluded(), Protocol6::getCompressed(), Protocol6::getMessageLength(), Protocol6::getPrio(), Protocol6::getSignalIdIncluded(), HaltOutput, MAX_MESSAGE_SIZE, MAX_RECEIVED_SIGNALS, NoHalt, numberToRef(), LinearSectionPtr::p, reportError(), LinearSectionPtr::sz, TE_INVALID_CHECKSUM, and TE_INVALID_MESSAGE_LENGTH.
00215 { 00216 static SignalHeader signalHeader; 00217 static LinearSectionPtr ptr[3]; 00218 Uint32 loop_count = 0; 00219 if(state == NoHalt || state == HaltOutput){ 00220 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { 00221 Uint32 word1 = readPtr[0]; 00222 Uint32 word2 = readPtr[1]; 00223 Uint32 word3 = readPtr[2]; 00224 loop_count++; 00225 #if 0 00226 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ 00227 //Do funky stuff 00228 } 00229 #endif 00230 00231 const Uint16 messageLen32 = Protocol6::getMessageLength(word1); 00232 00233 if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){ 00234 DEBUG("Message Size(words) = " << messageLen32); 00235 reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); 00236 return readPtr; 00237 }//if 00238 00239 if(Protocol6::getCheckSumIncluded(word1)){ 00240 const Uint32 tmpLen = messageLen32 - 1; 00241 const Uint32 checkSumSent = readPtr[tmpLen]; 00242 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen); 00243 00244 if(checkSumComputed != checkSumSent){ 00245 reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM); 00246 return readPtr; 00247 }//if 00248 }//if 00249 00250 #if 0 00251 if(Protocol6::getCompressed(word1)){ 00252 //Do funky stuff 00253 }//if 00254 #endif 00255 00256 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3); 00257 00258 Uint32 sBlockNum = signalHeader.theSendersBlockRef; 00259 sBlockNum = numberToRef(sBlockNum, remoteNodeId); 00260 signalHeader.theSendersBlockRef = sBlockNum; 00261 00262 Uint8 prio = Protocol6::getPrio(word1); 00263 00264 Uint32 * signalData = &readPtr[3]; 00265 00266 if(Protocol6::getSignalIdIncluded(word1) == 0){ 00267 signalHeader.theSendersSignalId = ~0; 00268 } else { 00269 signalHeader.theSendersSignalId = * signalData; 00270 signalData ++; 00271 }//if 00272 00273 Uint32 * sectionPtr = signalData + signalHeader.theLength; 00274 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections; 00275 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){ 00276 Uint32 sz = * sectionPtr; 00277 ptr[i].sz = sz; 00278 ptr[i].p = sectionData; 00279 00280 sectionPtr ++; 00281 sectionData += sz; 00282 } 00283 00284 execute(callbackObj, &signalHeader, prio, signalData, ptr); 00285 00286 readPtr += messageLen32; 00287 }//while 00288 } else { 00291 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { 00292 Uint32 word1 = readPtr[0]; 00293 Uint32 word2 = readPtr[1]; 00294 Uint32 word3 = readPtr[2]; 00295 loop_count++; 00296 #if 0 00297 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ 00298 //Do funky stuff 00299 }//if 00300 #endif 00301 00302 const Uint16 messageLen32 = Protocol6::getMessageLength(word1); 00303 if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){ 00304 DEBUG("Message Size(words) = " << messageLen32); 00305 reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); 00306 return readPtr; 00307 }//if 00308 00309 if(Protocol6::getCheckSumIncluded(word1)){ 00310 const Uint32 tmpLen = messageLen32 - 1; 00311 const Uint32 checkSumSent = readPtr[tmpLen]; 00312 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen); 00313 00314 if(checkSumComputed != checkSumSent){ 00315 00316 //theTransporters[remoteNodeId]->disconnect(); 00317 reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM); 00318 return readPtr; 00319 }//if 00320 }//if 00321 00322 #if 0 00323 if(Protocol6::getCompressed(word1)){ 00324 //Do funky stuff 00325 }//if 00326 #endif 00327 00328 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3); 00329 00330 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber; 00331 00332 if(rBlockNum == 252){ 00333 Uint32 sBlockNum = signalHeader.theSendersBlockRef; 00334 sBlockNum = numberToRef(sBlockNum, remoteNodeId); 00335 signalHeader.theSendersBlockRef = sBlockNum; 00336 00337 Uint8 prio = Protocol6::getPrio(word1); 00338 00339 Uint32 * signalData = &readPtr[3]; 00340 00341 if(Protocol6::getSignalIdIncluded(word1) == 0){ 00342 signalHeader.theSendersSignalId = ~0; 00343 } else { 00344 signalHeader.theSendersSignalId = * signalData; 00345 signalData ++; 00346 }//if 00347 00348 Uint32 * sectionPtr = signalData + signalHeader.theLength; 00349 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections; 00350 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){ 00351 Uint32 sz = * sectionPtr; 00352 ptr[i].sz = sz; 00353 ptr[i].p = sectionData; 00354 00355 sectionPtr ++; 00356 sectionData += sz; 00357 } 00358 00359 execute(callbackObj, &signalHeader, prio, signalData, ptr); 00360 } else { 00361 DEBUG("prepareReceive(...) - Discarding message to block: " 00362 << rBlockNum << " from Node: " << remoteNodeId); 00363 }//if 00364 00365 readPtr += messageLen32; 00366 }//while 00367 }//if 00368 return readPtr; 00369 }
Here is the call graph for this function:

| Uint32 TransporterRegistry::unpack | ( | Uint32 * | readPtr, | |
| Uint32 | bufferSize, | |||
| NodeId | remoteNodeId, | |||
| IOState | state | |||
| ) | [private] |
Unpack signal data
state = HaltIO || state == HaltInput
Definition at line 26 of file Packer.cpp.
References callbackObj, computeChecksum(), Protocol6::createSignalHeader(), DEBUG, execute(), Protocol6::getByteOrder(), Protocol6::getCheckSumIncluded(), Protocol6::getCompressed(), Protocol6::getMessageLength(), Protocol6::getPrio(), Protocol6::getSignalIdIncluded(), HaltOutput, MAX_MESSAGE_SIZE, MAX_RECEIVED_SIGNALS, NoHalt, numberToRef(), LinearSectionPtr::p, reportError(), LinearSectionPtr::sz, TE_INVALID_CHECKSUM, and TE_INVALID_MESSAGE_LENGTH.
Referenced by performReceive().
00029 { 00030 SignalHeader signalHeader; 00031 LinearSectionPtr ptr[3]; 00032 00033 Uint32 usedData = 0; 00034 Uint32 loop_count = 0; 00035 00036 if(state == NoHalt || state == HaltOutput){ 00037 while ((sizeOfData >= 4 + sizeof(Protocol6)) && 00038 (loop_count < MAX_RECEIVED_SIGNALS)) { 00039 Uint32 word1 = readPtr[0]; 00040 Uint32 word2 = readPtr[1]; 00041 Uint32 word3 = readPtr[2]; 00042 loop_count++; 00043 00044 #if 0 00045 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ 00046 //Do funky stuff 00047 } 00048 #endif 00049 00050 const Uint16 messageLen32 = Protocol6::getMessageLength(word1); 00051 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2; 00052 00053 if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){ 00054 DEBUG("Message Size = " << messageLenBytes); 00055 reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); 00056 return usedData; 00057 }//if 00058 00059 if (sizeOfData < messageLenBytes) { 00060 break; 00061 }//if 00062 00063 if(Protocol6::getCheckSumIncluded(word1)){ 00064 const Uint32 tmpLen = messageLen32 - 1; 00065 const Uint32 checkSumSent = readPtr[tmpLen]; 00066 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen); 00067 00068 if(checkSumComputed != checkSumSent){ 00069 reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM); 00070 return usedData; 00071 }//if 00072 }//if 00073 00074 #if 0 00075 if(Protocol6::getCompressed(word1)){ 00076 //Do funky stuff 00077 }//if 00078 #endif 00079 00080 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3); 00081 00082 Uint32 sBlockNum = signalHeader.theSendersBlockRef; 00083 sBlockNum = numberToRef(sBlockNum, remoteNodeId); 00084 signalHeader.theSendersBlockRef = sBlockNum; 00085 00086 Uint8 prio = Protocol6::getPrio(word1); 00087 00088 Uint32 * signalData = &readPtr[3]; 00089 00090 if(Protocol6::getSignalIdIncluded(word1) == 0){ 00091 signalHeader.theSendersSignalId = ~0; 00092 } else { 00093 signalHeader.theSendersSignalId = * signalData; 00094 signalData ++; 00095 }//if 00096 signalHeader.theSignalId= ~0; 00097 00098 Uint32 * sectionPtr = signalData + signalHeader.theLength; 00099 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections; 00100 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){ 00101 Uint32 sz = * sectionPtr; 00102 ptr[i].sz = sz; 00103 ptr[i].p = sectionData; 00104 00105 sectionPtr ++; 00106 sectionData += sz; 00107 } 00108 00109 execute(callbackObj, &signalHeader, prio, signalData, ptr); 00110 00111 readPtr += messageLen32; 00112 sizeOfData -= messageLenBytes; 00113 usedData += messageLenBytes; 00114 }//while 00115 00116 return usedData; 00117 } else { 00120 while ((sizeOfData >= 4 + sizeof(Protocol6)) && 00121 (loop_count < MAX_RECEIVED_SIGNALS)) { 00122 Uint32 word1 = readPtr[0]; 00123 Uint32 word2 = readPtr[1]; 00124 Uint32 word3 = readPtr[2]; 00125 loop_count++; 00126 00127 #if 0 00128 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ 00129 //Do funky stuff 00130 }//if 00131 #endif 00132 00133 const Uint16 messageLen32 = Protocol6::getMessageLength(word1); 00134 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2; 00135 if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){ 00136 DEBUG("Message Size = " << messageLenBytes); 00137 reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); 00138 return usedData; 00139 }//if 00140 00141 if (sizeOfData < messageLenBytes) { 00142 break; 00143 }//if 00144 00145 if(Protocol6::getCheckSumIncluded(word1)){ 00146 const Uint32 tmpLen = messageLen32 - 1; 00147 const Uint32 checkSumSent = readPtr[tmpLen]; 00148 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen); 00149 00150 if(checkSumComputed != checkSumSent){ 00151 00152 //theTransporters[remoteNodeId]->disconnect(); 00153 reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM); 00154 return usedData; 00155 }//if 00156 }//if 00157 00158 #if 0 00159 if(Protocol6::getCompressed(word1)){ 00160 //Do funky stuff 00161 }//if 00162 #endif 00163 00164 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3); 00165 00166 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber; 00167 00168 if(rBlockNum == 252){ 00169 Uint32 sBlockNum = signalHeader.theSendersBlockRef; 00170 sBlockNum = numberToRef(sBlockNum, remoteNodeId); 00171 signalHeader.theSendersBlockRef = sBlockNum; 00172 00173 Uint8 prio = Protocol6::getPrio(word1); 00174 00175 Uint32 * signalData = &readPtr[3]; 00176 00177 if(Protocol6::getSignalIdIncluded(word1) == 0){ 00178 signalHeader.theSendersSignalId = ~0; 00179 } else { 00180 signalHeader.theSendersSignalId = * signalData; 00181 signalData ++; 00182 }//if 00183 00184 Uint32 * sectionPtr = signalData + signalHeader.theLength; 00185 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections; 00186 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){ 00187 Uint32 sz = * sectionPtr; 00188 ptr[i].sz = sz; 00189 ptr[i].p = sectionData; 00190 00191 sectionPtr ++; 00192 sectionData += sz; 00193 } 00194 00195 execute(callbackObj, &signalHeader, prio, signalData, ptr); 00196 } else { 00197 DEBUG("prepareReceive(...) - Discarding message to block: " 00198 << rBlockNum << " from Node: " << remoteNodeId); 00199 }//if 00200 00201 readPtr += messageLen32; 00202 sizeOfData -= messageLenBytes; 00203 usedData += messageLenBytes; 00204 }//while 00205 00206 00207 return usedData; 00208 }//if 00209 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void TransporterRegistry::update_connections | ( | ) |
Definition at line 1234 of file TransporterRegistry.cpp.
References CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING, n, nTransporters, performStates, report_connect(), report_disconnect(), and theTransporters.
Referenced by ThreadConfig::ipControlLoop(), TransporterFacade::threadMainReceive(), and MgmtSrvr::transporter_connect().
01235 { 01236 for (int i= 0, n= 0; n < nTransporters; i++){ 01237 Transporter * t = theTransporters[i]; 01238 if (!t) 01239 continue; 01240 n++; 01241 01242 const NodeId nodeId = t->getRemoteNodeId(); 01243 switch(performStates[nodeId]){ 01244 case CONNECTED: 01245 case DISCONNECTED: 01246 break; 01247 case CONNECTING: 01248 if(t->isConnected()) 01249 report_connect(nodeId); 01250 break; 01251 case DISCONNECTING: 01252 if(!t->isConnected()) 01253 report_disconnect(nodeId, 0); 01254 break; 01255 } 01256 } 01257 }
Here is the call graph for this function:

Here is the caller graph for this function:

friend class OSE_Receiver [friend] |
friend class SHM_Transporter [friend] |
friend class Transporter [friend] |
Definition at line 94 of file TransporterRegistry.hpp.
friend class TransporterService [friend] |
void* TransporterRegistry::callbackObj [private] |
Definition at line 276 of file TransporterRegistry.hpp.
Referenced by Transporter::get_callback_obj(), performReceive(), prepareSend(), report_connect(), report_disconnect(), TransporterRegistry(), and unpack().
IOState* TransporterRegistry::ioStates [private] |
Definition at line 329 of file TransporterRegistry.hpp.
Referenced by ioState(), performReceive(), prepareSend(), setIOState(), TransporterRegistry(), and ~TransporterRegistry().
NodeId TransporterRegistry::localNodeId [private] |
Definition at line 284 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), get_localNodeId(), init(), and removeTransporter().
Definition at line 278 of file TransporterRegistry.hpp.
Referenced by get_mgm_handle(), set_mgm_handle(), start_clients_thread(), TransporterRegistry(), and ~TransporterRegistry().
Definition at line 281 of file TransporterRegistry.hpp.
Referenced by start_clients(), start_clients_thread(), and stop_clients().
int TransporterRegistry::m_shm_own_pid [private] |
Definition at line 363 of file TransporterRegistry.hpp.
Referenced by SHM_Transporter::connect_client_impl(), SHM_Transporter::connect_server_impl(), and startReceiving().
struct NdbThread* TransporterRegistry::m_start_clients_thread [private] |
Definition at line 280 of file TransporterRegistry.hpp.
Referenced by start_clients(), and stop_clients().
Definition at line 265 of file TransporterRegistry.hpp.
Referenced by add_transporter_interface(), Ndb_cluster_connection::connect(), connect_ndb_mgmd(), and start_service().
unsigned TransporterRegistry::maxTransporters [private] |
Definition at line 286 of file TransporterRegistry.hpp.
Referenced by connect_server(), disconnectAll(), removeAll(), and TransporterRegistry().
bool TransporterRegistry::nodeIdSpecified [private] |
Definition at line 285 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), init(), start_service(), and TransporterRegistry().
int TransporterRegistry::nOSETransporters [private] |
Definition at line 291 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), performSend(), removeTransporter(), and TransporterRegistry().
int TransporterRegistry::nSCITransporters [private] |
Definition at line 289 of file TransporterRegistry.hpp.
Referenced by createSCITransporter(), pollReceive(), prepareSend(), removeTransporter(), and TransporterRegistry().
int TransporterRegistry::nSHMTransporters [private] |
Definition at line 290 of file TransporterRegistry.hpp.
Referenced by createSHMTransporter(), pollReceive(), prepareSend(), removeTransporter(), and TransporterRegistry().
int TransporterRegistry::nTCPTransporters [private] |
Definition at line 288 of file TransporterRegistry.hpp.
Referenced by createTCPTransporter(), performReceive(), performSend(), poll_TCP(), pollReceive(), removeTransporter(), startReceiving(), and TransporterRegistry().
int TransporterRegistry::nTransporters [private] |
Definition at line 287 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), removeTransporter(), start_clients_thread(), TransporterRegistry(), and update_connections().
PerformState* TransporterRegistry::performStates [private] |
State arrays, index by host id
Definition at line 328 of file TransporterRegistry.hpp.
Referenced by connect_server(), createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), do_connect(), do_disconnect(), getPerformStateString(), is_connected(), report_connect(), report_disconnect(), start_clients_thread(), TransporterRegistry(), update_connections(), and ~TransporterRegistry().
int TransporterRegistry::sendCounter [private] |
Definition at line 283 of file TransporterRegistry.hpp.
Referenced by forceSendCheck(), performSend(), and TransporterRegistry().
int TransporterRegistry::tcpReadSelectReply [private] |
Used in polling if exists TCP_Transporter
Definition at line 355 of file TransporterRegistry.hpp.
Referenced by performReceive(), poll_TCP(), and pollReceive().
fd_set TransporterRegistry::tcpReadset [private] |
Definition at line 356 of file TransporterRegistry.hpp.
Referenced by performReceive(), and poll_TCP().
int TransporterRegistry::theOSEJunkSocketRecv [private] |
Definition at line 320 of file TransporterRegistry.hpp.
Referenced by startReceiving(), stopReceiving(), and TransporterRegistry().
int TransporterRegistry::theOSEJunkSocketSend [private] |
In OSE you for some bizar reason needs to create a socket the first thing you do when using inet functions.
Furthermore a process doing select has to "own" a socket
Definition at line 319 of file TransporterRegistry.hpp.
Referenced by startSending(), stopSending(), and TransporterRegistry().
class OSE_Receiver* TransporterRegistry::theOSEReceiver [private] |
OSE Receiver
Definition at line 310 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), performReceive(), startReceiving(), stopReceiving(), TransporterRegistry(), and ~TransporterRegistry().
Definition at line 299 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), performSend(), removeTransporter(), TransporterRegistry(), and ~TransporterRegistry().
Definition at line 297 of file TransporterRegistry.hpp.
Referenced by createSCITransporter(), performReceive(), performSend(), removeTransporter(), TransporterRegistry(), and ~TransporterRegistry().
Definition at line 298 of file TransporterRegistry.hpp.
Referenced by createSHMTransporter(), performReceive(), performSend(), removeTransporter(), TransporterRegistry(), and ~TransporterRegistry().
Arrays holding all transporters in the order they are created
Definition at line 296 of file TransporterRegistry.hpp.
Referenced by createTCPTransporter(), performReceive(), performSend(), poll_TCP(), removeTransporter(), startReceiving(), TransporterRegistry(), and ~TransporterRegistry().
Transporter** TransporterRegistry::theTransporters [private] |
Definition at line 305 of file TransporterRegistry.hpp.
Referenced by connect_client(), connect_server(), createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), disconnectAll(), get_free_buffer(), get_transporter(), OSE_Receiver::getTransporter(), prepareSend(), removeAll(), removeTransporter(), start_clients_thread(), TransporterRegistry(), update_connections(), and ~TransporterRegistry().
Array, indexed by nodeId, holding all transporters
Definition at line 304 of file TransporterRegistry.hpp.
Referenced by createOSETransporter(), createSCITransporter(), createSHMTransporter(), createTCPTransporter(), OSE_Receiver::getTransporter(), removeTransporter(), TransporterRegistry(), and ~TransporterRegistry().
1.4.7

