WL#5599: MTS: Recovery Process for MTS mode

Affects: Server-5.6   —   Status: Complete

CONTEXT
-------
This WL aims at handling failures that may happen while applying changes in a
slave through a worker thread as specified WL#5569 (Multi-thread Slave).

In particular, if a crash happens, the slave should be able to restart the
execution from a position in the relay log that would not generate any
inconsistency.

REQUIREMENTS
------------
REQ-1 - The solution must be aligned with WL#2775.

REQ-2 - It requires what has been developed in the WL#5569.
FAILURES
--------
      Relay Log File
        -------
        | W01 |                   ---> Worker 01 (W01)
        -------
        | W01 |   ---> SQL THREAD ---> Worker 02 (W02)
        -------
        | W02 |                   ---> Worker 03 (W03)
        -------
        | ... |

The SQL THREAD acts as a coordinator and is responsible for reading events from
the relay log and sending them to the appropriate workers. The set of events
in the relay log file are split among the workers according to a hash function
(i.e., mapping function) as described in WL#5569.

Events are organized in groups, e.g. transactions, that must be atomically
processed. In what follows, however, we use the term events and group of events
interchangeably and distinguish them when necessary.

If a failure happens while assigning an event to a worker due to a corrupted
checksum, for example, all workers are immediately stopped and then coordinator
stops. Similar behavior is adopted when a worker fails while executing an
assigned event. The worker that has failed stops and reports the failure to the
coordinator what requests all other workers to immediately stop.

The worker failures are classified as transient(e.g. deadlock) or unrecoverable
(e.g. duplicated key).

If a transient failure happens, the worker should try to re-execute the event
that has failed. The number of possible re-executions is given by the option
slave_transaction_retries and after which the failure is handled as unrecoverable.

If an unrecoverable failure happens, the work that has failed stops, notifies
the coordinator that requests all other works to immediately stopped. Notice
that a transaction, that updated a MyIsam table but is aborted due to a deadlock
and, is re-executed, may lead to a unrecoverable failure, e.g. duplicated key.

The coordinator failures are unusual and are related to corrupted events. This
makes the coordinator to request that all workers stop immediately and then the
coordinator stops.

Clearly, a crash may happen at any time and due to several reasons. In the
context of this WL, however, they are considered similarly to a coordinator
failure or a worker unrecoverable failure in the sense that workers and the
coordinator are immediately stopped.


RECOVERING FROM FAILURES
------------------------
So if there is a failure the workers and the coordinator are immediately stopped
and a recovery routine may face the following situation:

      Relay Log File

        -------
        | W01 |
        -------  <--- W01's latest processed event
        | W01 |
        -------
        | W02 |
        -------  <--- WO2's latest processed event
        | ... |

Worker W01 has processed the first event but never had the change to process the
second event. So when the slave is restarted a recovery routine must identify
what events were processed and what events were not. Events that were processed
must be skipped and the others (i.e. gaps) processed. 

To fulfil this goal, we rely on the WL#2775 to ensure that when transactional
engines are used, slaves will atomically apply changes and update the
appropriate meta-information thus making possible to implement the recovery
routine outlined in the Low Level Design Section.


PROPOSED SOLUTION
-----------------
P-1 The option slave_transaction_retries will be ignored and transient failures
(e.g. deadlock) will be handled as unrecoverable failures.

P-2 - There will be one repository for each worker that is created in order to
store meta-information on which events, i.e. positions, were processed.

P-3 - Periodically the workers' meta-information is read to compute what is the
event, i.e. position, in the relay log before which all the assigned events were
processed.

P-4 - STOP SLAVE NOW or STOP SLAVE SQL_THREAD NOW will immediately stop workers
and may leave the system with gaps.

P-5 - STOP SLAVE or STOP SLAVE SQL_THREAD must try to gracefully stop workers.
This means that the coordinator will not enqueue new groups, e.g. a transaction,
and will wait for workers to execute the necessary events to fill up existing gaps.

Due to time constraints P-1 will be implemented but this limitation will be
circumvent in the future.
RECOVERY ROUTINE
----------------
The recovery routine has five steps that are outlined as follows:

Assign Step:
------------

     Relay Log File

        -------
        | W01 |                   ---> Worker 01 (W01)
        -------
        | W01 |   ---> SQL THREAD ---> Worker 02 (W02)
        -------
        | W02 |                   ---> Worker 03 (W03)
        -------
        | ... |

