#include <OSE_Receiver.hpp>
Collaboration diagram for OSE_Receiver:

Public Member Functions | |
| OSE_Receiver (class TransporterRegistry *, int recBufSize, NodeId localNodeId) | |
| ~OSE_Receiver () | |
| bool | hasData () const |
| bool | isFull () const |
| Uint32 | getReceiveData (NodeId *remoteNodeId, Uint32 **readPtr) |
| void | updateReceiveDataPtr (Uint32 szRead) |
| bool | doReceive (Uint32 timeOutMillis) |
| PROCESS | createPhantom () |
| void | destroyPhantom () |
Private Member Functions | |
| OSE_Transporter * | getTransporter (NodeId nodeId) |
| void | insertReceiveBuffer (union SIGNAL *_sig) |
| void | clearRecvBuffer (NodeId _nodeId) |
| bool | checkWaitStack (NodeId _nodeId) |
| void | clearWaitStack (NodeId _nodeId) |
| void | insertWaitStack (union SIGNAL *_sig) |
Private Attributes | |
| TransporterRegistry * | theTransporterRegistry |
| NodeId | localNodeId |
| char | localHostName [255] |
| bool | phantomCreated |
| PROCESS | phantomPid |
| OS_redir_entry | redir |
| int | recBufReadIndex |
| int | recBufWriteIndex |
| int | recBufSize |
| SIGNAL ** | receiveBuffer |
| int | waitStackCount |
| int | waitStackSize |
| SIGNAL ** | waitStack |
| Uint32 * | nextSigId |
Definition at line 24 of file OSE_Receiver.hpp.
| OSE_Receiver::OSE_Receiver | ( | class TransporterRegistry * | , | |
| int | recBufSize, | |||
| NodeId | localNodeId | |||
| ) |
Definition at line 24 of file OSE_Receiver.cpp.
References DEBUG, localHostName, localNodeId, MAX_NTRANSPORTERS, nextSigId, phantomCreated, recBufReadIndex, recBufSize, recBufWriteIndex, receiveBuffer, BaseString::snprintf(), theTransporterRegistry, waitStack, waitStackCount, and waitStackSize.
00026 { 00027 theTransporterRegistry = tr; 00028 00029 recBufSize = _recBufSize; 00030 recBufReadIndex = 0; 00031 recBufWriteIndex = 0; 00032 receiveBuffer = new union SIGNAL * [recBufSize]; 00033 00034 waitStackCount = 0; 00035 waitStackSize = _recBufSize; 00036 waitStack = new union SIGNAL * [waitStackSize]; 00037 00038 nextSigId = new Uint32[MAX_NTRANSPORTERS]; 00039 for (int i = 0; i < MAX_NTRANSPORTERS; i++) 00040 nextSigId[i] = 0; 00041 00042 phantomCreated = false; 00043 localNodeId = _localNodeId; 00044 BaseString::snprintf(localHostName, sizeof(localHostName), 00045 "ndb_node%d", localNodeId); 00046 00047 DEBUG("localNodeId = " << localNodeId << " -> localHostName = " 00048 << localHostName); 00049 }
Here is the call graph for this function:

| OSE_Receiver::~OSE_Receiver | ( | ) |
Definition at line 51 of file OSE_Receiver.cpp.
References destroyPhantom(), recBufReadIndex, recBufSize, recBufWriteIndex, and receiveBuffer.
00051 { 00052 while(recBufReadIndex != recBufWriteIndex){ 00053 free_buf(&receiveBuffer[recBufReadIndex]); 00054 recBufReadIndex = (recBufReadIndex + 1) % recBufSize; 00055 } 00056 delete [] receiveBuffer; 00057 destroyPhantom(); 00058 }
Here is the call graph for this function:

