WL#6827: Group Replication: Interface for Applying Log Events

Affects: Server-5.7   —   Status: Complete

EXECUTIVE SUMMARY
=================

This task captures the design and implementation of an interface for 
the purpose of directly applying binary log events.

PROBLEM STATEMENT
=================

Events are applied by resorting to their Log event::apply event
member function. This is a virtual function and is implemented 
by event type. Currently this is too tightly coupled with the 
existing MySQL master-slave replication protocol, in particular 
with the SQL thread and position update. Not to mention that 
these apply functions are overloaded with additional logics, such
as filtering and so forth.

We need to have a clean applier that only handles the procedure 
of actually installing the updates. Filtering should be handled 
before that and positioning update should be done at the end of 
a transaction, before the commit. Also, perhaps we should not 
filter per event, but rather per transaction, since we are
handling onle transactional units of work.

For example (pseudo code and error handling left out on purpose 
to not clutter the example):

if filter . check ( transaction ):
  begin ( transaction )
  for event in transaction :
    applier . apply ( event )
  replica_metadata . update_gtids ( transaction .gtid)
  commit ( transaction )

The interfaces pretty much exist today. We just need to clean 
them up, and separate some of the logics if possible. All in all,
it could be that we keep this in log_event.cc and just open a small
interface for injecting raw data into the applier framework that
lives inside the server. (This needs to be investigated).


Use Cases
=========

In the current effort to add a multi master suport to MySQL, we can
see many of the current changes as a group of components that will now
be added to the server. In that sense is only natural to move to a
interface based strategy that allow us to compose elements in
different manners.
And not only when creating new generic components, the same applies to
the current server components that can be separated and re-utilized
with a occasional re-factorization. In the context of this task, such
abstraction should be the guideline for this job.

The interfaces will then play a role on allowing the rapid and simple
composition of different server tasks give the node different roles in
the big picture. In the different planned components, the user may
want to filter some transactions, and in the context of synchronous
replication, the certification module is also something that may be
optional. Even the Relay log file can be replaced and the user may
want direct injection into the SQL Thread. The idea here is then to
achieve common interfaces that allow us to define such pipelines as:

reader | certifier | filter | relay-log | applier

reader | certifier | filter | applier

reader | certifier | applier

reader | applier

What is offered to the applier should then be the same that is offered
to the certifier or the filtering component. In the near future, the
Binlog API can also be a component to interact with this interfaces,
allowing events to be easily injected into it.
There are no known Functional or Non-Functional requirements for this worklog.
The base interface 
==================

Upon the resolution of abstracting the different appliers it is
important that these interfaces reflect the simple nature of this task
that is equal among many components. Starting with the base handler:

  EventHandler
  {

    //handler initialization 
    initialize();

    //handling of the event
    handle(PipelineEvent ev);

  }

The idea here is that even if these components are based on
transactions a the logical level, the server will still operate on
events as it did until now for memory and performance reasons. This
forces the developer that implements such interfaces to still be aware
of the transactions boundaries. Due to this, we abstract the event
into a wrapper class to holds different event formats and context info
as explained below.


Pipelines 
=========

With these interfaces we also don't mention what the source or
destination of the events are, allowing to treat these components as
mutable black boxes. Note that this interface, for example, may be
used to queue events, filter them or inject them on the binlog API,
among other examples.

Departing from this, the formation of pipelines among different
handlers that share the same interface is something that should also
come naturally. In a first approach, one could create a list of
handlers to be executed, passing events from handler to handler,
waiting on the methods completion. But a more adaptable option is to
imbue this in the interface design, leading to:

  EventHandler
  {

    //The next handler in line
    EventHandler next_in_pipeline;

    initialize();

    handle(PipelineEvent ev);

    //plug the next handler
    plug(EventHandler *next);

  }

This way, every component knows directly what is the next handler to
invoke making the composition of different parts of the server more
natural.

  EventHandler pipeline = new EventHander_A();
  EventHander B = new EventHander_B();
  pipeline.plug(B);

This is also easier for situations where a transaction
composed of several events, may or may not advance in the
pipeline. In this cases, the handler in case, should store the events
until a decision is made, injecting them in the next handler when the
decision is positive.


Termination
===========

But there is one element missing in the above, simplified,
interface. Due to the nature of the different implementations that can
be under such interface, the notion of completion is something that
cannot always be associated to a method's return value. So there is a
need for a interaction between caller and invoked where the caller
shall be informed when the invoked ends its tasks.

To achieve this we based our approach on a object upon which we can
wait, inspired on the Future notion existent in the Java language. 
Named Continuation we define it as:

  Continuation
  {

    //Method called to wait on the method's completion.
    wait();

    //Method called to signal the method's completion.
    signal();

  }

On each method invocation, such object could then be passed to the
handler method. Passed from handler to handler, the object should
only release on the last handler execution or if the object is marked
as discarded.

This class marks a clean separation between what is the pipeline
information (when the event was processed or discarded, what error
happened, etc) that is stored on the Continuation, and the event
information that goes into the PipelineEvent class.


Transaction abstraction
=======================

While the information of each transaction can be inferred in each
handler based on received events, this is undesirable as it would
probably lead to the duplication of code/operations. If possible this
should be abstracted and some of this information should be
incorporated in the events that go through the pipeline.

To embody this notion, we can add context flags to the event wrapper,
the PipelineEvent class. As a base proposal, a delimitation of the
transaction boundaries and what events were discarded should be
reasonable. 

However, due to the heterogeneity of handlers and use cases, depending
on the situations these flags may not be set or used, so a default
value that marks their non initialization is necessary.
In fact, who sets these flags or if/what are the handlers that use
these, this worklog is not the place to define it. However, it's easy
to picture a simple handler that when receives the events and defines
these flags or ignores it when a discarded flag exists.


