#include <mysql.h>#include <mysqld_error.h>#include <NdbApi.hpp>#include <iostream>Include dependency graph for ndbapi_async.cpp:

Go to the source code of this file.
Classes | |
| struct | async_callback_t |
| struct | transaction_t |
Defines | |
| #define | PRINT_ERROR(code, msg) |
| #define | MYSQLERROR(mysql) |
| #define | APIERROR(error) |
Functions | |
| static void | milliSleep (int milliseconds) |
| int | populate (Ndb *myNdb, int data, async_callback_t *cbData) |
| bool | asynchErrorHandler (NdbTransaction *trans, Ndb *ndb) |
| void | asynchExitHandler (Ndb *m_ndb) |
| void | closeTransaction (Ndb *ndb, async_callback_t *cb) |
| int | create_table (Ndb *myNdb) |
| static void | callback (int result, NdbTransaction *trans, void *aObject) |
| int | create_table (MYSQL &mysql) |
| int | main () |
Variables | |
| transaction_t | transaction [1024] |
| int | tempErrors = 0 |
| int | permErrors = 0 |
| static int | nPreparedTransactions = 0 |
| static int | MAX_RETRIES = 10 |
| static int | parallelism = 100 |
| #define APIERROR | ( | error | ) |
Value:
{ \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
Definition at line 79 of file ndbapi_async.cpp.
Referenced by callback(), do_delete(), do_insert(), do_read(), do_update(), executeInsertTransaction(), insert(), main(), myCreateEvent(), populate(), run_application(), scan_delete(), scan_print(), and scan_update().
| #define MYSQLERROR | ( | mysql | ) |
Value:
{ \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
Definition at line 76 of file ndbapi_async.cpp.
Referenced by create_table(), main(), and run_application().
| #define PRINT_ERROR | ( | code, | |||
| msg | ) |
Value:
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl
Definition at line 72 of file ndbapi_async.cpp.
| static bool asynchErrorHandler | ( | NdbTransaction * | trans, | |
| Ndb * | ndb | |||
| ) |
returns true if is recoverable, Error handling based on hugo false if it is an error that generates an abort.
The error code indicates a temporary error. The application should typically retry. (Includes classifications: NdbError::InsufficientSpace, NdbError::TemporaryResourceError, NdbError::NodeRecoveryError, NdbError::OverloadError, NdbError::NodeShutdown and NdbError::TimeoutExpired.)
We should sleep for a while and retry, except for insufficient space
Definition at line 230 of file ndbapi_async.cpp.
References error, NdbTransaction::getNdbError(), NdbError::InsufficientSpace, milliSleep(), NdbError::PermanentError, NdbError::Success, tempErrors, NdbError::TemporaryError, and NdbError::UnknownResult.
Referenced by callback(), and populate().
00231 { 00232 NdbError error = trans->getNdbError(); 00233 switch(error.status) 00234 { 00235 case NdbError::Success: 00236 return false; 00237 break; 00238 00239 case NdbError::TemporaryError: 00250 if(error.classification == NdbError::InsufficientSpace) 00251 return false; 00252 milliSleep(10); 00253 tempErrors++; 00254 return true; 00255 break; 00256 case NdbError::UnknownResult: 00257 std::cout << error.message << std::endl; 00258 return false; 00259 break; 00260 default: 00261 case NdbError::PermanentError: 00262 switch (error.code) 00263 { 00264 case 499: 00265 case 250: 00266 milliSleep(10); 00267 return true; // SCAN errors that can be retried. Requires restart of scan. 00268 default: 00269 break; 00270 } 00271 //ERROR 00272 std::cout << error.message << std::endl; 00273 return false; 00274 break; 00275 } 00276 return false; 00277 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void asynchExitHandler | ( | Ndb * | m_ndb | ) |
Exit function
Definition at line 220 of file ndbapi_async.cpp.
Referenced by callback(), and populate().
Here is the caller graph for this function:

| static void callback | ( | int | result, | |
| NdbTransaction * | trans, | |||
| void * | aObject | |||
| ) | [static] |
Callback executed when transaction has return from NDB
Error: Temporary or permanent?
OK! close transaction
Definition at line 160 of file ndbapi_async.cpp.
References asynchErrorHandler(), asynchExitHandler(), closeTransaction(), async_callback_t::data, milliSleep(), async_callback_t::ndb, and populate().
Referenced by Dbdict::alterTab_writeSchemaConf(), Mutex::create(), Dbdict::createObj_abort_start_done(), Dbdict::createObj_commit_start_done(), Dbdict::createObj_writeSchemaConf1(), Dbdict::createTab_commit(), Dbdict::createTab_prepare(), Dbdict::createTab_writeSchemaConf1(), Dbdict::createTab_writeTableConf(), Mutex::destroy(), Dbdict::dropObj_abort_start_done(), Dbdict::dropObj_commit_start_done(), Dbdict::execALTER_TAB_REQ(), Dbdict::execGET_TABINFO_CONF(), Logfile_client::get_log_buffer(), Mutex::lock(), main(), populate(), Pgman::process_callback(), Dbdict::restartCreateObj_prepare_start_done(), Dbdict::restartCreateTab_dihComplete(), Dbdict::restartCreateTab_writeTableConf(), Mutex::trylock(), BackupRestore::tuple_a(), and Mutex::unlock().
00161 { 00162 async_callback_t * cbData = (async_callback_t *)aObject; 00163 if (result<0) 00164 { 00168 if (asynchErrorHandler(trans, (Ndb*)cbData->ndb)) 00169 { 00170 closeTransaction((Ndb*)cbData->ndb, cbData); 00171 while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0) 00172 milliSleep(10); 00173 } 00174 else 00175 { 00176 std::cout << "Restore: Failed to restore data " 00177 << "due to a unrecoverable error. Exiting..." << std::endl; 00178 delete cbData; 00179 asynchExitHandler((Ndb*)cbData->ndb); 00180 } 00181 } 00182 else 00183 { 00187 closeTransaction((Ndb*)cbData->ndb, cbData); 00188 delete cbData; 00189 } 00190 }
Here is the call graph for this function:

Here is the caller graph for this function:

| void closeTransaction | ( | Ndb * | ndb, | |
| async_callback_t * | cb | |||
| ) |
Helper function used in callback(...)
Definition at line 148 of file ndbapi_async.cpp.
References Ndb::closeTransaction(), transaction_t::conn, ndb, async_callback_t::retries, async_callback_t::transaction, transaction, and transaction_t::used.
Referenced by callback().
00149 { 00150 ndb->closeTransaction(transaction[cb->transaction].conn); 00151 transaction[cb->transaction].conn = 0; 00152 transaction[cb->transaction].used = 0; 00153 cb->retries++; 00154 }
Here is the call graph for this function:

Here is the caller graph for this function:

| static void create_table | ( | MYSQL & | mysql | ) |
Create table "GARAGE"
Definition at line 196 of file ndbapi_async.cpp.
00197 { 00198 while (mysql_query(&mysql, 00199 "CREATE TABLE" 00200 " GARAGE" 00201 " (REG_NO INT UNSIGNED NOT NULL," 00202 " BRAND CHAR(20) NOT NULL," 00203 " COLOR CHAR(20) NOT NULL," 00204 " PRIMARY KEY USING HASH (REG_NO))" 00205 " ENGINE=NDB")) 00206 { 00207 if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) 00208 MYSQLERROR(mysql); 00209 std::cout << "MySQL Cluster already has example table: GARAGE. " 00210 << "Dropping it..." << std::endl; 00211 /************** 00212 * Drop table * 00213 **************/ 00214 if (mysql_query(&mysql, "DROP TABLE GARAGE")) 00215 MYSQLERROR(mysql); 00216 } 00217 return 1; 00218 }
| int create_table | ( | Ndb * | myNdb | ) |
Function to create table
Referenced by create_table_from_items(), create_table_precheck(), main(), mysql_execute_command(), mysql_test_create_table(), run_application(), and test_bug21206().
Here is the caller graph for this function:

| int main | ( | void | ) |
Definition at line 409 of file ndbapi_async.cpp.
References APIERROR, transaction_t::conn, Ndb_cluster_connection::connect(), create_table(), exit, Ndb::getNdbError(), Ndb::init(), milliSleep(), mysql, mysql_init(), mysql_query(), mysql_real_connect(), MYSQLERROR, ndb_init(), populate(), tempErrors, transaction, and Ndb_cluster_connection::wait_until_ready().
00410 { 00411 ndb_init(); 00412 MYSQL mysql; 00413 00414 /************************************************************** 00415 * Connect to mysql server and create table * 00416 **************************************************************/ 00417 { 00418 if ( !mysql_init(&mysql) ) { 00419 std::cout << "mysql_init failed\n"; 00420 exit(-1); 00421 } 00422 if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 00423 3306, "/tmp/mysql.sock", 0) ) 00424 MYSQLERROR(mysql); 00425 00426 mysql_query(&mysql, "CREATE DATABASE TEST_DB"); 00427 if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); 00428 00429 create_table(mysql); 00430 } 00431 00432 /************************************************************** 00433 * Connect to ndb cluster * 00434 **************************************************************/ 00435 Ndb_cluster_connection cluster_connection; 00436 if (cluster_connection.connect(4, 5, 1)) 00437 { 00438 std::cout << "Unable to connect to cluster within 30 secs." << std::endl; 00439 exit(-1); 00440 } 00441 // Optionally connect and wait for the storage nodes (ndbd's) 00442 if (cluster_connection.wait_until_ready(30,0) < 0) 00443 { 00444 std::cout << "Cluster was not ready within 30 secs.\n"; 00445 exit(-1); 00446 } 00447 00448 Ndb* myNdb = new Ndb( &cluster_connection, 00449 "TEST_DB" ); // Object representing the database 00450 if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions 00451 APIERROR(myNdb->getNdbError()); 00452 } 00453 00457 for(int i = 0 ; i < 1024 ; i++) 00458 { 00459 transaction[i].used = 0; 00460 transaction[i].conn = 0; 00461 00462 } 00463 int i=0; 00467 while(i < 20000) 00468 { 00469 while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again. 00470 milliSleep(10); 00471 00472 i++; 00473 } 00474 std::cout << "Number of temporary errors: " << tempErrors << std::endl; 00475 delete myNdb; 00476 }
Here is the call graph for this function:

