WL#6842: Group Replication: Transaction Applier Module

Affects: Server-5.7   —   Status: Complete

EXECUTIVE SUMMARY
=================

Resorting to the interfaces declared in WL#6827, this module will 
implement a transaction apply procedure. Basically, it will be a 
scheduler for transactions that should be applied, and then all  
that is left to do is to call out the applier interface.


PROBLEM STATEMENT
=================

Under WL#6827, the underlying goal was to turn the reuse of existing
server components easy and allow for a simpler composition of these
with new components. Focusing solely on the design of simple and
reusable interfaces/pipelines for event handling, we are still lacking
however a proper applier module that will harness these new
structures.
There is then a need for a new module where event handlers inhabit
and are invoked in a pipeline defined according to the user and GCS
needs.

Under a simple interface to be invoked by the GCS message receiver,
this module shall then group the several existing handlers as defined
in WL#6827, certifying and filtering transactions and then queuing
them in the designated queuer or even applying them as deemed
necessary.


Functional requirements:

F1: Every transaction that is queued for processing on the applier
pipeline is eventually discarded or processed.

F2: The user can chose and define pipelines. 

F3: The applier shall guarantee that handlers that have unique roles
are only present once in the configured pipelines.

F4: The applier shall buffer transactions when their application is
not possible (e.g. recovery)

FR5: If an error occurs where the applier cannot apply any more events,
the applier emits an error to the log and the node exists the group.

FR6: If the applier fails to start an error is reported to the user
during the START GCS_REPLICATION option.

FR7: If an older applier instance is still running when the user calls
START GCS_REPLICATION then an error is returned to the user.
Basic functioning
=================

In its basis, the main function of the applier module is to
support a well defined pipeline where events will be injected and
processed. 

Providing a simple entry point used by the cluster receiver thread,
the applier module shall wrap the incoming events and pass them to the
pipeline where they will be certified, filtered, queued or submitted to
some other action. 

This processing shall be asynchronous though, as the receiver does
not need to wait on the queuing of the received events for now. The
applier module will then live in a separated thread being feed by a
queue were the receiver deposits the new transactions.

  
  |-----------|
  | Receiver  |  handle() 
  |  Thread   | --------|     |-----------------------|
  |___________|         |     |  Applier module       |
                        v_____|_____      __________  |
                        |__|__|__|__| -> | Pipeline | |
                   queue      |           ----------  |
                              |_______________________|


Handler/pipeline initialization
===============================

The applier module is not only responsible for constructing and
feeding events to the pipeline but also for the initialization and
maintenance of the included handlers. 
Even handlers that aren't directly connected to the module's
pipeline like the SQL thread should be initialized together with it. 

What handlers are chosen and how will they be combined is not known
though. To solve this, the applier will have a list of possible
configurations where each one of these is itself a list of handlers
that compose an order of execution.

Example:
   classic_gcs_configuration = certifier, relay_log, sql_thread

These lists shall be in a first phase hard-coded and used in well
defined situations, being in this case, the configuration list for the
GCS plugin always the same. In the future, users may change this list
by well defined options like "--use-extra-component-X", by selecting
this list as the one to be used from a group of possible
configurations or by the explicit designation of the handlers to be
used.

In these lists, for each one of these handlers, we also need to take
into account some of the handlers properties.
Under this scope we can define for each handler two properties:

  1) Initialization: Can the handler be instantiated only once in a
     pipeline (aka as singleton) or can multiple of his instances be
     used at the same time. Handlers like the Certifier and Event
     appliers are not built to be used more than once, while such
     components as a queuer can be used multiple time in the pipeline.

  2) Pipeline inclusion: Is the handler a pipeline handler or a
     standalone handler. Handlers like an SQL thread applier don't
     have a place in the pipeline, working independently after
     initialization.

Such variables can be added to the applier interfaces from WL#6827 with
support of such methods as "is_standalone()" or "is_singleton()".

For handler initialization, this list is then read and each of
these components is initialized and added or not to the pipeline. If
not a pipeline handler, it's assumed that we are dealing with a
standalone handler that will work independently after initialization.
In this process, the possible double initialization of singleton
handlers is also checked.


Handler interaction 
===================

During the cluster life cycle, after initialization and starting up
procedures, handlers are not immutable entities self contained in the
applier module. In fact such components as the certifier and the
applier can change their role during execution derived from such tasks
as the distributed recovery.

The interaction with such components residing in the applier is thus a
necessity, opening then a need for new interfaces.
One alternative is for this to be done based on handler roles.  With
separated roles and different possible implementations, the handlers
shall be extracted based on their function is on the pipeline leading
to methods such as:

 get_certifier()

   or 

 get_applier()

This would however lead to the creation of a new method whenever a new
role is added, becoming unsustainable in the future. A generic method
should be created as:

 get_handler(Enum role)

