WL#6837: Group Replication: Online distributed recovery

Affects: Server-5.7   —   Status: Complete

EXECUTIVE SUMMARY
=================

On recovery, the server MUST recover from other servers in the 
network, in a distributed fashion. That way the replica can get 
up to date automatically, before actually start serving requests.

PROBLEM STATEMENT
=================

When a server joins a group, it MUST get all the missing transactions,
where the simplest of the ways is by native asynchronous replication.
While it gets them, it buffers other incoming transactions from the
group communication layer (because the all of this happens online) and
only once the recovery is complete, it starts applying those
transactions that have arrived in the meantime.  Once these two queues
are flushed, the server can be brought online (i.e., can start
accepting (update/delete/insert) requests). In theory the server can
start serving read requests, but then it will be exporting stale data.

There are several approaches to do this: multiple recovery rounds, one 
recovery round with buffering of incoming transactions, etc...
FR1: All messages from a new view can only be delivered when handler for view
changes returns.

FR2: The view id and the connection info about all online nodes (possible donors)
should be available on recovery. 

FR3: Methods for extracting the Certification Database and setting it must be
available.

FR4: If no donor is found or no connection can be established, recovery
aborts, an error is written in the log and the node leaves the group

FR5: Connection options must be configurable, allowing to set a
replication user and password

FR6: Donor connection retries must be configurable.
Assuming a simple view synchronization model, we present here a
generic recovery algorithm to be implemented outside the group
communication layer upon view changes were the joining node needs to
catch up with the rest of the cluster. When the node finally catches
up, his state changes and this info is shared among the nodes, but
there is no new induced communication group changes.

From a higher up observation point, recovery is based on handling a
incoming view change episode that:

  * on the joiner side: The applier_module (WL#6842) is paused,
    queuing incoming messages, and a donor is selected among the
    available/suitable cluster nodes so that all the data that is
    missing in the joiner up to the view change is transferred. After
    this step ends, the applier_module resumes and applies the queued
    messages, certifying them with the data also received from the
    donor. When it manages to catch up with the cluster, the joiner
    declares itself as online.
  
  * on the donor side: Upon a view change the donor marks this moment
    by creating a new replication log event that will end up in its
    binary log. This event will also contain certification data that
    will allow the joiner to certify future events. When sending the
    data to the joiner, this new event will allow the joiner to know
    when it has received all the pre-view data.


NEW ELEMENTS 
============

+ View_change_log_event: 

The view change log event is a new log event that is created in all
online nodes upon a view change GCS event. On the joiner when
received, it represents the last event from the views that preceded
the current new view.

+ START SQL THREAD UNTIL VIEW_ID= view_id

A new mode for the SQL thread that executes all events until a
View_change_event containing a view id equal to the given one is
found. This mode shall not be public, at least for now.

+ Recovery end / Node online message:

A new type of message that goes into the communication layer every
time a joining node catches up with the cluster. This message has the
purpose of letting the other cluster nodes knowing that the node is

+ New node states

As of now, the node only has two possible states: "Online" and
"Offline". With this worklog we introduce the notion of recovery and
with it, the status "Recovering" is also added.
Another added status is "Donor" that marks a node when a node is
fulfilling this role.

ALGORITHM  
=========

From the generic overview we can now pass to a high level description
of the algorithm from both points of view.

#JOINER

1) The handler_view_change method is invoked. This should be
   synchronized with message reception from the group communication
   to guarantee that we can queue packets/execute methods in the
   exact moment before the first view message arrives.

2) The joiner adds to the applier_module queue a message to suspend
   the applier_module activity. Note that the applier_module will only
   suspend after consuming possible messages from older views, a
   situation that can be caused by short disconnections

3) The recovery thread is launched.

4) The node state is changed to "Recovering"

5) The method returns and message queuing into the applier_module's
   queue proceeds as usual.

-- On the newly launched recovery thread

6) Wait for the applier_module suspension and then for the consumption
   of the relay log by the SQL thread. For the monitoring of this last
   one, as we cannot use event log positions similar to what the
   current server does, GTIDs are used instead. When the GTID
   retrieved set becomes a sub set of the GTID executed set, we know
   that all the queued elements are now implemented.

7) Stop the applier SQL thread (*1)

