WL#6842: Group Replication: Transaction Applier Module
Affects: Server-5.7
—
Status: Complete
EXECUTIVE SUMMARY ================= Resorting to the interfaces declared in WL#6827, this module will implement a transaction apply procedure. Basically, it will be a scheduler for transactions that should be applied, and then all that is left to do is to call out the applier interface. PROBLEM STATEMENT ================= Under WL#6827, the underlying goal was to turn the reuse of existing server components easy and allow for a simpler composition of these with new components. Focusing solely on the design of simple and reusable interfaces/pipelines for event handling, we are still lacking however a proper applier module that will harness these new structures. There is then a need for a new module where event handlers inhabit and are invoked in a pipeline defined according to the user and GCS needs. Under a simple interface to be invoked by the GCS message receiver, this module shall then group the several existing handlers as defined in WL#6827, certifying and filtering transactions and then queuing them in the designated queuer or even applying them as deemed necessary.
Functional requirements: F1: Every transaction that is queued for processing on the applier pipeline is eventually discarded or processed. F2: The user can chose and define pipelines. F3: The applier shall guarantee that handlers that have unique roles are only present once in the configured pipelines. F4: The applier shall buffer transactions when their application is not possible (e.g. recovery) FR5: If an error occurs where the applier cannot apply any more events, the applier emits an error to the log and the node exists the group. FR6: If the applier fails to start an error is reported to the user during the START GCS_REPLICATION option. FR7: If an older applier instance is still running when the user calls START GCS_REPLICATION then an error is returned to the user.
Basic functioning ================= In its basis, the main function of the applier module is to support a well defined pipeline where events will be injected and processed. Providing a simple entry point used by the cluster receiver thread, the applier module shall wrap the incoming events and pass them to the pipeline where they will be certified, filtered, queued or submitted to some other action. This processing shall be asynchronous though, as the receiver does not need to wait on the queuing of the received events for now. The applier module will then live in a separated thread being feed by a queue were the receiver deposits the new transactions. |-----------| | Receiver | handle() | Thread | --------| |-----------------------| |___________| | | Applier module | v_____|_____ __________ | |__|__|__|__| -> | Pipeline | | queue | ---------- | |_______________________| Handler/pipeline initialization =============================== The applier module is not only responsible for constructing and feeding events to the pipeline but also for the initialization and maintenance of the included handlers. Even handlers that aren't directly connected to the module's pipeline like the SQL thread should be initialized together with it. What handlers are chosen and how will they be combined is not known though. To solve this, the applier will have a list of possible configurations where each one of these is itself a list of handlers that compose an order of execution. Example: classic_gcs_configuration = certifier, relay_log, sql_thread These lists shall be in a first phase hard-coded and used in well defined situations, being in this case, the configuration list for the GCS plugin always the same. In the future, users may change this list by well defined options like "--use-extra-component-X", by selecting this list as the one to be used from a group of possible configurations or by the explicit designation of the handlers to be used. In these lists, for each one of these handlers, we also need to take into account some of the handlers properties. Under this scope we can define for each handler two properties: 1) Initialization: Can the handler be instantiated only once in a pipeline (aka as singleton) or can multiple of his instances be used at the same time. Handlers like the Certifier and Event appliers are not built to be used more than once, while such components as a queuer can be used multiple time in the pipeline. 2) Pipeline inclusion: Is the handler a pipeline handler or a standalone handler. Handlers like an SQL thread applier don't have a place in the pipeline, working independently after initialization. Such variables can be added to the applier interfaces from WL#6827 with support of such methods as "is_standalone()" or "is_singleton()". For handler initialization, this list is then read and each of these components is initialized and added or not to the pipeline. If not a pipeline handler, it's assumed that we are dealing with a standalone handler that will work independently after initialization. In this process, the possible double initialization of singleton handlers is also checked. Handler interaction =================== During the cluster life cycle, after initialization and starting up procedures, handlers are not immutable entities self contained in the applier module. In fact such components as the certifier and the applier can change their role during execution derived from such tasks as the distributed recovery. The interaction with such components residing in the applier is thus a necessity, opening then a need for new interfaces. One alternative is for this to be done based on handler roles. With separated roles and different possible implementations, the handlers shall be extracted based on their function is on the pipeline leading to methods such as: get_certifier() or get_applier() This would however lead to the creation of a new method whenever a new role is added, becoming unsustainable in the future. A generic method should be created as: get_handler(Enum role) To support this we need to know what handler is what, so the applier module knows what to return. This can be included in the handler class in a similar manner to the is_standalone or is_singleton variables or in a list on the applier that tells for each configuration what is the role of each handler. The interfaces that these components provide when retrieved, for each one of their roles, are not to be defined here though. Instead, these interfaces shall be developed according to the needs that are define in the worklogs for distributed recovery, certification or others. As some handlers can have multiple instances, retrieving them is not useful nor practical because in the returned list we could hardly know what handler we exactly wanted. Based on this, only singleton handlers are marked and can be retrieved. The handlers that need to be exported are by norm the singleton ones anyway, as these are the those that have unique roles that require interfaces to the outside. Buffering ========= On last concern with the applier is the fact that this module is not expected to be fully operational at all times. In fact, when doing recovery for example, the applier cannot process the incoming events as it does not have all the data to do so. A solution is then to have moments here the applier will buffer these pipeline events until it has permission to start applying them. Being signaled to do so, the applier would append to the pipeline a buffering handler that will fulfill this task, and all following events will directed to this new component. When the user thinks the application of events can resume, the handler will start feeding events to the rest of the pipeline and when it finds itself empty it can remove itself from the applier module. Based on this, we then have 2 new methods: start_buffering() resume_application() Current handlers ================ For a base implementation of this module, the following handlers shall be included in this worklog. ** Relay log queuer ** Role: queuer The relay log queuer is currently based on the queue method provided in the slave code base. With needed re-factoring an cleaning this code can be reused for this handler. ** SQL thread ** Role: applier The classic SQL thread that currently leaves in the slave. When the applier is initialized, this thread is also configured and activated. During the system life cycle, this thread can be accessed and the Relay Log source changed. Sub-handlers ============ These are applier auxiliary handlers ** Event catalogation ** Role: event analyzes Also an handler, but one that is not defined by the user, the event catalogation handler serves the purpose of defining transactions and filter them when they are discarded. This handler shall analyze the events and mark them whenever it sees a transaction ending or starting, and whenever a transaction is discarded it shall ignore all events until a new transaction appears. ** Event buffering ** Role: buffering This handler will buffer events that cannot at the time be applied, and when signaled it will start transmitting events to the remaining pipeline. When the application of events resumes and the buffer becomes empty, this handler will trigger is own removal from the pipeline. Future handlers =============== These are other handlers than can/will be added in the future. ** Certifier ** Role: certifier The certifier is the handler in the pipeline that will decide if the current transaction shall be committed or abort as it conflicts with other transactions. Due to the recovery process, this component can also be accessed and during the server life time. Depends on: WL#6837 and WL#6833. ** Filtering ** Role: filter The filtering handler shall in the future be used to ignore/change certain kinds of transactions according to used defined rules. In the context of a cluster, this means that the node can diverge from the cluster so this handler should be used with care. Added errors ============ > ER_GCS_REPLICATION_APPLIER_INIT_ERROR Thrown when an error occurs while initializing the applier module on start > ER_GCS_STOP_APPLIER_THREAD_TIMEOUT Thrown when an applier instance is still running when starting Added configuration options =========================== Status var: gcs_replication_plugin_components_stop_timeout The time to wait until the applier threads terminate. Status var: gcs_replication_plugin_pipeline_type_var The pipeline to use. Currently we only accept one.
Handlers' info ======== ::File :> applier_interfaces.h ::Context :> In this interfaces header some extra methods are added to the EventHandler interface defining some of the handlers characteristics. ::Contains :> /** @enum Handler_role Enumeration type for the different roles that handlers can have. When creating a new role, it should be added here. */ enum Handler_role { EVENT_CATALOGER= 0, APPLIER= 1, CERTIFIER= 2, QUEUER= 3, ROLE_NUMBER= 4 //The number of roles }; ::Methods :> class EventHandler { // tells if the handler is a pipeline or an standalone handler virtual bool is_standalone(); // tells if the handler is a singleton or not virtual bool is_singleton(); // returns the handler role virtual Handler_role get_role(); } Pipelines handling ================== ::File :> applier_handlers.h ::Context:> Header file here types of handlers are defined. In this file we also define the several existing pipelines that can be chosen. Accompanying this, several helper functions for the applier to fetch a valid, configured pipeline are also given. The idea behind this file is to centralize the definition of new handlers in a single file. If a developer makes a new handler, its id, how it feats existing pipelines and what is its role is all defined here. ::Contains :> /** @enum Handler_id Enumeration type for the different existing handlers ids. When creating a new handler, a new id should be added here. */ enum Handler_id { EVENT_CATALOGER=0, RELAY_LOG_QUEUER, CERTIFIER, SQL_THREAD_HANDLER }; /** @enum Handler_pipeline_type Enumeration type for the existing pipelines configurations. When creating a new pipeline, it should be added here. */ enum Handler_pipeline_type { STANDARD_GCS_PIPELINE= 0 }; ::Methods :> /** Defines the pipeline and returns the number of handlers on it. In this method the developer shall define what each pipeline configuration contains. */ int get_pipeline_configuration(Handler_pipeline_type pipeline_type, Handler_id* pipeline_conf) { switch(pipeline_type){ case STANDARD_GCS_PIPELINE: //number_of_handlers= number of handlers in the pipeline pipeline_conf= new Handler_id[number_of_handlers]; pipeline_conf[0]= EVENT_CATALOGATION; //and example of a handler ... return number_of_handlers; default: //error } } /** Defines the pipeline and returns the number of handlers on it. In this method the developer shall decode the user defined handler list. */ int get_pipeline_configuration(char* handler_info, Handler_id* pipeline_conf) { //handlers[],number_of_handlers = input_handling(handler_info) pipeline_conf= new Handler_id[number_of_handlers]; pipeline_conf[0]= handlers[0]; ... } /** Constructs the pipeline according to the given handlers. This method assumes a list of handlers was given as "handler1,handler2" */ int configure_pipeline(char* handler_ids, EventHandler** pipeline, std::vector* handlers_per_role) { Handler_id* handler_list = NULL; int num_handlers = get_pipeline_configuration(handler_ids, handler_list) return configure_pipeline(handler_list, num_handlers, pipeline, handlers_per_role); } /** Constructs the pipeline according to the given configuration. */ int configure_pipeline(Handler_pipeline_type pipeline_type, EventHandler** pipeline, std::vector * handlers_per_role) { Handler_id* handler_list = NULL; int num_handlers = get_pipeline_configuration(pipeline_type, handler_list) return configure_pipeline(handler_list, num_handlers, pipeline, handlers_per_role); } /** Constructs the pipeline according to the given handlers. */ int configure_pipeline(Handler_id* handler_list, int num_handlers, EventHandler** pipeline, std::vector * handlers_per_role) { for (i < num_handlers) { EventHandler* handler = NULL; switch(handler_list[i]) { case EVENT_CATALOGATION: // a pipeline handler handler= new Event_catalogation(); break; .... case SQL_THREAD: //a stand alone handler handler= new SQL_thread_handler(); break; default: //error } //initialize the handler handler->initialize(); //Added it to the pipeline if it is a pipeline handler if(!handler.is_standalone()) //add the handler to the pipeline EventHandler::append_handler(pipeline,handler); //Record the handler role if singleton if(handler.is_singleton()) { //Check to see if the handler was already used in this pipeline if(handlers_per_role[handler.get_role()]) //throw error //mark the handler role on the module given list handlers_per_role->insert(handlers_per_role->begin() +handler->get_role(), handler); } } return 0; } Applier module ============== ::File :> rpl_gcs_applier.h rpl_gcs_applier.cc ::Context:> The applier core. In this file/class we construct the pipeline, we receive events and we apply them according to the chosen handlers. ::Contains :> //The pipeline EventHandler *pipeline; //The message queue that is feeded by the receiver thread SynchronizedQueue *incoming; //The roles of each handler std::vector contained_handlers; //The possible applier states enum Applier_state { APPLYING=0, APPLYING_FROM_BUFFER, BUFFERING }; //the appliers current state Applier_state applier_current_state; //flag that tells the applier to change the pipeline and buffer events bool start_buffering //flag that tell the applier to change the pipeline and remove the buffer bool end_buffering ::Methods :> class ApplierModule { //initialize the applier accordingly to the request pipeline //This method assumes that a sys var exists on the parent class for //the configuration int setup(Handler_pipeline_type pipeline_type); //initialize the applier accordingly to the given handlers //This method assumes that a sys var exists on the parent class for //the configuration int setup(char* handlers); //starts applier thread int initialize(); //Thread executing method int run() { while(!error) { if ((error= this->incoming->pop(&transaction_packets))) // blocking break; if(start_buffering) append_buffer(); if(end_buffering) remove_buffer(); //Assume all nodes in the cluster are equal. //If not a map is need here to pass the correct FDE event to //the pipeline. Format_description_log_event* fde_evt= new Format_description_log_event(BINLOG_VERSION); Continuation* cont = new Continuation(); while (packets exists on the transaction_packets) { PipelineEvent* pevent = new PipelineEvent(packet,fde_evt); pipeline->handle(pevent, cont); error = cont.wait(); //handle errors } } } //Method used by the receiver to queue transactions for the //applier to process int handle(const char *transaction_packets, uint len) { return this->incoming->push(new Packet(transaction_packets, len)); } //Method to fetch one of the handlers accordingly to his role. void get_handler(Handler_role role,EventHandler** handler) { *handler = contained_handlers[role]; } //buffer methods /* If the handler is not currently buffering, add a handler to the pipeline and start to buffer events. If for some reason we are currently still reading from the buffer, stop the application of events This mechanism is event driven, being the pipeline not changed here but on the next event to be applied, avoiding the needs for locks */ int start_buffering() { if(applier_current_state == APPLYING) { applier_current_state = BUFFERING; start_buffering= true; } if(applier_current_state == APPLYING_FROM_BUFFER) { applier_current_state = BUFFERING; handler buffer; get_handler(BUFFER,buffer); buffer.stop_application(); } } /* If the handler is currently buffering, start sending events to the remaining pipeline. */ int resume_application() { if(applier_current_state == BUFFERING) { applier_current_state = APPLYING_FROM_BUFFER; handler buffer; get_handler(BUFFER,buffer); buffer.start_application(); } } append_buffer() { //change the pipeline to add the buffer } remove_buffer { //change the pipeline to remove the buffer } } Handlers ======== ::File :> New folder: handlers/ handlers/event_cataloger_handler.cc/h handlers/relay_log_queuer_handler.cc/h handlers/sql_thread_handler.cc/h ::Context :> Each handler has their own logic and as their number can increase, to separate them in a new folder seems like the best option. ::Skeleton :> class Handler : public EventHandler { public: Handler(); //handling method virtual int handle(PipelineEvent *ev,Continuation* cont); //initialization virtual int initialize(); private: handler_variables }; ::Relay Log Queuer - handling :> Relay_log_Queuer::handle(PipelineEvent *event,Continuation* cont) { Packet* p; event->get_Packet(&p); queue_event(this->mi, (const char*)p->payload, p->len); //pass to the next handler } ::Event catalogation - handling :> Event_catalogation::handle(PipelineEvent *event,Continuation* cont) { Log_event *ev= NULL; event->get_LogEvent(&ev); //if not discarded check if was in the meanwhile if (!transaction_discarded) { transaction_discarded= cont.is_transaction_discarded() } //See if a new transaction was started if (ev->starts_group()) { transaction_discarded = false; event->mark_event(TRANSACTION_BEGIN); } //if the transaction is still discarded, ignore the event if (transaction_discarded){ cont->signal(); } //Mark the transaction end if(ev->ends_group()){ event->mark_event(TRANSACTION_END); } //Mark it was normal event else { event->mark_event(UNMARKED_EVENT); } //pass to the next handler } } ::Event buffering - handling :> Event_buffering::handle(PipelineEvent *event,Continuation* cont) { cache.add_event(event) } Event_buffering::start_application(){ //start a process to send events to the remaining pipeline. while(events in cache) { invoke next// } end_buffering= true; } } Event_buffering::stop_application(){ //stop sending events to the pipeline }
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.