If you have asynchronous replication where the slave database is using MySQL Cluster then you can use the NDB API events functionality to mask/overwrite data. You might do this for example if the replica is to be used for generating reports where some of the data is sensitive and not relevant to those reports. Unlike stored procedures, NDB API events will be triggered on the slave.
The first step is to set up replication (master->slave rather than multi-master) as described in Setting up MySQL Asynchronous Replication for High Availability).
In this example, the following table definition is used:
1
2
|
mysql> use clusterdb; mysql> create table ASSETS (CODE int not null primary key, VALUE int) engine=ndb; |
The following code should be compiled and then executed on a node within the slave Cluster:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
#include <NdbApi.hpp> #include <stdio.h> #include <iostream> #include <unistd.h> #include <cstdlib> #include <string.h> #define APIERROR(error) { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" << error.code << ", msg: " << error.message << "." << std::endl; exit(-1); } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName); static void do_blank(Ndb*, int); int main(int argc, char** argv) { if (argc < 1) { std::cout << "Arguments are <connect_string cluster>.n"; exit(-1); } const char *connectstring = argv[1]; ndb_init(); Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(connectstring); // Object representing the cluster int r= cluster_connection->connect(5 /* retries */, 3 /* delay between retries */, 1 /* verbose */); if (r > 0) { std::cout << "Cluster connect failed, possibly resolved with more retries.n"; exit(-1); } else if (r < 0) { std::cout << "Cluster connect failed.n"; exit(-1); } if (cluster_connection->wait_until_ready(30,30)) { std::cout << "Cluster was not ready within 30 secs." << std::endl; exit(-1); } Ndb* myNdb= new Ndb(cluster_connection, "clusterdb"); // Object representing the database if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); const char *eventName= "CHNG_IN_ASSETS"; const char *eventTableName= "ASSETS"; const int noEventColumnName= 2; const char *eventColumnName[noEventColumnName]= {"CODE", "VALUE"}; // Create events myCreateEvent(myNdb, eventName, eventTableName, eventColumnName, noEventColumnName); // Normal values and blobs are unfortunately handled differently.. typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH; int i; // Start "transaction" for handling events NdbEventOperation* op; printf("create EventOperationn"); if ((op = myNdb->createEventOperation(eventName)) == NULL) APIERROR(myNdb->getNdbError()); printf("get valuesn"); RA_BH recAttr[noEventColumnName]; RA_BH recAttrPre[noEventColumnName]; for (i = 0; i < noEventColumnName; i++) { recAttr[i].ra = op->getValue(eventColumnName[i]); recAttrPre[i].ra = op->getPreValue(eventColumnName[i]); } // set up the callbacks // This starts changes to "start flowing" if (op->execute()) APIERROR(op->getNdbError()); while (true) { int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r > 0) { while ((op= myNdb->nextEvent())) { NdbRecAttr* ra = recAttr[0].ra; if (ra->isNULL() >= 0) { // we have a value if (ra->isNULL() == 0) { // we have a non-null value printf("CODE: %d ", ra->u_32_value()); do_blank(myNdb, ra->u_32_value()); } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); // no value ra = recAttr[1].ra; printf("n"); } } } } int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnNames, const int noEventColumnNames) { NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); const NdbDictionary::Table *table= myDict->getTable(eventTableName); if (!table) APIERROR(myDict->getNdbError()); NdbDictionary::Event myEvent(eventName, *table); myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT); myEvent.addEventColumns(noEventColumnNames, eventColumnNames); // Add event to database if (myDict->createEvent(myEvent) == 0) myEvent.print(); else if (myDict->getNdbError().classification == NdbError::SchemaObjectExists) { printf("Event creation failed, event existsn"); printf("dropping Event...n"); if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError()); // try again // Add event to database if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError()); } else APIERROR(myDict->getNdbError()); return 0; } static void do_blank(Ndb* myNdb, int code) { const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("ASSETS"); if (myTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb->startTransaction(); if (myTransaction == NULL) APIERROR(myNdb->getNdbError()); printf("Replacing VALUE with 0 for CODE: %d ", code); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->updateTuple(); myOperation->equal("CODE", code); myOperation->setValue("VALUE", 0); if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); myNdb->closeTransaction(myTransaction); } shell> slave_filter 127.0.0.1:1186 |
From the master Cluster, insert some values (note that the example can easily be extended to cover updates too):
1 |
mysql> insert into ASSETS values (101, 50),(102, 40), (103, 99); |
and then check that on the slave the value has been set to 0 for each of the entries:
1
2
3
4
5
6
7
8
9
|
mysql> select * from ASSETS; +------+-------+ | CODE | VALUE | +------+-------+ | 100 | 0 | | 103 | 0 | | 101 | 0 | | 102 | 0 | +------+-------+ |
How this works…. The table data is replicated as normal and the real values are stored in the slave. The “slave_filter” process has registered against insert operations on this table and when it’s triggered it sets the VALUE field to 0. The event is processes asynchronously from the replication and so there will be some very narrow window during which the true values would be stored in the slave.