Check waitstack for signals that are next in sequence Put any found signal in receive buffer Returns true if one signal is found
Definition at line 111 of file OSE_Receiver.cpp.
References SIGNAL::dataSignal, insertReceiveBuffer(), isFull(), ndbout_c(), nextSigId, NULL, reportError(), NdbTransporterData::senderNodeId, NdbTransporterData::sigId, TE_RECEIVE_BUFFER_FULL, waitStack, and waitStackCount.
Referenced by doReceive().
00111 { 00112 00113 for(int i = 0; i < waitStackCount; i++){ 00114 if (waitStack[i]->dataSignal.senderNodeId == _nodeId && 00115 waitStack[i]->dataSignal.sigId == nextSigId[_nodeId]){ 00116 00117 ndbout_c("INFO: signal popped from waitStack, sigId = %d", 00118 waitStack[i]->dataSignal.sigId); 00119 00120 if(isFull()){ 00121 ndbout_c("ERROR: receiveBuffer is full"); 00122 reportError(callbackObj, _nodeId, TE_RECEIVE_BUFFER_FULL); 00123 return false; 00124 } 00125 00126 // The next signal was found, put it in the receive buffer 00127 insertReceiveBuffer(waitStack[i]); 00128 00129 // Increase sequence id, set it to the next expected id 00130 nextSigId[_nodeId]++; 00131 00132 // Move signals below up one step 00133 for(int j = i; j < waitStackCount-1; j++) 00134 waitStack[j] = waitStack[j+1]; 00135 waitStack[waitStackCount] = NULL; 00136 waitStackCount--; 00137 00138 // return true since signal was found 00139 return true; 00140 } 00141 } 00142 return false; 00143 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void OSE_Receiver::clearRecvBuffer | ( | NodeId | _nodeId | ) | [private] |
Definition at line 335 of file OSE_Receiver.cpp.
References SIGNAL::dataSignal, insertReceiveBuffer(), recBufReadIndex, recBufSize, recBufWriteIndex, receiveBuffer, and NdbTransporterData::senderNodeId.
Referenced by doReceive().
00335 { 00336 int tmpIndex = 0; 00337 union SIGNAL** tmp = new union SIGNAL * [recBufSize]; 00338 00342 while(recBufReadIndex != recBufWriteIndex){ 00343 if(receiveBuffer[recBufReadIndex]->dataSignal.senderNodeId != nodeId){ 00344 tmp[tmpIndex] = receiveBuffer[recBufReadIndex]; 00345 tmpIndex++; 00346 } else { 00347 free_buf(&receiveBuffer[recBufReadIndex]); 00348 } 00349 recBufReadIndex = (recBufReadIndex + 1) % recBufSize; 00350 } 00351 00355 for(int i = 0; i<tmpIndex; i++) 00356 insertReceiveBuffer(tmp[i]); 00357 00358 delete [] tmp; 00359 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void OSE_Receiver::clearWaitStack | ( | NodeId | _nodeId | ) | [private] |
Clear waitstack for signals from node with _nodeId
Definition at line 149 of file OSE_Receiver.cpp.
References SIGNAL::dataSignal, NULL, NdbTransporterData::senderNodeId, waitStack, and waitStackCount.
Referenced by doReceive().
00149 { 00150 00151 for(int i = 0; i < waitStackCount; i++){ 00152 if (waitStack[i]->dataSignal.senderNodeId == _nodeId){ 00153 00154 // Free signal buffer 00155 free_buf(&waitStack[i]); 00156 00157 // Move signals below up one step 00158 for(int j = i; j < waitStackCount-1; j++) 00159 waitStack[j] = waitStack[j+1]; 00160 waitStack[waitStackCount] = NULL; 00161 waitStackCount--; 00162 } 00163 } 00164 nextSigId[_nodeId] = 0; 00165 }
Here is the caller graph for this function:

| PROCESS OSE_Receiver::createPhantom | ( | ) |
Definition at line 61 of file OSE_Receiver.cpp.
References DEBUG, localHostName, NULL, phantomCreated, phantomPid, and redir.
Referenced by TransporterRegistry::startReceiving().
00061 { 00062 redir.sig = 1; 00063 redir.pid = current_process(); 00064 00065 if(!phantomCreated){ 00066 phantomPid = create_process 00067 (OS_PHANTOM, // Type 00068 localHostName, // Name 00069 NULL, // Entry point 00070 0, // Stack size 00071 0, // Prio - Not used 00072 (OSTIME)0, // Timeslice - Not used 00073 0, // Block - current block 00074 &redir, 00075 (OSVECTOR)0, // vector 00076 (OSUSER)0); // user 00077 phantomCreated = true; 00078 DEBUG("Created phantom pid: " << hex << phantomPid); 00079 } 00080 return phantomPid; 00081 }
Here is the caller graph for this function:

| void OSE_Receiver::destroyPhantom | ( | ) |
Definition at line 84 of file OSE_Receiver.cpp.
References DEBUG, phantomCreated, and phantomPid.
Referenced by TransporterRegistry::stopReceiving(), ~OSE_Receiver(), and TransporterRegistry::~TransporterRegistry().
00084 { 00085 if(phantomCreated){ 00086 DEBUG("Destroying phantom pid: " << hex << phantomPid); 00087 kill_proc(phantomPid); 00088 phantomCreated = false; 00089 } 00090 }
Here is the caller graph for this function:

Definition at line 181 of file OSE_Receiver.cpp.
References checkWaitStack(), clearRecvBuffer(), clearWaitStack(), OSE_Transporter::connectConf(), OSE_Transporter::connectRef(), OSE_Transporter::connectReq(), DEBUG, OSE_Transporter::disconnectOrd(), getTransporter(), OSE_Transporter::huntReceived(), insertReceiveBuffer(), insertWaitStack(), Transporter::isConnected(), isFull(), NDB_TRANSPORTER_CONNECT_CONF, NDB_TRANSPORTER_CONNECT_REF, NDB_TRANSPORTER_CONNECT_REQ, NDB_TRANSPORTER_DATA, NDB_TRANSPORTER_DISCONNECT_ORD, NDB_TRANSPORTER_HUNT, NDB_TRANSPORTER_PRIO_A, ndbout_c(), nextSigId, NIL, PRIO_A_SIGNALS, PRIO_B_SIGNALS, NdbTransporterHunt::remoteNodeId, reportError(), NdbTransporterDisconnectOrd::senderNodeId, NdbTransporterConnectConf::senderNodeId, NdbTransporterConnectRef::senderNodeId, NdbTransporterConnectReq::senderNodeId, sig(), sigNo2String(), TE_TOO_LARGE_SIGID, TE_TOO_SMALL_SIGID, waitStackCount, and waitStackSize.
Referenced by TransporterRegistry::performReceive().
00181 { 00182 if(isFull()) 00183 return false; 00184 00185 union SIGNAL * sig = receive_w_tmo(0, 00186 PRIO_A_SIGNALS); 00187 if(sig == NIL){ 00188 sig = receive_w_tmo(timeOutMillis, 00189 PRIO_B_SIGNALS); 00190 if(sig == NIL) 00191 return false; 00192 } 00193 00194 DEBUG("Received signal: " << sig->sigNo << " " 00195 << sigNo2String(sig->sigNo)); 00196 00197 switch(sig->sigNo){ 00198 case NDB_TRANSPORTER_PRIO_A: 00199 { 00200 OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId); 00201 if (t != 0 && t->isConnected()){ 00202 insertReceiveBuffer(sig); 00203 } else { 00204 free_buf(&sig); 00205 } 00206 } 00207 break; 00208 case NDB_TRANSPORTER_DATA: 00209 { 00210 OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId); 00211 if (t != 0 && t->isConnected()){ 00212 int nodeId = sig->dataSignal.senderNodeId; 00213 Uint32 currSigId = sig->dataSignal.sigId; 00214 00219 if (nextSigId[nodeId] == currSigId){ 00220 00221 // Insert in receive buffer 00222 insertReceiveBuffer(sig); 00223 00224 // Increase sequence id, set it to the next expected id 00225 nextSigId[nodeId]++; 00226 00227 // Check if there are any signal in the wait stack 00228 if (waitStackCount > 0){ 00229 while(checkWaitStack(nodeId)); 00230 } 00231 } else { 00232 // Signal was not received in correct order 00233 // Check values and put it in the waitStack 00234 ndbout_c("WARNING: sigId out of order," 00235 " currSigId = %d, nextSigId = %d", 00236 currSigId, nextSigId[nodeId]); 00237 00238 if (currSigId < nextSigId[nodeId]){ 00239 // Current recieved sigId was smaller than nextSigId 00240 // There is no use to put it in the waitStack 00241 ndbout_c("ERROR: recieved sigId was smaller than nextSigId"); 00242 reportError(callbackObj, nodeId, TE_TOO_SMALL_SIGID); 00243 return false; 00244 } 00245 00246 if (currSigId > (nextSigId[nodeId] + waitStackSize)){ 00247 // Current sigId was larger than nextSigId + size of waitStack 00248 // we can never "save" so many signal's on the stack 00249 ndbout_c("ERROR: currSigId > (nextSigId + size of waitStack)"); 00250 reportError(callbackObj, nodeId, TE_TOO_LARGE_SIGID); 00251 return false; 00252 } 00253 00254 // Insert in wait stack 00255 insertWaitStack(sig); 00256 } 00257 } else { 00258 free_buf(&sig); 00259 } 00260 } 00261 break; 00262 case NDB_TRANSPORTER_HUNT: 00263 { 00264 NdbTransporterHunt * s = (NdbTransporterHunt*)sig; 00265 OSE_Transporter * t = getTransporter(s->remoteNodeId); 00266 if(t != 0) 00267 t->huntReceived(s); 00268 free_buf(&sig); 00269 } 00270 break; 00271 case NDB_TRANSPORTER_CONNECT_REQ: 00272 { 00273 NdbTransporterConnectReq * s = (NdbTransporterConnectReq*)sig; 00274 OSE_Transporter * t = getTransporter(s->senderNodeId); 00275 if(t != 0){ 00276 if(t->connectReq(s)){ 00277 clearWaitStack(s->senderNodeId); 00278 clearRecvBuffer(s->senderNodeId); 00279 } 00280 } 00281 free_buf(&sig); 00282 } 00283 break; 00284 case NDB_TRANSPORTER_CONNECT_REF: 00285 { 00286 NdbTransporterConnectRef * s = (NdbTransporterConnectRef*)sig; 00287 OSE_Transporter * t = getTransporter(s->senderNodeId); 00288 if(t != 0){ 00289 if(t->connectRef(s)){ 00290 clearWaitStack(s->senderNodeId); 00291 clearRecvBuffer(s->senderNodeId); 00292 } 00293 } 00294 free_buf(&sig); 00295 } 00296 break; 00297 case NDB_TRANSPORTER_CONNECT_CONF: 00298 { 00299 NdbTransporterConnectConf * s = (NdbTransporterConnectConf*)sig; 00300 OSE_Transporter * t = getTransporter(s->senderNodeId); 00301 if(t != 0){ 00302 if(t->connectConf(s)){ 00303 clearWaitStack(s->senderNodeId); 00304 clearRecvBuffer(s->senderNodeId); 00305 } 00306 } 00307 free_buf(&sig); 00308 } 00309 break; 00310 case NDB_TRANSPORTER_DISCONNECT_ORD: 00311 { 00312 NdbTransporterDisconnectOrd * s = (NdbTransporterDisconnectOrd*)sig; 00313 OSE_Transporter * t = getTransporter(s->senderNodeId); 00314 if(t != 0){ 00315 if(t->disconnectOrd(s)){ 00316 clearWaitStack(s->senderNodeId); 00317 clearRecvBuffer(s->senderNodeId); 00318 } 00319 } 00320 free_buf(&sig); 00321 } 00322 } 00323 return true; 00324 }
Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 91 of file OSE_Receiver.hpp.
References NdbTransporterData::data, NdbTransporterData::length, recBufReadIndex, recBufWriteIndex, receiveBuffer, and NdbTransporterData::senderNodeId.
Referenced by TransporterRegistry::performReceive().
00092 { 00093 NdbTransporterData *s = (NdbTransporterData *)receiveBuffer[recBufReadIndex]; 00094 if(recBufReadIndex != recBufWriteIndex){ 00095 * remoteNodeId = s->senderNodeId; 00096 * readPtr = &s->data[0]; 00097 return s->length; 00098 } 00099 return 0; 00100 }
Here is the caller graph for this function:

| OSE_Transporter * OSE_Receiver::getTransporter | ( | NodeId | nodeId | ) | [private] |
Definition at line 327 of file OSE_Receiver.cpp.
References theTransporterRegistry, TransporterRegistry::theTransporters, TransporterRegistry::theTransporterTypes, and tt_OSE_TRANSPORTER.
Referenced by doReceive().
00327 { 00328 if(theTransporterRegistry->theTransporterTypes[nodeId] != tt_OSE_TRANSPORTER) 00329 return 0; 00330 return (OSE_Transporter *) 00331 theTransporterRegistry->theTransporters[nodeId]; 00332 }
Here is the caller graph for this function:

| bool OSE_Receiver::hasData | ( | ) | const [inline] |
Definition at line 79 of file OSE_Receiver.hpp.
References recBufReadIndex, and recBufWriteIndex.
Referenced by TransporterRegistry::performReceive().
00079 { 00080 return recBufReadIndex != recBufWriteIndex; 00081 }
Here is the caller graph for this function:

| void OSE_Receiver::insertReceiveBuffer | ( | union SIGNAL * | _sig | ) | [inline, private] |
Definition at line 113 of file OSE_Receiver.hpp.
References recBufSize, recBufWriteIndex, and receiveBuffer.
Referenced by checkWaitStack(), clearRecvBuffer(), and doReceive().
00113 { 00114 receiveBuffer[recBufWriteIndex] = _sig; 00115 recBufWriteIndex = (recBufWriteIndex + 1) % recBufSize; 00116 }
Here is the caller graph for this function:

| void OSE_Receiver::insertWaitStack | ( | union SIGNAL * | _sig | ) | [inline, private] |
Definition at line 170 of file OSE_Receiver.cpp.
References localNodeId, ndbout_c(), reportError(), TE_WAIT_STACK_FULL, waitStack, waitStackCount, and waitStackSize.
Referenced by doReceive().
00170 { 00171 if (waitStackCount <= waitStackSize){ 00172 waitStack[waitStackCount] = _sig; 00173 waitStackCount++; 00174 } else { 00175 ndbout_c("ERROR: waitStack is full"); 00176 reportError(callbackObj, localNodeId, TE_WAIT_STACK_FULL); 00177 } 00178 }
Here is the call graph for this function:

Here is the caller graph for this function:

| bool OSE_Receiver::isFull | ( | ) | const [inline] |
Definition at line 85 of file OSE_Receiver.hpp.
References recBufSize, and recBufWriteIndex.
Referenced by checkWaitStack(), and doReceive().
00085 { 00086 return ((recBufWriteIndex + 1) % recBufSize) == recBufWriteIndex; 00087 }
Here is the caller graph for this function:

| void OSE_Receiver::updateReceiveDataPtr | ( | Uint32 | szRead | ) | [inline] |
Definition at line 104 of file OSE_Receiver.hpp.
References recBufReadIndex, recBufSize, and receiveBuffer.
Referenced by TransporterRegistry::performReceive().
00104 { 00105 if(bytesRead != 0){ 00106 free_buf(&receiveBuffer[recBufReadIndex]); 00107 recBufReadIndex = (recBufReadIndex + 1) % recBufSize; 00108 } 00109 }
Here is the caller graph for this function:

char OSE_Receiver::localHostName[255] [private] |
NodeId OSE_Receiver::localNodeId [private] |
Definition at line 48 of file OSE_Receiver.hpp.
Referenced by insertWaitStack(), and OSE_Receiver().
Uint32* OSE_Receiver::nextSigId [private] |
Definition at line 66 of file OSE_Receiver.hpp.
Referenced by checkWaitStack(), doReceive(), and OSE_Receiver().
bool OSE_Receiver::phantomCreated [private] |
Definition at line 51 of file OSE_Receiver.hpp.
Referenced by createPhantom(), destroyPhantom(), and OSE_Receiver().
PROCESS OSE_Receiver::phantomPid [private] |
Definition at line 52 of file OSE_Receiver.hpp.
Referenced by createPhantom(), and destroyPhantom().
int OSE_Receiver::recBufReadIndex [private] |
Definition at line 55 of file OSE_Receiver.hpp.
Referenced by clearRecvBuffer(), getReceiveData(), hasData(), OSE_Receiver(), updateReceiveDataPtr(), and ~OSE_Receiver().
int OSE_Receiver::recBufSize [private] |
Definition at line 57 of file OSE_Receiver.hpp.
Referenced by clearRecvBuffer(), insertReceiveBuffer(), isFull(), OSE_Receiver(), updateReceiveDataPtr(), and ~OSE_Receiver().
int OSE_Receiver::recBufWriteIndex [private] |
Definition at line 56 of file OSE_Receiver.hpp.
Referenced by clearRecvBuffer(), getReceiveData(), hasData(), insertReceiveBuffer(), isFull(), OSE_Receiver(), and ~OSE_Receiver().
union SIGNAL** OSE_Receiver::receiveBuffer [private] |
Definition at line 58 of file OSE_Receiver.hpp.
Referenced by clearRecvBuffer(), getReceiveData(), insertReceiveBuffer(), OSE_Receiver(), updateReceiveDataPtr(), and ~OSE_Receiver().
struct OS_redir_entry OSE_Receiver::redir [private] |
class TransporterRegistry* OSE_Receiver::theTransporterRegistry [private] |
union SIGNAL** OSE_Receiver::waitStack [private] |
Definition at line 63 of file OSE_Receiver.hpp.
Referenced by checkWaitStack(), clearWaitStack(), insertWaitStack(), and OSE_Receiver().
int OSE_Receiver::waitStackCount [private] |
Definition at line 61 of file OSE_Receiver.hpp.
Referenced by checkWaitStack(), clearWaitStack(), doReceive(), insertWaitStack(), and OSE_Receiver().
int OSE_Receiver::waitStackSize [private] |
Definition at line 62 of file OSE_Receiver.hpp.
Referenced by doReceive(), insertWaitStack(), and OSE_Receiver().
1.4.7

