WL#6827: Group Replication: Interface for Applying Log Events
Affects: Server-5.7
—
Status: Complete
EXECUTIVE SUMMARY ================= This task captures the design and implementation of an interface for the purpose of directly applying binary log events. PROBLEM STATEMENT ================= Events are applied by resorting to their Log event::apply event member function. This is a virtual function and is implemented by event type. Currently this is too tightly coupled with the existing MySQL master-slave replication protocol, in particular with the SQL thread and position update. Not to mention that these apply functions are overloaded with additional logics, such as filtering and so forth. We need to have a clean applier that only handles the procedure of actually installing the updates. Filtering should be handled before that and positioning update should be done at the end of a transaction, before the commit. Also, perhaps we should not filter per event, but rather per transaction, since we are handling onle transactional units of work. For example (pseudo code and error handling left out on purpose to not clutter the example): if filter . check ( transaction ): begin ( transaction ) for event in transaction : applier . apply ( event ) replica_metadata . update_gtids ( transaction .gtid) commit ( transaction ) The interfaces pretty much exist today. We just need to clean them up, and separate some of the logics if possible. All in all, it could be that we keep this in log_event.cc and just open a small interface for injecting raw data into the applier framework that lives inside the server. (This needs to be investigated). Use Cases ========= In the current effort to add a multi master suport to MySQL, we can see many of the current changes as a group of components that will now be added to the server. In that sense is only natural to move to a interface based strategy that allow us to compose elements in different manners. And not only when creating new generic components, the same applies to the current server components that can be separated and re-utilized with a occasional re-factorization. In the context of this task, such abstraction should be the guideline for this job. The interfaces will then play a role on allowing the rapid and simple composition of different server tasks give the node different roles in the big picture. In the different planned components, the user may want to filter some transactions, and in the context of synchronous replication, the certification module is also something that may be optional. Even the Relay log file can be replaced and the user may want direct injection into the SQL Thread. The idea here is then to achieve common interfaces that allow us to define such pipelines as: reader | certifier | filter | relay-log | applier reader | certifier | filter | applier reader | certifier | applier reader | applier What is offered to the applier should then be the same that is offered to the certifier or the filtering component. In the near future, the Binlog API can also be a component to interact with this interfaces, allowing events to be easily injected into it.
There are no known Functional or Non-Functional requirements for this worklog.
The base interface ================== Upon the resolution of abstracting the different appliers it is important that these interfaces reflect the simple nature of this task that is equal among many components. Starting with the base handler: EventHandler { //handler initialization initialize(); //handling of the event handle(PipelineEvent ev); } The idea here is that even if these components are based on transactions a the logical level, the server will still operate on events as it did until now for memory and performance reasons. This forces the developer that implements such interfaces to still be aware of the transactions boundaries. Due to this, we abstract the event into a wrapper class to holds different event formats and context info as explained below. Pipelines ========= With these interfaces we also don't mention what the source or destination of the events are, allowing to treat these components as mutable black boxes. Note that this interface, for example, may be used to queue events, filter them or inject them on the binlog API, among other examples. Departing from this, the formation of pipelines among different handlers that share the same interface is something that should also come naturally. In a first approach, one could create a list of handlers to be executed, passing events from handler to handler, waiting on the methods completion. But a more adaptable option is to imbue this in the interface design, leading to: EventHandler { //The next handler in line EventHandler next_in_pipeline; initialize(); handle(PipelineEvent ev); //plug the next handler plug(EventHandler *next); } This way, every component knows directly what is the next handler to invoke making the composition of different parts of the server more natural. EventHandler pipeline = new EventHander_A(); EventHander B = new EventHander_B(); pipeline.plug(B); This is also easier for situations where a transaction composed of several events, may or may not advance in the pipeline. In this cases, the handler in case, should store the events until a decision is made, injecting them in the next handler when the decision is positive. Termination =========== But there is one element missing in the above, simplified, interface. Due to the nature of the different implementations that can be under such interface, the notion of completion is something that cannot always be associated to a method's return value. So there is a need for a interaction between caller and invoked where the caller shall be informed when the invoked ends its tasks. To achieve this we based our approach on a object upon which we can wait, inspired on the Future notion existent in the Java language. Named Continuation we define it as: Continuation { //Method called to wait on the method's completion. wait(); //Method called to signal the method's completion. signal(); } On each method invocation, such object could then be passed to the handler method. Passed from handler to handler, the object should only release on the last handler execution or if the object is marked as discarded. This class marks a clean separation between what is the pipeline information (when the event was processed or discarded, what error happened, etc) that is stored on the Continuation, and the event information that goes into the PipelineEvent class. Transaction abstraction ======================= While the information of each transaction can be inferred in each handler based on received events, this is undesirable as it would probably lead to the duplication of code/operations. If possible this should be abstracted and some of this information should be incorporated in the events that go through the pipeline. To embody this notion, we can add context flags to the event wrapper, the PipelineEvent class. As a base proposal, a delimitation of the transaction boundaries and what events were discarded should be reasonable. However, due to the heterogeneity of handlers and use cases, depending on the situations these flags may not be set or used, so a default value that marks their non initialization is necessary. In fact, who sets these flags or if/what are the handlers that use these, this worklog is not the place to define it. However, it's easy to picture a simple handler that when receives the events and defines these flags or ignores it when a discarded flag exists. Discarded transactions ====================== Regarding discarded transactions, upon a handler decision to do so, this information should be fed again to the pipeline. This information should then be included in the Continuation class, as it survives the pipeline execution and is again feeded to the pipeline. Otherwise, if included in the event class, whomever invokes the pipeline would have to parse this information and pass it when a new event arrives. Events ====== Being a base of this interface, the missing element on the design is the definition of the PipelineEvents. Containing information about the event and some context about the transaction to which he belongs, let's resume its components: # Base event representation: Log_event or Packet One of the challenges to simplicity given by these interfaces is the heterogeneity of the different handlers and the their inputs. In fact such components as the new filtering handler should be based on log_events, but the current relay log queuer is based on network packets. As the current solution, we opted to support both formats inter changing between them. When a handler needs a Log_event the PipelineEvent transforms the Packet and returns a Log_event and when a Packet is need, the opposite is done. Only a format is maintained at each moment in time. # Transaction context When needed the event should contain a flag that informs the handler that a new transactions is starting or ended. # Extra Context This object can be extended to be enriched with other information that can be given by some handler. Such information, should however handled with care as it can create dependencies among different handlers.
== Base Interfaces == ===================== Auxiliary classes for the base interface. Packet class ============ :: Description > Wrapper class for the packet info and length. :: Contents > //The packet data and length uchar* payload; uint len; Continuation class ================== :: Description > Class that allows us to wait and know when and how the pipeline ended. :: Contents > //continuation locks and signals for the waiting process mysql_mutex_t lock; mysql_cond_t cond; bool ready; //The return error code (a message can also be included) int error_code; //Flag that signals the transaction was discarded. bool transaction_discarded; ::> Methods > //Wait for pipeline handling of an event int wait() { //base waiting algorithm lock(&lock); while(!ready) { wait(&cond, &lock); } ready=false; unlock(&lock); } //Method called to signal the pipeline completion int signal(int error, bool tran_discarded=false) { //the basic signaling algorithm lock(&lock); ready = true; unlock(&lock); broadcast(&cond); } bool is_transaction_discarded() { //tells us if the transaction was discarded. } PipelineEvent class =================== :: Description > Base event to be fed to the pipeline. :: Contents > //Possible format: packet Packet* packet; //Possible format: event Log_event* log_event; //Transactional context flag int event_context; ::> Methods > public: int get_LogEvent(Log_event** out_event) { //Returns the contained log event, //converts the existing packet if needed } int get_Packet(Packet** out_packet) { //Returns the contained packet, //converts the existing log event if needed } void set_event_context(int modifier) { //change the event transactional context flag } int get_event_context() { //return the event transactional context flag } private: int decode() { //Decode the packet and sets the event. } int encode() { //Encode the event into a new packet. } >>: Related resources > //The transactional event possible status. enum event_modifiers { UNDEFINED=0, TRANSACTION_BEGIN, TRANSACTION_END, UNMARKED_EVENT }; == Handler Interface == ======================= EventHandler class ================== :: Description > The base interface to be implemented by all handlers. :: Contents > //The next handler in line EventHandler *next_in_pipeline; ::> Methods > //The initialization method to be implemented virtual int initialize()= 0; //The handle method to be implemented virtual int handle(PipelineEvent *ev,Continuation* cont) = 0; //Set the next in line handler void plug(EventHandler *next) { next_in_pipeline = next; } //Treat the pipeline as a linked list and append in the end int append(EventHandler* event_handler) { EventHandler* pipeline_iter = this; while(pipeline_iter->next_in_pipeline) { pipeline_iter = pipeline_iter->next_in_pipeline; } pipeline_iter->plug(event_handler); return 0; } //Append a handler to a given pipeline, that can be null. static int append_handler(EventHandler** pipeline, EventHandler* event_handler) { if(!(*pipeline)) *pipeline= event_handler; else (*pipeline)->append(event_handler); return 0; } //Method to be used in handlers: pass the event to the next handler //If no more handlers exist, we reach the end, so signal. int next(PipelineEvent *ev, Continuation* cont) { if(next_in_pipeline) next_in_pipeline->handle(ev, cont); else cont->signal(0); return 0; } == Examples and use patterns == =============================== The example, when having a filtering and a queuer as handlers: EventHandler pipeline; (Some undefined user configuration process){ EventHandler filter = new Event_filtering_handler(); pipeline.plug(filter); EventHander queuer = new Event_queuing_handler(); pipeline.plug(queuer); } //Create a continuation to be used by several events Continuation c = new Continuation(); while(reading_from_source(Event* e)) { pipeline.handle(e,c); int error = c.wait() if(error) { //do something } if(c.transaction_discarded()) { //do something or pass the continuation again to the pipeline and let some handler discard the new events until he detects a new transaction begins. } }
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.