| static void milliSleep | ( | int | milliseconds | ) | [static] |
Helper sleep function
Definition at line 61 of file ndbapi_async.cpp.
Referenced by asynchErrorHandler(), callback(), main(), populate(), scan_delete(), scan_print(), and scan_update().
00061 { 00062 struct timeval sleeptime; 00063 sleeptime.tv_sec = milliseconds / 1000; 00064 sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; 00065 select(0, 0, 0, 0, &sleeptime); 00066 }
Here is the caller graph for this function:

| int populate | ( | Ndb * | myNdb, | |
| int | data, | |||
| async_callback_t * | cbData | |||
| ) |
Prepare and send transaction
We already have a callback This is an absolutely new transaction
We already have a callback
Set data used by the callback
no transaction to close since conn == null
When we have prepared parallelism number of transactions -> send the transaction to ndb. Next time we will deal with the transactions are in the callback. There we will see which ones that were successful and which ones to retry.
Definition at line 289 of file ndbapi_async.cpp.
References APIERROR, asynchErrorHandler(), asynchExitHandler(), callback(), Ndb::closeTransaction(), NdbTransaction::Commit, transaction_t::conn, NdbOperation::equal(), NdbTransaction::executeAsynchPrepare(), Ndb::getDictionary(), NdbDictionary::Dictionary::getNdbError(), NdbTransaction::getNdbOperation(), NdbDictionary::Dictionary::getTable(), NdbOperation::insertTuple(), MAX_RETRIES, milliSleep(), nPreparedTransactions, NULL, parallelism, async_callback_t::retries, Ndb::sendPollNdb(), NdbOperation::setValue(), Ndb::startTransaction(), transaction, and transaction_t::used.
Referenced by callback(), and main().
00290 { 00291 00292 NdbOperation* myNdbOperation; // For operations 00293 const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); 00294 const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); 00295 if (myTable == NULL) 00296 APIERROR(myDict->getNdbError()); 00297 00298 async_callback_t * cb; 00299 int retries = 0; 00300 int current = 0; 00301 for(int i=0; i<1024; i++) 00302 { 00303 if(transaction[i].used == 0) 00304 { 00305 current = i; 00306 if (cbData == 0) 00307 { 00312 cb = new async_callback_t; 00313 cb->retries = 0; 00314 } 00315 else 00316 { 00320 cb =cbData; 00321 retries = cbData->retries; 00322 } 00326 cb->ndb = myNdb; //handle to Ndb object so that we can close transaction 00327 // in the callback (alt. make myNdb global). 00328 00329 cb->data = data; //this is the data we want to insert 00330 cb->transaction = current; //This is the number (id) of this transaction 00331 transaction[current].used = 1 ; //Mark the transaction as used 00332 break; 00333 } 00334 } 00335 if(!current) 00336 return -1; 00337 00338 while(retries < MAX_RETRIES) 00339 { 00340 transaction[current].conn = myNdb->startTransaction(); 00341 if (transaction[current].conn == NULL) { 00342 if (asynchErrorHandler(transaction[current].conn, myNdb)) 00343 { 00347 milliSleep(10); 00348 retries++; 00349 continue; 00350 } 00351 asynchExitHandler(myNdb); 00352 } 00353 myNdbOperation = transaction[current].conn->getNdbOperation(myTable); 00354 if (myNdbOperation == NULL) 00355 { 00356 if (asynchErrorHandler(transaction[current].conn, myNdb)) 00357 { 00358 myNdb->closeTransaction(transaction[current].conn); 00359 transaction[current].conn = 0; 00360 milliSleep(10); 00361 retries++; 00362 continue; 00363 } 00364 asynchExitHandler(myNdb); 00365 } // if 00366 if(myNdbOperation->insertTuple() < 0 || 00367 myNdbOperation->equal("REG_NO", data) < 0 || 00368 myNdbOperation->setValue("BRAND", "Mercedes") <0 || 00369 myNdbOperation->setValue("COLOR", "Blue") < 0) 00370 { 00371 if (asynchErrorHandler(transaction[current].conn, myNdb)) 00372 { 00373 myNdb->closeTransaction(transaction[current].conn); 00374 transaction[current].conn = 0; 00375 retries++; 00376 milliSleep(10); 00377 continue; 00378 } 00379 asynchExitHandler(myNdb); 00380 } 00381 00382 /*Prepare transaction (the transaction is NOT yet sent to NDB)*/ 00383 transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, 00384 &callback, 00385 cb); 00393 if (nPreparedTransactions == parallelism-1) 00394 { 00395 // send-poll all transactions 00396 // close transaction is done in callback 00397 myNdb->sendPollNdb(3000, parallelism ); 00398 nPreparedTransactions=0; 00399 } 00400 else 00401 nPreparedTransactions++; 00402 return 1; 00403 } 00404 std::cout << "Unable to recover from errors. Exiting..." << std::endl; 00405 asynchExitHandler(myNdb); 00406 return -1; 00407 }
Here is the call graph for this function:

Here is the caller graph for this function:

int MAX_RETRIES = 10 [static] |
int nPreparedTransactions = 0 [static] |
int parallelism = 100 [static] |
Definition at line 281 of file ndbapi_async.cpp.
Referenced by BackupRestore::BackupRestore(), Backup::execBACKUP_FRAGMENT_REQ(), Suma::SyncRecord::nextScan(), and populate().
| int permErrors = 0 |
Definition at line 145 of file ndbapi_async.cpp.
| int tempErrors = 0 |
stat. variables
Definition at line 144 of file ndbapi_async.cpp.
Referenced by asynchErrorHandler(), and main().
| transaction_t transaction[1024] |
Free list holding transactions
Definition at line 108 of file ndbapi_async.cpp.
Referenced by closeTransaction(), main(), and populate().
1.4.7

