WL#4648: Prototype for multi-threaded slave for row-based replication

Affects: Server-Prototype Only   —   Status: Complete   —   Priority: Medium


Improve replication speed by using parallel application of rows.


The goal of the project is to increase the throughput of replication in terms of
transactions per second and reduce latency where the latency is the difference
between the last applied transaction on the slave and the last executed
transaction on the master.


Currently, replication does not scale well on multi-core machines.
The single slave thread execute replication events one by one and may not
cope with a load produced by concurrent multiple client connections
served by separate master server's CPU.
This situation is getting more dramitic in the case of the NDB cluster where the
role of master is performed by multiple computers.


- The solution will not be restricted to any specific storage engine.

We will support three different consistency levels:

- Any-time consistency: for any state of the slave, this is a state that the
  master has been in

- Most-of-the-time consistency: 

- Eventual consistency

Internal Goals

The internal goals of the project are goals that we aim for but might not be


While WL#943 `More than one SQL slave thread for SMP machines' defines 
a task of parallel executing the query events on the slave,
the current WL concentrates on different techniques of parallelism
considering exclusively the row-based events.

Parameters of replication such as the delay of applying events on the
slave or the time of processing the binlog file play the most
important role for the user.
Normally values for these two are desired to be optimal that means
close to their theoretical minimums.
Notice that the more powerful hardware such as SMP machines generate
the load the more significant lagging the slave can suffer, the longer
time to process the binlog. A similar situation arises in replication
from the NDB-cluster: the more nodes the bigger lagging.

Parallel execution replication events by many OS threads is a possible
technical mean to address the minimum delay or the minimum time of the
binlog processing problems.

The following description does not necessary imply the row-based
format of events. But let us make a bold assumption that the Statement
Based format restricts parallelism on a few specific cases such as
LOAD DATA, bulk inserts. Feasibility of parallelisation for SBR
depends entirely on possibility to find out a conflict between two
binlog transactions. Whereas RBR conflicts can be found relatively easy
this is far from true for the SBR case.

A job the current only slave thread performs applying replication
events is as the following loop:



       Read(E) /* E is a replication event */; 

       Terminate(/* until condition met*/) 

       if (E is the last in its group) 
          Update(rli info);


Execute(E) is intuitively\footnote{Read() is an object for profiling
and optimization as well} the slowest part that attracts
optimization ideas.
The most straightforward one is to read many groups of events to
reconstruct the entire transactions - jobs - as they were on the master,
and execute the jobs in parallel; let's call this method as P1.
This scheme has to ensure that parallel transactions do not have
mutual conflicts. Should they have several control techniques can be deployed:

C0 to turn a blind eye on possible conflicts - there can be cases when
   lack of conflicts is guaranteed, e.g LOAD DATA regardless of
   the binlog format or the cluster timestamp conflict resolution
   ensures the eventual consistency.
   Let's borrow an example from Henrik's mail:


     /* Assume all columns are "0" to start */
     /* 0 */ (SET AUTOCOMMIT=1)
     /* 1 */ UPDATE t1 SET f1=1, f2=2, timestamp=001 WHERE id=1;
     /* 2 */ UPDATE t2 SET f1=3, f2=4, timestamp=002 WHERE id=1;
     /* 3 */ UPDATE t1 SET f1=5, f2=6, timestamp=003 WHERE id=1;
     /* 4 */ UPDATE t2 SET f1=7, f2=8, timestamp=004 WHERE id=1;

     (Slave commits each transactions in parallel threads (4), without locking,
     so order is random.)

     /* 1 */ UPDATE t1 SET f1=1, f2=2, timestamp=001 WHERE id=1;
     /* 3 */ UPDATE t1 SET f1=5, f2=6, timestamp=003 WHERE id=1;
     /* 4 */ UPDATE t2 SET f1=7, f2=8, timestamp=004 WHERE id=1;

     /* The following statement is discarded since NEW.timestamp <
     OLD.timestamp */

     /* 2 */ UPDATE t2 SET f1=3, f2=4, timestamp=002 WHERE id=1

     So in this case there is the race condition that a select between
     commits of rows 3 and 4

     /* a */ SELECT t1.*. t2.* from t1 JOIN t2 WHERE id=1;

     ...may return...
     t1.f1 | t1.f2 | t2.f1 | t2.f2 |
         5 |     6 |     0 |     0 |

     which is a state that never existed on the master, since master would have
     0, 0, 0, 0
     1, 2, 0, 0
     1, 2, 3, 4
     5, 6, 3, 4
     5, 6, 7, 8

     So... Clearly, such behaviour is mostly not acceptable. 

   However a good property is the ultimate and hopefully
   most-of-the-time consistency with the master.

