WL#7165: MTS: Optimizing MTS scheduling by increasing the parallelization window on master
Affects: Server-5.7
—
Status: Complete
Executive Summary ================= WL#6314 implemented intra-schema multi-threaded slave, where multiple transactions can be applied in parallel on the slave (even when they modify same database) as long as they are non-conflicting. The existing implementations uses a simple algorithm for determining which transactions are non-conflicting. It already gives a 40% improvement on one benchmark, compared to single-threaded slave. The present worklog optimizes the implementation by using a potentially more precise algorithm to determine which transactions are non-conflicting. On workloads that are not evenly distributed on many databases, this allows more transactions to execute in parallel.
After this worklog the Intra-schema multi-threaded slave should be optimized and we should be able to gain more throughput. No change to server user interface. The new scheme will completely replace the scheme implemented in WL#6314. FR1: Additional fields will be added to the binary log. FR2: mysqlbinlog output shall show the two logical timestamps for all transactions. NFR1: Some transactions may be tagged with zero values of last_committed and sequence_number. In this case transaction's scheduling on slave reduces parallelism. It's insignificant penalty as long as the number of such transactions is small. NFR2: Some transactions/event groups are tagged as dependent on a preceding one even though the actual dependency is different (more relaxing). It's insignificant penalty as long as the number of such transactions is small.
Old, Commit-Parent-Based Scheme =============================== The old scheme for multi-threaded slave implemented in WL#6314 works as follows. - On master, there is a global counter. The counter is incremented before each storage engine commit. - On master, before a transaction enters the prepare phase, the current value of the global counter is stored in the transaction. This number is called the commit-parent for the transaction. - On master, the commit-parent is stored in the binary log in the header of the transaction. - On slave, two transactions are allowed to execute in parallel if they have the same commit-parent. Problem With Commit-Parent-Based Scheme ======================================= The old scheme allows less parallelism than would be possible. The old scheme partitions the time line into intervals. When a transaction commits, the current time interval ends and a new begins. Two transactions can execute in parallel if they were prepared within the same time interval. The following picture illustrates the scheme: Trx1 ------------P----------C--------------------------------> | Trx2 ----------------P------+---C----------------------------> | | Trx3 -------------------P---+---+-----C----------------------> | | | Trx4 -----------------------+-P-+-----+----C-----------------> | | | | Trx5 -----------------------+---+-P---+----+---C-------------> | | | | | Trx6 -----------------------+---+---P-+----+---+---C----------> | | | | | | Trx7 -----------------------+---+-----+----+---+-P-+--C-------> | | | | | | | Each horizontal line represents a transaction. Time progresses to the right. P denotes the point in time when the commit-parent is read before the prepare phase. C denotes the point in time when the transaction increases the global counter and thus begins a new interval. The vertical lines extending down from each commit show the interval boundaries. Trx5 and Trx6 are allowed to execute in parallel because they have the same commit-parent (namely, the counter value set by Trx2). However, Trx4 and Trx5 are not allowed to execute in parallel, and Trx6 and Trx7 are not allowed to execute in parallel. But note that two transactions that hold all their respective locks at the same point in time on the master are necessarily non-conflicting. Thus, it would not be problematic to allow them to execute in parallel on the slave. In the above example, this has two implications: - Trx4, Trx5, and Trx6 hold all their locks at the same time but Trx4 will be executed in isolation. - Trx6 and Trx7 hold all their locks at the same time but Trx7 will be executed in isolation. It would be better if Trx4 could execute in parallel with Trx5 and Trx6, and Trx6 could execute in parallel with Trx7. New, Lock-Based Scheme ====================== In the present worklog we implement a scheme that allows two transactions to execute in parallel if they hold all their locks at the same time. We define the lock interval as the interval of time when a transaction holds all its locks: - The lock interval ends when the first lock is released in the storage engine commit. For simplicity, we do not analyze the lock releases inside the storage engine; instead, we assume that locks are released just before the storage engine commit. - The lock interval begins when the last lock is acquired. This may happen in the storage engine or in the server. For simplicity, we do not analyze lock acquisition in the storage engine or in the server; instead, we assume that the last lock is acquired at the end of the last DML statement, in binlog_prepare. This works correctly both for normal transactions and for autocommitted transactions. If Trx1, Trx2 are transactions, and Trx1 appears before Trx2, the criterion for parallel execution is this: C1. Trx1, Trx2 can execute in parallel if and only if their locking intervals overlap. The following is an equivalent formulation: C2. Trx1, Trx2 can NOT execute in parallel, if and only if Trx1 has ended its locking interval before Trx2 has started its locking interval. The following illustrates the criteria (L denotes the beginning of the locking interval and C denotes the end of the locking interval). - Can execute in parallel: Trx1 -----L---------C------------> Trx2 ----------L---------C-------> - Can not execute in parallel: Trx1 -----L----C-----------------> Trx2 ---------------L----C-------> To evaluate the locking criteria, we need to keep track of which transactions have ended their locking intervals. To this end, we assign a logical timestamp to each transaction: transaction.sequence_number. We will need to store transaction.sequence_number in the binary log. Therefore, we step it and assign it to the transaction just before the transaction enters the flush stage. In addition, we maintain the global variable global.max_committed_transaction, which holds the maximal sequence_number of all transactions that have ended their locking intervals. The variable plays a role of the system commit logical clock. Thus, before a transaction performs storage engine commit, it sets global.max_committed_transaction to max(global.max_committed_timestamp, transaction.sequence_number). Each transaction needs to know which transactions it cannot execute in parallel with. We define the *commit parent* of a transaction to be the *newest* transaction that cannot execute in parallel with the transaction. Thus, when the transaction begins its locking interval, we store global.max_committed_timestamp into the variable transaction.last_committed. Recall that the locking interval for multi-statement transactions begins at the end of the last statement before commit. Since we do not know a priori which is the last statement, we store global.max_committed_timestamp into transaction.last_committed at the end of *every* DML statement, overwriting the old value. Then we will have the correct value when the transaction is written to the binary log. We store both timestamps in the binary log. The condition for executing a transaction on the slave is as follows: C3. Slave can execute a transaction if the smallest sequence_number among all executing transactions is greater than transaction.last_committed. In order to check this condition, the slave scheduler maintains an ordered sequence of currently executing transactions. The first transaction in the sequence is the one that appeared first in the master binary log. In other words, it is the one with the smallest value for transaction.sequence_number. The last transaction in the sequence is the one that appeared last in the master binary log, i.e., has the greatest value for transaction.transaction_counter Before a transaction is taken for scheduling, the following condition is checked: (*) transaction_sequence[0].sequence_number > this.last_committed Scheduling holds up until this condition becomes true. At successful scheduling, the transaction is appended at the end of transaction_sequence. After a transaction has committed, it is effectively removed from the sequence. (In the implementation, it is merely marked as done, which tells the scheduler to ignore the transaction when it evaluates condition (*)). Pseudo-code =========== Master variables: - int64 global.transaction_counter - int64 global.max_committed_transaction - int64 transaction.sequence_number - int64 transaction.last_committed Master logic in order of events of execution: - in binlog_prepare: if this is not a transaction commit: transaction.last_committed = global.max_committed_transaction - after it has been determined that the transaction is the next one to be flushed, and before transaction is flushed, the global transaction counter is stepped and copied to the transaction's sequence number: transaction.sequence_number = ++global.transaction_counter - write transaction.sequence_number and transaction.last_committed to the binary log, in the transaction header; - before transaction does storage engine commit: global.max_committed_transaction = max(global.max_committed_transaction, transaction.sequence_number) When @@global.binlog_order_commits is true, in principle we could reduce the max to an assignment: global.max_committed_transaction = transaction.sequence_number However, since binlog_order_commits is dynamic, if we do this, there will be a short time interval just after user change binlog_order_commits from 0 to 1, during which the committing transactions' timestamps are not monotonically increasing, but binlog_order_commits == 1. If we used the assignment algorithm during this time period, transactions could have the wrong timestamps in the binary log, which could lead to conflicting transactions executing in parallel on the slave. To handle both cases using atomic operations we use the following algorithm: int64 old_value = transaction.sequence_number - 1; while (!my_atomic_cas64(&global.max_committed_transaction, &old_value, transaction.sequence_number) && transaction.sequence_number > old_value) ; // do nothing Slave variables: - transaction_sequence: ordered sequence containing all executing transactions in order of increasing sequence_number. (In the code, this is implemented using the existing Relay_log_info::GAQ. This is a circular queue of large, fixed size.) Slave logic: - before scheduler pushes the transaction for execution: wait until transaction_sequence[0].sequence_number > transaction.last_committed (The actual implementation will step through the list in the following manner: // The Low Water Mark is the newest transaction for which the scheduler // knows the following facts: // - the transaction has been committed; // - all older transactions have been committed. // LWM_plus_1 is the next transaction, i.e., the one that was the oldest // executing transaction last time that the schedule looked. global int LWM_plus_1; // the same as transaction_sequence[0] function wait_until_transaction_can_be_scheduled(transaction): while true: while rli.GAQ[LWM_plus_1].is_committed: LWM_plus_1++ if rli.GAQ[LWM_plus_1].sequence_number > transaction.last_committed: return wait until rli.GQA[LWM_plus_1] commits - after transaction commits: GAQ[transaction.index].is_committed = true; Corner cases ============ 1. Handle exhaustion of the counters. (Note, this will never happen, because it takes 500 years to wrap a 64 bit counter if you have 1,000,000,000 transactions per second, but we should handle it because people usually worry about such things.) If the counter wraps, we should rotate the binary log. The slave coordinator should make a checkpoint and wait for all currently running threads when it sees a rotate. This mechanism is already implemented for the current scheme, so all we need is a test case. 2. Fall back to sequential execution. In certain cases a transaction is not scheduled in parallel to require all prior to have been finished (a similar policy exists in WL#5569). Transaction header event is tagged with last_committed value of zero, and possibly with last_committed of zero. Those rare cases include: - "old" WL7165 unaware master transaction, incl wl6134-aware ones - DROP of multiple tables is logged such way with a second Query event - CREATE table ... SELECT ... from @user-var, or rand function, or INTVAR is generated for the query. 3. Mixed engine transaction is logged as multiple (two) groups, where the 2nd is tagged to have the 1st as its commit parent. Optimizations ============= 1. Access to global.transaction_counter does not need a lock because flushes are serialized on the master. 2. The two numbers stored in the binary log will normally have a very small difference. So instead of storing two 64-bit integers, we can store transaction.sequence_number as a 64-bit integer, and then store the difference as a 16-bit integer. This will save 6 bytes of space. In the case that the difference is greater than 65535, we store the number 65535. This is safe, but may give less parallelism (in the case of 65536 or more concurrent transactions on the master). ==== Notes for future work ==== The ideas of this section are *not* to be included in this worklog; they are merely mentioned here to prevent possible concerns and motivate the current design choice. 1. Logical timestamp compressing in the binlog event If binlog_order_commits=OFF, the current policy of storing just two numbers in the binary log may give sub-optimal scheduling on the slave. This could in theory be fixed by replacing transaction.last_committed by a more complex data structure. However, this would be both more conceptually complex and require a more complex implementation, as well as more data in the binary log. It also only addresses a corner case (the default is binlog_order_commits=ON and there is no known reason to turn it off). Therefore, we do not intend to fix that in this worklog. Just for the record, we here outline the problem and a possible solution; this may be considered future work in case it is determined to be useful. The precise problem is that when binlog_order_commits=OFF, it is possible for two transactions to be committed in a different order than the order in which they were flushed. Thus, even if trx1 is written before trx2 to the binary log and thus trx1.sequence_number < trx2.sequence_number, it is possible that trx2 is committed before trx1. This gives the following possible scenario of sub-optimal scheduling on the slave: 1. T1 flushes and is assigned transaction.sequence_number := 1 2. T2 flushes and is assigned transaction.sequence_number := 2 3. T2 commits and sets global.max_committed_transaction := 2 4. T3 reads transaction.last_committed := global.max_committed_transaction = 2 5. T1 commits Then, the slave will not schedule T3 at the same time as T1 or T2. However, it would have been ok for T3 to execute in parallel with T1, since T1 held all locks at the same time as T3. To fix this, we would need to replace transaction.last_committed by the set of all sequence numbers that have not yet been committed. Currently, we know that this set only contains consecutive values, so it is conveniently stored as a single integer, but when commits may happen in a different order from the assignment of transaction.sequence_number, the set may be more complex. The set of sequence numbers that have not been committed can then be represented as a list of intervals, or as an offset plus a bitmap (if bit number N is set in the bitmap, it means that sequence number offset+N has been generated but not committed). 2. Transaction distribution policies Among substantial factors to consider there's the style of the jobs assigning (feeding) to Workers. There are two being considered for the immediate evaluation, yet only the first one (A) implemented and is present in this section to contrast with the 2nd (B). A. At-Most-One (which had been designed yet by WL#6314) Any worker can have only at most one transaction in its private queue. In case all workers are occupied, which is actually expected 'cos the read time is about 1% of execution time, the Coordinator gets to waiting for release of any first of them. Potential disadvantage is apparent, in the worst case all but one Worker can be without any assignment for duration of scheduling of few transactions. And it actually scales up: the last of the idling workers would experience hungry time for duration of N-1 scheduling times. B. The First Available (arguably ideal, not to be implemented in this WL) The idea is use a shared queue to hold the transaction events that Coordinator pushes into, and Worker pick up from the (other) end. Such queue design had been done ago at the DB-type "classing" MTS. The queue features concurrent access (push and pop) by multiple threads.
Master side logical timestamping -------------------------------- Two "system clocks" global.transaction_counter and global.max_committed_transaction are implemented through augmentation of class MYSQL_BIN_LOG + /* Transaction logical timestamp */ + Logical_clock transaction_counter; + /* Maximum value of transaction_counter amount committed transactions */ + Logical_clock max_committed_transaction; transaction.sequence_number and transaction.last_committed are made as new members of class Transaction_ctx { + /* Binlog-specific logical timestamps. */ + /* + Store for the transaction's commit parent. + */ + longlong last_committed; + /* + The transaction's own logical timestamp assigned + at the transaction prepare phase. + More specifically it could thought as flush timestamp. + */ + longlong sequence_number; to stay next to m_flags. The two comprise an informal Binlog section of the class. At binlog_prepare() of the statement, Transaction_ctx::last_committed is approximated (optimistically in case of multi-statement transaction) via querying the current value of max_committed_transaction. At binlog_prepare() of the statement, Transaction_ctx::last_committed is approximated by reading the current value of max_committed_transaction. This logic works correctly both for single-statement transactions (aka autocommit=1) and multi-statement transactions. In the case of multi-statement transactions, the value will be read at the end of every statement, and only the value fetched at the last statement before the commit will be used. Stepping transaction_counter and acquiring the resulted timestamp for Transaction_ctx::sequence_number is done in MYSQL_BIN_LOG::flush_thread_caches(). max_committed_transaction is updated according to the while-loop of HLS description which accounts possibility of @@global.binlog_order_commits runtime flip. Notice should @@global.binlog_order_commits be constantly true, max_committed_transaction would be always incremented by one in MYSQL_BIN_LOG::process_commit_stage_queue() by the commit phase leader. Conversely when it were false, incrementing would be done inside MYSQL_BIN_LOG::finish_commit() by each thread and the increment value can be zero or more than 1, because the group members can commit out of order and the clock can't go in reverse direction. GAQ-based implementation of the running transaction sequence ------------------------------------------------------------ At transaction scheduling regardless of the scheduler type, the Coordinator always allocates a new index at the end of GAQ array. The array index order therefore follows the master binlog order as long as transactions appear for scheduling in that order. Coordinator schedules a transaction to a first available Worker not letting any Worker to have more than one transaction assigned. To honor C3 dependency of the minimum running transaction timestamp with the being dispatched transaction's commit parent the following scheme is implemented. At received from Coordinator Worker compares its current transaction's commit parent against the value of low-water-mark (lwm)\footnote{defined as the minimum timestamp below which and including all transactions are committed} of committed transaction. When the conflict condition takes place the Worker has to wait. There could be multiple waiting workers at a time. In order to be awakened they compute the min waited timestamp which a "parent" Worker, that finishes its transaction and possibly increments lwm, considers in order to decide whether it needs to signal to any body, see Slave_worker::slave_worker_ends_group. Here goes pseudo-coded version of the algorithm: Child Worker Parent Worker W^(c) | at "starting" ; W^(p) | at "committing" ; T.done = t; when (p <= lwm) ; when |\pi| = 0 continue; // "likely" ; continue; // "likely" lock mutex; ; lock mutex; ; when |\pi| = 0 lwm= get_lwm() ; continue; // breaking the block while (p > lwm) ; lwm= get_lwm() add(p into \pi) ; p_min=min(p, p_min) ; wait(p <= lwm) ; when (lwm >= p_min) ; \for each W^(c)_i ; such that p <= lwm signal(go); ; unlock; delete p from \pi; unlock; W - a worker thread, p - commit parent timestamp, \pi a set of waiting Workers, p_min - the minimum waited "parent" commit timestamp; mutex, wait and broadcast are shortcuts for pthread primitives. Notice, that in "likely" cases the Child Worker set is empty so there's no communication between threads. Mts_submode_logical_clock::get_lwm() explores GAQ state to account committed transactions to update the system "instant" status\footnote{in contrast to the instant there's a stable LWM GAQ index and logical timestamp that Coordinator updates once in @@global.slave_checkpoint_group committed transactions unless a timeout elapses first}. In implementation \pi is constructed as a single-linked list where the head is a link with p_min. Each link is represented by a GAQ index, see WL#5669, augmented with a new member `next_in_list' to point to the next in the list. While GAQ indexes are ordered by `commit_point', that is sequenced by it, the list is ordered by `last_committed' (p). An example of the waiting list of four members: { 103, 100, 102, 101 }: GAQ index | 100 | 101 | 102 | 103 | 104 | +---------+----------+----------+--------+----------+ worker id | W #2 | W #1 | W #1 | W #2 | W #3 | exec status |waited |assigned |waited |waited |assigned | next_in_list |102 |nil |101 |100 | | last_committed | 91 | 99 | 91 | 90 | 100 | commit_point | 95 | 96 | 97 | 98 | 99 | ... | ... | | | | | +-----+---+----------+-+--------+--+-----+----------+ ^ | ^------ + ^ | +--|------------------|---------+ +------------------+ Due to the list representation the `add' and `delete' have `insert' and `pop' semantics correspondingly. In case of many waiters it's enough one of the Waiters to chop off a head part of the waiting list that got satisfied to the commit parent condition. The first awakened thread does so in implementation.
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.