8) Select a donor and change its state on the local state info. In a
   first phase this selection can be simplified by choosing the
   cluster node with the lowest id. This can be enhanced in several
   ways though. First off all, not all nodes can be selected. If a
   node is recovering he shall not be considered a suitable
   donor. Node selection is also a field for improvement as it can be
   enriched with cluster data that can tells us what is the "fastest"
   node or user given hints that tell the joiner which node to
   chose. This info shall come from state messages traded upon view
   changes making the decision deterministic and equal throughout the
   cluster.

9) Start a master/slave connection. This implies creating a new
   master info object that contains the donor connection credentials
   and starting a new SQL and IO thread. 

9.1) The SQL thread is started with a UNTIL VIEW_ID = view_id mode
     that needs to be implemented. When the view id event is
     detected, the SQL thread returns and the certification data is
     set. This shall also awake the sleeping recover thread. 

10) Terminate the recovery IO/SQL threads if they haven't stopped yet.

11) Broadcast a message that informs all cluster nodes that the donor
    as fulfilled its job. All nodes shall then change the donor
    status back to online.

12) Restart the applier module's SQL thread.

13) Awake the applier

14) In the recovery thread we monitor the applier module queue or the
    certifier stats until we assert that the node caught up to the
    cluster.

15) Broadcast a message into the group communication layer with a new
    type: RECOVERY_DONE and on its payload the node id.  

16) The recovery thread is killed.

#DONOR (and remaining online nodes) 

1) The handler_view_change method is invoked. Again, this should be
   synchronized with message reception. 

2) The node adds to the applier queue a View_change_log_event.

3) When the View_change_event enters the pipeline, this is a sign that
   the applier now finished processing all events from older views.
   When the event then goes into the Certifying handler, we now know
   that we can request the Certification Database and incorporate it
   in the event. This is in fact the cluster certification database
   for the new View, and when passed onto the joiner, it will allow it
   to certify its queued events.

4) The node to be used as a donor is calculated deterministically in
   all nodes. This allows all nodes to correctly change their node
   info, correctly displaying which nodes are currently serving data
   to joining nodes.  The node status change is made.

5) If the node eventually becomes the donor, in the transmitted data
   the view_change_event will mark the end of data belonging to the
   previews views carrying also data for certification. 

Notes:

(*1) Whenever direct interactions with the SQL/IO threads are
mentioned in such points as 6), 8) 9) and 10), this results from the
absence of suitable ways of re-utilizing this structures at the moment
of this worklog design. This should be improved when WL#1697 comes
into production.

STATE TRANSITION
================

Being generic, the recovery process can be used to allow brand new
nodes to catch up with the cluster and also for nodes that exited the
quorum communication group for some reason but now re-joined. This
two cases end up being the same, being the only difference that in
the second case the there can still be some old data being applied in
the joiner's applier module. 

Based on this, here is a state transaction summary of the possible
node states in respect to recovery. 

States:

Offline node - node not on the cluster. A brand new node is just
               another type of an offline node.

Online node - a live node of the cluster that can receive messages.

Recovery node (joiner) - A node that joined and is going trough
                         recovery.

Donor node - A node who sends all the missing info to joiner.

States transaction:

Online node 
  -> view change -> still on view? 
    -> yes -> selected as donor?
      -> yes -> Donor
      -> no -> Online 
    -> no  -> Offline
 
Offline node 
  -> view change 
    -> recovery -> Online

Recovery node (joiner)
  -> view change -> still on view?
    -> yes -> donor still on view?
      -> yes -> go on -> Online
      -> no  -> select another donor -> recovery -> Online
    -> no -> kill the recovery process -> Offline

Donor node
  -> joiner message of state transfer completion?
    -> Online

On donors if the joiner exists the group, the dump thread just dies as
if a slave went away.

ENHANCEMENTS 
============

One of the possible enhancements to this module is the monitoring of
the recovery process. We have here 2 differentiated phases, the
joiner-donor connection phase and the joiner catching up phase.
For the first one, one can retrieve from the donor its GTID retrieved
and from that estimate how many transactions are he behind. For the
second phase, one could monitor the size of the applier's queue or
extract information from the stable transactions data. 
Other methods that can be added are the ones for such status as the
RAM monitoring on the applier's module queue defining a max value as
well.