C1 to commit in the master's order; this method can suffice for
   specific applications that do not create conflicting jobs.
   A SELECT similar to the cluster example of C0 would never find
   a non-existed master's state. This case guarantees any-time

C2 to break temporarily parallel applying at the first conflicting
   transaction and switch
 to the serial schedule until a sufficient
   number (the minimum is two) of jobs are conflict-free again;

C3 to compose "better" transactions from the originals
   e.g with conducting sorting to avoid deadlocks, removing duplicates,
   and to implement a scheduling mechanism that ensures the binlog
   order of applying concurrent events by different job threads;
   This approach infers implementation of a dependency graph.

C4 to combine several small transactions into a bigger one with partial
   resolving conflicts right at the parsing stage.
   Big transactions can be served further using scheduling of C1 or C2;
   An apparent advantage of this method is that it's a form of group-commit.
   Also it's a possible technique to distribute jobs fairly.

Notice that all C2-C4 are general methods that honours consistency of
the master data at any time in a sense the slave data always
corresponds to a master's existed state and eventually the slave data
reaches the master's ultimate state (corresponding to the end of the
C0, C1 on the other hand are specific to the load and do not require
the notion of conflicts. They still could be practical for gaining the
eventual consistency (the min time of the binlog processing).

A different to P1 approach is to transform the master transactions
into the slave's jobs using some specific partitioning
criterion, e.g partitioning MyISAM events per table to make a job thread
thread execute all events on a specific table; let's call it P2.
An approach like this does not honour any-time consistency with the
master data while should provide the eventual or the most-of-the-time

There could be yet another P3 offered for cases when a typical
transaction in the binlog is large and it makes sense to
execute the single transaction in parallel. It would resemble
execution of the nested transaction. However there is no engine that
supports the nested transaction in the mysql server so that this
method does not guarantee the strict consistency with the CONSISTENT SNAPSHOT
isolation level (a common profile for INNODB): in
between of two commits of parts of the big transaction the slave can
expose a state that never existed on the master.
Otoh, this method does not need conflict detection and suffice to
transactions executing in the SERIALIZABLE isolation.
Distribution of work for handling a big (epoch) transaction can be organized as
the following.
Coordinator reads the *rows* event after the rows event from the
source (e.g relay-log file) till the commit event, locates all
separate *rows* comprising the event, applies a hash function to each
row to yield an identifier of a worker, and passes the row to the
worker. The assigned Worker deals with performing the handler level
operations ha_write,update,delete_row(). There can be some more tasks
such as initializing (and maybe committing) a sub-transaction for the
Worker. Once the Coordinator meets the Commit event it will commit all
the active sub-transactions \footnote{Workers can commit separately
which is a natural option for the prototype}.
In SERIALIZABLE isolation level, committing pars of the transaction in
one-by-one can be concurrent with a slave local transaction not to loose the
strict consistency. A slave local SELECT can not see a combination of
uncommitted and committed sub-transaction works. To get to an uncommitted yet
data it will have to wait for locks release.
That proves the slave transaction can perceive only states existed on master and
only in the order how the master changed them.

Among the agents there are channels that the Coordinator and the
Workers communicate through.

Once it's online, the Worker's state changes from listening to the
channel to executing a row task. As mentioned, the worker's executing
state infer different work content depending on the type of the row
task: e.g the first ever row can start the sub-transaction.

The Coordinator's state loops in between of reading an event, hashing it,
sending hashed-out tasks to Workers through the channel, and eventually
committing the group.
The channel state is binary: empty of not.

**Further elementary tasks specialization and jobs assignment**

The current SQL thread can play the Coordinator role with minumum
changes to its current logics and the source code. 
The Coordinator thread prepares a task for a Worker thread and calls
something like Rows_log_event::do_exec_row() that the SQL thread
currently runs, just the call will be asynchronous this time.
The task's outcome will be requested later (in principal can be
deferred as late as to at the whole nested transaction commit).

The Workers are instantiated as special slave threads being ready to
execute one of the three Rows_log_event::do_exec_row() kinds.

