# WL#7165: MTS: Optimizing MTS scheduling by increasing the parallelization window on master

Affects: Server-5.7   —   Status: Complete

Executive Summary
=================

WL#6314 implemented intra-schema multi-threaded slave, where multiple transactions
can be applied in parallel on the slave (even when they modify same database) as
long as they are non-conflicting.

The existing implementations uses a simple algorithm for determining which
transactions are non-conflicting. It already gives a 40% improvement on one

The present worklog optimizes the implementation by using a potentially more
precise algorithm to determine which transactions are non-conflicting. On
workloads that are not evenly distributed on many databases, this allows more
transactions to execute in parallel.

After this worklog the Intra-schema multi-threaded slave should be optimized and
we should be able to gain more throughput.

No change to server user interface. The new scheme will completely replace the
scheme implemented in WL#6314.

FR2: mysqlbinlog output shall show the two logical timestamps for all
transactions.

NFR1: Some transactions may be tagged with zero values of last_committed
and sequence_number. In this case transaction's scheduling on slave
reduces parallelism. It's insignificant penalty as long as the number of
such transactions is small.

NFR2: Some transactions/event groups are tagged as dependent on a preceding
one even though the actual dependency is different (more relaxing).
It's insignificant penalty as long as the number of
such transactions is small.
Old, Commit-Parent-Based Scheme
===============================

The old scheme for multi-threaded slave implemented in WL#6314 works
as follows.

- On master, there is a global counter. The counter is incremented
before each storage engine commit.

- On master, before a transaction enters the prepare phase, the
current value of the global counter is stored in the
transaction. This number is called the commit-parent for the
transaction.

- On master, the commit-parent is stored in the binary log in the

- On slave, two transactions are allowed to execute in parallel if
they have the same commit-parent.

Problem With Commit-Parent-Based Scheme
=======================================

The old scheme allows less parallelism than would be possible.

The old scheme partitions the time line into intervals.  When a
transaction commits, the current time interval ends and a new begins.
Two transactions can execute in parallel if they were prepared within
the same time interval. The following picture illustrates the scheme:

Trx1 ------------P----------C-------------------------------->
|
Trx2 ----------------P------+---C---------------------------->
|   |
Trx3 -------------------P---+---+-----C---------------------->
|   |     |
Trx4 -----------------------+-P-+-----+----C----------------->
|   |     |    |
Trx5 -----------------------+---+-P---+----+---C------------->
|   |     |    |   |
Trx6 -----------------------+---+---P-+----+---+---C---------->
|   |     |    |   |   |
Trx7 -----------------------+---+-----+----+---+-P-+--C------->
|   |     |    |   |   |  |

Each horizontal line represents a transaction. Time progresses to the
right. P denotes the point in time when the commit-parent is read
before the prepare phase.  C denotes the point in time when the
transaction increases the global counter and thus begins a new
interval.  The vertical lines extending down from each commit show the
interval boundaries.

Trx5 and Trx6 are allowed to execute in parallel because they have the
same commit-parent (namely, the counter value set by Trx2).  However,
Trx4 and Trx5 are not allowed to execute in parallel, and Trx6 and
Trx7 are not allowed to execute in parallel.

But note that two transactions that hold all their respective locks at
the same point in time on the master are necessarily non-conflicting.
Thus, it would not be problematic to allow them to execute in parallel
on the slave.  In the above example, this has two implications:

- Trx4, Trx5, and Trx6 hold all their locks at the same time but Trx4
will be executed in isolation.

- Trx6 and Trx7 hold all their locks at the same time but Trx7 will be
executed in isolation.

It would be better if Trx4 could execute in parallel with Trx5 and
Trx6, and Trx6 could execute in parallel with Trx7.

New, Lock-Based Scheme
======================

In the present worklog we implement a scheme that allows two
transactions to execute in parallel if they hold all their locks at
the same time.

We define the lock interval as the interval of time when a transaction
holds all its locks:

