00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 #include <ndb_global.h> 00018 00019 #include "Ndbfs.hpp" 00020 #include "AsyncFile.hpp" 00021 #include "Filename.hpp" 00022 00023 #include <signaldata/FsOpenReq.hpp> 00024 #include <signaldata/FsCloseReq.hpp> 00025 #include <signaldata/FsReadWriteReq.hpp> 00026 #include <signaldata/FsAppendReq.hpp> 00027 #include <signaldata/FsRemoveReq.hpp> 00028 #include <signaldata/FsConf.hpp> 00029 #include <signaldata/FsRef.hpp> 00030 #include <signaldata/NdbfsContinueB.hpp> 00031 #include <signaldata/DumpStateOrd.hpp> 00032 00033 #include <RefConvert.hpp> 00034 #include <NdbSleep.h> 00035 #include <NdbOut.hpp> 00036 #include <Configuration.hpp> 00037 00038 #define DEBUG(x) { ndbout << "FS::" << x << endl; } 00039 00040 inline 00041 int pageSize( const NewVARIABLE* baseAddrRef ) 00042 { 00043 int log_psize; 00044 int log_qsize = baseAddrRef->bits.q; 00045 int log_vsize = baseAddrRef->bits.v; 00046 if (log_vsize < 3) 00047 log_vsize = 3; 00048 log_psize = log_qsize + log_vsize - 3; 00049 return (1 << log_psize); 00050 } 00051 00052 00053 Ndbfs::Ndbfs(Block_context& ctx) : 00054 SimulatedBlock(NDBFS, ctx), 00055 scanningInProgress(false), 00056 theLastId(0), 00057 theRequestPool(0), 00058 m_maxOpenedFiles(0) 00059 { 00060 BLOCK_CONSTRUCTOR(Ndbfs); 00061 00062 // Set received signals 00063 addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ); 00064 addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD); 00065 addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR); 00066 addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ); 00067 addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ); 00068 addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ); 00069 addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ); 00070 addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ); 00071 addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB); 00072 addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ); 00073 addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ); 00074 // Set send signals 00075 } 00076 00077 Ndbfs::~Ndbfs() 00078 { 00079 // Delete all files 00080 // AsyncFile destuctor will take care of deleting 00081 // the thread it has created 00082 for (unsigned i = 0; i < theFiles.size(); i++){ 00083 AsyncFile* file = theFiles[i]; 00084 delete file; 00085 theFiles[i] = NULL; 00086 }//for 00087 theFiles.clear(); 00088 if (theRequestPool) 00089 delete theRequestPool; 00090 } 00091 00092 void 00093 Ndbfs::execREAD_CONFIG_REQ(Signal* signal) 00094 { 00095 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); 00096 00097 Uint32 ref = req->senderRef; 00098 Uint32 senderData = req->senderData; 00099 00100 const ndb_mgm_configuration_iterator * p = 00101 m_ctx.m_config.getOwnConfigIterator(); 00102 ndbrequire(p != 0); 00103 theFileSystemPath.assfmt("%sndb_%u_fs%s", m_ctx.m_config.fileSystemPath(), 00104 getOwnNodeId(), DIR_SEPARATOR); 00105 theBackupFilePath.assign(m_ctx.m_config.backupFilePath()); 00106 00107 theRequestPool = new Pool<Request>; 00108 00109 m_maxFiles = 40; 00110 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_OPEN_FILES, &m_maxFiles); 00111 Uint32 noIdleFiles = 27; 00112 ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles); 00113 if (noIdleFiles > m_maxFiles) 00114 m_maxFiles = noIdleFiles; 00115 // Create idle AsyncFiles 00116 for (Uint32 i = 0; i < noIdleFiles; i++){ 00117 theIdleFiles.push_back(createAsyncFile()); 00118 } 00119 00120 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); 00121 conf->senderRef = reference(); 00122 conf->senderData = senderData; 00123 sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 00124 ReadConfigConf::SignalLength, JBB); 00125 } 00126 00127 /* Received a restart signal. 00128 * Answer it like any other block 00129 * PR0 : StartCase 00130 * DR0 : StartPhase 00131 * DR1 : ? 00132 * DR2 : ? 00133 * DR3 : ? 00134 * DR4 : ? 00135 * DR5 : SignalKey 00136 */ 00137 void 00138 Ndbfs::execSTTOR(Signal* signal) 00139 { 00140 jamEntry(); 00141 00142 if(signal->theData[1] == 0){ // StartPhase 0 00143 jam(); 00144 00145 { 00146 #ifdef NDB_WIN32 00147 CreateDirectory(theFileSystemPath.c_str(), 0); 00148 #else 00149 mkdir(theFileSystemPath.c_str(), 00150 S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP); 00151 #endif 00152 } 00153 00154 cownref = NDBFS_REF; 00155 // close all open files 00156 ndbrequire(theOpenFiles.size() == 0); 00157 00158 scanningInProgress = false; 00159 00160 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; 00161 sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1); 00162 00163 signal->theData[3] = 255; 00164 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB); 00165 return; 00166 } 00167 ndbrequire(0); 00168 } 00169 00170 int 00171 Ndbfs::forward( AsyncFile * file, Request* request) 00172 { 00173 jam(); 00174 file->execute(request); 00175 return 1; 00176 } 00177 00178 void 00179 Ndbfs::execFSOPENREQ(Signal* signal) 00180 { 00181 jamEntry(); 00182 const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0]; 00183 const BlockReference userRef = fsOpenReq->userReference; 00184 AsyncFile* file = getIdleFile(); 00185 ndbrequire(file != NULL); 00186 Filename::NameSpec spec(theFileSystemPath, theBackupFilePath); 00187 00188 Uint32 userPointer = fsOpenReq->userPointer; 00189 00190 if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT) 00191 { 00192 Ptr<GlobalPage> page_ptr; 00193 if(m_global_page_pool.seize(page_ptr) == false) 00194 { 00195 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00196 fsRef->userPointer = userPointer; 00197 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory); 00198 fsRef->osErrorCode = ~0; // Indicate local error 00199 sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB); 00200 return; 00201 } 00202 file->m_page_ptr = page_ptr; 00203 } 00204 else 00205 { 00206 ndbassert(file->m_page_ptr.isNull()); 00207 file->m_page_ptr.setNull(); 00208 } 00209 00210 if(signal->getNoOfSections() == 0){ 00211 jam(); 00212 file->theFileName.set(spec, userRef, fsOpenReq->fileNumber); 00213 } else { 00214 jam(); 00215 SegmentedSectionPtr ptr; 00216 signal->getSection(ptr, FsOpenReq::FILENAME); 00217 file->theFileName.set(spec, ptr, g_sectionSegmentPool); 00218 releaseSections(signal); 00219 } 00220 file->reportTo(&theFromThreads); 00221 00222 Request* request = theRequestPool->get(); 00223 request->action = Request::open; 00224 request->error = 0; 00225 request->set(userRef, userPointer, newId() ); 00226 request->file = file; 00227 request->theTrace = signal->getTrace(); 00228 request->par.open.flags = fsOpenReq->fileFlags; 00229 request->par.open.page_size = fsOpenReq->page_size; 00230 request->par.open.file_size = fsOpenReq->file_size_hi; 00231 request->par.open.file_size <<= 32; 00232 request->par.open.file_size |= fsOpenReq->file_size_lo; 00233 00234 ndbrequire(forward(file, request)); 00235 } 00236 00237 void 00238 Ndbfs::execFSREMOVEREQ(Signal* signal) 00239 { 00240 jamEntry(); 00241 const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr(); 00242 const BlockReference userRef = req->userReference; 00243 AsyncFile* file = getIdleFile(); 00244 ndbrequire(file != NULL); 00245 00246 Filename::NameSpec spec(theFileSystemPath, theBackupFilePath); 00247 file->theFileName.set(spec, userRef, req->fileNumber, req->directory); 00248 file->reportTo(&theFromThreads); 00249 00250 Request* request = theRequestPool->get(); 00251 request->action = Request::rmrf; 00252 request->par.rmrf.directory = req->directory; 00253 request->par.rmrf.own_directory = req->ownDirectory; 00254 request->error = 0; 00255 request->set(userRef, req->userPointer, newId() ); 00256 request->file = file; 00257 request->theTrace = signal->getTrace(); 00258 00259 ndbrequire(forward(file, request)); 00260 } 00261 00262 /* 00263 * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1 00264 * remove file 00265 */ 00266 void 00267 Ndbfs::execFSCLOSEREQ(Signal * signal) 00268 { 00269 jamEntry(); 00270 const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0]; 00271 const BlockReference userRef = fsCloseReq->userReference; 00272 const Uint16 filePointer = (Uint16)fsCloseReq->filePointer; 00273 const UintR userPointer = fsCloseReq->userPointer; 00274 00275 AsyncFile* openFile = theOpenFiles.find(filePointer); 00276 if (openFile == NULL) { 00277 // The file was not open, send error back to sender 00278 jam(); 00279 // Initialise FsRef signal 00280 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00281 fsRef->userPointer = userPointer; 00282 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); 00283 fsRef->osErrorCode = ~0; // Indicate local error 00284 sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB); 00285 return; 00286 } 00287 00288 Request *request = theRequestPool->get(); 00289 if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) { 00290 jam(); 00291 request->action = Request::closeRemove; 00292 } else { 00293 jam(); 00294 request->action = Request::close; 00295 } 00296 request->set(userRef, fsCloseReq->userPointer, filePointer); 00297 request->file = openFile; 00298 request->error = 0; 00299 request->theTrace = signal->getTrace(); 00300 00301 ndbrequire(forward(openFile, request)); 00302 } 00303 00304 void 00305 Ndbfs::readWriteRequest(int action, Signal * signal) 00306 { 00307 const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0]; 00308 Uint16 filePointer = (Uint16)fsRWReq->filePointer; 00309 const UintR userPointer = fsRWReq->userPointer; 00310 const BlockReference userRef = fsRWReq->userReference; 00311 const BlockNumber blockNumber = refToBlock(userRef); 00312 00313 AsyncFile* openFile = theOpenFiles.find(filePointer); 00314 00315 const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex]; 00316 UintPtr tPageSize; 00317 UintPtr tClusterSize; 00318 UintPtr tNRR; 00319 UintPtr tPageOffset; 00320 char* tWA; 00321 FsRef::NdbfsErrorCodeType errorCode; 00322 00323 Request *request = theRequestPool->get(); 00324 request->error = 0; 00325 request->set(userRef, userPointer, filePointer); 00326 request->file = openFile; 00327 request->action = (Request::Action) action; 00328 request->theTrace = signal->getTrace(); 00329 00330 Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag); 00331 00332 if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed 00333 jam(); 00334 errorCode = FsRef::fsErrInvalidParameters; 00335 goto error; 00336 } 00337 00338 if(format != FsReadWriteReq::fsFormatGlobalPage && 00339 format != FsReadWriteReq::fsFormatSharedPage) 00340 { 00341 if (fsRWReq->varIndex >= getBatSize(blockNumber)) { 00342 jam();// Ensure that a valid variable is used 00343 errorCode = FsRef::fsErrInvalidParameters; 00344 goto error; 00345 } 00346 if (myBaseAddrRef == NULL) { 00347 jam(); // Ensure that a valid variable is used 00348 errorCode = FsRef::fsErrInvalidParameters; 00349 goto error; 00350 } 00351 if (openFile == NULL) { 00352 jam(); //file not open 00353 errorCode = FsRef::fsErrFileDoesNotExist; 00354 goto error; 00355 } 00356 tPageSize = pageSize(myBaseAddrRef); 00357 tClusterSize = myBaseAddrRef->ClusterSize; 00358 tNRR = myBaseAddrRef->nrr; 00359 tWA = (char*)myBaseAddrRef->WA; 00360 00361 switch (format) { 00362 00363 // List of memory and file pages pairs 00364 case FsReadWriteReq::fsFormatListOfPairs: { 00365 jam(); 00366 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { 00367 jam(); 00368 const UintPtr varIndex = fsRWReq->data.listOfPair[i].varIndex; 00369 const UintPtr fileOffset = fsRWReq->data.listOfPair[i].fileOffset; 00370 if (varIndex >= tNRR) { 00371 jam(); 00372 errorCode = FsRef::fsErrInvalidParameters; 00373 goto error; 00374 }//if 00375 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; 00376 request->par.readWrite.pages[i].size = tPageSize; 00377 request->par.readWrite.pages[i].offset = fileOffset * tPageSize; 00378 }//for 00379 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; 00380 break; 00381 }//case 00382 00383 // Range of memory page with one file page 00384 case FsReadWriteReq::fsFormatArrayOfPages: { 00385 if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) { 00386 jam(); 00387 errorCode = FsRef::fsErrInvalidParameters; 00388 goto error; 00389 }//if 00390 const UintPtr varIndex = fsRWReq->data.arrayOfPages.varIndex; 00391 const UintPtr fileOffset = fsRWReq->data.arrayOfPages.fileOffset; 00392 00393 request->par.readWrite.pages[0].offset = fileOffset * tPageSize; 00394 request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages; 00395 request->par.readWrite.numberOfPages = 1; 00396 request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize]; 00397 break; 00398 }//case 00399 00400 // List of memory pages followed by one file page 00401 case FsReadWriteReq::fsFormatListOfMemPages: { 00402 00403 tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages]; 00404 tPageOffset *= tPageSize; 00405 00406 for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { 00407 jam(); 00408 UintPtr varIndex = fsRWReq->data.listOfMemPages.varIndex[i]; 00409 00410 if (varIndex >= tNRR) { 00411 jam(); 00412 errorCode = FsRef::fsErrInvalidParameters; 00413 goto error; 00414 }//if 00415 request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; 00416 request->par.readWrite.pages[i].size = tPageSize; 00417 request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize); 00418 }//for 00419 request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; 00420 break; 00421 // make it a writev or readv 00422 }//case 00423 00424 default: { 00425 jam(); 00426 errorCode = FsRef::fsErrInvalidParameters; 00427 goto error; 00428 }//default 00429 }//switch 00430 } 00431 else if (format == FsReadWriteReq::fsFormatGlobalPage) 00432 { 00433 Ptr<GlobalPage> ptr; 00434 m_global_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]); 00435 request->par.readWrite.pages[0].buf = (char*)ptr.p; 00436 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages; 00437 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex; 00438 request->par.readWrite.numberOfPages = 1; 00439 } 00440 else 00441 { 00442 ndbrequire(format == FsReadWriteReq::fsFormatSharedPage); 00443 Ptr<GlobalPage> ptr; 00444 m_shared_page_pool.getPtr(ptr, fsRWReq->data.pageData[0]); 00445 request->par.readWrite.pages[0].buf = (char*)ptr.p; 00446 request->par.readWrite.pages[0].size = ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->numberOfPages; 00447 request->par.readWrite.pages[0].offset= ((UintPtr)GLOBAL_PAGE_SIZE)*fsRWReq->varIndex; 00448 request->par.readWrite.numberOfPages = 1; 00449 } 00450 00451 ndbrequire(forward(openFile, request)); 00452 return; 00453 00454 error: 00455 theRequestPool->put(request); 00456 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00457 fsRef->userPointer = userPointer; 00458 fsRef->setErrorCode(fsRef->errorCode, errorCode); 00459 fsRef->osErrorCode = ~0; // Indicate local error 00460 switch (action) { 00461 case Request:: write: 00462 case Request:: writeSync: { 00463 jam(); 00464 sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB); 00465 break; 00466 }//case 00467 case Request:: readPartial: 00468 case Request:: read: { 00469 jam(); 00470 sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB); 00471 }//case 00472 }//switch 00473 return; 00474 } 00475 00476 /* 00477 PR0: File Pointer , theData[0] 00478 DR0: User reference, theData[1] 00479 DR1: User Pointer, etc. 00480 DR2: Flag 00481 DR3: Var number 00482 DR4: amount of pages 00483 DR5->: Memory Page id and File page id according to Flag 00484 */ 00485 void 00486 Ndbfs::execFSWRITEREQ(Signal* signal) 00487 { 00488 jamEntry(); 00489 const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0]; 00490 00491 if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){ 00492 jam(); 00493 readWriteRequest( Request::writeSync, signal ); 00494 } else { 00495 jam(); 00496 readWriteRequest( Request::write, signal ); 00497 } 00498 } 00499 00500 /* 00501 PR0: File Pointer 00502 DR0: User reference 00503 DR1: User Pointer 00504 DR2: Flag 00505 DR3: Var number 00506 DR4: amount of pages 00507 DR5->: Memory Page id and File page id according to Flag 00508 */ 00509 void 00510 Ndbfs::execFSREADREQ(Signal* signal) 00511 { 00512 jamEntry(); 00513 FsReadWriteReq * req = (FsReadWriteReq *)signal->getDataPtr(); 00514 if (FsReadWriteReq::getPartialReadFlag(req->operationFlag)) 00515 readWriteRequest( Request::readPartial, signal ); 00516 else 00517 readWriteRequest( Request::read, signal ); 00518 } 00519 00520 /* 00521 * PR0: File Pointer DR0: User reference DR1: User Pointer 00522 */ 00523 void 00524 Ndbfs::execFSSYNCREQ(Signal * signal) 00525 { 00526 jamEntry(); 00527 Uint16 filePointer = (Uint16)signal->theData[0]; 00528 BlockReference userRef = signal->theData[1]; 00529 const UintR userPointer = signal->theData[2]; 00530 AsyncFile* openFile = theOpenFiles.find(filePointer); 00531 00532 if (openFile == NULL) { 00533 jam(); //file not open 00534 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00535 fsRef->userPointer = userPointer; 00536 fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); 00537 fsRef->osErrorCode = ~0; // Indicate local error 00538 sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB); 00539 return; 00540 } 00541 00542 Request *request = theRequestPool->get(); 00543 request->error = 0; 00544 request->action = Request::sync; 00545 request->set(userRef, userPointer, filePointer); 00546 request->file = openFile; 00547 request->theTrace = signal->getTrace(); 00548 00549 ndbrequire(forward(openFile,request)); 00550 } 00551 00552 void 00553 Ndbfs::execFSAPPENDREQ(Signal * signal) 00554 { 00555 const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0]; 00556 const Uint16 filePointer = (Uint16)fsReq->filePointer; 00557 const UintR userPointer = fsReq->userPointer; 00558 const BlockReference userRef = fsReq->userReference; 00559 const BlockNumber blockNumber = refToBlock(userRef); 00560 00561 FsRef::NdbfsErrorCodeType errorCode; 00562 00563 AsyncFile* openFile = theOpenFiles.find(filePointer); 00564 const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex]; 00565 00566 const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA; 00567 const Uint32 tSz = myBaseAddrRef->nrr; 00568 const Uint32 offset = fsReq->offset; 00569 const Uint32 size = fsReq->size; 00570 Request *request = theRequestPool->get(); 00571 00572 if (openFile == NULL) { 00573 jam(); 00574 errorCode = FsRef::fsErrFileDoesNotExist; 00575 goto error; 00576 } 00577 00578 if (myBaseAddrRef == NULL) { 00579 jam(); // Ensure that a valid variable is used 00580 errorCode = FsRef::fsErrInvalidParameters; 00581 goto error; 00582 } 00583 00584 if (fsReq->varIndex >= getBatSize(blockNumber)) { 00585 jam();// Ensure that a valid variable is used 00586 errorCode = FsRef::fsErrInvalidParameters; 00587 goto error; 00588 } 00589 00590 if(offset + size > tSz){ 00591 jam(); // Ensure that a valid variable is used 00592 errorCode = FsRef::fsErrInvalidParameters; 00593 goto error; 00594 } 00595 00596 request->error = 0; 00597 request->set(userRef, userPointer, filePointer); 00598 request->file = openFile; 00599 request->action = Request::append; 00600 request->theTrace = signal->getTrace(); 00601 00602 request->par.append.buf = (const char *)(tWA + offset); 00603 request->par.append.size = size << 2; 00604 00605 ndbrequire(forward(openFile, request)); 00606 return; 00607 00608 error: 00609 jam(); 00610 theRequestPool->put(request); 00611 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00612 fsRef->userPointer = userPointer; 00613 fsRef->setErrorCode(fsRef->errorCode, errorCode); 00614 fsRef->osErrorCode = ~0; // Indicate local error 00615 00616 jam(); 00617 sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB); 00618 return; 00619 } 00620 00621 Uint16 00622 Ndbfs::newId() 00623 { 00624 // finds a new key, eg a new filepointer 00625 for (int i = 1; i < SHRT_MAX; i++) 00626 { 00627 if (theLastId == SHRT_MAX) { 00628 jam(); 00629 theLastId = 1; 00630 } else { 00631 jam(); 00632 theLastId++; 00633 } 00634 00635 if(theOpenFiles.find(theLastId) == NULL) { 00636 jam(); 00637 return theLastId; 00638 } 00639 } 00640 ndbrequire(1 == 0); 00641 // The program will not reach this point 00642 return 0; 00643 } 00644 00645 AsyncFile* 00646 Ndbfs::createAsyncFile(){ 00647 00648 // Check limit of open files 00649 if (theFiles.size()+1 == m_maxFiles) { 00650 // Print info about all open files 00651 for (unsigned i = 0; i < theFiles.size(); i++){ 00652 AsyncFile* file = theFiles[i]; 00653 ndbout_c("%2d (0x%x): %s", i, file, file->isOpen()?"OPEN":"CLOSED"); 00654 } 00655 ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile"); 00656 } 00657 00658 AsyncFile* file = new AsyncFile(* this); 00659 file->doStart(); 00660 00661 // Put the file in list of all files 00662 theFiles.push_back(file); 00663 00664 #ifdef VM_TRACE 00665 infoEvent("NDBFS: Created new file thread %d", theFiles.size()); 00666 #endif 00667 00668 return file; 00669 } 00670 00671 AsyncFile* 00672 Ndbfs::getIdleFile(){ 00673 AsyncFile* file; 00674 if (theIdleFiles.size() > 0){ 00675 file = theIdleFiles[0]; 00676 theIdleFiles.erase(0); 00677 } else { 00678 file = createAsyncFile(); 00679 } 00680 return file; 00681 } 00682 00683 00684 00685 void 00686 Ndbfs::report(Request * request, Signal* signal) 00687 { 00688 const Uint32 orgTrace = signal->getTrace(); 00689 signal->setTrace(request->theTrace); 00690 const BlockReference ref = request->theUserReference; 00691 00692 if(!request->file->m_page_ptr.isNull()) 00693 { 00694 m_global_page_pool.release(request->file->m_page_ptr); 00695 request->file->m_page_ptr.setNull(); 00696 } 00697 00698 if (request->error) { 00699 jam(); 00700 // Initialise FsRef signal 00701 FsRef * const fsRef = (FsRef *)&signal->theData[0]; 00702 fsRef->userPointer = request->theUserPointer; 00703 if(request->error & FsRef::FS_ERR_BIT) 00704 { 00705 fsRef->errorCode = request->error; 00706 fsRef->osErrorCode = 0; 00707 } 00708 else 00709 { 00710 fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error)); 00711 fsRef->osErrorCode = request->error; 00712 } 00713 switch (request->action) { 00714 case Request:: open: { 00715 jam(); 00716 // Put the file back in idle files list 00717 theIdleFiles.push_back(request->file); 00718 sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB); 00719 break; 00720 } 00721 case Request:: closeRemove: 00722 case Request:: close: { 00723 jam(); 00724 sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB); 00725 break; 00726 } 00727 case Request:: writeSync: 00728 case Request:: writevSync: 00729 case Request:: write: 00730 case Request:: writev: { 00731 jam(); 00732 sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB); 00733 break; 00734 } 00735 case Request:: read: 00736 case Request:: readPartial: 00737 case Request:: readv: { 00738 jam(); 00739 sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB); 00740 break; 00741 } 00742 case Request:: sync: { 00743 jam(); 00744 sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB); 00745 break; 00746 } 00747 case Request::append: { 00748 jam(); 00749 sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB); 00750 break; 00751 } 00752 case Request::rmrf: { 00753 jam(); 00754 // Put the file back in idle files list 00755 theIdleFiles.push_back(request->file); 00756 sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB); 00757 break; 00758 } 00759 00760 case Request:: end: { 00761 // Report nothing 00762 break; 00763 } 00764 }//switch 00765 } else { 00766 jam(); 00767 FsConf * const fsConf = (FsConf *)&signal->theData[0]; 00768 fsConf->userPointer = request->theUserPointer; 00769 switch (request->action) { 00770 case Request:: open: { 00771 jam(); 00772 theOpenFiles.insert(request->file, request->theFilePointer); 00773 00774 // Keep track on max number of opened files 00775 if (theOpenFiles.size() > m_maxOpenedFiles) 00776 m_maxOpenedFiles = theOpenFiles.size(); 00777 00778 fsConf->filePointer = request->theFilePointer; 00779 sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB); 00780 break; 00781 } 00782 case Request:: closeRemove: 00783 case Request:: close: { 00784 jam(); 00785 // removes the file from OpenFiles list 00786 theOpenFiles.erase(request->theFilePointer); 00787 // Put the file in idle files list 00788 theIdleFiles.push_back(request->file); 00789 sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB); 00790 break; 00791 } 00792 case Request:: writeSync: 00793 case Request:: writevSync: 00794 case Request:: write: 00795 case Request:: writev: { 00796 jam(); 00797 sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB); 00798 break; 00799 } 00800 case Request:: read: 00801 case Request:: readv: { 00802 jam(); 00803 sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB); 00804 break; 00805 } 00806 case Request:: readPartial: { 00807 jam(); 00808 fsConf->bytes_read = request->par.readWrite.pages[0].size; 00809 sendSignal(ref, GSN_FSREADCONF, signal, 2, JBB); 00810 break; 00811 } 00812 case Request:: sync: { 00813 jam(); 00814 sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB); 00815 break; 00816 }//case 00817 case Request::append: { 00818 jam(); 00819 signal->theData[1] = request->par.append.size; 00820 sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB); 00821 break; 00822 } 00823 case Request::rmrf: { 00824 jam(); 00825 // Put the file in idle files list 00826 theIdleFiles.push_back(request->file); 00827 sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB); 00828 break; 00829 } 00830 case Request:: end: { 00831 // Report nothing 00832 break; 00833 } 00834 } 00835 }//if 00836 signal->setTrace(orgTrace); 00837 } 00838 00839 00840 bool 00841 Ndbfs::scanIPC(Signal* signal) 00842 { 00843 Request* request = theFromThreads.tryReadChannel(); 00844 jam(); 00845 if (request) { 00846 jam(); 00847 report(request, signal); 00848 theRequestPool->put(request); 00849 return true; 00850 } 00851 return false; 00852 } 00853 00854 #if defined NDB_WIN32 00855 Uint32 Ndbfs::translateErrno(int aErrno) 00856 { 00857 switch (aErrno) 00858 { 00859 //permission denied 00860 case ERROR_ACCESS_DENIED: 00861 00862 return FsRef::fsErrPermissionDenied; 00863 //temporary not accessible 00864 case ERROR_PATH_BUSY: 00865 case ERROR_NO_MORE_SEARCH_HANDLES: 00866 00867 return FsRef::fsErrTemporaryNotAccessible; 00868 //no space left on device 00869 case ERROR_HANDLE_DISK_FULL: 00870 case ERROR_DISK_FULL: 00871 00872 return FsRef::fsErrNoSpaceLeftOnDevice; 00873 //none valid parameters 00874 case ERROR_INVALID_HANDLE: 00875 case ERROR_INVALID_DRIVE: 00876 case ERROR_INVALID_ACCESS: 00877 case ERROR_HANDLE_EOF: 00878 case ERROR_BUFFER_OVERFLOW: 00879 00880 return FsRef::fsErrInvalidParameters; 00881 //environment error 00882 case ERROR_CRC: 00883 case ERROR_ARENA_TRASHED: 00884 case ERROR_BAD_ENVIRONMENT: 00885 case ERROR_INVALID_BLOCK: 00886 case ERROR_WRITE_FAULT: 00887 case ERROR_READ_FAULT: 00888 case ERROR_OPEN_FAILED: 00889 00890 return FsRef::fsErrEnvironmentError; 00891 00892 //no more process resources 00893 case ERROR_TOO_MANY_OPEN_FILES: 00894 case ERROR_NOT_ENOUGH_MEMORY: 00895 case ERROR_OUTOFMEMORY: 00896 return FsRef::fsErrNoMoreResources; 00897 //no file 00898 case ERROR_FILE_NOT_FOUND: 00899 return FsRef::fsErrFileDoesNotExist; 00900 00901 case ERR_ReadUnderflow: 00902 return FsRef::fsErrReadUnderflow; 00903 00904 default: 00905 return FsRef::fsErrUnknown; 00906 } 00907 } 00908 #elif defined NDB_OSE || defined NDB_SOFTOSE 00909 Uint32 Ndbfs::translateErrno(int aErrno) 00910 { 00911 switch (aErrno) 00912 { 00913 //permission denied 00914 case EACCES: 00915 case EROFS: 00916 case ENXIO: 00917 return FsRef::fsErrPermissionDenied; 00918 //temporary not accessible 00919 case EAGAIN: 00920 case ETIMEDOUT: 00921 case ENOLCK: 00922 return FsRef::fsErrTemporaryNotAccessible; 00923 //no space left on device 00924 case ENFILE: 00925 case EDQUOT: 00926 case ENOSPC: 00927 return FsRef::fsErrNoSpaceLeftOnDevice; 00928 //none valid parameters 00929 case EINVAL: 00930 case EFBIG: 00931 case EBADF: 00932 case ENAMETOOLONG: 00933 case EFAULT: 00934 case EISDIR: 00935 return FsRef::fsErrInvalidParameters; 00936 //environment error 00937 case EMLINK: 00938 case ELOOP: 00939 return FsRef::fsErrEnvironmentError; 00940 00941 //no more process resources 00942 case EMFILE: 00943 case ENOMEM: 00944 return FsRef::fsErrNoMoreResources; 00945 //no file 00946 case ENOENT: 00947 return FsRef::fsErrFileDoesNotExist; 00948 00949 case ERR_ReadUnderflow: 00950 return FsRef::fsErrReadUnderflow; 00951 00952 default: 00953 return FsRef::fsErrUnknown; 00954 } 00955 } 00956 #else 00957 Uint32 Ndbfs::translateErrno(int aErrno) 00958 { 00959 switch (aErrno) 00960 { 00961 //permission denied 00962 case EACCES: 00963 case EROFS: 00964 case ENXIO: 00965 return FsRef::fsErrPermissionDenied; 00966 //temporary not accessible 00967 case EAGAIN: 00968 case ETIMEDOUT: 00969 case ENOLCK: 00970 case EINTR: 00971 case EIO: 00972 return FsRef::fsErrTemporaryNotAccessible; 00973 //no space left on device 00974 case ENFILE: 00975 case EDQUOT: 00976 #ifdef ENOSR 00977 case ENOSR: 00978 #endif 00979 case ENOSPC: 00980 case EFBIG: 00981 return FsRef::fsErrNoSpaceLeftOnDevice; 00982 //none valid parameters 00983 case EINVAL: 00984 case EBADF: 00985 case ENAMETOOLONG: 00986 case EFAULT: 00987 case EISDIR: 00988 case ENOTDIR: 00989 case EEXIST: 00990 case ETXTBSY: 00991 return FsRef::fsErrInvalidParameters; 00992 //environment error 00993 case ELOOP: 00994 #ifdef ENOLINK 00995 case ENOLINK: 00996 #endif 00997 #ifdef EMULTIHOP 00998 case EMULTIHOP: 00999 #endif 01000 #ifdef EOPNOTSUPP 01001 case EOPNOTSUPP: 01002 #endif 01003 #ifdef ESPIPE 01004 case ESPIPE: 01005 #endif 01006 case EPIPE: 01007 return FsRef::fsErrEnvironmentError; 01008 01009 //no more process resources 01010 case EMFILE: 01011 case ENOMEM: 01012 return FsRef::fsErrNoMoreResources; 01013 //no file 01014 case ENOENT: 01015 return FsRef::fsErrFileDoesNotExist; 01016 01017 case ERR_ReadUnderflow: 01018 return FsRef::fsErrReadUnderflow; 01019 01020 default: 01021 return FsRef::fsErrUnknown; 01022 } 01023 } 01024 #endif 01025 01026 01027 01028 void 01029 Ndbfs::execCONTINUEB(Signal* signal) 01030 { 01031 jamEntry(); 01032 if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) { 01033 jam(); 01034 01035 // Also send CONTINUEB to ourself in order to scan for 01036 // incoming answers from AsyncFile on MemoryChannel theFromThreads 01037 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; 01038 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1); 01039 if (scanningInProgress == true) { 01040 jam(); 01041 return; 01042 } 01043 } 01044 if (scanIPC(signal)) { 01045 jam(); 01046 scanningInProgress = true; 01047 signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY; 01048 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB); 01049 } else { 01050 jam(); 01051 scanningInProgress = false; 01052 } 01053 return; 01054 } 01055 01056 bool Global_useO_SYNC = true; 01057 bool Global_unlinkO_CREAT = false; 01058 Uint32 Global_syncFreq = 1024 * 1024; 01059 01060 void 01061 Ndbfs::execDUMP_STATE_ORD(Signal* signal) 01062 { 01063 if(signal->theData[0] == 19){ 01064 if(signal->length() > 1){ 01065 Global_useO_SYNC = signal->theData[1]; 01066 } 01067 if(signal->length() > 2){ 01068 Global_syncFreq = signal->theData[2] * 1024 * 1024; 01069 } 01070 if(signal->length() > 3){ 01071 Global_unlinkO_CREAT = signal->theData[3]; 01072 } 01073 ndbout_c("useO_SYNC = %d syncFreq = %d unlinkO_CREATE = %d", 01074 Global_useO_SYNC, 01075 Global_syncFreq, 01076 Global_unlinkO_CREAT); 01077 return; 01078 } 01079 if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){ 01080 infoEvent("NDBFS: Files: %d Open files: %d", 01081 theFiles.size(), 01082 theOpenFiles.size()); 01083 infoEvent(" Idle files: %d Max opened files: %d", 01084 theIdleFiles.size(), 01085 m_maxOpenedFiles); 01086 infoEvent(" Max files: %d", 01087 m_maxFiles); 01088 infoEvent(" Requests: %d", 01089 theRequestPool->size()); 01090 01091 return; 01092 } 01093 if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){ 01094 infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size()); 01095 01096 for (unsigned i = 0; i < theOpenFiles.size(); i++){ 01097 AsyncFile* file = theOpenFiles.getFile(i); 01098 infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str()); 01099 } 01100 return; 01101 } 01102 if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){ 01103 infoEvent("NDBFS: Dump all files: %d", theFiles.size()); 01104 01105 for (unsigned i = 0; i < theFiles.size(); i++){ 01106 AsyncFile* file = theFiles[i]; 01107 infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); 01108 } 01109 return; 01110 } 01111 if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){ 01112 infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size()); 01113 01114 for (unsigned i = 0; i < theIdleFiles.size(); i++){ 01115 AsyncFile* file = theIdleFiles[i]; 01116 infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); 01117 } 01118 return; 01119 } 01120 01121 if(signal->theData[0] == 404) 01122 { 01123 ndbrequire(signal->getLength() == 2); 01124 Uint32 file= signal->theData[1]; 01125 AsyncFile* openFile = theOpenFiles.find(file); 01126 ndbrequire(openFile != 0); 01127 ndbout_c("File: %s %p", openFile->theFileName.c_str(), openFile); 01128 Request* curr = openFile->m_current_request; 01129 Request* last = openFile->m_last_request; 01130 if(curr) 01131 ndbout << "Current request: " << *curr << endl; 01132 if(last) 01133 ndbout << "Last request: " << *last << endl; 01134 01135 ndbout << "theReportTo " << *openFile->theReportTo << endl; 01136 ndbout << "theMemoryChannelPtr" << *openFile->theMemoryChannelPtr << endl; 01137 01138 ndbout << "All files: " << endl; 01139 for (unsigned i = 0; i < theFiles.size(); i++){ 01140 AsyncFile* file = theFiles[i]; 01141 ndbout_c("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); 01142 } 01143 } 01144 }//Ndbfs::execDUMP_STATE_ORD() 01145 01146 const char* 01147 Ndbfs::get_filename(Uint32 fd) const 01148 { 01149 jamEntry(); 01150 const AsyncFile* openFile = theOpenFiles.find(fd); 01151 if(openFile) 01152 return openFile->theFileName.get_base_name(); 01153 return ""; 01154 } 01155 01156 01157 BLOCK_FUNCTIONS(Ndbfs) 01158 01159 template class Vector<AsyncFile*>; 01160 template class Vector<OpenFiles::OpenFileItem>; 01161 template class MemoryChannel<Request>; 01162 template class Pool<Request>; 01163 template NdbOut& operator<<(NdbOut&, const MemoryChannel<Request>&);
1.4.7

