00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include <ndb_global.h> 00018 #include <ndb_opt_defaults.h> 00019 #include <IPCConfig.hpp> 00020 #include <NdbOut.hpp> 00021 #include <NdbHost.h> 00022 00023 #include <TransporterDefinitions.hpp> 00024 #include <TransporterRegistry.hpp> 00025 #include <Properties.hpp> 00026 00027 #include <mgmapi_configuration.hpp> 00028 #include <mgmapi_config_parameters.h> 00029 00030 #if defined DEBUG_TRANSPORTER 00031 #define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; 00032 #else 00033 #define DEBUG(t) 00034 #endif 00035 00036 IPCConfig::IPCConfig(Properties * p) 00037 { 00038 theNoOfRemoteNodes = 0; 00039 the_ownId = 0; 00040 if(p != 0) 00041 props = new Properties(* p); 00042 else 00043 props = 0; 00044 } 00045 00046 00047 IPCConfig::~IPCConfig() 00048 { 00049 if(props != 0){ 00050 delete props; 00051 } 00052 } 00053 00054 int 00055 IPCConfig::init(){ 00056 Uint32 nodeId; 00057 00058 if(props == 0) return -1; 00059 if(!props->get("LocalNodeId", &nodeId)) { 00060 DEBUG( "Did not find local node id." ); 00061 return -1; 00062 } 00063 the_ownId = nodeId; 00064 00065 Uint32 noOfConnections; 00066 if(!props->get("NoOfConnections", &noOfConnections)) { 00067 DEBUG( "Did not find noOfConnections." ); 00068 return -1; 00069 } 00070 00071 for(Uint32 i = 0; i<noOfConnections; i++){ 00072 const Properties * tmp; 00073 Uint32 node1, node2; 00074 00075 if(!props->get("Connection", i, &tmp)) { 00076 DEBUG( "Did not find Connection." ); 00077 return -1; 00078 } 00079 if(!tmp->get("NodeId1", &node1)) { 00080 DEBUG( "Did not find NodeId1." ); 00081 return -1; 00082 } 00083 if(!tmp->get("NodeId2", &node2)) { 00084 DEBUG( "Did not find NodeId2." ); 00085 return -1; 00086 } 00087 00088 if(node1 == the_ownId && node2 != the_ownId) 00089 if(!addRemoteNodeId(node2)) { 00090 DEBUG( "addRemoteNodeId(node2) failed." ); 00091 return -1; 00092 } 00093 00094 if(node1 != the_ownId && node2 == the_ownId) 00095 if(!addRemoteNodeId(node1)) { 00096 DEBUG( "addRemoteNodeId(node2) failed." ); 00097 return -1; 00098 } 00099 } 00100 return 0; 00101 } 00102 00103 bool 00104 IPCConfig::addRemoteNodeId(NodeId nodeId){ 00105 for(int i = 0; i<theNoOfRemoteNodes; i++) 00106 if(theRemoteNodeIds[i] == nodeId) 00107 return false; 00108 theRemoteNodeIds[theNoOfRemoteNodes] = nodeId; 00109 theNoOfRemoteNodes++; 00110 return true; 00111 } 00112 00118 bool 00119 IPCConfig::getNextRemoteNodeId(NodeId & nodeId) const { 00120 NodeId returnNode = MAX_NODES + 1; 00121 for(int i = 0; i<theNoOfRemoteNodes; i++) 00122 if(theRemoteNodeIds[i] > nodeId){ 00123 if(theRemoteNodeIds[i] < returnNode){ 00124 returnNode = theRemoteNodeIds[i]; 00125 } 00126 } 00127 if(returnNode == (MAX_NODES + 1)) 00128 return false; 00129 nodeId = returnNode; 00130 return true; 00131 } 00132 00133 00134 Uint32 00135 IPCConfig::getREPHBFrequency(NodeId id) const { 00136 const Properties * tmp; 00137 Uint32 out; 00138 00142 if (!props->get("Node", id, &tmp) || 00143 !tmp->get("HeartbeatIntervalRepRep", &out)) { 00144 DEBUG("Illegal Node or HeartbeatIntervalRepRep in config."); 00145 out = 10000; 00146 } 00147 00148 return out; 00149 } 00150 00151 const char* 00152 IPCConfig::getNodeType(NodeId id) const { 00153 const char * out; 00154 const Properties * tmp; 00155 00156 if (!props->get("Node", id, &tmp) || !tmp->get("Type", &out)) { 00157 DEBUG("Illegal Node or NodeType in config."); 00158 out = "Unknown"; 00159 } 00160 00161 return out; 00162 } 00163 00164 #include <mgmapi.h> 00165 Uint32 00166 IPCConfig::configureTransporters(Uint32 nodeId, 00167 const class ndb_mgm_configuration & config, 00168 class TransporterRegistry & tr){ 00169 TransporterConfiguration conf; 00170 00171 DBUG_ENTER("IPCConfig::configureTransporters"); 00172 00177 { 00178 const char *separator= ""; 00179 BaseString connect_string; 00180 ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE); 00181 for(iter.first(); iter.valid(); iter.next()) 00182 { 00183 Uint32 type; 00184 if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; 00185 if(type != NODE_TYPE_MGM) continue; 00186 const char* hostname; 00187 Uint32 port; 00188 if(iter.get(CFG_NODE_HOST, &hostname)) continue; 00189 if( strlen(hostname) == 0 ) continue; 00190 if(iter.get(CFG_MGM_PORT, &port)) continue; 00191 connect_string.appfmt("%s%s:%u",separator,hostname,port); 00192 separator= ","; 00193 } 00194 NdbMgmHandle h= ndb_mgm_create_handle(); 00195 if ( h && connect_string.length() > 0 ) 00196 { 00197 ndb_mgm_set_connectstring(h,connect_string.c_str()); 00198 tr.set_mgm_handle(h); 00199 } 00200 } 00201 00202 Uint32 noOfTransportersCreated= 0; 00203 ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); 00204 00205 for(iter.first(); iter.valid(); iter.next()){ 00206 00207 Uint32 nodeId1, nodeId2, remoteNodeId; 00208 const char * remoteHostName= 0, * localHostName= 0; 00209 if(iter.get(CFG_CONNECTION_NODE_1, &nodeId1)) continue; 00210 if(iter.get(CFG_CONNECTION_NODE_2, &nodeId2)) continue; 00211 00212 if(nodeId1 != nodeId && nodeId2 != nodeId) continue; 00213 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1); 00214 00215 { 00216 const char * host1= 0, * host2= 0; 00217 iter.get(CFG_CONNECTION_HOSTNAME_1, &host1); 00218 iter.get(CFG_CONNECTION_HOSTNAME_2, &host2); 00219 localHostName = (nodeId == nodeId1 ? host1 : host2); 00220 remoteHostName = (nodeId == nodeId1 ? host2 : host1); 00221 } 00222 00223 Uint32 sendSignalId = 1; 00224 Uint32 checksum = 1; 00225 if(iter.get(CFG_CONNECTION_SEND_SIGNAL_ID, &sendSignalId)) continue; 00226 if(iter.get(CFG_CONNECTION_CHECKSUM, &checksum)) continue; 00227 00228 Uint32 type = ~0; 00229 if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; 00230 00231 Uint32 server_port= 0; 00232 if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break; 00233 00234 Uint32 nodeIdServer= 0; 00235 if(iter.get(CFG_CONNECTION_NODE_ID_SERVER, &nodeIdServer)) break; 00236 00237 /* 00238 We check the node type. 00239 */ 00240 Uint32 node1type, node2type; 00241 ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE); 00242 ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE); 00243 node1iter.find(CFG_NODE_ID,nodeId1); 00244 node2iter.find(CFG_NODE_ID,nodeId2); 00245 node1iter.get(CFG_TYPE_OF_SECTION,&node1type); 00246 node2iter.get(CFG_TYPE_OF_SECTION,&node2type); 00247 00248 if(node1type==NODE_TYPE_MGM || node2type==NODE_TYPE_MGM) 00249 conf.isMgmConnection= true; 00250 else 00251 conf.isMgmConnection= false; 00252 00253 if (nodeId == nodeIdServer && !conf.isMgmConnection) { 00254 tr.add_transporter_interface(remoteNodeId, localHostName, server_port); 00255 } 00256 00257 DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", 00258 nodeId, remoteNodeId, server_port, sendSignalId, checksum)); 00259 /* 00260 This may be a dynamic port. It depends on when we're getting 00261 our configuration. If we've been restarted, we'll be getting 00262 a configuration with our old dynamic port in it, hence the number 00263 here is negative (and we try the old port number first). 00264 00265 On a first-run, server_port will be zero (with dynamic ports) 00266 00267 If we're not using dynamic ports, we don't do anything. 00268 */ 00269 00270 conf.localNodeId = nodeId; 00271 conf.remoteNodeId = remoteNodeId; 00272 conf.checksum = checksum; 00273 conf.signalId = sendSignalId; 00274 conf.s_port = server_port; 00275 conf.localHostName = localHostName; 00276 conf.remoteHostName = remoteHostName; 00277 conf.serverNodeId = nodeIdServer; 00278 00279 switch(type){ 00280 case CONNECTION_TYPE_SHM: 00281 if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break; 00282 if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break; 00283 00284 Uint32 tmp; 00285 if(iter.get(CFG_SHM_SIGNUM, &tmp)) break; 00286 conf.shm.signum= tmp; 00287 00288 if(!tr.createSHMTransporter(&conf)){ 00289 DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d", 00290 conf.localNodeId, conf.remoteNodeId)); 00291 ndbout << "Failed to create SHM Transporter from: " 00292 << conf.localNodeId << " to: " << conf.remoteNodeId << endl; 00293 } else { 00294 noOfTransportersCreated++; 00295 } 00296 DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, " 00297 "buf size = %d", conf.shm.shmKey, conf.shm.shmSize)); 00298 00299 break; 00300 00301 case CONNECTION_TYPE_SCI: 00302 if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sci.sendLimit)) break; 00303 if(iter.get(CFG_SCI_BUFFER_MEM, &conf.sci.bufferSize)) break; 00304 if (nodeId == nodeId1) { 00305 if(iter.get(CFG_SCI_HOST2_ID_0, &conf.sci.remoteSciNodeId0)) break; 00306 if(iter.get(CFG_SCI_HOST2_ID_1, &conf.sci.remoteSciNodeId1)) break; 00307 } else { 00308 if(iter.get(CFG_SCI_HOST1_ID_0, &conf.sci.remoteSciNodeId0)) break; 00309 if(iter.get(CFG_SCI_HOST1_ID_1, &conf.sci.remoteSciNodeId1)) break; 00310 } 00311 if (conf.sci.remoteSciNodeId1 == 0) { 00312 conf.sci.nLocalAdapters = 1; 00313 } else { 00314 conf.sci.nLocalAdapters = 2; 00315 } 00316 if(!tr.createSCITransporter(&conf)){ 00317 DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", 00318 conf.localNodeId, conf.remoteNodeId)); 00319 ndbout << "Failed to create SCI Transporter from: " 00320 << conf.localNodeId << " to: " << conf.remoteNodeId << endl; 00321 } else { 00322 DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, " 00323 "remote SCI node id %d", 00324 conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0)); 00325 DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, " 00326 "buf size = %d", conf.localHostName, 00327 conf.remoteHostName, conf.sci.sendLimit, 00328 conf.sci.bufferSize)); 00329 if (conf.sci.nLocalAdapters > 1) { 00330 DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, " 00331 "second remote SCI node id = %d", 00332 conf.sci.remoteSciNodeId1)); 00333 } 00334 noOfTransportersCreated++; 00335 continue; 00336 } 00337 break; 00338 00339 case CONNECTION_TYPE_TCP: 00340 if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.tcp.sendBufferSize)) break; 00341 if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.tcp.maxReceiveSize)) break; 00342 00343 const char * proxy; 00344 if (!iter.get(CFG_TCP_PROXY, &proxy)) { 00345 if (strlen(proxy) > 0 && nodeId2 == nodeId) { 00346 // TODO handle host:port 00347 conf.s_port = atoi(proxy); 00348 } 00349 } 00350 00351 if(!tr.createTCPTransporter(&conf)){ 00352 ndbout << "Failed to create TCP Transporter from: " 00353 << nodeId << " to: " << remoteNodeId << endl; 00354 } else { 00355 noOfTransportersCreated++; 00356 } 00357 DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, " 00358 "maxReceiveSize = %d", conf.tcp.sendBufferSize, 00359 conf.tcp.maxReceiveSize)); 00360 break; 00361 case CONNECTION_TYPE_OSE: 00362 if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.ose.prioASignalSize)) break; 00363 if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.ose.prioBSignalSize)) break; 00364 00365 if(!tr.createOSETransporter(&conf)){ 00366 ndbout << "Failed to create OSE Transporter from: " 00367 << nodeId << " to: " << remoteNodeId << endl; 00368 } else { 00369 noOfTransportersCreated++; 00370 } 00371 break; 00372 00373 default: 00374 ndbout << "Unknown transporter type from: " << nodeId << 00375 " to: " << remoteNodeId << endl; 00376 break; 00377 } // switch 00378 } // for 00379 00380 DBUG_RETURN(noOfTransportersCreated); 00381 } 00382
1.4.7