- The lock interval ends when the first lock is released in the
storage engine commit.  For simplicity, we do not analyze the lock
releases inside the storage engine; instead, we assume that locks
are released just before the storage engine commit.

- The lock interval begins when the last lock is acquired. This may
happen in the storage engine or in the server.  For simplicity, we
do not analyze lock acquisition in the storage engine or in the
server; instead, we assume that the last lock is acquired at the end
of the last DML statement, in binlog_prepare. This works correctly
both for normal transactions and for autocommitted transactions.

If Trx1, Trx2 are transactions, and Trx1 appears before Trx2, the
criterion for parallel execution is this:

C1. Trx1, Trx2 can execute in parallel if and only if their locking
intervals overlap.

The following is an equivalent formulation:

C2. Trx1, Trx2 can NOT execute in parallel, if and only if Trx1 has
ended its locking interval before Trx2 has started its locking
interval.

The following illustrates the criteria (L denotes the beginning of the
locking interval and C denotes the end of the locking interval).

- Can execute in parallel:
Trx1 -----L---------C------------>
Trx2 ----------L---------C------->

- Can not execute in parallel:
Trx1 -----L----C----------------->
Trx2 ---------------L----C------->

To evaluate the locking criteria, we need to keep track of which
transactions have ended their locking intervals. To this end, we
assign a logical timestamp to each transaction:
transaction.sequence_number. We will need to store
transaction.sequence_number in the binary log.  Therefore, we step it
and assign it to the transaction just before the transaction enters
the flush stage.

In addition, we maintain the global variable
global.max_committed_transaction, which holds the maximal
sequence_number of all transactions that have ended their locking
intervals. The variable plays a role of the system commit logical clock.
Thus, before a transaction performs storage engine commit,
it sets global.max_committed_transaction to
max(global.max_committed_timestamp, transaction.sequence_number).

Each transaction needs to know which transactions it cannot execute in
parallel with. We define the *commit parent* of a transaction to be the
*newest* transaction that cannot execute in parallel with the transaction.
Thus, when the transaction begins its locking
interval, we store global.max_committed_timestamp into the variable
transaction.last_committed. Recall that the locking interval for
multi-statement transactions begins at the end of the last statement
before commit. Since we do not know a priori which is the last
statement, we store global.max_committed_timestamp into
transaction.last_committed at the end of *every* DML statement,
overwriting the old value. Then we will have the correct value when
the transaction is written to the binary log.

We store both timestamps in the binary log.

The condition for executing a transaction on the slave is as follows:

C3. Slave can execute a transaction if the smallest sequence_number
among all executing transactions is greater than
transaction.last_committed.

In order to check this condition, the slave scheduler maintains an
ordered sequence of currently executing transactions.  The first
transaction in the sequence is the one that appeared first in the
master binary log. In other words, it is the one with the smallest
value for transaction.sequence_number. The last transaction in
the sequence is the one that appeared last in the master binary log,
i.e., has the greatest value for transaction.transaction_counter

Before a transaction is taken for scheduling, the following condition is
checked:

(*)  transaction_sequence[0].sequence_number > this.last_committed

Scheduling holds up until this condition becomes true. At successful
scheduling, the transaction is appended at the end of
transaction_sequence.

After a transaction has committed, it is effectively removed from the
sequence. (In the implementation, it is merely marked as done, which
tells the scheduler to ignore the transaction when it evaluates
condition (*)).

Pseudo-code
===========

Master variables:
- int64 global.transaction_counter
- int64 global.max_committed_transaction
- int64 transaction.sequence_number
- int64 transaction.last_committed

Master logic in order of events of execution:

- in binlog_prepare:

if this is not a transaction commit:
transaction.last_committed = global.max_committed_transaction

- after it has been determined that the transaction is the next one to
be flushed, and before transaction is flushed, the global transaction
counter is stepped and copied to the transaction's sequence number:

transaction.sequence_number = ++global.transaction_counter

- write transaction.sequence_number and transaction.last_committed to
the binary log, in the transaction header;