To support this we need to know what handler is what, so the applier
module knows what to return. This can be included in the handler
class in a similar manner to the is_standalone or is_singleton
variables or in a list on the applier that tells for each configuration
what is the role of each handler.

The interfaces that these components provide when retrieved, for each
one of their roles, are not to be defined here though. Instead, these
interfaces shall be developed according to the needs that are define
in the worklogs for distributed recovery, certification or others.

As some handlers can have multiple instances, retrieving them is not
useful nor practical because in the returned list we could hardly know
what handler we exactly wanted. Based on this, only singleton handlers
are marked and can be retrieved. The handlers that need to be exported
are by norm the singleton ones anyway, as these are the those that have
unique roles that require interfaces to the outside.


Buffering
=========

On last concern with the applier is the fact that this module is not
expected to be fully operational at all times. In fact, when doing
recovery for example, the applier cannot process the incoming events
as it does not have all the data to do so.

A solution is then to have moments here the applier will buffer these
pipeline events until it has permission to start applying them. Being
signaled to do so, the applier would append to the pipeline a
buffering handler that will fulfill this task, and all following
events will directed to this new component. 

When the user thinks the application of events can resume, the
handler will start feeding events to the rest of the pipeline and
when it finds itself empty it can remove itself from the applier
module.

Based on this, we then have 2 new methods:

start_buffering()

resume_application()


Current handlers
================

For a base implementation of this module, the following handlers
shall be included in this worklog.

** Relay log queuer **

Role: queuer

The relay log queuer is currently based on the queue method provided
in the slave code base. With needed re-factoring an cleaning this
code can be reused for this handler.


** SQL thread **

Role: applier

The classic SQL thread that currently leaves in the slave. When the
applier is initialized, this thread is also configured and activated.
During the system life cycle, this thread can be accessed and the
Relay Log source changed. 


Sub-handlers
============

These are applier auxiliary handlers

** Event catalogation **

Role: event analyzes

Also an handler, but one that is not defined by the user, the event
catalogation handler serves the purpose of defining transactions and
filter them when they are discarded.
This handler shall analyze the events and mark them whenever it sees
a transaction ending or starting, and whenever a transaction is
discarded it shall ignore all events until a new transaction appears.

** Event buffering  **

Role: buffering

This handler will buffer events that cannot at the time be applied,
and when signaled it will start transmitting events to the remaining
pipeline. When the application of events resumes and the buffer
becomes empty, this handler will trigger is own removal from the
pipeline.  

Future handlers
===============

These are other handlers than can/will be added in the future.

** Certifier **

Role: certifier

The certifier is the handler in the pipeline that will decide if the
current transaction shall be committed or abort as it conflicts with
other transactions. Due to the recovery process, this component can
also be accessed and during the server life time.

Depends on: WL#6837 and WL#6833.


** Filtering **

Role: filter 

The filtering handler shall in the future be used to ignore/change
certain kinds of transactions according to used defined rules. In the
context of a cluster, this means that the node can diverge from the
cluster so this handler should be used with care.


Added errors
============

> ER_GCS_REPLICATION_APPLIER_INIT_ERROR
 
Thrown when an error occurs while initializing the applier module on start

> ER_GCS_STOP_APPLIER_THREAD_TIMEOUT

Thrown when an applier instance is still running when starting


Added configuration options
===========================

Status var:
gcs_replication_plugin_components_stop_timeout

The time to wait until the applier threads terminate.

Status var:
gcs_replication_plugin_pipeline_type_var

The pipeline to use. Currently we only accept one.



Handlers' info
========

::File :> 

  applier_interfaces.h 

::Context :>

  In this interfaces header some extra methods are added to the
  EventHandler interface defining some of the handlers
  characteristics.

::Contains :>

  /**
    @enum Handler_role
    Enumeration type for the different roles that handlers can have.
    When creating a new role, it should be added here.
  */
  enum Handler_role
  {
    EVENT_CATALOGER= 0,
    APPLIER= 1, 
    CERTIFIER= 2,
    QUEUER= 3,
    ROLE_NUMBER= 4 //The number of roles
  };

::Methods :>

  class EventHandler
  {
    // tells if the handler is a pipeline or an standalone handler
    virtual bool is_standalone();

    // tells if the handler is a singleton or not
    virtual bool is_singleton();
   
    // returns the handler role
    virtual Handler_role get_role();
  }


Pipelines handling 
==================

::File :> 

  applier_handlers.h

::Context:>

  Header file here types of handlers are defined. In this file we also
  define the several existing pipelines that can be chosen.
  Accompanying this, several helper functions for the applier to fetch
  a valid, configured pipeline are also given.
  
  The idea behind this file is to centralize the definition of new
  handlers in a single file. If a developer makes a new handler, its
  id, how it feats existing pipelines and what is its role is all defined
  here.

