WL#5569: Replication events parallel execution via hashing per database name

Affects: Server-5.6   —   Status: Complete

High-Level Description

MOTIVATION
----------
In many cases the user application logically partitions data per
databases allowing some flexible patterns of applying replication
events on the slave.
In one important use case updating within a single database must
follow the master total order but any two databases don't have to
coordinate their applying.


OBJECTIVE
---------
Given the user application specifics of the logical partitioning per
database, the goal is largely the same as in wl#943, WL#4648 that is
to scale up slave system performance via engaging multiple threads to
apply replication events. Improvement in performance effectively
decreases lagging the slave behind.

A prototype implementation subject to few technical-wise limitations
is considered in WL#5563.


FEATURES
--------
Parallelization by db name is not concerned in the binlog format of
  events;

Partitioned to different worker threads transactions do not have
  commit coordination;

Parallelization by db name is transparent to the engine type;

Automatic switching from PARALLEL concurrent to temporal sequential mode
  in special cases when events can't be executed concurrently in parallel.
  Once a special case is handled, the slave automatically restores 
  the concurrent PARALLEL mode.
 
  NOTE_1: An event that updates multiple databases do not necessary mean
  such switching.

  NOTE_2: There are few event types like Format_Descriptor, Rotate
          that are executed by Single-Threaded Slave standard algorithm
          (that is by Coordinator).
	  They do not introduce any throttle to concurrency while binary
	  logs are not rotated frequently.

Internal Goals
--------------
To create a framework facilitating further extending towards support
of more use cases of partitioning, e.g per table.


Conceptual limitations and differences from the single thread
--------------------------------------------------------------

a. This framework relies on WL#2775 implementing a base for automatic
   recovery. The transactional storage keeping the descriptor of last-updated
   info is extended to record also an array of last-executed group positions
   by the worker threads (see more in Recovery section).

b. Status info displaying via SHOW-SLAVE-STATUS will largely remain the same.
   In particular that relates to the monotonic grow of
   EXEC_MASTER_LOG_POS. However the parameter will turn into a
   Low-Water-Mark. LWM indicates that before (in the master logging order
   sense) the last committed transaction or an executed
   group of event there is no uncommitted yet transactions/groups.
   But there can be some committed after the LWM.
   SECONDS_BEHIND_MASTER can't be updated per each event and updating
   happens in intervals along with EXEC_MASTER_LOG_POS (see LLD for more 
   details).

c. Additional info on status of individual workers can be provided via 
   mysql.SLAVE_WORKER_INFO table of WL#5599.

