WL#6813: MTS: ordered commits (sequential consistency)

Affects: Server-5.7   —   Status: Complete

EXECUTIVE SUMMARY :
===================
This worklog will ensure that the commits by slave applier threads running in 
parallel will be done in the same order as on the master. This also means that 
the slave will never externalize a database state which was never externalized 
by the master. This is a requirement when the applications reading from the
slave MUST observe the same set of states that existed on the master due to some 
application enforced constraint. This has become a necessity after WL#6314 which 
enables multiple transactions to be executed in parallel by the slave threads, 
few of which may be modifying a single database.

Motivation
-----------

  1. Replication stream is easier to understand with MTS;
  2. Easier to troubleshoot and debug.This is a requirement or the applications 
that need to go through exact same execution history as the master
  3. Speculation: will not break consistency from the application
     point of view, the data is always consistent even without this feature.


FULL DESCRIPTION
=================
A slave server can be configured to execute transactions in parallel to improve 
performance. Transactions that execute in parallel can be committed out of 
order. Thus, when the replication thread is executing transactions, the slave 
database can be in a state that never existed on the master.

This does not pose problems when the slave is only used for redundancy/fail-
over,since in such cases application 'work' on final stage of the slave and are 
indifferent to the execution history followed by the slave to reach the point. 
However, if the slave is used for read scale-out, clients/applications may 
depend upon some intermediate state of the slave.

However, we have implemented WL#6314, where transactions that have same 
commit parent(commit parent=> last committed transaction)  can execute in 
parallel. Now the consistency is as follows:

- If clients C1 and C2 execute transactions T1 and T2 on master, and the server
  sends ACK for C1 before C2 sends COMMIT for T2, then C1 and C2 will be in
  order on slave.
- If, on the other hand, C1 and C2 both send COMMIT before any one of them
  receive an ACK, then T1 and T2 may be committed in any order on the slave.
  Moreover, T1 and T2 can then be externalized in different orders on master
  and slave. So if client C3 reads the database state on master, and client C4
  reads the database state on the slave, then C3 and C4 may see T1 and T2
  be committed in different orders.

This worklog adds an option to serialize commits for parallel transactions: 
transactions can begin to execute in parallel, but the executing thread will 
hang before the commit, until all previous transactions are committed. In case 
of any application level constraint requiring exactly the same states as the 
master, the slave can be configured for the same.   

This will solve the problem for transactional storage engines. For non-
transactional storage engines it makes no difference, but that's probably 
impossible to fix.


               
+=========================+
| FUNCTIONAL REQUIREMENTS |
==========================+

1. BASE REQUIREMENT
=====================
All commits by slave workers should happen in same order as an the master.
The implies the following
 a. The Binlog flush should run in the same order as that on the master.
 b. The semisync notication should happen in order.

2. PRESERVE BINLOG GROUP COMMIT
================================
A careless implementation of this feature could destroy binary log group commit 
on the slave: if the slave thread sends the 'commit' statement to the server 
only after the previous transaction finishes processing the 'commit' statement, 
then all transactions will end up in different transaction groups and there will 
be one disk sync per transaction on the slave.

Instead, we have to perform the wait inside the binary log group commit code. 
Suppose transaction T1 was committed before T2 on the master. Then, before T2 
adds itself to the flush queue on the slave, T2 should wait until T1 is in the 
flush queue. That is, in MYSQL_BIN_LOG::ordered_commit, before entering stage 
#1, T2 should wait until T1 has entered stage #1.

3. HANDLE COMMIT ERRORS
========================

If T1 was committed before T2 on master, and T1 fails, then we must make sure 
that T2 is rolled back.

4. HANDLE TRANSACTION WITHOUT EXPLICIT COMMIT
==============================================
The statements that 'commit' before transaction end, such  as DDLs, are ordered 
correctly. WE MUST handle sequentialization of DDLs.

5. HANDLE THE CASE OF BINLOG DISABLED ON THE SLAVE
===================================================
It is very complex to implement it without breaking Innodb group commit, when 
binlog is disabled on slave. So it is not supported when either log_bin or 
log_slave_updates is OFF.

6. HANDLE ONLINE ALTERATIONS TO SCHEMA AND ISOLATION LEVELS
============================================================
Refer to section 5.5 of the Low level design of WL#6314. 

7. FILTERING TRANSACTION SHOULD HAVE NO EFFECT IN COMMIT ORDERING
===================================================================

8. HANDLE TRANSACTIONAL ENGINES ONLY
=====================================
Enforcing commit order during the apply phase MUST work for transactional 
tables. This requirement does not apply to non-transactional tables.