The coordinator reads events from the relay log and assign them to the
appropriate workers and tracks down the assigned events through the "(G)roup
(A)ssigned (Q)ueue". For each assigned group, i.e. a set of events that must be
atomically processed, the coordinator adds a reference to it in the GAQ.

                                 Relay Log File
            ---------------------------------------------------------
            |  E1  |  E2  |  E3  |  E4  |  E5  |  E6  | .... |  En  |
            ---------------------------------------------------------
                                       |
                                       v

                                   SQL THREAD

                                       |
                                       v

                                      GAQ
                  -------------------------------------------
                  |  R1  |  R2  |  R3  |  R4  |  R5  |  R6  |
                  -------------------------------------------
                                       |
                                       v

                                   SQL THREAD

                                       |
                                       v
                                                                               
                                    SQL THREAD

                                       |
                                       v

                        W01           W02           W03
                    -----------   -----------   -----------
                    | E1 | E2 |   | E3 | E4 |   | E5 | E6 |
                    -----------   -----------   -----------

An entry added as a reference (e.g. R1, R2, etc) to an assigned group is named
Slave Job Group and has the following structure:

  . worker_id - The worker's id who has the correspondent group/event assigned.

  . done - If the worker has processed the group/event.

  . shifted - How many bits in the bitmap stored in the worker's repository 
    must be shifted. See the Resync Step for further details.

  . checkpoint_seqno - The GAQ's position that has been assigned to this 
    group.

  . checkpoint_group_relay_log_name - The positions in the relay log before 
    which all the assigned events were processed at the time the reference
    was added to the GAQ. This information comes from the relay log info.
  . checkpoint_group_relay_log_pos.

  . checkpoint_group_master_log_name - The positions in the binary log before 
    which all the assigned events were processed at the time the reference was
    added to the GAQ. This information comes from the relay log info.
  . checkpoint_group_master_log_pos.

  . group_relay_log_name - The positions in the relay log that correspond to
    the end of the assigned group and beginning of a new one. 
  . group_relay_log_pos.

  . group_master_log_name - The positions in the binary log that correspond to
    the end of the assigned group and beginning of a new one.
  . group_master_log_pos.

So,

R1 = {
       worker_id = 1;
       done = 0;
       checkpoint_seqno= 0;
       ....
     };

...

R6 = {
       worker_id = 3;
       done = 0;
       checkpoint_seqno = 5;

     };

Notice that the size of the GAQ must be defined as the size of the worker's
queue times the number of workers.

Commit Step:
------------
Upon committing a group, a worker writes to its own repository the information
at Slave Job Group. In particular, the checkpoint_seqno is used to set a bit
in a bitmap that is kept by the worker and identifies the groups the worker has
processed so far. 

So the following information is stored at worker's repository:

  . processed_groups - Bitmap calculate based on checkpoint_seqno from Slave
    Job Group.
  . checkpoint_group_relay_log_name.
  . checkpoint_group_relay_log_pos.
  . checkpoint_group_master_log_name.
  . checkpoint_group_master_log_pos.
  . group_relay_log_name.
  . group_relay_log_pos.
  . group_master_log_name.
  . group_master_log_pos.

Considering the aforementioned scenario and assuming that W01 has processed R1
and W02 has processed R3 we have what follows:

R1 = {
       worker_id = 1;
       done = 1;
       checkpoint_seqno= 0;
       ...
     };

The bitmap in this case is 1 0 0 0 0 0.

R3 = {
       worker_id = 2;
       done = 1;
       checkpoint_seqno= 2;
       ...
     };

The bitmap in this case is 0 0 1 0 0 0.

Notice that the size of the bitmap must be equal to the GAQ's size and to avoid
repeatedly copying the same information into instances of the Slave Job Group,
its content is filled up only when a new checkpoint is defined or the SQL Thread
starts reading from a new relay log, i.e. a rotate event is processed.
Otherwise, only the worker_id, done and checkpoint_seqno are filled up.

Checkpoint Step:
----------------
Periodically or whenever the GAQ gets full a checkpoint routine is executed. It
aims to free slots in the GAQ and to make sure that the coordinator can show
progress through the SHOW SLAVE STATUS.

To do that the checkpoint routine scans the GAQ until it finds the highest
entry that was processed, copies this information to the relay log info and
shifts the GAQ to the lowest unsigned/unprocessed entry.

Let's consider that R1 and R3 were processed when the checkpoint routine is
executed. In this case, we have what follows

                  -------------------------------------------
                  |  R1  |  R2  |  R3  |  R4  |  R5  |  R6  |
                  -------------------------------------------
                                       |
                                       v
                  -------------------------------------------
                  |  R2  |  R3  |  R4  |  R5  |  R6  |  --  |
                  -------------------------------------------


So let's assume that a new event is assigned to W1:

                  -------------------------------------------
                  |  R2  |  R3  |  R4  |  R5  |  R6  |  R7  |
                  -------------------------------------------
R7 = {
       worker_id = 1;
       done = 0;
       checkpoint_seqno= 5;
       ...
     };

The number of shifted bits along with the previous checkpoint positions are
stored in a array that can be trivially cleaned up when an entry is not
necessary. See the Resync Step for further information.
 
Notice that the Checkpoint and Assign Steps cannot be concurrently executed.

Resync Step:
------------
Upon assigning a group to a worker, it is necessary to check what is the last
checkpoint the worker is aware of in order to calculate how many bits will be
shifted in the worker's processed group bitmap upon commit. 