d. May not be supported:

    - transaction retry after a temporary failure
    - start slave UNTIL
    - foreign keys constraint if the referenced table is in 
      a different database (see wl#6007). Currently, 
      MTS user has to make sure that possible FK-dependencies do not cross
      over the explicit table databases set. Otherwise it could lead
      deadlocks involving slave worker threads and data inconsistency.
      
    - pre-5.0 master server version LOAD-DATA related events

d.  BUG#37426 fulnerable master can't be detected by MTS. This is a technical
    issue, could be lifted away.
    

Recovery model
--------------
The parallel execution mode does not change in principle recovery for DDL
or any other Non-transactional events.
In this case the recovery routine is deemed to be manual.

For the reason of the simple possible recovery, when the user expects
failures and got accustomed to some specific to his application recovery
procedure, DDL can (optionally) force the slave to apply it in the SEQUENTIAL
mode with waiting first for the previous Parallelly distributed load has
been completed by the Workers.


This worklog requires some extensions to be done to WL#2775 slave
system tables for automatic recovery.  The relay-log-info table is
supposed to be augmented (WL#5599) to contain each worker last
executed group position. Another change is meant for the last executed
group position of the single SQL thread which is converted into the
low-water mark of executed group positions among all Workers, that
indicates no gaps in committed groups exist before it.

Depending on the binary log events format the following methods can be applied

1. (Not to be implemented here) Idempotent recovery when binlog_format == ROW

   The low-water-mark last executed group position that the coordinator
   maintains alone can suffice.
   It is regarded as an anchor to restart events applying same way as
   the last executed group position of the single SQL thread does.
   Possible errors like Dup-key, Key-not-found few more are benign and will
   be ignored.
   The idempotent applying policy changes to the user preferred one
   as soon as the low-water mark has reached the max group position among
   workers.
   This type of recovery does not require the Worker pool and can be conducted
   solely by the Coordinator.

   Notice a great advantage of this method although bound to the ROW-format is 
   ransparent to the engine type.

2. General recovery regardless of the binlog-format (read more on that in
   WL#5599).

   This method leaves aside the mentioned DDL and
   Non-transactional Query-log-events and applies to
   the Transactional Rows- and Query -log-events.

   This type of recovery involve information that Workers left
   in their info-table.
   In short, MTS recovery reads the Workers info constructs 
   a set of not-applied transactions that follow LWM.
   Starting from the low-water-mark the Coordinator parses events
   to ignore those that were committed and apply the rest.


Favorable environment
----------------------

While Statement-format events including Query-log-event and all types of
engine will be supported, the primary concern is in achieving performance
scaling up with the following conditions:

- Innodb storage engine;
- ROW binlog format;
- Transactions operates on disjoint sets of databases (i.e.,
  they do not conflict and can commit independently).

Sketch of Architecture
----------------------
It is assumed for now that any replication event can be partitioned.
Query-log or Rows-log events to different databases fall into that
category with partitioning per db name.
Such partitioning model suggests to equipping the single SQL thread
system by a worker thread pool. The SQL thread changes its current multi-purpose
role to serve as their Coordinator.
Interaction between Coordinator and Workers follow to the producer-consumer
model, pretty close to what had been implemented in the WL#4648
(in fact that piece of work can be reused).

Pseudo-code for Coordinator:

// Coordinator the former SQL thread remains to operate as in the single thread env
// to stay in read-execute loop but instead of executing an event
// through apply_event_and_update_pos() it distributes tasks to workers

while(!killed)
  exec_relay_log_event();

where

  exec_relay_log_event()
  {

    if (workers_state() is W_ERROR)
      return error;

    ev= read();

    switch (mts_execution_mode(ev))

    case TEMPORARY_SEQUENTIAL:
         wait_for_workers_to_finish();

    case PARALLEL_CONCURRENT:
         engage_worker(ev, w_thd[worker_id]);
	 break;

    // Some of events type like Format_Descriptor, Rotate Coordinator
    // executes itself. Depending on the event property its execution 
    // can require synchronization with Workers.

    case SEQUENTIAL_SYNC:
         wait_for_workers_to_finish();
  
    case SEQUENTIAL_ASYNC:
         apply_event_and_update_pos();
  }
}

Pseudo-code for Worker:

// The worker stays in wait-execute loop

while (!killed)
{
  ev= get_event();     // wait for an event to show up
  exec_res= ev->apply_event(rli);
  ev->update_pos();    // updating workers info-table with the task's group position
  report(exec_res);    // asynch status communication with Coord
}

The worker pool is activated via START SLAVE.
STOP SLAVE terminates all threads including the worker pool.
Requirements:
=============

R0. To provide the user interface for the parallel execution mode including

    A. @@global.slave_parallel_workers

       that specifies the Number of worker threads.
       Zero number of workers, the default, designates no worker pool and therefore
       the legacy sequential execution mode or Single-Threaded-Slave (STS).

    Use cases:

    U.A: set @@global.slave_parallel_workers= `value`;

    (see LLD section for the details of the variable definition)

R1. To provide transparent detection of limitations for parallelization 
    and performing corresponding actions for such events due
    to the following cases:

    A: the hash function mapping conflicts;


       U.A.1: a transaction spawning to multiple databases

              Let's consider multi-statement transactions  T_1, T_2, T_3, where

              T_1: begin; update db1; commit;
	      T_2: begin; update db1; commit;
              T_3: begin; update db1; update db2; commit;

              Verify the correct applying takes place, the parallel exec mode
resumes
              after T_3 has committed.

       U.A.2: a query crossing databases

              set autocommit = ON;
              update db1.table, db2.table ...;

              In case the number of databases exceed a maximum the whole hash
              will be locked for a Worker executing the query that reduces
              parallelism.

    B: the exceptional set of events to be executed by Coordinator;

       U.B: Limited to the sequential execution by Coordinator events include
all type of
            events in the list:

            STOP_EVENT,
            ROTATE_EVENT,
            FORMAT_DESCRIPTION_EVENT,
            INCIDENT_EVENT

    C: events from an old master with reduced parallelism;

       U.C: A query event from MTS-unaware old master are treated as if the query
            crosses the maximum databases of the case U.A.2.

    D: events that can't be supported in MTS;

       There are cases when MTS starts reading events not from the boundary of
       a group. Unfortunately that can be a legacy log file created by an old
server.
       
       U.D: LOAD-DATA events generated by 4.0,4.1 master. MTS execution will stop
            with an error.


R3. To deliver MTS implementation that scales up.
    To annotate as favorable so least permant use cases.

     use case:

     U: to demonstrate when the max performance is achieved as a
        function of parameters including:
        the number of computer computing units, databases, Workers,
        the binary log format, engine types.

NOTE_1: the crash-recovery requirement is addressed by WL#5599.

NOTE_2: monitoring status is considered by other WL:s.

       As for the individual per Worker so the aggregate info
       on the overall performance incl LWM.
       A P_S Worker table (WL#5631)
       holds all the per-Worker info including execution times,
       the last executed group by a worker, the total performed tasks by a
       worker, the current assigned queues and status of the queues.
       Applying progress of an individual Worker can be monitored by
       mysql.SLAVE_WORKER_INFO table of WL#5599


NOTE_3: an option to set up database distribution manually is not implemented here

       The manual partitioning would allow the user to bind two or
       more certain databases to be served through one specific queue
       thereby to accomplish many *dependent* databases to one Worker
       association.
       A similar idea is to implement priority mechanism for applying
       events from different queues.



Replication events applying phases, tasks decomposition, mapping
----------------------------------------------------------------

The sequential execution of an event breaks into five basic activities
as the following diagram represents

+--------------------------------------------------------+--------------------+
|                                                        |                    |
| +---------------+   +--------------+   +-------------+ |                    |
| |               |   |              |   |             | |                    |
| |     Read      |   |  Construct   |   |     Skip    | |   COORDINATOR      |
| |    Event    --+---+>           --+---+->  Until    | |   STAGES           |
| |  (relay-log)  |   |    Event     |   |    Delay    | |                    |
| |               |   |              |   |             | | (There is only     |
| +-----^---------+   +--------------+   +-------+-----+ |  one instance      |
|       !                                        |       |  of a coordinator) |
|-------+----------------------------------------+-------+--------------------+
|       |                                        |       |                    |
|       |             +--------------+   +-------+-----+ |                    |
|       |             |              |   |       V     | |   WORKERS          |
|       |             |              |   |             | |   STAGES           |
|       +-------------+   COMMIT    <+---+-- EXECUTE   | |                    |
|                     |              |   |             | | (multiple workers  |
|                     |              |   |             | |  can exist at the  |
|                     +--------------+   +-------------+ |  same time)        |
|                                                        |                    |
+--------------------------------------------------------+--------------------+


- Coordinator: reads events from relay log, instantiates them,
               handles the skip/until/delay stages and then
               schedules them to the correct worker

- Worker:      execute events to commit statements and
               whole transactions

The relation between coordinator and workers is: 1-N, where 1 is
the one and only coordinator thread and N is the number of threads
in the workers pool.

Agglomeration of Read, Construct and Skip-Until-Delay to be executed within
one thread is reasonable considering the three tasks are light and some host of
codes can be reused.

Execute and Commit tasks fit for delegating to Worker threads.
Indeed, splitting Execute into pieces can not be really
backed up by any reasonable requirement.
The Commit task is actually a part of the Execute and
the last executed group position is a field of RLI table to be updated and
committed within - at the end of - the master transaction.
So agglomeration of the two to carry out by one Worker is a reasonable choice.

Communication mechanism between C and W consists of an assignment
Worker Queue (WQ) and Assigned Partition Hash (APH) which correspond
to the downward arrow from C to W of the diagram.
For recovery purpose there is yet another Groups Assignment Queue
(GAQ) that corresponds to the upward arrow.

WQ holds scheduled to execution events belonging to a certain
partition. Coordinator calls a hash function on an event instance in
order to determine a partition identifier P_id.
A partition to a Worker mapping is controlled with the APH hash.
The hash contains (P_id, N, W_id) tuples, where N - a number of ongoing
*groups* of events. 

APH is complemented with Current Group Assigned Partitions object
(CGAP) on the Coordinator's side that lists all P_id found during the
current group assigning. Multiple events of a group map to one P_id as
long as one database is updated. CGAP items can be many if many databases
are updated by the group. Each item of the list forces either to
insert or to update a tuple in the APH at scheduling time as
following.

On the Coordinator side, after computing P_id Coordinator first searches CGAP
for an existing item.
If the search is positive APH does not change.
If the search is negative first a new record with P_id is inserted into
CGAP. Further the P_id-keyed tuple is searched in APH.
If not found a new (P_id, 1, W_id) tuple is inserted, W_id is determined as a new
Worker's id if the max pool size allows to thread it out, and to the
least occupied Worker id if not.
Otherwise the found record is updated with incrementing N++.
At last if the event is the current group terminator, all items in the CGAP are
discarded.

On the Worker's side when the current group is being committed N-- is
decremented in all the group's tuples that Coordinator inserted or
updated. A tuple is (optionally) removed from APH once N becomes zero.
The identifiers of the group are collected in Current Group Executed
Partitions (CGEP) list. That is the Worker's side complementary to APH
object that behaves analogically to the Coordinator's CGAP.
That is each item of the CGEP list forces to update N and possible to
delete a corresponding tuple from APH at the group committing time.

An event propagation is depicted  as the following


 				      +---------+----------+
	 ++-------------++            | Queue_1 + Worker_1 |
	 ||             ||	      +---------+----------+
+---+	 || Coordinator || hash(E)    +---------+----------+
| E +--->||             |+----------->| Queue_2 + Worker_2 |
+---+	 ||             ||	      +---------+----------+
	 ++-------------++	               ...
				      +---------+----------+
				      | Queue_n | Worker_n |
				      +---------+----------+ 


where hash(E): E -> P_id, Queue_i contains zero or more P_id.

A worker that was attached to the P_id executes the event and updates
the RLI recovery table if the event is the current group terminator.
Although workers don't have any communication between each other
they coordinate their commits to RLI as explained further.

WQ has a maximum size threshold.
The limited WQ makes Coordinator to wait until the actual size of
the target WQ decreases within the limit.

Notice, there are few constraints to this type of the
inter-transaction parallelization:

-  the entire transaction is mapped to one Worker at most
-  the few types of Log_event class (see R1.uc) are executed in the Sequential mode.
   Those types represent internal replication facilities and can't
   have any significant impact on performance (see LLD for further
   details).
-  workers still have common resources such as RLI recovery table
   and therefore is subject to few implicit synchronizations.

The Commit task has a strong connection with WL#5599. When a Worker
commits to RLI it is supposed to update its own last committed and
possibly the common Low-Water-Mark.
That is the Worker's role includes carrying out a group commit to RLI.
In order to facilitate to that, C and {W} maintain yet another Groups
Assignment Queue (GAQ) that the Worker updates atomically with RLI.
GAQ contains tokens of each being processed group of
events by different Workers.
The queue item consists of 
a sequence number of the group in the master binlog order,
the partition id, the assigned Worker and a status -  Committed or Not.
The queue's head is a next to the Low-Water-Mark (NLWM) group.
It separates out already committed groups without gaps from the rest
that form thereby a sequence of still ongoing and committed started
from the not-yet-committed NLWM.
C enqueues an item to the GAQ once it
recognizes a new group of events is being read out. W marks the group
as Committed and if its assignment index follows immediately the index of
the current NLWM, W rearranges the head of the queue as in the
following example.
Let's look at use case of 3 Workers identified as W#1, W#2, W#3.
On the following diagram, let NLWM the head of GAQ contains a group
being executed by Worker #2.
Assume further that Coordinator distributed 5 groups as the following

|   0     |     1    |      2   |   3    |    4
+---------+----------+----------+--------+----------+
|  W #2   |  W #1    |   W #1   | W #2   |  W #3    |    
|         |          |          |        |          |
+---|-----+----------+-----+----+--------+----------+

here the first number row are sequence numbers in the master binlog
order.

Suppose the first of W #1 and the task of W #3
got committed at time Worker #2 is committing.
W #2 has to rearrange  GAQ

|   0     |     1    |      2   |   3    |    4
+---------+----------+----------+--------+----------+----
|  W #2   |  W #1    |   W #1   | W #2   |  W #3    | ...
|committin| Committed|          |        | committed|
+---|-----+----------+-----+----+--------+----------+---
    | 			   |
    +--- NLWM------------->+

to discard its and the next to it committed groups which is the group
with the sequence number 1. The new NLWM will shift to point at the
group 2 and the queue becomes

|   2     |   3    |    4     |
+---------+--------+----------+
|   W #1  |  W #2  |  W #3    |
|         |        | committed|
+---------+--------+----------+

Now in case W #1 commits the 2nd group

|   2     |   3    |    4     |
+---------+--------+----------+
|   W #1  |  W #2  |  W #3    |
|committin|        | committed|
+---------+--------+----------+

the queue transforms to

|   3    |    4     |
+--------+----------+
|  W #2  |  W #3    |
|        | committed|
+--------+----------+

and at last when W #2 commits it becomes empty.





Invariants of Parallel and Sequential execution modes
-----------------------------------------------------

					 ++-------------++
	  +-----------------+		 ||             ||
	  |                 |Execute	 || Worker      ||
	  |   Event         +------------+|             ||
	  |                 |		 ||             ||
	  |                 |		 ++------+------++
	  |                 |			 |  distribute
	  | do_apply_event()|Execute	 ++------+------++
	  |                 +------------++             ||
	  +-----------------+		 || Coordinator ||
					 ||             ||
					 ||             ||
					 ++-------------++



Whichever of Coordinator or Worker executes an event it must
call Event::do_apply_event() method. 
The method is the core of the Execute task.
The diagram above explains an event can be applied through
Coordinator::distribute + Worker::execute (the Parallel exec mode) or
Coordinator::Execute. The event classes hierarchy does not receive
any changes due to the new Parallel execution mode thanks to Worker
execution context design.

Error Reporting (to the End User)
=================================

In multi-threaded execution mode a Worker can hit an
error. At that point, it propagates that error to the
Coordinator. The most recent error that a Worker found, and that
forced the slave to stop, will eventually be displayed in "SHOW
SLAVE STATUS".

However, an important note needs to be raised here. When a Worker
stops because of an error, there may be the case that there will
be gaps in the execution history (some other Workers may have
already executed events that were serialized, by the master,
ahead of this one; some other, serialized before, might not even
have finished their execution). Thus apart from the Worker error
that is reported in SHOW SLAVE STATUS, we also issue a warning:

  "The slave Coordinator and Worker threads are stopped, possibly
   leaving data in inconsistent state. The following restart
   shall restore consistency automatically. There might be
   exceptional situations in the recovery caused by combination
   of non-transactional storage for either of Coordinator or
   Workers info tables and updating non-transactional data tables
   or DDL queries. In such cases you have to examine your
   data (see documentation for de tails)."

We choose to go with this approach because of SHOW SLAVE STATUS
limitation, which is not prepared to show more than one error,
thus we don't want to shadow the original cause.

Future Work (Feature Request):

  In the future we can improve the MTS reporting capabilities by
  implementing an information/performance schema table which the
  user can query to get details on individual the Worker internal
  state. Thus, we can promote the warning mentioned above to
  error, show it in SHOW SLAVE STATUS and then exhibit the
  original cause (and the subsequent list of errors issued during
  the entire execution of the offending statement, until the
  Worker stops) in the [I|P]_S table.
This section elaborates on the following fine details of the project:

I.   Basic objects, lifetimes, states, cooperation

  0.  Coordinator
  1.  Worker
  2.  Worker pool, lifetime, states
  3.  Coordinator <-> Worker communications;
      WQ, GAQ interfaces

  4.  The load distribution hash function and its supplementary mechanisms

II.  Execution modes and flow; use cases and their handling

  0.  WQ overflow protection
  1.  Fallback to the sequential exec mode
  2.  Deferred scheduling of BEGIN;

III. Interfaces for recovery

  0. GAQ from the RLI-commit point of view for WL#5599
  1. Executing recovery at starting the slave.

IV.   User interfaces



I.   Basic objects, lifetimes, states, cooperation
===================================================

0.  Coordinator, roles
----------------------

    Is implemented on the base of the former SQL thread in a way
    of WL#5563 prototype.
    After Coordinator instantiates an event, it finds out its 
    partition/database information. For instance Coordinator does not
    sql-parse a Query-log-event but rather relies on the info prepared
    on the master side.
    Coordinator decides if an event needs the sequential execution
    if the partition id can't be computed (more than one database with
    the automatic hashing) or the event type belongs to the exceptional kinds
(see HLS).
    If event is parallelizable, Coordinator calls a hash function
    to produce P_id the identifier of a partition/database.
    APH is searched for an existing record keyed by P_id, the found
    record is updated or a new is inserted. 
    In the update case there exists a Worker and it remains to handle
    the partition.
    In the insert case Coordinator the least loaded Worker is assigned.
    A separate constant activity is to fill in GAQ each time a new
    group is recognized, see further details of handling BEGIN.

    Other activities include:

    Invoking recovery routine at the slave sql, at bootstrap;
    Watching the pool status if any Worker has failed, per event. A
      failure noticed triggers marking with KILLED the remained
      workers and the Coordinator's exit;
    Shutdown the Worker pool, at exit.

1.  Worker
----------

    Worker is implemented as a system thread that handles events alike 
    the sequential mode SQL thread.
    Its execution context is Slave_worker class based on
    Relay_log_info that corresponds to the execution context of the
    SQL thread or Coordinator.
    
    Worker calls a new Log_event::do_apply_event_worker(Slave_worker *w)
    that casts to Log_event::do_apply_event(w) for all but
    Xid_log_event subclasses of Log_event.

    Xid_log_event::do_apply_event(Slave_worker *) for execution the event by Worker 
    is different from do_apply_event(Relay_log_info *) not just
    because of the type of the argument but rather logics of modifying
    the slave info (recovery) table (see WL#5599).

    The passed to do_apply_event() argument can call its own
    methods. Those methods need to be declared virtual in
    Relay_log_info base class and defined specifically for
    Slave_worker.
    Example: report().


2.  Worker pool, lifetime, states
---------------------------------

    Worker pool is a DYNAMIC_ARRAY of THD instances each attached to OS thread.
    Worker and Coordinator maintain a WQ as a communication channel.
    W and WQ associate 1-to-1, that justifies making WQ as a
    member of the Worker class.
    The pool is initialized with number of members according to
    @@global.slave_parallel_workers at time the SQL/Coordinator thread
    starts.

    The pool is destroyed by Coordinator at its shutdown time.

    A worker resides in a wait-execute loop; wait is for
    an event in its WQ, execution is through ev->do_apply_event().
    Commit or XID group terminators or a self-terminating event
    like DDL of Query_log_event force to update the Worker info table.
    Committing the group affects GAQ as described in HLS part.

    A worker leaves the loop in case of being killed or errored out.
      KILLED is "hard" to force the instant exit like after
      KILL query. 

      Optionally not to be implemented here, there could be soft KILLED
      to make the Worker to process all its assignments first and exist upon that.
      The soft would be a facility to stop the slave with no gaps
      after LWM (see STOP SLAVE).


3.  Coordinator <-> Worker communications; WQ, GAQ, APH interfaces
--------------------------------------------------------------

    WQ, GAQ are implemented through array of a fixed size in a manner
    of the Circular buffer. Here is a template definition:

     typedef struct queue
     {
          DYNAMIC_ARRAY Q;  // placeholder

	  mysql_mutex_t lock;
	  mysql_cond_t  cond;

	  public:

     	  uint en_queue(item*);
          item* head_queue();
	  item* de_queue();
	  bool update(uint index, item*)

     } Slave_queue;

     `item*' stands for a reference to an opaque object corresponding
     to the task assignment (WQ) or the group of events descriptor (GAQ).

     Coordinator calls WQ.en_queue(), Worker - head_queue() and
     de_queue(). Phasing the dequeue operation into two is required by
     specifics of the Execute logics.

     Coordinator calls ind= GAQ.en_queue() when a new BEGIN found in the
     stream of read-out events. Later when the partition id and the Worker id
     got determined it calls GAQ.update(ind, {id, W_id, Not-committed}).
     Worker calls GAQ.update(ind, {id, W_id, Committed}) and GAQ.de_queue()
     as many times as it's necessary if NLWM needs moving.

     APH is a hash with a semaphor (mutex/cond var). The mutex is
     to be held by a caller for any operation. 
     Coordinator searches for, inserts or updates a record. 
     A Worker updates or deletes.
     Coordinator can wait for a condition like `N' usage counters zeros
     out (see fallback in II.1).

     A possible optimization (not yet implemented) for operations on WQ, GAQ,
APH is 
     to implement spin-locking, and for APH - record-level locking.
     Latest (by Tue May 10 15:38:55 EEST 2011) benchmarking suggests this approach.


4.  The load distribution hash function and its supplementary mechanisms
------------------------------------------------------------------------

    The database partitioning info in Rows-event is available for
    Coordinator for granted but not for Query-log-event.
    Early preparation of the hash info on the master side is done to
    the Query-log-event class through a new Status variable of the class.
    Each time a query is logged all databases that it has updated are
    put into a list of the new status var.
    Coordinator checks the list through a new method similar to the existing 
    Log_event::get_db() and calls the hash function as
    explained in p.1.


II.  Execution modes and flow; use cases and their handling
=====================================================

0.  WQ Overflow protection and lessen C to W synchronization
-------------------------------------------------------------

C always waits for a Worker if it can't assign a task due to 
the limited size of its WQ.
Whenever it is proved all Workers have been assigned with sufficient
bulk of load C goes to sleep for a small amount of time
automatically computed basing on statistics of WQ:s processing.


1.  Fallback to the sequential exec mode
----------------------------------------

This part is extensively highlighted in HLS section.
When an event (e.g Query-log-event) contains a db in the partition info that is not
handled by current Worker but instead has been assigned to some other,
C does the following:

a. marks the record of APH with a flag requesting to signal in the
   cond var when `N' the usage counter drops to zero by the other Worker;
b. waits for the other Worker to finish tasks on that partition and
   gets the signal;
c. updates the APH record to point to the first Worker (naturally, N := 1),
   scheduled the event, and goes back into the parallel mode


2.  A special treatment to BEGIN Query-log-event
-------------------------------------------------

BEGIN Query_log_event is regraded as the beginning of a group of
events token.
It allocates a new item in GAQ which will stay anonymous until the next event
reveals the partition/database id.



III. Interfaces for recovery
============================


0. GAQ from the RLI-commit point of view for WL#5599 
----------------------------------------------------

Recovery implementation part which uses cases include

     a. crash
     b. stop slave
     c. a slave thread is stopped
     d. a slave thread is errored out

Commit Query-log-event or XID event forces the Worker to
rearrange NLWM that is to remove first items of GAQ as garbage.

1. Executing recovery at starting the slave
-------------------------------------------

see recovery after the temporary failure in section II as well.
Read details on this section in WL#5599.


IV.  USER Interfaces
====================

0.  User interfaces to Coordinator
----------------------------------

     a. set @@global.slave_parallel_workers= [0..ULONG],

        Cmd line: YES
        Scope:    GLOBAL
        Dynamic:  YES      (does not affect ongoing slave session)
        Range:    0..1024
   
	In case @@global.slave_parallel_workers == 0, execution will
        be in the legacy sequential mode where SQL thread executes
	events.
	Changes to the variable are effective after STOP SLAVE
	that is the running pool size can't change.
   
     b. @@global.slave_pending_jobs_size_max

        Cmd line: YES
        Scope:    GLOBAL
        Dynamic:  YES      (does not affect ongoing slave session)
        Range:    0..ULONG

        Max total size of all events in processing by all Workers
        The least possible value must be not less than the master side
        max_allowed_packet.

     c. Recovery related (WL#5599), including
     @@global.slave_checkpoint_group                                            
  
        Cmd line: YES
        Scope:    GLOBAL
        Dynamic:  YES      (does not affect ongoing slave session)
        Range:    0..ULONG

     @@global.slave_checkpoint_period
 
        Cmd line: YES
        Scope:    GLOBAL
        Dynamic:  YES      (does not affect ongoing slave session)
        Range:    0..ULONG

     d. At STOP SLAVE Coordinator waits till all assigned group of events
        have been processed. Waiting is limited to a timeout (1 minute
        similarly to a Single-Threaded-Slave case).

     e. KILL Query|connection Coordinator|Worker has the immediate affect
        that is a Worker may not finish up the current ongoing group
   	thereby letting the committed groups to have some gaps.

     Changes to Show-Slave-Status parameters 

     f. The last executed group of events coordinates as described by 
        EXEC_MASTER_LOG_POS et all are not update per each group (transaction)
        but rather per some number of processed group of
        events. The number can't be greater than
        @@global.slave_checkpoint_group and anyway SBM updating
        rate does not exceed @@global.slave_checkpoint_period.

     g. Notice, that Seconds_behind_master (SBM) updating policy is slightly
        different for MTS.
        The status parameter is updated similarly to EXEC_MASTER_LOG_POS et all
        of p.f.
        SBM is set to a new value after processing the terminal event
        (e.g Commit) of a group.
        Coordinator resets SBM when notices no more groups left neither
        to read from Relay-log nor to process by Workers.

      

1.  User interfaces to Workers
------------------------------

     a. Monitoring through a new P_S table of wl#5631

     b. (not to be implemented here) Manual performance tuning

        - to freeze (unfreeze) the Worker's communication channel, that is
	  to make it work through the current set of partitions

	- to combine multiple considered to be mutually dependent
          partitions/databases into one WQ

	- to start multiple workers at the
          slave bootstrap and manually associate each with a set of
	  partitions/databases