9. HANDLE ONLY INTRA-SCHEMA MULTI-THREADED SLAVES
===================================================
This worklog will ONLY handle the intra-schema multi-threaded slave and not 
database partitioned MTS.

 
+===========================+
|NON-FUNCTIONAL REQUIREMENT |
+===========================+

1. PERFORMANCE IMPACT
======================


2. MTS RECOVERY WILL NOT HAVE TO FILL GAPS
============================================

Since all the worker threads are committed in order, it is not possible that a 
worker commits a transaction if an 'older' worker has failed. This means that 
the MTS recovery will not have to fill gaps upon server start-up.
We Introduce a new variable --slave-preserve-commit-order which will be 
used to enable or disable the feature.
SCOPE:
=======
- GLOBAL VARIABLE   
- DYNAMIC: YES (settable only when the slave has stopped)
- DEFAULT: OFF
- ASSOCIATED_VARIABLE: YES

a.  When this switch is enabled the "START SLAVE SQL_AFTER_MTS_GAPS"  becomes a    
no-op.
b.  There will be a new thread state where users can see slave worker threads as 
"Waiting for its turn to commit". when the slave thread is waiting for older 
worker threads to commit.
c. the worker will emit the following error message when they stop due to an error 
in on of the previous worker threads.
"Slave worker has stopped since one or more of the previous worker(s) have 
encountered an error and slave-preserve-commit-order is enabled. The last 
transaction executed by this thread has not been committed to preserve commit 
order. Slave restart after fixing the failing thread(s) should fix this worker as 
well."
Binlog Group Commit (BGC) is implemented in three stages.

LEGEND
=======
The following is a legend of some concepts, names used while explaining the 
design. 

1 FLUSH STAGE: In this stage all the data from the transaction logs of various 
threads are flushed into the binary log by the "leader" thread(thread that has 
entered the queue first).

2. SYNC STAGE: In this stage the binlog_files are synced to the disk.

3. COMMIT STAGE: In this stage all the threads may
  a. execute the commit in parallel
  b. execute ordered commit where the leader calls low level engine commit for    
each thread sequentially. 

t-event - refers to either of the following
--------
          a. DDL.
          b. COMMIT Query.


NEW SYSTEM VARIABLE
--------------------
static Sys_var_mybool Sys_slave_preserve_commit_order(                          
       "slave_preserve_commit_order",                                           
       "Force the slave workers to order commits in the same sequence "         
       "as master. Enabled by default.",                                        
       GLOBAL_VAR(opt_slave_order_commit), CMD_LINE(OPT_ARG),                   
       DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG,                            
       ON_CHECK(check_slave_stopped), ON_UPDATE(NULL));


Commit order Class and methods
================================

class Commit_order_manager
{
public:
  enum Ordered_commit_thread_state {
    /// The data structures for ordered commit have not been initialized.
    UNINITIALIZED= 0,
    /// The thread is idle, it is not executing a transaction.
    IDLE,
    /// The thread is registered and has a place in the queue but is not first.
    NOT_FIRST,
    /// The thread is registered and is the first thread in the queue.
    FIRST,
  };


  Commit_orderer()
    : has_error(false), newest_thd(NULL)
  {
    mysql_mutex_init(key_ordered_commit_mutex, &mutex, NULL);
  }


  /**
    Registers a thread when it begins to execute a transaction.

    @retval 0 Success.
    @retval 1 Some previous thread had an error; the thread was not registered.
  */
  int register_thread(THD *thd)
  {
    // This function should be invoked by the coordinator.
    DBUG_ASSERT(thd != current_thd);

    if (has_error)
      return 1;

    /*
      Initialize the thd->cond here instead of THD::THD, because then
      we only do it for threads that need it (i.e. replication worker
      threads).
    */
    if (thd->ordered_commit_state == UNINITIALIZED)
    {
      mysql_cond_init(key_ordered_commit_cond, &thd->ordered_commit_cond, NULL);
    }

    // Add thd to the queue.
    mysql_mutex_lock(mutex);
    if (newest_thd == NULL)
    {
      thd->ordered_commit_state= FIRST;
    }
    else
    {
      newest_thd->ordered_commit_next= thd;
      thd->ordered_commit_state= NOT_FIRST;
    }
    newest_thd= thd;
    mysql_mutex_unlock(mutex);
  }