To do so, the checkpoint position of the last assigned group is compared to the
entries in the array of checkpoints kept by the coordinator and the number of
shifted bits is summed for those entries in the array that are greater than the
checkpoint position of the last assigned group.

Every time an entry in the array is seen by a worker a counter is incremented
and when it had been seen by all the workers will be removed.

Recovery Step:
--------------
When the slave is restarted after a failure, a recovery process is executed to
guarantee that events that were processed are skipped and events that were not
processed are processed. To accomplish this the recovery step uses the
information stored at the relay log info and workers' repositories.

Let's assume that W01 has committed E1, W02 has committed E3, a checkpoint
happened, E7 was assigned to W01, W01 committed E2, E7 and before a new
checkpoint had the chance to be executed the slave has crashed. So, at the time
of the failure the system had the following persisted information:
                         
         W03     Relay Log Info     W02                        W01
          |            |             |                          |
          v            v             v                          v
            ---------------------------------------------------------
            |  E1  |  E2  |  E3  |  E4  |  E5  |  E6  |  E7  |  E8  |
            ---------------------------------------------------------
                         (Next events to be processed)

According to the relay log info, E2 is the next event to be processed and so
forth, W01 points to E8 and W02 points to E4. W03 however points to an invalid
event because it never got the chance to commit anything. The goal is to figure
out between E2 and E8 which events were processed and which were not and must be
done so. To identify such events we use the bitmap stored in the workers'
repositories:

W01 has the following bitmap:  1 0 0 0 0 1

W02 has the following bitmap:  0 0 1 0 0 0

However, these bitmaps cannot be immediately combined to identify which events
were processed because the workers have different knowledge on the latest
checkpoint:

         W03   W02    W01
          |     |      |
          v     v      v
            ---------------------------------------------------------
            |  E1  |  E2  |  E3  |  E4  |  E5  |  E6  |  E7  |  E8  |
            ---------------------------------------------------------
                     (Knowledge on the latest Checkpoint)    

So the W02's bitmap must be shifted n-bits where n corresponds to the distance
between W02's recorded checkpoint and the actual checkpoint. In this case, W02's
bitmap must be shifted by one. On the other hand, W01 does not needed to be shifted.

After applying the necessary transformations to the workers' bitmaps, they are
combined and the coordinator goes through the relay log and skips those
events/groups whose correspondent bit in the bitmap is true and execute the
others whose bit is false. After processing the highest bit set to true, the
recovery step is complete and one can switch to a regular execution.


STOP SLAVE ROUTINE
------------------
STOP SLAVE NOW or STOP SLAVE SQL_THREAD NOW immediately stops workers and may
leave the system with gaps. One can achieve a similar result by issuing KILL
against the coordinator. In this case, however, workers will be immediately
stopped and unrecoverable errors may be found in the recovery step.

STOP SLAVE or STOP SLAVE SQL_THREAD must try to gracefully stop workers. This
means that the coordinator will not enqueue new groups, e.g. a transaction, and
will wait for workers to execute the necessary events to fill up existing gaps.

To accomplish this the stop slave routine is divided in three steps that are
described in what follows.

Notification Step
-----------------
The coordinator suspends the worker's activities, probes them to figure out what
is the highest group processed and notifies workers to resume activities.

If NOW is specified, the coordinator notifies workers to immediately stop after
guaranteeing that all events within their current groups are processed.
Otherwise, the coordinator notifies workers to  stop after processing their assigned
groups lower than the highest group previously retrieved.

Assign Step
-----------
The coordinator tries to guarantee that the worker with the highest group has
all the necessary events to process it. If the coordinator is still assigning
events that belong to this group, it will keep doing so until all the events in
the group are assigned or a time out has been reached because the missing events
to complete the group were not written to the relay log.

If the necessary events to complete the group are not in the relay log, the
coordinator, i.e. the SQL Thread, cannot wait indefinitely until they are
available. The IO Thread may have stopped due to several reasons and such events
will never be written to the relay log. In this case, the worker with the
highest group is immediately stopped.

Notice that if such worker has not executed any non-transactional statement,
there is no need try to fed it with missing events to have a consistent state
and it can be immediately stopped.

Wait for Workers Step
---------------------
The coordinator waits for all the workers to complete their jobs and the time
out was reached an error message is printed out.


REMARKS
-------
. There is redundant information stored in in-memory objects and in the
repositories in order to ease debugging and to track down where workers have
stopped.

. Note that the algorithm does not require any sort of synchronization between
the coordinator and workers and the duplicated information does not harm
performance.


DUE TO TIME CONSTRAINTS
-----------------------
. The recovery step is sequential due to time constraints and in the future we
may change this to improve performance.

. The notification step will not be implemented and workers will process all the
assigned groups.

. STOP SLAVE NOW or STOP SLAVE SQL_THREAD NOW will not be implemented.