- before transaction does storage engine commit:

global.max_committed_transaction = max(global.max_committed_transaction,
transaction.sequence_number)

When @@global.binlog_order_commits is true, in principle we could reduce
the max to an assignment:

global.max_committed_transaction = transaction.sequence_number

However, since binlog_order_commits is dynamic, if we do this, there will
be a short time interval just after user change binlog_order_commits from
0 to 1, during which the committing transactions' timestamps are not
monotonically increasing, but binlog_order_commits == 1. If we used the
assignment algorithm during this time period, transactions could have the
wrong timestamps in the binary log, which could lead to conflicting
transactions executing in parallel on the slave.

To handle both cases using atomic operations we use the following algorithm:

int64 old_value = transaction.sequence_number - 1;
while (!my_atomic_cas64(&global.max_committed_transaction,
&old_value, transaction.sequence_number) &&
transaction.sequence_number > old_value)
; // do nothing

Slave variables:

- transaction_sequence: ordered sequence containing all executing
transactions in order of increasing sequence_number.

(In the code, this is implemented using the existing Relay_log_info::GAQ.
This is a circular queue of large, fixed size.)

Slave logic:

- before scheduler pushes the transaction for execution:

wait until transaction_sequence[0].sequence_number >
transaction.last_committed

(The actual implementation will step through the list in the following
manner:

// The Low Water Mark is the newest transaction for which the scheduler
// knows the following facts:
// - the transaction has been committed;
// - all older transactions have been committed.
// LWM_plus_1 is the next transaction, i.e., the one that was the oldest
// executing transaction last time that the schedule looked.

global int LWM_plus_1;  // the same as transaction_sequence[0]

function wait_until_transaction_can_be_scheduled(transaction):
while true:
while rli.GAQ[LWM_plus_1].is_committed:
LWM_plus_1++
if rli.GAQ[LWM_plus_1].sequence_number > transaction.last_committed:
return
wait until rli.GQA[LWM_plus_1] commits

- after transaction commits:
GAQ[transaction.index].is_committed = true;

Corner cases
============

1. Handle exhaustion of the counters. (Note, this will never happen,
because it takes 500 years to wrap a 64 bit counter if you have
1,000,000,000 transactions per second, but we should handle it
because people usually worry about such things.)

If the counter wraps, we should rotate the binary log. The slave
coordinator should make a checkpoint and wait for all currently
implemented for the current scheme, so all we need is a test case.

2. Fall back to sequential execution.
In certain cases a transaction is not scheduled in parallel to require
all prior to have been finished (a similar policy exists in WL#5569).
Transaction header event is tagged with last_committed value of zero,
and possibly with last_committed of zero.
Those rare cases include:

- "old" WL7165 unaware master transaction, incl wl6134-aware ones
- DROP of multiple tables is logged such way with a second Query event
- CREATE table ... SELECT ... from @user-var, or rand function, or
INTVAR is generated for the query.

3. Mixed engine transaction is logged as multiple (two) groups, where
the 2nd is tagged to have the 1st as its commit parent.

Optimizations
=============

flushes are serialized on the master.

2. The two numbers stored in the binary log will normally have a very
small difference.  So instead of storing two 64-bit integers, we
can store transaction.sequence_number as a 64-bit integer, and
then store the difference as a 16-bit integer.  This will save 6
bytes of space.  In the case that the difference is greater than
65535, we store the number 65535.  This is safe, but may give less
parallelism (in the case of 65536 or more concurrent
transactions on the master).

==== Notes for future work ====

The ideas of this section are *not* to be included in this worklog;
they are merely mentioned here to prevent possible concerns and
motivate the current design choice.

1. Logical timestamp compressing in the binlog event

If binlog_order_commits=OFF, the current policy of storing just
two numbers in the binary log may give sub-optimal scheduling on
the slave. This could in theory be fixed by replacing
transaction.last_committed by a more complex data structure. However,
this would be both more conceptually complex and require a more
complex implementation, as well as more data in the binary log.
It also only addresses a corner case (the default is
binlog_order_commits=ON and there is no known reason to turn it
off). Therefore, we do not intend to fix that in this worklog.

Just for the record, we here outline the problem and a possible
solution; this may be considered future work in case it is
determined to be useful.

The precise problem is that when binlog_order_commits=OFF, it is
possible for two transactions to be committed in a different order
than the order in which they were flushed. Thus, even if trx1 is
written before trx2 to the binary log and thus
trx1.sequence_number < trx2.sequence_number, it is possible that
trx2 is committed before trx1. This gives the following possible
scenario of sub-optimal scheduling on the slave:

1. T1 flushes and is assigned transaction.sequence_number := 1
2. T2 flushes and is assigned transaction.sequence_number := 2
3. T2 commits and sets global.max_committed_transaction := 2
global.max_committed_transaction = 2
5. T1 commits

Then, the slave will not schedule T3 at the same time as T1 or T2.
However, it would have been ok for T3 to execute in parallel with
T1, since T1 held all locks at the same time as T3.

To fix this, we would need to replace transaction.last_committed
by the set of all sequence numbers that have not yet been committed.
Currently, we know that this set only contains consecutive values,
so it is conveniently stored as a single integer, but when commits
may happen in a different order from the assignment of
transaction.sequence_number, the set may be more complex. The set of
sequence numbers that have not been committed can then be represented
as a list of intervals, or as an offset plus a bitmap (if bit number
N is set in the bitmap, it means that sequence number offset+N has
been generated but not committed).

2. Transaction distribution policies

Among substantial factors to consider there's
the style of the jobs assigning (feeding) to Workers.
There are two being considered for the immediate evaluation, yet only
the first one (A) implemented and is present in this section to contrast
with the 2nd (B).

A. At-Most-One (which had been designed yet by WL#6314) Any worker
can have only at most one transaction in its private queue.  In
case all workers are occupied, which is actually expected 'cos
gets to waiting for release of any first of them.  Potential
disadvantage is apparent, in the worst case all but one Worker
can be without any assignment for duration of scheduling of few
transactions.  And it actually scales up: the last of the
idling workers would experience hungry time for duration of N-1
scheduling times.

B. The First Available (arguably ideal, not to be implemented in
this WL) The idea is use a shared queue to hold the transaction
events that Coordinator pushes into, and Worker pick up from
the (other) end.  Such queue design had been done ago at the
DB-type "classing" MTS.  The queue features concurrent access
(push and pop) by multiple threads.

Master side logical timestamping
--------------------------------

Two "system clocks"
global.transaction_counter and global.max_committed_transaction are implemented
through
augmentation of

class MYSQL_BIN_LOG
+  /* Transaction logical timestamp */
+   Logical_clock transaction_counter;
+  /* Maximum value of transaction_counter amount committed transactions */
+   Logical_clock max_committed_transaction;

transaction.sequence_number and transaction.last_committed are made as
new members of

class Transaction_ctx {

+  /* Binlog-specific logical timestamps. */
+  /*
+    Store for the transaction's commit parent.
+  */
+  longlong last_committed;
+  /*
+    The transaction's own logical timestamp assigned
+    at the transaction prepare phase.
+    More specifically it could thought as flush timestamp.
+  */
+  longlong sequence_number;

to stay next to m_flags. The two comprise an informal Binlog section
of the class.

At binlog_prepare() of the statement, Transaction_ctx::last_committed is
approximated (optimistically in case of multi-statement transaction) via
querying the current value of max_committed_transaction.

At binlog_prepare() of the statement, Transaction_ctx::last_committed
is approximated by reading the current value of
max_committed_transaction. This logic works correctly both for
single-statement transactions (aka autocommit=1) and multi-statement
transactions. In the case of multi-statement transactions, the value
will be read at the end of every statement, and only the value fetched
at the last statement before the commit will be used.

Stepping transaction_counter and acquiring the resulted timestamp for

max_committed_transaction is updated according to the while-loop of
HLS description which accounts possibility of
@@global.binlog_order_commits runtime flip. Notice should
@@global.binlog_order_commits be constantly true,
max_committed_transaction would be always incremented by one in
MYSQL_BIN_LOG::process_commit_stage_queue() by the commit phase
leader. Conversely when it were false, incrementing would be
done inside MYSQL_BIN_LOG::finish_commit() by each thread
and the increment value can be zero or more than 1, because the
group members can commit out of order and the clock can't go in
reverse direction.

GAQ-based implementation of the running transaction sequence
------------------------------------------------------------

At transaction scheduling regardless of the scheduler type,
the Coordinator always allocates a new index
at the end of GAQ array. The array index order therefore follows the
master binlog order as long as transactions appear for scheduling in
that order.
Coordinator schedules a transaction to a first available Worker not letting
any Worker to have more than one transaction assigned.

To honor C3 dependency of the minimum running transaction timestamp
with the being dispatched transaction's commit parent the following scheme is
implemented.
At received from Coordinator Worker compares its current transaction's
commit parent against the value of low-water-mark (lwm)\footnote{defined as
the minimum timestamp below which and including all transactions are committed}
of committed transaction.
When the conflict condition takes place the Worker has to wait.
There could be multiple waiting workers at a time. In order to be awakened they
compute the min waited timestamp which a "parent" Worker, that
finishes its transaction and possibly increments lwm, considers in order to decide
whether it needs to signal to any body, see Slave_worker::slave_worker_ends_group.
Here goes pseudo-coded version of the algorithm:

Child Worker                        Parent Worker

W^(c) | at "starting"             ; W^(p) | at "committing"
; T.done = t;

when (p <= lwm)                   ; when |\pi| = 0
continue;    // "likely"        ;   continue;   // "likely"

lock mutex;                       ; lock mutex;
;   when |\pi| = 0
lwm= get_lwm()                  ;     continue;  // breaking the block
while (p > lwm)
;   lwm= get_lwm()
p_min=min(p, p_min)           ;
wait(p <= lwm)                ;   when (lwm >= p_min)
;     \for each W^(c)_i
;        such that p <= lwm signal(go);
; unlock;
delete p from \pi;
unlock;

W - a worker thread, p - commit parent timestamp, \pi a set of waiting
Workers, p_min - the minimum waited "parent" commit timestamp;

Notice, that in "likely" cases the Child Worker set is empty so there's
Mts_submode_logical_clock::get_lwm() explores GAQ state to account
committed transactions to update the system "instant"
status\footnote{in contrast to the instant there's a stable LWM GAQ
index and logical timestamp that Coordinator updates once in
@@global.slave_checkpoint_group committed transactions unless a
timeout elapses first}.

In implementation \pi is constructed as a single-linked list where
the head is a link with p_min. Each link is represented by a GAQ index, see WL#5669,
augmented with a new member next_in_list' to point to the next in the list.
While GAQ indexes are ordered by commit_point', that is sequenced by it, the list
is ordered by last_committed' (p).

An example of the waiting list of four members: { 103, 100, 102, 101 }:

GAQ index     |   100   |   101    |    102   |  103   |   104    |
+---------+----------+----------+--------+----------+
worker id      |  W #2   |  W #1    |   W #1   | W #2   |  W #3    |
exec status    |waited   |assigned  |waited    |waited  |assigned  |
next_in_list   |102      |nil       |101       |100     |          |
last_committed | 91      | 99       | 91       | 90     | 100      |
commit_point   | 95      | 96       | 97       | 98     |  99      |
...          | ...     |          |          |        |          |
+-----+---+----------+-+--------+--+-----+----------+
^  |        ^------ + ^         |
+--|------------------|---------+
+------------------+

Due to the list representation the add' and delete' have insert' and pop'
semantics correspondingly.
In case of many waiters it's enough one of the Waiters to chop off a head part
of the waiting list that got satisfied to the commit parent condition.
The first awakened thread does so in implementation.

`