The world's most popular open source database
00001 /* Copyright (C) 2003 MySQL AB 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; either version 2 of the License, or 00006 (at your option) any later version. 00007 00008 This program is distributed in the hope that it will be useful, 00009 but WITHOUT ANY WARRANTY; without even the implied warranty of 00010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00011 GNU General Public License for more details. 00012 00013 You should have received a copy of the GNU General Public License 00014 along with this program; if not, write to the Free Software 00015 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 00016 00017 //#define TESTDEBUG 1 00018 00019 #include <ndb_global.h> 00020 00021 #include <kernel_types.h> 00022 #include <Pool.hpp> 00023 #include "AsyncFile.hpp" 00024 #include "NdbOut.hpp" 00025 #include "NdbTick.h" 00026 #include "NdbThread.h" 00027 #include "NdbMain.h" 00028 00029 // Test and benchmark functionality of AsyncFile 00030 // -n Number of files 00031 // -r Number of simultaneous requests 00032 // -s Filesize, number of pages 00033 // -l Number of iterations 00034 // -remove, remove files after close 00035 // -reverse, write files in reverse order, start with the last page 00036 00037 #define MAXFILES 255 00038 #define DEFAULT_NUM_FILES 1 00039 #define MAXREQUESTS 256 00040 #define DEFAULT_NUM_REQUESTS 1 00041 #define MAXFILESIZE 4096 00042 #define DEFAULT_FILESIZE 2048 00043 #define FVERSION 0x01000000 00044 #define PAGESIZE 8192 00045 00046 #define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond() 00047 #define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();\ 00048 Uint64 totaltime = (stoptick-starttick); \ 00049 ndbout << ops << " " << str << \ 00050 " total time " << (int)totaltime << "ms" << endl;\ 00051 char buf[255];\ 00052 sprintf(buf, "%d %s/sec\n",(int)((ops*1000)/totaltime), str);\ 00053 ndbout <<buf << endl;} 00054 00055 static int numberOfFiles = DEFAULT_NUM_FILES; 00056 static int numberOfRequests = DEFAULT_NUM_REQUESTS; 00057 static int fileSize = DEFAULT_FILESIZE; 00058 static int removeFiles = 0; 00059 static int writeFilesReverse = 0; 00060 static int numberOfIterations = 1; 00061 Uint32 FileNameArray[4]; 00062 00063 Pool<AsyncFile>* files; 00064 AsyncFile* openFiles[MAXFILES]; 00065 Pool<Request>* theRequestPool; 00066 MemoryChannelMultipleWriter<Request>* theReportChannel; 00067 00068 char WritePages[MAXFILES][PAGESIZE]; 00069 char ReadPages[MAXFILES][PAGESIZE]; 00070 00071 int readArguments(int argc, const char** argv); 00072 int openFile(int fileNum); 00073 int openFileWait(); 00074 int closeFile(int fileNum); 00075 int closeFileWait(); 00076 int writeFile( int fileNum, int pagenum); 00077 int writeFileWait(); 00078 int writeSyncFile( int fileNum, int pagenum); 00079 int writeSyncFileWait(); 00080 int readFile( int fileNum, int pagenum); 00081 int readFileWait(); 00082 00083 00084 NDB_COMMAND(aftest, "aftest", "aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]", "Test the AsyncFile class of Ndb", 8192) 00085 { 00086 int s, numReq, numOps; 00087 00088 readArguments(argc, argv); 00089 00090 files = new Pool<AsyncFile>(numberOfFiles, 2); 00091 theRequestPool = new Pool<Request>; 00092 theReportChannel = new MemoryChannelMultipleWriter<Request>; 00093 00094 ndbout << "AsyncFileTest starting" << endl; 00095 ndbout << " " << numberOfFiles << " files" << endl; 00096 ndbout << " " << numberOfRequests << " requests" << endl; 00097 ndbout << " " << fileSize << " * 8k files" << endl << endl; 00098 ndbout << " " << numberOfIterations << " iterations" << endl << endl; 00099 00100 NdbThread_SetConcurrencyLevel(numberOfFiles+2); 00101 00102 // initialize data to write to files 00103 for (int i = 0; i < MAXFILES; i++) { 00104 for (int j = 0; j < PAGESIZE; j++){ 00105 WritePages[i][j] = (64+i+j)%256; 00106 } 00107 // memset(&WritePages[i][0], i+64, PAGESIZE); 00108 } 00109 00110 // Set file directory and name 00111 // /T27/F27/NDBFS/S27Pnn.data 00112 FileNameArray[0] = 27; // T27 00113 FileNameArray[1] = 27; // F27 00114 FileNameArray[2] = 27; // S27 00115 FileNameArray[3] = FVERSION; // Version 00116 00117 for (int l = 0; l < numberOfIterations; l++) 00118 { 00119 00120 ndbout << "Opening files" << endl; 00121 // Open files 00122 for (int f = 0; f < numberOfFiles; f++) 00123 { 00124 openFile(f); 00125 00126 } 00127 00128 // Wait for answer 00129 openFileWait(); 00130 00131 ndbout << "Files opened!" << endl<< endl; 00132 00133 // Write to files 00134 ndbout << "Started writing" << endl; 00135 TIMER_START; 00136 s = 0; 00137 numReq = 0; 00138 numOps = 0; 00139 while ( s < fileSize) 00140 { 00141 for (int r = 0; r < numberOfRequests; r++) 00142 { 00143 for (int f = 0; f < numberOfFiles; f++) 00144 { 00145 writeFile(f, s); 00146 numReq++; 00147 numOps++; 00148 } 00149 00150 s++; 00151 } 00152 00153 while (numReq > 0) 00154 { 00155 writeFileWait(); 00156 numReq--; 00157 } 00158 00159 } 00160 00161 TIMER_PRINT("writes", numOps); 00162 00163 00164 ndbout << "Started reading" << endl; 00165 TIMER_START; 00166 00167 // Read from files 00168 s = 0; 00169 numReq = 0; 00170 numOps = 0; 00171 while ( s < fileSize) 00172 { 00173 for (int r = 0; r < numberOfRequests; r++) 00174 { 00175 for (int f = 0; f < numberOfFiles; f++) 00176 { 00177 readFile(f, s); 00178 numReq++; 00179 numOps++; 00180 } 00181 00182 s++; 00183 00184 } 00185 00186 while (numReq > 0) 00187 { 00188 readFileWait(); 00189 numReq--; 00190 } 00191 00192 } 00193 TIMER_PRINT("reads", numOps); 00194 00195 ndbout << "Started writing with sync" << endl; 00196 TIMER_START; 00197 00198 // Write to files 00199 s = 0; 00200 numReq = 0; 00201 numOps = 0; 00202 while ( s < fileSize) 00203 { 00204 for (int r = 0; r < numberOfRequests; r++) 00205 { 00206 for (int f = 0; f < numberOfFiles; f++) 00207 { 00208 writeSyncFile(f, s); 00209 numReq++; 00210 numOps++; 00211 } 00212 00213 s++; 00214 } 00215 00216 while (numReq > 0) 00217 { 00218 writeSyncFileWait(); 00219 numReq--; 00220 } 00221 00222 } 00223 00224 TIMER_PRINT("writeSync", numOps); 00225 00226 // Close files 00227 ndbout << "Closing files" << endl; 00228 for (int f = 0; f < numberOfFiles; f++) 00229 { 00230 closeFile(f); 00231 00232 } 00233 00234 // Wait for answer 00235 closeFileWait(); 00236 00237 ndbout << "Files closed!" << endl<< endl; 00238 } 00239 00240 // Deallocate memory 00241 delete files; 00242 delete theReportChannel; 00243 delete theRequestPool; 00244 00245 return 0; 00246 00247 } 00248 00249 00250 00251 int forward( AsyncFile * file, Request* request ) 00252 { 00253 file->execute(request); 00254 ERROR_CHECK 0; 00255 return 1; 00256 } 00257 00258 int openFile( int fileNum) 00259 { 00260 AsyncFile* file = (AsyncFile *)files->get(); 00261 00262 FileNameArray[3] = fileNum | FVERSION; 00263 file->fileName().set( NDBFS_REF, &FileNameArray[0] ); 00264 ndbout << "openFile: " << file->fileName().c_str() << endl; 00265 00266 if( ERROR_STATE ) { 00267 ERROR_RESET; 00268 files->put( file ); 00269 ndbout << "Failed to set filename" << endl; 00270 return 1; 00271 } 00272 file->reportTo(theReportChannel); 00273 00274 Request* request = theRequestPool->get(); 00275 request->action= Request::open; 00276 request->error= 0; 00277 request->par.open.flags = 0x302; //O_RDWR | O_CREAT | O_TRUNC ; // 770 00278 request->set(NDBFS_REF, 0x23456789, fileNum ); 00279 request->file = file; 00280 00281 if (!forward(file,request)) { 00282 // Something went wrong 00283 ndbout << "Could not forward open request" << endl; 00284 theRequestPool->put(request); 00285 return 1; 00286 } 00287 return 0; 00288 } 00289 00290 int closeFile( int fileNum) 00291 { 00292 00293 AsyncFile* file = openFiles[fileNum]; 00294 00295 Request* request = theRequestPool->get(); 00296 if (removeFiles == 1) 00297 request->action = Request::closeRemove; 00298 else 00299 request->action= Request::close; 00300 00301 request->error= 0; 00302 request->set(NDBFS_REF, 0x23456789, fileNum ); 00303 request->file = file; 00304 00305 if (!forward(file,request)) { 00306 // Something went wrong 00307 ndbout << "Could not forward close request" << endl; 00308 theRequestPool->put(request); 00309 return 1; 00310 } 00311 return 0; 00312 } 00313 00314 int writeFile( int fileNum, int pagenum) 00315 { 00316 AsyncFile* file = openFiles[fileNum]; 00317 #ifdef TESTDEBUG 00318 ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str()<< endl; 00319 #endif 00320 Request *request = theRequestPool->get(); 00321 request->action = Request::write; 00322 request->error = 0; 00323 request->set(NDBFS_REF, pagenum, fileNum); 00324 request->file = openFiles[fileNum]; 00325 00326 // Write only one page, choose the correct page for each file using fileNum 00327 request->par.readWrite.pages[0].buf = &WritePages[fileNum][0]; 00328 request->par.readWrite.pages[0].size = PAGESIZE; 00329 if (writeFilesReverse == 1) 00330 { 00331 // write the last page in the files first 00332 // This is a normal way for the Blocks in Ndb to write to a file 00333 request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE; 00334 } 00335 else 00336 { 00337 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; 00338 } 00339 request->par.readWrite.numberOfPages = 1; 00340 00341 if (!forward(file,request)) { 00342 // Something went wrong 00343 ndbout << "Could not forward write request" << endl; 00344 theRequestPool->put(request); 00345 return 1; 00346 } 00347 return 0; 00348 00349 } 00350 00351 int writeSyncFile( int fileNum, int pagenum) 00352 { 00353 AsyncFile* file = openFiles[fileNum]; 00354 #ifdef TESTDEBUG 00355 ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl; 00356 #endif 00357 Request *request = theRequestPool->get(); 00358 request->action = Request::writeSync; 00359 request->error = 0; 00360 request->set(NDBFS_REF, pagenum, fileNum); 00361 request->file = openFiles[fileNum]; 00362 00363 // Write only one page, choose the correct page for each file using fileNum 00364 request->par.readWrite.pages[0].buf = &WritePages[fileNum][0]; 00365 request->par.readWrite.pages[0].size = PAGESIZE; 00366 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; 00367 request->par.readWrite.numberOfPages = 1; 00368 00369 if (!forward(file,request)) { 00370 // Something went wrong 00371 ndbout << "Could not forward write request" << endl; 00372 theRequestPool->put(request); 00373 return 1; 00374 } 00375 return 0; 00376 00377 } 00378 00379 int readFile( int fileNum, int pagenum) 00380 { 00381 AsyncFile* file = openFiles[fileNum]; 00382 #ifdef TESTDEBUG 00383 ndbout << "readFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl; 00384 #endif 00385 Request *request = theRequestPool->get(); 00386 request->action = Request::read; 00387 request->error = 0; 00388 request->set(NDBFS_REF, pagenum, fileNum); 00389 request->file = openFiles[fileNum]; 00390 00391 // Read only one page, choose the correct page for each file using fileNum 00392 request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0]; 00393 request->par.readWrite.pages[0].size = PAGESIZE; 00394 request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; 00395 request->par.readWrite.numberOfPages = 1; 00396 00397 if (!forward(file,request)) { 00398 // Something went wrong 00399 ndbout << "Could not forward read request" << endl; 00400 theRequestPool->put(request); 00401 return 1; 00402 } 00403 return 0; 00404 00405 } 00406 00407 int openFileWait() 00408 { 00409 int openedFiles = 0; 00410 while (openedFiles < numberOfFiles) 00411 { 00412 Request* request = theReportChannel->readChannel(); 00413 if (request) 00414 { 00415 if (request->action == Request::open) 00416 { 00417 if (request->error ==0) 00418 { 00419 #ifdef TESTDEBUG 00420 ndbout << "Opened file " << request->file->fileName().c_str() << endl; 00421 #endif 00422 openFiles[request->theFilePointer] = request->file; 00423 } 00424 else 00425 { 00426 ndbout << "error while opening file" << endl; 00427 exit(1); 00428 } 00429 theRequestPool->put(request); 00430 openedFiles++; 00431 } 00432 else 00433 { 00434 ndbout << "Unexpected request received" << endl; 00435 } 00436 } 00437 else 00438 { 00439 ndbout << "Nothing read from theReportChannel" << endl; 00440 } 00441 } 00442 return 0; 00443 } 00444 00445 int closeFileWait() 00446 { 00447 int closedFiles = 0; 00448 while (closedFiles < numberOfFiles) 00449 { 00450 Request* request = theReportChannel->readChannel(); 00451 if (request) 00452 { 00453 if (request->action == Request::close || request->action == Request::closeRemove) 00454 { 00455 if (request->error ==0) 00456 { 00457 #ifdef TESTDEBUG 00458 ndbout << "Closed file " << request->file->fileName().c_str() << endl; 00459 #endif 00460 openFiles[request->theFilePointer] = NULL; 00461 files->put(request->file); 00462 } 00463 else 00464 { 00465 ndbout << "error while closing file" << endl; 00466 exit(1); 00467 } 00468 theRequestPool->put(request); 00469 closedFiles++; 00470 } 00471 else 00472 { 00473 ndbout << "Unexpected request received" << endl; 00474 } 00475 } 00476 else 00477 { 00478 ndbout << "Nothing read from theReportChannel" << endl; 00479 } 00480 } 00481 return 0; 00482 } 00483 00484 int writeFileWait() 00485 { 00486 Request* request = theReportChannel->readChannel(); 00487 if (request) 00488 { 00489 if (request->action == Request::write) 00490 { 00491 if (request->error == 0) 00492 { 00493 #ifdef TESTDEBUG 00494 ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; 00495 #endif 00496 00497 } 00498 else 00499 { 00500 ndbout << "error while writing file, error=" << request->error << endl; 00501 exit(1); 00502 } 00503 theRequestPool->put(request); 00504 } 00505 else 00506 { 00507 ndbout << "Unexpected request received" << endl; 00508 } 00509 } 00510 else 00511 { 00512 ndbout << "Nothing read from theReportChannel" << endl; 00513 } 00514 return 0; 00515 } 00516 00517 int writeSyncFileWait() 00518 { 00519 Request* request = theReportChannel->readChannel(); 00520 if (request) 00521 { 00522 if (request->action == Request::writeSync) 00523 { 00524 if (request->error == 0) 00525 { 00526 #ifdef TESTDEBUG 00527 ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; 00528 #endif 00529 00530 } 00531 else 00532 { 00533 ndbout << "error while writing file" << endl; 00534 exit(1); 00535 } 00536 theRequestPool->put(request); 00537 } 00538 else 00539 { 00540 ndbout << "Unexpected request received" << endl; 00541 } 00542 } 00543 else 00544 { 00545 ndbout << "Nothing read from theReportChannel" << endl; 00546 } 00547 return 0; 00548 } 00549 00550 int readFileWait() 00551 { 00552 Request* request = theReportChannel->readChannel(); 00553 if (request) 00554 { 00555 if (request->action == Request::read) 00556 { 00557 if (request->error == 0) 00558 { 00559 #ifdef TESTDEBUG 00560 ndbout << "readFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; 00561 #endif 00562 if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0) 00563 { 00564 ndbout <<"Verification error!" << endl; 00565 for (int i = 0; i < PAGESIZE; i++ ){ 00566 ndbout <<" Compare Page " << i << " : " << ReadPages[request->theFilePointer][i] <<", " <<WritePages[request->theFilePointer][i] << endl;; 00567 if( ReadPages[request->theFilePointer][i] !=WritePages[request->theFilePointer][i]) 00568 00569 exit(1); 00570 } 00571 } 00572 00573 } 00574 else 00575 { 00576 ndbout << "error while reading file" << endl; 00577 exit(1); 00578 } 00579 theRequestPool->put(request); 00580 } 00581 else 00582 { 00583 ndbout << "Unexpected request received" << endl; 00584 } 00585 } 00586 else 00587 { 00588 ndbout << "Nothing read from theReportChannel" << endl; 00589 } 00590 return 0; 00591 } 00592 00593 int readArguments(int argc, const char** argv) 00594 { 00595 00596 int i = 1; 00597 while (argc > 1) 00598 { 00599 if (strcmp(argv[i], "-n") == 0) 00600 { 00601 numberOfFiles = atoi(argv[i+1]); 00602 if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES)) 00603 { 00604 ndbout << "Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl; 00605 numberOfFiles = DEFAULT_NUM_FILES; 00606 } 00607 } 00608 else if (strcmp(argv[i], "-r") == 0) 00609 { 00610 numberOfRequests = atoi(argv[i+1]); 00611 if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS)) 00612 { 00613 ndbout << "Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl; 00614 numberOfRequests = DEFAULT_NUM_REQUESTS; 00615 } 00616 } 00617 else if (strcmp(argv[i], "-s") == 0) 00618 { 00619 fileSize = atoi(argv[i+1]); 00620 if ((fileSize < 1) || (fileSize > MAXFILESIZE)) 00621 { 00622 ndbout << "Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl; 00623 fileSize = DEFAULT_FILESIZE; 00624 } 00625 } 00626 else if (strcmp(argv[i], "-l") == 0) 00627 { 00628 numberOfIterations = atoi(argv[i+1]); 00629 if ((numberOfIterations < 1)) 00630 { 00631 ndbout << "Wrong number of iterations, default = 1" << endl; 00632 numberOfIterations = 1; 00633 } 00634 } 00635 else if (strcmp(argv[i], "-remove") == 0) 00636 { 00637 removeFiles = 1; 00638 argc++; 00639 i--; 00640 } 00641 else if (strcmp(argv[i], "-reverse") == 0) 00642 { 00643 ndbout << "Writing files reversed" << endl; 00644 writeFilesReverse = 1; 00645 argc++; 00646 i--; 00647 } 00648 00649 argc -= 2; 00650 i = i + 2; 00651 } 00652 00653 if ((fileSize % numberOfRequests)!= 0) 00654 { 00655 numberOfRequests = numberOfRequests - (fileSize % numberOfRequests); 00656 ndbout <<"numberOfRequest must be modulo of filesize" << endl; 00657 ndbout << "New numberOfRequest="<<numberOfRequests<<endl; 00658 } 00659 return 0; 00660 } 00661 00662 00663 // Needed for linking... 00664 00665 void ErrorReporter::handleError(ErrorCategory type, int messageID, 00666 const char* problemData, const char* objRef, NdbShutdownType stype) 00667 { 00668 00669 ndbout << "ErrorReporter::handleError activated" << endl; 00670 ndbout << "type= " << type << endl; 00671 ndbout << "messageID= " << messageID << endl; 00672 ndbout << "problemData= " << problemData << endl; 00673 ndbout << "objRef= " << objRef << endl; 00674 00675 exit(1); 00676 } 00677 00678 void ErrorReporter::handleAssert(const char* message, const char* file, int line) 00679 { 00680 ndbout << "ErrorReporter::handleAssert activated" << endl; 00681 ndbout << "message= " << message << endl; 00682 ndbout << "file= " << file << endl; 00683 ndbout << "line= " << line << endl; 00684 exit(1); 00685 } 00686 00687 00688 GlobalData globalData; 00689 00690 00691 Signal::Signal() 00692 { 00693 00694 } 00695
1.4.7