Discarded transactions
======================

Regarding discarded transactions, upon a handler decision to do so,
this information should be fed again to the pipeline. This information
should then be included in the Continuation class, as it survives the
pipeline execution and is again feeded to the pipeline. Otherwise, if
included in the event class, whomever invokes the pipeline would have
to parse this information and pass it when a new event arrives.


Events
======

Being a base of this interface, the missing element on the design is
the definition of the PipelineEvents. Containing information about the
event and some context about the transaction to which he belongs,
let's resume its components:

# Base event representation: Log_event or Packet

One of the challenges to simplicity given by these interfaces is the
heterogeneity of the different handlers and the their inputs. In fact
such components as the new filtering handler should be based on
log_events, but the current relay log queuer is based on network
packets.  

As the current solution, we opted to support both formats inter
changing between them.  When a handler needs a Log_event the
PipelineEvent transforms the Packet and returns a Log_event and when a
Packet is need, the opposite is done. Only a format is maintained at
each moment in time.

# Transaction context

When needed the event should contain a flag that informs the handler
that a new transactions is starting or ended.

# Extra Context

This object can be extended to be enriched with other information that
can be given by some handler. Such information, should however handled
with care as it can create dependencies among different handlers.
== Base Interfaces ==
=====================

Auxiliary classes for the base interface.

Packet class
============

:: Description >

Wrapper class for the packet info and length.

:: Contents > 

  //The packet data and length
  uchar* payload;
  uint len;


Continuation class
==================

:: Description >

Class that allows us to wait and know when and how the pipeline ended.

:: Contents > 

  //continuation locks and signals for the waiting process
  mysql_mutex_t lock;
  mysql_cond_t cond;
  bool ready; 

  //The return error code (a message can also be included)
  int error_code;
  //Flag that signals the transaction was discarded.
  bool transaction_discarded;

::> Methods >

  //Wait for pipeline handling of an event
  int wait()
  {
    //base waiting algorithm
    lock(&lock);
    while(!ready)
    {
      wait(&cond, &lock);
    }
    ready=false;
    unlock(&lock);
    
  }

  //Method called to signal the pipeline completion
  int signal(int error, bool tran_discarded=false)
  {
    //the basic signaling algorithm
    lock(&lock);    
    ready = true;
    unlock(&lock);
    broadcast(&cond);
    
  }

  bool is_transaction_discarded()
  {
    //tells us if the transaction was discarded.
  }


PipelineEvent class
===================

:: Description >

Base event to be fed to the pipeline.

:: Contents > 

  //Possible format: packet
  Packet* packet;
  //Possible format: event
  Log_event* log_event;
  //Transactional context flag
  int event_context;
  
::> Methods >

public:

  int get_LogEvent(Log_event** out_event)
  {
    //Returns the contained log event, 
    //converts the existing packet if needed 
  }

  int get_Packet(Packet** out_packet)
  {
    //Returns the contained packet, 
    //converts the existing log event if needed 
  }

  void set_event_context(int modifier)
  {
    //change the event transactional context flag
  }

  int get_event_context()
  {
    //return the event transactional context flag 
  }

private:

  int decode()
  {
    //Decode the packet and sets the event.
  }

  int encode()
  {
    //Encode the event into a new packet.
  }

>>: Related resources >

  //The transactional event possible status.
  enum event_modifiers
  {
    UNDEFINED=0,
    TRANSACTION_BEGIN,
    TRANSACTION_END,
    UNMARKED_EVENT
  };


== Handler Interface ==
=======================

EventHandler class
==================

:: Description >

The base interface to be implemented by all handlers.

:: Contents > 

  //The next handler in line
  EventHandler *next_in_pipeline;

::> Methods >

  //The initialization method to be implemented
  virtual int initialize()= 0;

  //The handle method to be implemented
  virtual int handle(PipelineEvent *ev,Continuation* cont) = 0;
  
  //Set the next in line handler
  void plug(EventHandler *next)
  {
    next_in_pipeline = next;
  }

  //Treat the pipeline as a linked list and append in the end
  int append(EventHandler* event_handler)
  {
    EventHandler* pipeline_iter = this;
    while(pipeline_iter->next_in_pipeline)
    {
      pipeline_iter = pipeline_iter->next_in_pipeline;
    }
    pipeline_iter->plug(event_handler);
    
    return 0;
  }

  //Append a handler to a given pipeline, that can be null. 
  static int append_handler(EventHandler** pipeline, EventHandler* event_handler)
  {  
    if(!(*pipeline))
      *pipeline= event_handler;
    else
      (*pipeline)->append(event_handler);
    return 0;
  }

  //Method to be used in handlers: pass the event to the next handler
  //If no more handlers exist, we reach the end, so signal.
  int next(PipelineEvent *ev, Continuation* cont)
  {
    if(next_in_pipeline)
      next_in_pipeline->handle(ev, cont);
    else
      cont->signal(0);
    return 0;
  }


== Examples and use patterns ==
===============================

The example, when having a filtering and a queuer as handlers:

  EventHandler pipeline;

  (Some undefined user configuration process){
    EventHandler filter = new Event_filtering_handler();
    pipeline.plug(filter);
    EventHander queuer = new Event_queuing_handler();
    pipeline.plug(queuer);
  }

  //Create a continuation to be used by several events
  Continuation c = new Continuation();

  while(reading_from_source(Event* e))
  {
    pipeline.handle(e,c);
    int error = c.wait()
    if(error)
    {
      //do something
    }
    if(c.transaction_discarded())
    {
      //do something or pass the continuation again to the pipeline
      and let some  handler discard the new events until he detects a
      new transaction begins.
    }
  }