  /**
    Waits until all threads that were registered with
    begin_transaction before this one have called end_transaction.

    @retval 0 All previous threads succeeded so this thread can go
    ahead and commit.

    @retval 1 Some previous thread generated an error so this thread
    should rollback.
  */
  int wait_for_older_threads(THD *thd)
  {
    DBUG_ASSERT(thd == current_thd);
    /*
      Here it is ok to read ordered_commit_state without mutex: the
      only thread that could possibly change
  enum Ordered_commit_thread_state {
    /// The data structures for ordered commit have not been initialized.
    UNINITIALIZED= 0,
    /// The thread is idle, it is not executing a transaction.
    IDLE,
    /// The thread is registered and has a place in the queue but is not first.
    NOT_FIRST,
    /// The thread is registered and is the first thread in the queue.
    FIRST,
  };
 the state concurrently is
      the previous thread in the queue; if that happens, it means that
      this thread is free to execute now regardless of the value read.
    */
    if (thd->ordered_commit_state == NOT_FIRST)
    {
      mysql_mutex_lock(mutex);
      while (thd->ordered_commit_state == NOT_FIRST)
        mysql_cond_wait(&thd->ordered_commit_cond, &mutex);
      mysql_mutex_unlock(mutex);
      DBUG_ASSERT(thd->ordered_commit_state == FIRST);
    }
    return thd->ordered_commit_previous_error ? 0 : 1;
  }


  /**
    Unregister this thread and tell the next thread (if any) that it
    is first.
  */
  void signal_newer_threads(THD *thd, bool error)
  {
    DBUG_ASSERT(thd == current_thd);
    DBUG_ASSERT(state == FIRST);
    /*
      Here it is ok to read thd->ordered_commit_state without mutex:
      this thread is either not in the queue at all (a normal client),
      or it is first in the queue.  In either case, there is no other
      thread that could possibly write the state.
    */
    DBUG_ASSERT(thd->ordered_commit_state != NOT_FIRST);
    if (thd->ordered_commit_state == FIRST)
    {
      if (error)
      {
        // Remember in this Commit_orderer object that there is an
        // error, to block it from scheduling new transactions.
        has_error= true;
      }
      else
      {
        // If some previous thread had an error, then this thread and
        // all later threads must also stop with error.
        if (thd->ordered_commit_previous_error)
          error= true;
      }
      mysql_mutex_lock(mutex);
      THD *next= thd->ordered_commit_next;
      if (next == NULL)
        newest_thd= NULL;
      else
      {
        // Pass on error state to next thread.
        next->ordered_commit_previous_error= error;
        next->ordered_commit_state= FIRST;
        signal_update(next->ordered_commit_cond);
      }
      mysql_mutex_unlock(mutex);
      thd->ordered_commit_next= NULL;
      thd->ordered_commit_previous_error= false;
      thd->ordered_commit_state= IDLE;
    }
    return error;
  }

private:

  bool has_error;

  // Pointer to the newest thread in the queue.
  THD *newest_thd;

  /**
    Protects:
    - thd->ordered_commit_next for all threads.
  */
  mysql_mutex_t mutex;
};


Wait and signal hooks
=======================

The following table gives the location of the hooks wait and signal.

Legend:
wait     :       wait_for_older_threads()
signal   :       signal_newer_threads()

commit   :       A commit query was issued or an implicit commit by DDL
error    :       the worker has errored out durig event execution and has not 
reached the ordered_commit method.

|--------+------------------+----------------+-----------------------|
| HOOK   | COMMIT           | COMMIT         | ERROR                 |
|        | (BINLOG ON)      | (BINLOG OFF)   |                       |
|--------+------------------+----------------+-----------------------|
|        |                  | MYSQL_BINLOG:: |                       |
| wait   | ordered_commit() | commit()       |                       |
|--------+------------------+----------------+-----------------------|
|        |                  | MYSQL_BINLOG:: | Slave_worker::        |
| signal | enroll_for()     | commit()       | slave_worker_exec_job |
|--------+------------------+----------------+-----------------------|

The call to wait and signal functions will be done in MYSQL_BINLOG::commit() 
method since here the code path diverges for Binlog group commit in case Binlog 
is enabled or a direct call to ha_commit_low.

Here is a code snippet.

6732   if (stuff_logged)                                                             
6733   {                                                                             
6734     if (ordered_commit(thd, all))                                               
6735       DBUG_RETURN(RESULT_INCONSISTENT);                                         
6736   }                                                                             
6737   else                                                                          
6738   {                                                                             
6739     if (commit_order_mngr.wait_for_older_threads(thd, all))                     
6740     {                                                                           
6741       my_error(ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR, MYF(0));             
6742       DBUG_RETURN(RESULT_ABORTED);                                              
6743     }                                                                           
6744     if (ha_commit_low(thd, all))                                                
6745     {                                                                           
6746       commit_order_mngr.signal_newer_threads(thd, true, all);                   
6747       DBUG_RETURN(RESULT_INCONSISTENT);                                         
6748     }                                                                           
6749      commit_order_mngr.signal_newer_threads(thd, false, all);                   
6750   }    


** The review board request can be found here.
http://rb.no.oracle.com/rb/r/3675/