00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 //**************************************************************************** 00018 // 00019 // NAME 00020 // TransporterRegistry 00021 // 00022 // DESCRIPTION 00023 // TransporterRegistry (singelton) is the interface to the 00024 // transporter layer. It handles transporter states and 00025 // holds the transporter arrays. 00026 // 00027 //***************************************************************************/ 00028 #ifndef TransporterRegistry_H 00029 #define TransporterRegistry_H 00030 00031 #include "TransporterDefinitions.hpp" 00032 #include <SocketServer.hpp> 00033 #include <SocketClient.hpp> 00034 00035 #include <NdbTCP.h> 00036 00037 #include <mgmapi/mgmapi.h> 00038 00039 // A transporter is always in an IOState. 00040 // NoHalt is used initially and as long as it is no restrictions on 00041 // sending or receiving. 00042 enum IOState { 00043 NoHalt = 0, 00044 HaltInput = 1, 00045 HaltOutput = 2, 00046 HaltIO = 3 00047 }; 00048 00049 enum TransporterType { 00050 tt_TCP_TRANSPORTER = 1, 00051 tt_SCI_TRANSPORTER = 2, 00052 tt_SHM_TRANSPORTER = 3, 00053 tt_OSE_TRANSPORTER = 4 00054 }; 00055 00056 static const char *performStateString[] = 00057 { "is connected", 00058 "is trying to connect", 00059 "does nothing", 00060 "is trying to disconnect" }; 00061 00062 class Transporter; 00063 class TCP_Transporter; 00064 class SCI_Transporter; 00065 class SHM_Transporter; 00066 class OSE_Transporter; 00067 00068 class TransporterRegistry; 00069 class SocketAuthenticator; 00070 00071 class TransporterService : public SocketServer::Service { 00072 SocketAuthenticator * m_auth; 00073 TransporterRegistry * m_transporter_registry; 00074 public: 00075 TransporterService(SocketAuthenticator *auth= 0) 00076 { 00077 m_auth= auth; 00078 m_transporter_registry= 0; 00079 } 00080 void setTransporterRegistry(TransporterRegistry *t) 00081 { 00082 m_transporter_registry= t; 00083 } 00084 SocketServer::Session * newSession(NDB_SOCKET_TYPE socket); 00085 }; 00086 00091 class TransporterRegistry { 00092 friend class OSE_Receiver; 00093 friend class SHM_Transporter; 00094 friend class Transporter; 00095 friend class TransporterService; 00096 public: 00100 TransporterRegistry(void * callback = 0 , 00101 unsigned maxTransporters = MAX_NTRANSPORTERS, 00102 unsigned sizeOfLongSignalMemory = 100); 00103 00109 void set_mgm_handle(NdbMgmHandle h); 00110 NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; }; 00111 00112 bool init(NodeId localNodeId); 00113 00117 bool connect_server(NDB_SOCKET_TYPE sockfd); 00118 00119 bool connect_client(NdbMgmHandle *h); 00120 00125 NDB_SOCKET_TYPE connect_ndb_mgmd(SocketClient *sc); 00126 00131 NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h); 00132 00136 void removeAll(); 00137 00141 void disconnectAll(); 00142 00147 ~TransporterRegistry(); 00148 00149 bool start_service(SocketServer& server); 00150 bool start_clients(); 00151 bool stop_clients(); 00152 void start_clients_thread(); 00153 void update_connections(); 00154 00158 void startReceiving(); 00159 void stopReceiving(); 00160 00164 void startSending(); 00165 void stopSending(); 00166 00167 // A transporter is always in a PerformState. 00168 // PerformIO is used initially and as long as any of the events 00169 // PerformConnect, ... 00170 enum PerformState { 00171 CONNECTED = 0, 00172 CONNECTING = 1, 00173 DISCONNECTED = 2, 00174 DISCONNECTING = 3 00175 }; 00176 const char *getPerformStateString(NodeId nodeId) const 00177 { return performStateString[(unsigned)performStates[nodeId]]; }; 00178 00182 void do_connect(NodeId node_id); 00183 void do_disconnect(NodeId node_id); 00184 bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; }; 00185 void report_connect(NodeId node_id); 00186 void report_disconnect(NodeId node_id, int errnum); 00187 00191 IOState ioState(NodeId nodeId); 00192 void setIOState(NodeId nodeId, IOState state); 00193 00202 bool createTCPTransporter(struct TransporterConfiguration * config); 00203 bool createSCITransporter(struct TransporterConfiguration * config); 00204 bool createSHMTransporter(struct TransporterConfiguration * config); 00205 bool createOSETransporter(struct TransporterConfiguration * config); 00206 00212 Uint32 get_free_buffer(Uint32 node) const ; 00213 00225 SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio, 00226 const Uint32 * const signalData, 00227 NodeId nodeId, 00228 const LinearSectionPtr ptr[3]); 00229 00230 SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio, 00231 const Uint32 * const signalData, 00232 NodeId nodeId, 00233 class SectionSegmentPool & pool, 00234 const SegmentedSectionPtr ptr[3]); 00235 00242 void external_IO(Uint32 timeOutMillis); 00243 00244 Uint32 pollReceive(Uint32 timeOutMillis); 00245 void performReceive(); 00246 void performSend(); 00247 00253 int forceSendCheck(int sendLimit); 00254 00255 #ifdef DEBUG_TRANSPORTER 00256 void printState(); 00257 #endif 00258 00259 class Transporter_interface { 00260 public: 00261 NodeId m_remote_nodeId; 00262 int m_s_service_port; // signed port number 00263 const char *m_interface; 00264 }; 00265 Vector<Transporter_interface> m_transporter_interface; 00266 void add_transporter_interface(NodeId remoteNodeId, const char *interf, 00267 int s_port); // signed port. <0 is dynamic 00268 Transporter* get_transporter(NodeId nodeId); 00269 NodeId get_localNodeId() { return localNodeId; }; 00270 00271 00272 struct in_addr get_connect_address(NodeId node_id) const; 00273 protected: 00274 00275 private: 00276 void * callbackObj; 00277 00278 NdbMgmHandle m_mgm_handle; 00279 00280 struct NdbThread *m_start_clients_thread; 00281 bool m_run_start_clients_thread; 00282 00283 int sendCounter; 00284 NodeId localNodeId; 00285 bool nodeIdSpecified; 00286 unsigned maxTransporters; 00287 int nTransporters; 00288 int nTCPTransporters; 00289 int nSCITransporters; 00290 int nSHMTransporters; 00291 int nOSETransporters; 00292 00296 TCP_Transporter** theTCPTransporters; 00297 SCI_Transporter** theSCITransporters; 00298 SHM_Transporter** theSHMTransporters; 00299 OSE_Transporter** theOSETransporters; 00300 00304 TransporterType* theTransporterTypes; 00305 Transporter** theTransporters; 00306 00310 class OSE_Receiver * theOSEReceiver; 00311 00319 int theOSEJunkSocketSend; 00320 int theOSEJunkSocketRecv; 00321 #if defined NDB_OSE || defined NDB_SOFTOSE 00322 PROCESS theReceiverPid; 00323 #endif 00324 00328 PerformState* performStates; 00329 IOState* ioStates; 00330 00334 Uint32 unpack(Uint32 * readPtr, 00335 Uint32 bufferSize, 00336 NodeId remoteNodeId, 00337 IOState state); 00338 00339 Uint32 * unpack(Uint32 * readPtr, 00340 Uint32 * eodPtr, 00341 NodeId remoteNodeId, 00342 IOState state); 00343 00350 void removeTransporter(NodeId nodeId); 00351 00355 int tcpReadSelectReply; 00356 fd_set tcpReadset; 00357 00358 Uint32 poll_OSE(Uint32 timeOutMillis); 00359 Uint32 poll_TCP(Uint32 timeOutMillis); 00360 Uint32 poll_SCI(Uint32 timeOutMillis); 00361 Uint32 poll_SHM(Uint32 timeOutMillis); 00362 00363 int m_shm_own_pid; 00364 }; 00365 00366 #endif // Define of TransporterRegistry_H
1.4.7