::Contains :>

  /**
    @enum Handler_id
    Enumeration type for the different existing handlers ids. 
    When creating a new handler, a new id should be added here.
  */
  enum Handler_id
  {
    EVENT_CATALOGER=0,
    RELAY_LOG_QUEUER,
    CERTIFIER,
    SQL_THREAD_HANDLER
  };
  
  /**
    @enum Handler_pipeline_type
    Enumeration type for the existing pipelines configurations. 
    When creating a new pipeline, it should be added here.
  */
  enum Handler_pipeline_type
  {
    STANDARD_GCS_PIPELINE= 0
  };

::Methods :>

  /**
    Defines the pipeline and returns the number of handlers on it.
    In this method the developer shall define what each pipeline
    configuration contains.
  */
  int get_pipeline_configuration(Handler_pipeline_type pipeline_type, 
                                 Handler_id* pipeline_conf)
  {
    switch(pipeline_type){
      case STANDARD_GCS_PIPELINE:
        //number_of_handlers= number of handlers in the pipeline  
        pipeline_conf= new Handler_id[number_of_handlers];
        pipeline_conf[0]= EVENT_CATALOGATION; //and example of a handler
        ...
        return number_of_handlers;
      default:
        //error 
    }
  }
  
  /**
    Defines the pipeline and returns the number of handlers on it.
    In this method the developer shall decode the user defined handler
    list.
  */
  int get_pipeline_configuration(char* handler_info, 
                                 Handler_id* pipeline_conf)
  {
    //handlers[],number_of_handlers = input_handling(handler_info)
    pipeline_conf= new Handler_id[number_of_handlers];
    pipeline_conf[0]= handlers[0];
    ...
  }

  /**
    Constructs the pipeline according to the given handlers.
    This method assumes a list of handlers was given as "handler1,handler2"
  */
  int configure_pipeline(char* handler_ids,
                         EventHandler** pipeline,  
                         std::vector* handlers_per_role)
  {
    Handler_id* handler_list = NULL;
    int num_handlers = get_pipeline_configuration(handler_ids, handler_list)
    return configure_pipeline(handler_list, num_handlers, 
                              pipeline, handlers_per_role);
  }


  /**
    Constructs the pipeline according to the given configuration.
  */
  int configure_pipeline(Handler_pipeline_type pipeline_type,
                         EventHandler** pipeline,  
                         std::vector* handlers_per_role)
  {
    Handler_id* handler_list = NULL;
    int num_handlers = get_pipeline_configuration(pipeline_type, handler_list)
    return configure_pipeline(handler_list, num_handlers, 
                              pipeline, handlers_per_role);
  }


  /**
    Constructs the pipeline according to the given handlers.
  */
  int configure_pipeline(Handler_id* handler_list, int num_handlers,
                         EventHandler** pipeline,  
                         std::vector* handlers_per_role)
  {
    for (i < num_handlers)
    {
      EventHandler* handler = NULL;
      
      switch(handler_list[i])
      {
        case EVENT_CATALOGATION: // a pipeline handler
          handler= new Event_catalogation();
          break;
          ....
        case SQL_THREAD: //a stand alone handler
          handler= new SQL_thread_handler(); 
          break;
  	    default:
         //error
      }
      
      //initialize the handler
      handler->initialize();
      
      //Added it to the pipeline if it is a pipeline handler 
      if(!handler.is_standalone())
        //add the handler to the pipeline 
        EventHandler::append_handler(pipeline,handler);
      
      //Record the handler role if singleton
      if(handler.is_singleton())
      {
        //Check to see if the handler was already used in this pipeline
        if(handlers_per_role[handler.get_role()])
          //throw error

        //mark the handler role on the module given list
        handlers_per_role->insert(handlers_per_role->begin()
                                  +handler->get_role(),
                                  handler);  
      }
    }
    return 0;
  }



Applier module
==============

::File :> 
  
  rpl_gcs_applier.h
  rpl_gcs_applier.cc

::Context:>

  The applier core. In this file/class we construct the pipeline, we
  receive events and we apply them according to the chosen handlers.

::Contains :>
   
  //The pipeline
  EventHandler *pipeline;

  //The message queue that is feeded by the receiver thread
  SynchronizedQueue *incoming;

  //The roles of each handler
  std::vector contained_handlers;

  //The possible applier states
  enum Applier_state
  {
    APPLYING=0,
    APPLYING_FROM_BUFFER,
    BUFFERING
  };
  
  //the appliers current state
  Applier_state applier_current_state;  

  //flag that tells the applier to change the pipeline and buffer events
  bool start_buffering  

  //flag that tell the applier to change the pipeline and remove the buffer
  bool end_buffering