Other needed change to the future is focused in the applier module's
queue and the worries about it's memory consumption during recovery.
The persistence of it's data is something that should be discussed in
a separated worklog together with connected questions. 
For example, if the certification information is know at an early
time, we can certify and queue events in parallel on a relay log.
We can also queue them without certification but that would imply a
decoupled relay log reader to be used on the Certifier.
::File :> 

  applier_interfaces.h 

::Context/Purpose :>

  Add a context flag to the Packet class identifying suspend and end
  packets.

:: Added content :>

  A optional flag to the packet class.

=====================================================================

::File :> 

  gcs_replication/gcs_applier.cc/h

::Context/Purpose :>

  Add suspend capabilities to the plugin applier module.

:: Added content :>

  Mutexes/conditions/flag to suspend the applier

:: Added Methods :>

  //Awakes a suspended applier
  void awake()

  //Tells if the applier is suspended
  void is_suspended()

  //Adds a suspend packet to the applier's queue
  void add_suspend_packet() 

:: Changed algorithms :>

  In the applier loop, we now have:

  while(!error)
  {
    incoming->pop(&packet)
    if(packet.status == end_packet)
      //do nothing, this is only to unlock the queue
    if(packet.status == suspend)
      suspend()   
    // remaining logic
  }

=====================================================================

::File :> 

  gcs_replication/gcs_recovery.cc/h  [new]

::Context/Purpose :>

  Class that contains the main logic for the recovery algorithm  

::Contains :>

  //A thread object for the recovery thread
  THD recovery thread

  Mutexes/conditions for waiting for the donor thread. 

::Methods :>

  class Recovery_module { 

    //Constructor that receives the applier
    Recovery_module(applier)  

    //Launch the recovery thread for this view.
    //Receives the view id and the possible donor nodes
    int launch_recovery(view_id, cluster_nodes) 
    
    //Method with node selection logic 
    donor_info select_donor(nodes)
   
    //Method with logic for the slave connection with the donor
    int start_donor_connection(donor_info, view_id)

    //Method to terminate the connection to the donor
    int terminate_donor_connection()

    //Method to broadcast to all nodes the info to change the donor
    //state back to online.
    broadcast_state_transfer_end()

    //Method that resumes the applier SQL thread and awakes it
    int resume_applier()

    //Method that monitors the current applier delay
    //Blocking method that only returns when it caught up 
    int monitor_applier_delay()
   
    //Broadcast the message that signals the node is now online
    notify_cluster_recovery_end(node_id)

  }

:: Algorithms :>

  The recovery thread 

  handle_recovery(view_id, nodes)
  {

    //wait for the applier suspension and
    //wait for the consumption of the relay log 
    while( !applier_is_suspend() 
            && !retrievedGTIDs.is_subset(executedGTIDs) )
    {
      sleep
    }
   
    //select a donor
    donor_info = select_donor(nodes)
   
    //set the condition/mutex to allow a later awakening.
    //see below the changes to gcs_replication.cc/h
    set_donor_connection_condition(donor_condition,donor_mutex)

    //start a connection with the donor
    start_donor_connection(donor_info, view_id)

    //wait for the donor to send all the data
    while(get_retrieved_cert_db() == NULL)
      donor_condition.wait()

    //kill the donor connection
    kill_donor_connection()

    //Mark the donor as finished
    broadcast_state_transfer_end()
   
    //resume the applier module 
    resume_applier()
    
    //block until the applier delay is minimal
    monitor_applier_delay()

    //broadcast a message to all nodes
    signal_recovery_end(node_id)
  } 

=====================================================================

::File :> 

  gcs_replication/include/gcs_protocol.cc

::Context/Purpose :>

  Add a new message type to GCS

:: Added content :>

  Add new type "MSG_RECOVERY" to Msg_type enum
  The contents of the message are:

  | node type | node id |

  Where:
    node type: if the node is a donor or a joiner.
               Donor:  State transfer ended.
               Joiner: Recovery ended 
    node id  : the id of the node.

=====================================================================

::File :> 

  gcs_replication/gcs_event_handlers.cc

::Context/Purpose :>

  Add logic to the view handling method.
  The message handler also handles recovery end messages now. 
  
:: Added Methods :>

  //method that launches the recovery thread
  launch_recovery()

  //method that creates and adds a view_change_event to the applier
  //Also changes the donor state to "Donor"
  mark_view_change()