*The channel implementation*

The channel consists of a st_table cache with the hash access. The key
is the Worker id and the value of the st_table instance. The cache
needs a posix mutex to guard at least write access, cond variable pair
to signal to the Worker by the Coordinator thread. The st_table
instance's record[0],[1] are filled in with the row before and after
images. The cache should have some limit on number of istances, and if
the limit is reached unpacking should happen to an exteranl
buffer.\footnote{The `st_table' interface does not provide with
possibility to use the external buffer in place of the native
record[0,1] though}

*The Coordinator pseudo-code*

A rows-event execution by the Coordinator mimics the current SQL
thread loop in Rows_log_event::do_apply_event().
The Coordinator takes care of opening a st_table instance, but not to
lock that is left to the Worker.

   while (is not empty the current event buffer)

      table = get_table();  // looks up in a new slave free tables hash first
      row = unpack_current_rows(table);  // table.record[0],[1] are prepared or
external buffers (post prototype enhancement)
      worker = tasks_cache.hash_func(row);
      if (check the last row applying by workers status is okay)
         // send row,table for execution    
            table.is_use = worker;
	    memorize worker as the current group execution participant;
            hash_insert(tasks_cache, table); // a new task for a worker
            signal to worker;
         emergency signal to the worker to cancel the current group exec

      if (event is STMT_END_F flagged)
         clean-up the slave free table cache and release all st_table handles
         (same way as the SQL thread currently does)


      is the tasks_hash's hash function with the main property is 
      to returns a worker id

      searches first in yet another the slave
      hash of free tables shared with Workers. The life time of this
      object is bounded at least to spawn the current statement time
      (i.e till STMT_END_F flagged rows-event), might last longer for
      At post-prototype impelementation get_table() is supposed to 
      open only limited number of st_table intances and the instances
      should be able to exploit external buffers instead of the native.

Coordinator effectively conducts 2PC with Workers for the transaction commit
\footnote{strictly correct for SERIALIZABLE isolation, see above}.
The prepare phase starts when a Worker is engaged to begin with a sub-transaction.
The Worker reports to the Coordinator when it's done with all assignments
that designates the prepare has ended. Coordinator is waiting for acks when it
noticed STMT_END_F flagged event, which for the NDB case means the transaction
is about to commit. Upon gathering all prepare acks the Coordinator commits  the
sub-transactions one by one (or the Workers are signalled to commit) executing

*Questions & Concerns for P3*

1. extracting a necessary row from the whole rows-event could be done by
   the Worker if the hash key data prepared on the master and recorded
   into the event together with offset.
   That would allow the Worker not the Coordinator to open a st_table
   handle and unpack the necessary row into its record[].

2. Coordinator-to-Worker communication channel
   requires rather heavy cost posix mutex/cond variable based protocol.
   This cost is higher than e.g in the inter-transactional
   PARALLELIZATION (see the current writeup on WL#4648) as notifications
   have to sent per row-event (!) in the intra-transaction
   The optimization of p.1 seems to relax the cost to be per the whole

*** DESIGN for P1 with C0-C4 workarounds for conflicts ***

A rationale for P1+C0 is the cluster use case with timestamp conflict
resolution as well as LOAD DATA and the bulk insert.

** P1 overview **

P1 can be thought as cooperative work of `R' the coordinator and
{W} the pool of worker threads.

R reads a new group of events from 
from the relay log \footnote{the relay log is merely one of the
possible sources} and appends it onto J the list of jobs.
Coordinator duties are supposed to include updating the relay-log info
position of the last executed group \footnote{an option to consider,
alternatively updating the relay-log.info can be handed over to {W}}.
{W} members pop out a new job from the head of J list and
process that job till commit. Depending on the commit policy a worker
may do some extra coordination with others at thi
s point.
Once it has committed it goes for another loop.

** Requirements for P1 **

* External *

E1. Algorithm must scale close to linear with the number of workers.
    With zero workers in the {W} pool the alg must correspond to 
    the existing pattern.

E2. Algorithm must be flexible to allow future extensions: it starts
    as implementation of P1+C0 but can evolve up to P1+C4

E3. While there is no requirement for committing in the master's order
    fairness of distribution of jobs between workers does not matter.
    However the optimal performance with C1 will require the fairness.

E4. While there is no sorting (C3) of row-events in the job groups
    implemented neither C2 the step-back to the serial schedule tests
    should simulate load such a way that does not create  deadlocks.

* Internal *

1. R must prepare jobs for {W} on time;
   the coordinator can be hungry, the workers not.

2. R must not keep the job list for too big; a natural constraint for
   |J| t
he size of J is

   n <= |J| <= k * n, where k is a small integer, can be 2.

** Pseudo-code for coordinator and Workers: P1+C0

    /* a new aggregation class representing a group of repl event */
    Class Group_event; 

       the list of jobs for R-{W} cooperation, the list can be
       augmented with some counters or other attributes.
       It must have guard methods for exclusive access.
       Groups/Jobs are supposed to
       be numbered according to the master's order. A number is
       associated with the group at its reading from the relay-log.

    List <Group_event> J; 

    /*  the low water mark of committed groups, to be implemented as 
        a semaphore. 
    	The counter increases still not necessarily permanently.
        The current value designates that jobs numbered with lower or equal
        values has been committed.
    Class Commit_Semaphore
       ulonglong counter = 0;
       int lock();
       int unlock();

    Commit_Semaphore commit;


	 the list is initially empty and the augmented counter says
         there has not been yet any job
     assert(|J| == 0);
     assert(J.went_through == 0); 

     /* at warm-up read 2*(sizeof workers pool groups) */

         Read_group() takes care of  possible Terminate and Skip
         J.append() associates each read G with `order' the number that
         increments per read
     for i=1 .. 2*|W| do J.append(relay_log.Read_group()); 
     assert(J.head.order == 1);
     assert(J.tail.order == 2*|W|);

     /* continue reading and updating rli info in the loop */



       if (|J| < 2*|W|) 
         for i=1 .. 2*|W| - |J| do J.append(relay_log.Read_group());
         sleep(reader_nap_time); // to be tuned in run time

          the commit watermark and rli info are updated per |W| groups
          on average

       if (J.went_through - |W| + 1 > commit_counter)

         commit.counter = J.went_through - |W| + 1;
            update rli_info along with the watermark as all groups
            before the current have been done.

            The position corresponding to `commit.counter' watermark
    	    is generally pessimistic. However, the stored group
    	    position may not be exact because row-based events
    	    execution can be reentrant.
	    At recovery, or upon a worker failure and subsequent
    	    restart the slave system, a piece of the relay-log
    	    [position(commit.counter + 1), Read_Master_log_pos] should be
	    executed in the reentrant mode.


A worker:

     /* killed can be set by the thread itself e.g if G.execute() fails */


       Group_event G;
       if (|J| is 0) 
          J.wait(); // wait() unlocks; wait should not normally happen
	     make coordinator more active on 10%, the constant is heuristics
          coordinator_nap_time -= 0.1 * coordinator_nap_time;
       G = J.pop(); // |J| decrements
       assert(J.went_through == G.order);
       my_group_order = G.order;
       G.execute(); // similar to Log_event::do_apply_event for all
                    // events but commit

       G.commit();  // commit is made separate


** Extension towards P1+C1 **

Synchronization of commits by workers to comply to the master binlog
order can be added as an option inside of G.commit() method.
Committers will form a queue where elements are sorted according to
their `my_group_order value. commit_counter is allowed to increment
only to the immediate successor, i.e the job that has

  my_group_order = commit_counter + 1.

The queue won't be processed while its head is not
an immediate successor. Once all |W| jobs have been committed in the
master's order the coordinator can perform rli_info.update(), that is per |W|
jobs. Alternatively, this part can be executed per job, i.e each W updates

** Extension towards P1+C2 **

The notion of the conflict must be implemented.
The pool size is going to be dynamic variable. The coordinator should be
able to mark items in the list of jobs with a break tag once it finds
that there is an earlier in the master order job not yet committed that
conflicts with the current being read.
Workers stop popping out until all the jobs prior the marked have been

** Extension towards P1+C3 **

Requires the notion of the conflict.

While it's a complicated matter the idea is to construct a dependency
graph that the coordinator (the coordinator) is to maintain and the workers
are to consult with when are transiting from event to event.
A piece
 of a job - an ordinary row-event - can stay blocked until an
earlier in the master's order job has not been done yet.
The coordinator's duties include sorting of events in a job to avoid
deadlocks which should be performed at conflicts lookup time.

** Extension towards P1+C4 **

C4 is basically C3 but requires more intelligence for the coordinator.