::Methods :>

  class ApplierModule
  {
  
    //initialize the applier accordingly to the request pipeline
    //This method assumes that a sys var exists on the parent class for 
    //the configuration
    int setup(Handler_pipeline_type pipeline_type);

    //initialize the applier accordingly to the given handlers
    //This method assumes that a sys var exists on the parent class for 
    //the configuration
    int setup(char* handlers);
   
    //starts applier thread
    int initialize();
   
    //Thread executing method
    int run()
    {
  
      while(!error)
      {
        if ((error= this->incoming->pop(&transaction_packets))) // blocking
          break;
  
          if(start_buffering)
            append_buffer();
          if(end_buffering)
            remove_buffer();

         //Assume all nodes in the cluster are equal.
         //If not a map is need here to pass the correct FDE event to
         //the pipeline.
         Format_description_log_event* fde_evt= 
           new Format_description_log_event(BINLOG_VERSION);
         Continuation* cont = new Continuation();
  
         while (packets exists on the transaction_packets)  
         {
            PipelineEvent* pevent = new PipelineEvent(packet,fde_evt);
            pipeline->handle(pevent, cont);
            error = cont.wait();
            //handle errors
         }
      }
    }
  
    //Method used by the receiver to queue transactions for the
    //applier to process 
    int handle(const char *transaction_packets, uint len)
    {
      return this->incoming->push(new Packet(transaction_packets, len));
    }
  
    //Method to fetch one of the handlers accordingly to his role.
    void get_handler(Handler_role role,EventHandler** handler)
    {
      *handler = contained_handlers[role];
    }
  
    //buffer methods

    /*
      If the handler is not currently buffering, add a handler to the
      pipeline and start to buffer events. 
      If for some reason we are currently still reading from the
      buffer, stop the application of events 
      
      This mechanism is event driven, being the pipeline not changed
      here but on the next event to be applied, avoiding the needs for
      locks
    */
    int start_buffering()
    {
       if(applier_current_state == APPLYING)
       {
         applier_current_state = BUFFERING;
         start_buffering= true;
       }
       if(applier_current_state == APPLYING_FROM_BUFFER)
       {
         applier_current_state = BUFFERING;
         handler buffer;
         get_handler(BUFFER,buffer);
         buffer.stop_application();
       }
    }

    /*
      If the handler is currently buffering, start sending events to
      the remaining pipeline.    
    */
    int resume_application()
    {
       if(applier_current_state == BUFFERING)
       {
         applier_current_state = APPLYING_FROM_BUFFER;
         handler buffer;
         get_handler(BUFFER,buffer);
         buffer.start_application();
       }
    }

    append_buffer()
    {
      //change the pipeline to add the buffer
    }

    remove_buffer
    {
      //change the pipeline to remove the buffer
    }

  }
  
Handlers 
========

::File :> 

  New folder: handlers/
  
  handlers/event_cataloger_handler.cc/h
  handlers/relay_log_queuer_handler.cc/h
  handlers/sql_thread_handler.cc/h

::Context :>

  Each handler has their own logic and as their number can increase,
  to separate them in a new folder seems like the best option.

::Skeleton :>

  class Handler : public EventHandler
  {
  public:
    Handler();

    //handling method
    virtual int handle(PipelineEvent *ev,Continuation* cont);

    //initialization 
    virtual int initialize();

  private:
    handler_variables
  };

::Relay Log Queuer - handling :>

  Relay_log_Queuer::handle(PipelineEvent *event,Continuation* cont)
  {
    Packet* p;
    event->get_Packet(&p); 
    queue_event(this->mi, (const char*)p->payload, p->len);

    //pass to the next handler
  }
 
::Event catalogation - handling :>
 
  Event_catalogation::handle(PipelineEvent *event,Continuation* cont)
  {

    Log_event *ev= NULL;
    event->get_LogEvent(&ev); 

    //if not discarded check if was in the meanwhile
    if (!transaction_discarded)
    {
      transaction_discarded= cont.is_transaction_discarded()
    }

    //See if a new transaction was started
    if (ev->starts_group())
    {
       transaction_discarded = false;
       event->mark_event(TRANSACTION_BEGIN);
    }
    
    //if the transaction is still discarded, ignore the event
    if (transaction_discarded){
      cont->signal();
    }

    //Mark the transaction end 
    if(ev->ends_group()){
      event->mark_event(TRANSACTION_END);
    }
    //Mark it was normal event
    else
    {
      event->mark_event(UNMARKED_EVENT);
    }

    //pass to the next handler
    }
  }

::Event buffering - handling :>

  Event_buffering::handle(PipelineEvent *event,Continuation* cont)
  {
    cache.add_event(event)
  }

  Event_buffering::start_application(){
    //start a process to send events to the remaining pipeline.
    while(events in cache)
    {
      invoke next//
    }
    end_buffering= true;
  }
  }

  Event_buffering::stop_application(){
    //stop sending events to the pipeline 
  }