#include <NdbApi.hpp>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <string.h>
#define APIERROR(error)
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:"
<< error.code << ", msg: " << error.message << "." << std::endl;
exit(-1); }
int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnName,
const int noEventColumnName);
static void do_blank(Ndb*, int);
int main(int argc, char** argv)
{
if (argc < 1)
{
std::cout << "Arguments are <connect_string cluster>.n";
exit(-1);
}
const char *connectstring = argv[1];
ndb_init();
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(connectstring); // Object representing the cluster
int r= cluster_connection->connect(5 /* retries */,
3 /* delay between retries */,
1 /* verbose */);
if (r > 0)
{
std::cout << "Cluster connect failed, possibly resolved with more retries.n";
exit(-1);
}
else if (r < 0)
{
std::cout << "Cluster connect failed.n";
exit(-1);
}
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs." << std::endl;
exit(-1);
}
Ndb* myNdb= new Ndb(cluster_connection,
"clusterdb"); // Object representing the database
if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
const char *eventName= "CHNG_IN_ASSETS";
const char *eventTableName= "ASSETS";
const int noEventColumnName= 2;
const char *eventColumnName[noEventColumnName]=
{"CODE",
"VALUE"};
// Create events
myCreateEvent(myNdb,
eventName,
eventTableName,
eventColumnName,
noEventColumnName);
// Normal values and blobs are unfortunately handled differently..
typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
int i;
// Start "transaction" for handling events
NdbEventOperation* op;
printf("create EventOperationn");
if ((op = myNdb->createEventOperation(eventName)) == NULL)
APIERROR(myNdb->getNdbError());
printf("get valuesn");
RA_BH recAttr[noEventColumnName];
RA_BH recAttrPre[noEventColumnName];
for (i = 0; i < noEventColumnName; i++) {
recAttr[i].ra = op->getValue(eventColumnName[i]);
recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
}
// set up the callbacks
// This starts changes to "start flowing"
if (op->execute())
APIERROR(op->getNdbError());
while (true) {
int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
if (r > 0) {
while ((op= myNdb->nextEvent())) {
NdbRecAttr* ra = recAttr[0].ra;
if (ra->isNULL() >= 0) { // we have a value
if (ra->isNULL() == 0) { // we have a non-null value
printf("CODE: %d ", ra->u_32_value());
do_blank(myNdb, ra->u_32_value());
} else
printf("%-5s", "NULL");
} else
printf("%-5s", "-"); // no value
ra = recAttr[1].ra;
printf("n");
}
}
}
}
int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnNames,
const int noEventColumnNames)
{
NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
if (!myDict) APIERROR(myNdb->getNdbError());
const NdbDictionary::Table *table= myDict->getTable(eventTableName);
if (!table) APIERROR(myDict->getNdbError());
NdbDictionary::Event myEvent(eventName, *table);
myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
// Add event to database
if (myDict->createEvent(myEvent) == 0)
myEvent.print();
else if (myDict->getNdbError().classification ==
NdbError::SchemaObjectExists) {
printf("Event creation failed, event existsn");
printf("dropping Event...n");
if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
// try again
// Add event to database
if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
} else
APIERROR(myDict->getNdbError());
return 0;
}
static void do_blank(Ndb* myNdb, int code)
{
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("ASSETS");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
printf("Replacing VALUE with 0 for CODE: %d ", code);
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->updateTuple();
myOperation->equal("CODE", code);
myOperation->setValue("VALUE", 0);
if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
myNdb->closeTransaction(myTransaction);
}
shell> slave_filter 127.0.0.1:1186