:: Changed algorithms :>

  handle_view_change(view, total, left, joined, quorate)
  {
    for(node n : joined)
    {
      if n.id == local.id
        joiner= true;
        break;
    }
    if(joiner && total.size > 1)
      launch_recovery(view.get_view_id(), total)
    else if(joiner.size > 0)
      mark_view_change(view.get_view_id())
  }
  
  handle_message_delivery(msg, view)
  {
    if (msg.type == MSG_REGULAR)
      //current logic
    else if (msg.type == MSG_RECOVERY)
      node_type = (Recovery_message)msg.get_node_type
      if(node_type == joiner)
        //set the state of the joiner node as being online
      if(node_type == donor)
        //set the state of the donor back to online
  }


=====================================================================

::File :> 

  log_event.cc/h

::Context/Purpose :>

  Add a new log event

:: Added content :>

  class View_change_event
  {
    Certification_database cert_db;
    View_id id;
  }

:: Added Methods :>

  class View_change_event 
  {
    //Event constructor based on the view id
    View_change_event(view_id)
   
    //Sets/Gets the event certification database
    set/get_certification_db()
    
    //gets the event view id
    get_view_id()

   //The log event methods: size, validation, write function, etc. 
   write_data_header(IO_CACHE* file)
   write_data_body(IO_CACHE* file)
   ... 
  }

:: Algorithms :>
  
  write_data_header(IO_CACHE* file)
  {
    //The header contains the view id and the cert db size
    store(view_id);
    store(cert_db.size)
  }
  
  write_data_body(IO_CACHE* file)
  {
    for(cert_db_line in cert_db)
    {
      len= strlen(cert_db_line->first); //The key
      store(len)
      store(cert_db_line->first)  //store the key
      store(cert_db_line->second) //version(gno has a fixed size)
    }
  }

  read_data_body(map_size)
  {
    //map_size came from the header
    for(int i = 0, i < map_size, i++)
      len= read()
      key = read(len)
      value = read(gno_len)
      map[key] = value;
  }

=====================================================================

::File :> 

  rpl_rli.cc/h

::Context/Purpose :>

  Add a new until mode to the SQL thread

:: Added content :>

  new value "VIEW_CHANGE_EVENT" on until_condition enum 

:: Changed algorithms :>

  is_until_satisfied(event)
  {
    case UNTIL_VIEW_ID:
    {
      if(event.get_type == VIEW_CHANGE_EVENT)
      {
        if( (View_change_event)event.view_id == view_id_arg)
          set_retrieved_cert_db(View_change_event)
          return true;
      }
    }
  }
  

=====================================================================

::File :> 

  gcs_replication.cc

::Context/Purpose :>

  Add methods for the SQL thread to execute.
  These serve the purpose of storing the certification DB and awake
  the recovery thread. 

:: Added content :>

  //The lock/condition to awake the recovery thread
  donor_queue_condition/lock
  //The certification that the SQL thread got from the View_change_event
  certification_db 

:: Added Methods :>

  //Set the condition/mutex to later awake the recovery thread
  set_donor_connection_condition(condition, mutex)

  //Sets the certification DB received by the donor connection and
  //awakes the recovery thread
  set_retrieved_cert_db(View_change_event)
  
  //Gets the certification DB 
  get_retrieved_cert_db()

:: Added Algorithms :>

  set_retrieved_cert_db(view_change_event event)
  {
    certification_db = event.get_certification_db()
    cond_broadcast(donor_condition);
  }


=====================================================================

::File :> 

  gcs_replication/handlers/certification_handler.cc

::Context/Purpose :>

  Add logic to the certifier to get the certification DB and put it
  on the View change event on the donor side.

:: Changed algorithms :>

  switch (event_type)
  {
    case VIEW_CHANGE_EVENT:
      Certification_database certDB = get_certification_db()
      event.set_certification_db(certDB);
    ...
  }

Added configuration options
===========================

Status var:
gcs_replication_plugin_recovery_user

The user to use during recovery

Status var:
gcs_replication_plugin_recovery_password

The password to use during recovery.
This should be settable but not readable. 

Status var:
gcs_replication_plugin_recovery_retry_count

The number of connection tries to a donor.

Also uses:

Status var:
gcs_replication_plugin_components_stop_timeout

The time to wait until the recovery threads terminate.