WL#8440: Group Replication: Parallel applier support
Affects: Server-5.7
—
Status: Complete
MySQL Group Replication provides multi-master update everywhere replication to MySQL. Clients can connect to any group server, and after conflict detection, write changes that will be propagated to all group members. When client updates data on a group member, data goes through two different paths: local, how changes are written on the same server to which the client is connected; remote, how changes are written to other group members. Local changes are written directly to binary log and database. Remote changes are queued and applied by using the channel abstraction also used for regular asynchronous replication (and multi-source replication). Applier instance on Group Replication is the same applier component that it is on MySQL asynchronous replication, usually referred to as SQL thread. As for receiving and queuing transactions, the code path is pretty much the same, except that it is executed by a different thread than the IO thread. It is executed by a group communication receiver thread. This worklog goal is to improve Group Replication applier by making it parallel, that is, capable of applying multiple non-conflicting transactions in parallel. This will decrease the time needed by each group member to apply queued remote transactions. Group Replication parallel applier implementation is based on WL#7165: "MTS: Optimizing MTS scheduling by increasing the parallelization window on master". In that feature, two transactions are parallelizable if we can tell that they have not blocked each other during their execution. This parallelization is expressed using two counters that are added to every transaction header (Gtid_log_event): last_committed and sequence_number. sequence_number: is the monotonically increasing transaction number counter, incremented by each transaction and reset to 1 at the beginning of each binary log file. last_committed: is the version on which the transaction was executed, that is, the sequence_number before the current transaction commits. So, whenever a remote transaction enters the certifier, after certification both this last committed and sequence number fields are calculated. Upon that, these counters are set in the transaction new GTID event and sent in its normal path to the applier where MTS can now parallelize them. Hence, on this worklog we will take advantage of the certifier component to populate the parallelization indexes described above in order to speed up the apply time of remote transactions.
FR1: Any two transactions, from different server:client tuple, that write disjoint sets of rows shall be tagged as non-conflicting, thus may be applied in parallel. FR2: Transactions executed from the same server:client tuple must not be applied in parallel, that is, must be applied in same order that were executed on the client session. NFR1: There must be a performance improvement after this worklog when compared to single applier.
OVERVIEW ======== Group Replication parallel applier implementation is based on WL#7165: "MTS: Optimizing MTS scheduling by increasing the parallelization window on master". In that feature, two transactions are parallelize if we can tell that they have not blocked each other during their execution. The parallelization is expressed using two counters that are added to every transaction header (Gtid_log_event): last_committed and sequence_number. On this worklog we will take advantage of the certifier component to populate the parallelization indexes mentioned above in order to speed up the apply time of remote transactions. DEFINITIONS =========== Before going into details lets first present the terms that will be used on this design. transaction.sequence_number: is the monotonically increasing transaction number counter, incremented by each transaction. It is set on the Gtid_log_event that starts each transaction. From now on We will refer to it as trx.SN. Please see High-Level Specification of WL#7165 for further details. This should not be confused with certification sequence number that is described below. transaction.last_committed: is the version (transaction number) on which the transaction was executed relative to trx.SN, that is, the trx.SN (transaction number) before the current transaction commits. It is set on the Gtid_log_event that starts each transaction. From now on we will refer to it as trx.LC. Please see High-Level Specification of WL#7165 for further details. certification info: memory structure used by Group Replication certification component to detect conflicting transactions. It maps transaction write sets to snapshot versions. Please see High-Level Specification of WL#6833 for more details. write set: set of hashes that identify unequivocally the updated rows by the transaction. snapshot version: is the version on which the transaction was executed. It is the value of GTID_EXECUTED before transaction commits. Example: UUID1:1-5, UUID2:1. certification info sequence_number: is the increasing remote transaction number counter in the certifier. REPLICATION PATH ================ On WL#7165 the parallelization indexes are saved to Gtid_log_event on master binary log, then events are replicated to slave and slave parallel applier will use those indexes to schedule the events. On this worklog the path is slightly different, when a transaction is committed it is captured and sent to all group members for certification. At certification point, transactions can be considered as: local: that was started and broadcast from this server; remote: committed on another server. This worklog will only handle the remotes transactions. Local ones, after certification, are committed and logged like on a regular master. On local transactions, the parallelization indexes stored on binary log are the ones computed by WL#7165. For remote transactions, after certification we do the computation described below, being the parallelization indexes stored on group_replication_applier channel relay log and then read by parallel applier to schedule the events. PARALLELIZATION INDEXES ======================= Note: These indexes are referred as logical timestamps on WL#7165: "MTS: Optimizing MTS scheduling by increasing the parallelization window on master". trx.SN (transaction sequence_number) ------------------------------------ Like presented above, trx.SN is the monotonically increasing transaction number counter, which is assigned to each transaction on commit. On Group Replication we have the same monotonic behaviour on certifier component, that is, all transactions go through certification process sequentially, so we can maintain and assign a transaction counter here. We will have a counter: parallel_applier_sequence_number that will be monotonically incremented after each positively certified remote transaction. We will map this parallel_applier_sequence_number in a counter at Group Replication Certifier class. Its value will be assigned to each current remote transaction, trx.SN, set on Gtid_log_event, after positively certified. parallel_applier_sequence_number will be tracked by write set, that is, each write set will be recorded on the certification not only with the snapshot version but also the transaction sequence_number. This will allow to compute the dependency graph of each transaction and with that information compute the correct last_committed value, as explained on Example 3. This counter, parallel_applier_sequence_number, will be initialized to 2 whenever the group starts or members join. Summary of parallel_applier_sequence_number initialization: group member join: parallel_applier_sequence_number= 2 Why 2? View_change_log_event sequence number is always 0, so the first transaction will have: last_committed: 1 sequence_number: 2 This way indexes of transactions never intersect with view change. trx.LC (transaction last_committed) ----------------------------------- The MySQL Group Replication certification ensures that two parallel conflicting transactions, which update one or more common rows, are not allowed to commit. The rule is that the first one to reach certification will be accepted and the other will be rejected (rollback). Certification info contains the already committed but not yet applied transactions on all members, that is, transactions that are known to be positively certified but are still waiting on remote queues to be applied. Or if already applied all group members are not aware of that. Please see garbage collection procedure on WL#6833 for further details. Certification info, except on a clean group start, is never empty. On parallel applier we need a more fine filter than the above one, because two non-conflicting transactions may not be allowed to be applied in parallel. Despite when they were executed there was no certification conflict, there may be a parallelization conflict. One example can be seen below (EX2) through a look into the certification info content after two transactions are executed: ----------------------------------------------------------- Trx | WS | SV | Update ----------------------------------------------------------- T1 | ws1 | UUID1:1 | INSERT INTO t1 VALUES (1) T2 | ws1, ws2 | UUID1:1-2 | UPDATE t1 SET c1=2 WHERE c1=1 ----------------------------------------------------------- Transaction T2 does not conflict (intersecting write sets) with T1, since T2 snapshot version includes T1 one. But these two transactions cannot be applied in parallel, T2 may be started before T1. This will make it to silently fail since 1 is not on the table yet. So we need to increase the criteria for conflict detection in order to correctly specify which transactions can be applied in parallel. We already mentioned that certification info contains all ongoing or not yet applied on all members transactions. So after the current transaction is certified we need to check if the write sets of the current transaction are present on certification info, before updating it if they are it means that we need to mark this transaction as non parallel with the previous ones. Lets see the outcome of a given execution on a group to analyze how should trx.LC be set on transactions depending on its write sets. On Example 3 (EX3) we see the write sets of a sequence of transactions as they arrive certification and how the trx.LC and trx.SN progress ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- | | | 0 (group boot) T1 | ws1 | 1 | 2 ---| T1 and T2 can be applied in T2 | ws2 | 1 | 3 ---| parallel T3 | ws1, ws3 | 2 | 4 ---| T3 can not be applied in parallel with T1, so trx.LC is set to T1 trx.SN T4 | ws4 | 1 | 5 | T5 | ws5 | 1 | 6 | T6 | ws5, ws6 | 6 | 7 ---| T6 can not be applied in parallel with T5, so trx.LC is set to T5 trx.SN T7 | ws7 | 1 | 8 | T8 | ws8 | 1 | 9 ---| ----------------------------------------------------------------- We can see on example EX3, that we have the following dependency graph: T3 does depend on T1; T6 does depend on T5; all others do not depend on any transaction. No write set case ----------------- On Group Replication some transactions may not have write set, like empty transactions with GTID_NEXT specified or DDL. For those we cannot check conflicts, and consequently, we do not know if them can be applied in parallel. So we need to follow the pessimistic approach and run them sequentially. Example (EX4): ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- | | | 0 (group boot) T1 | | 1 | 2 | T2 | ws1 | 2 | 3 | T3 | ws2 | 2 | 4 | T4 | | 4 | 5 | T5 | ws1 | 5 | 6 | Algorithm (AL1) to determine if a given remote transaction can be applied in parallel: 1) Transaction is certified positively trx.LC= parallel_applier_last_committed_global 2) Assign parallel_applier_sequence_number to trx.SN trx.SN= parallel_applier_sequence_number 3) Transaction have write set? a) Yes: Check if any of the transaction write set exists on the certification info. If write sets sequence number is greater than trx.LC then update trx.LC b) No: parallel_applier_last_committed_global= parallel_applier_sequence_number 4) Insert/update current transaction write set and snapshot version on certification info. 5) Increment parallel_applier_sequence_number ------------- Special cases ------------- Certification info garbage collection ------------------------------------- Since the procedure to find parallel_applier_last_committed_global value, as explained in algorithm AL1, depends on certification info content, every update to certification info should be reflected on parallel_applier_last_committed_global (and on trx.LC). Lets see a example (EX5) to visualize the problem. Incorrect output of a given execution including a certification garbage collection run: ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- | | | 0 (group boot) T1 | ws1 | 1 | 2 T2 | ws2 | 1 | 3 | | | (garbage collection procedure T1 is purged) So we will end up with the following content on certification info ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- T2 | ws2 | 1 | 3 If a new transaction that touches ws1 is executed we will end up with: ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- T2 | ws2 | 1 | 3 T3 | ws1 | 1 | 4 T4 | ws4 | 1 | 5 That is, T1 and T3 can be executed in parallel, which is incorrect since both update ws1. To prevent this incorrect behaviour, every time a certification garbage collection run happens, a new parallelization window is open stating that all transactions executed after this point will need to wait for the previous transactions to be complete to be applied. This is done by updating the parallel_applier_last_committed_global variable to current parallel_applier_sequence_number. Correct output of trx.LC and trx.SN when garbage collection procedure is considered, example (EX6): ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- | | | 0 (group boot) T1 | ws1 | 1 | 2 T2 | ws2 | 1 | 3 | | | (garbage collection procedure T1 is purge) The certification info and transaction counters will now be reflected by ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- T2 | ws2 | 1 | 3 If a new transaction that touches ws1 is executed we will end up with: ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- T2 | ws2 | 1 | 3 T3 | ws1 | 3 | 4 T4 | ws4 | 3 | 5 Now, T1 and T2 can not be applied in parallel, which is correct since both update the same row. View_change_log_event --------------------- View_change_log_event transaction needs to be applied sequentially because it is the termination condition for distributed recovery procedure, and applier must not be executing any other transaction in parallel to avoid termination errors or data after the View_change_log_event transaction is applied. To fulfil this requirement, when a View_change_log_event transaction is queued to applier, the following will happen: 1) View_change_log_event trx.LC= 0 View_change_log_event trx.SN= 0 The values (0,0) force applier to apply the View_change_log_event sequentially and resets applier counters. On a joiner, after View_change_log_event queue, the sequence number algorithm will have the following defaults for future transactions: parallel_applier_last_committed_global= 1 parallel_applier_sequence_number= 2 which are always disjoint of the 0 values of View_change_log_event. On existing group members, the group_replication_applier channel relay log is rotated, the View_change_log_event is logged and the parallel_applier_last_committed_global and parallel_applier_sequence_number maintain their values, which are disjoint of the view ones. Again, lets see the outcome of a correct execution which includes a View_change_log_event transaction, example (EX7): ----------------------------------------------------------------- Trx | WS | trx.LC | trx.SN ----------------------------------------------------------------- | | | 0 (group boot) T1 | ws1 | 1 | 2 T2 | ws2 | 1 | 3 | | 0 | 0 (view change) T3 | ws3 | 1 | 4 Session commit order -------------------- Yet another special case is the session consistency, that is, updates made by the same client on the same server must preserve its order. This is ensured by server option --slave_preserve_commit_order=ON which is required when we enable parallel applier on Group Replication. Summary of parallel_applier_last_committed_global update: garbage collection procedure purges data from certification info: parallel_applier_last_committed_global= parallel_applier_sequence_number after transaction is certified positively: transaction does not have write set: parallel_applier_last_committed_global= parallel_applier_sequence_number SERVER OPTIONS ============== Applier on Group Replication will follow server configuration options slave_parallel_workers and slave-parallel-type, like asynchronous replication. CHALLENGES ========== Wait until view change ---------------------- Due to the specificities of WL#7165 implementation, we will need to change how WAIT_UNTIL_VIEW_CHANGE stops reading and applying relay logs. MySQL Group Replication view change event transaction is composed by: GTID BEGIN VIEW_CHANGE COMMIT Parallel applier does not support to be stopped on a ongoing transaction, it must be stopped on transactions boundaries, so WAIT_UNTIL_VIEW_CHANGE will be changed to stop after view change event transaction is committed. Meaning that the executed and logged view change event on a joiner is the one fetched during recovery and not the one generated by the joiner itself (the previous behavior). When WAIT UNTIL finds the view change event, it sets a flag that will cause the WAIT UNTIL to stop after transaction is applied. View change transaction is always applied sequentially, that is, its trx.LC and trx.SN never intersect with any other transaction. WL#7165 applier recovery ------------------------ WL#7165 applier recovery is based on master log name and position, this makes very difficult to enable parallel applier on Group Replication. The following example (EX7) shows why: ------ ------ ------ | S1 | | S2 | | S3 | ------ ------ ------ |________|________| For simplicity only S1 and S2 do writes: * S1 executes T1 * S2 executes T2 Transactions are sent to all group members, for simplicity again, lets only observe S3, on which T1 and T2 are remote transactions. Lets also assume that both T1 and T2 only have one event, which length is 100 bytes. So in S3 Group Replication applier (relay log) we would have: # at 4 #150101 00:00:25 server id 3 end_log_pos 122 Start: binlog v4 # at 126 #150101 00:00:26 server id 1 end_log_pos 100 T1 # at 226 #150101 00:00:26 server id 2 end_log_pos 100 T2 Please note the end_log_pos 100 on both transactions. Since on Group Replication we have several masters there is not a single master position to follow. So we will need to add a exception to WL#7165 applier recovery to execute a different recovery procedure. This exception will only be available on Group Replication applier channel. Since we have always GTID_MODE=ON on Group Replication, the simplest way to solve this issue it is on WL#7165 applier recovery ignore the positions completely, seek the current relay log to the beginning and start from there. Already applied transactions will be skipped due to GTIDs auto skip feature and applier will resume from the last applied transaction. On 5.8 internal WL#7165 applier recovery procedure should ignore positions and only rely on GTIDs. This approach works if transactions do not span multiple relay logs, that can happen on the following scenarios: 1. flush relay logs in the middle of writing a transaction; 2. max_relay_log_size exceeded in the middle of writing a transaction; 3. user does CHANGE MASTER TO MASTER_LOG_POS forcing it to start in the middle of a transaction; 4. user kills the receiver thread in the middle of writing a transaction, then reconnects using auto_position=0; #2 only happens in 5.6, so it's not a problem. #3 and #4 are not possible with Group Replication. However, seems that #1 could happen: group replication uses queue_event to write the relay log, and queue_event will acquire and release LOG_lock. Since a transaction consists of multiple events, group replication will have a sequence of calls to queue_event for any given transaction. If FLUSH RELAY LOGS happens between two such calls, the second relay log will begin with a half transaction. To prevent that user must be disallowed to execute FLUSH RELAY LOGS FOR CHANNEL "group_replication_applier"; while Group Replication is running. FLUSH RELAY LOGS without FOR CHANNEL does not affect Group Replication channels, so no problem there.
SUMMARY OF CHANGES ================== Server core changes ------------------- 1. Wait until view change procedure will change to stop after view change transaction is committed. When WAIT UNTIL finds the view change event, it sets a flag that will cause the WAIT UNTIL to stop after transaction is applied. Applied and logged view change event will be the one fetched by Group Replication recovery procedure. 2. Sequential execution of transactions on logical clock parallel applier, by setting transaction logical timestamps to (0,0). This was already implemented on server, but since it was not a valid code path, there was a warning. Now we have a valid code path, so warning was removed. 3. WL#7165 applier recovery when executed on Group Replication applier channel will ignore positions and will only rely on GTIDs. It will seek the current relay log to the beginning and start from there. Already applied transactions will be skipped due to GTIDs auto skip feature and applier will resume from the last applied transaction. 4. Disallow FLUSH RELAY LOGS FOR CHANNEL "group_replication_applier". To avoid that DBA splits transactions among relay logs, the command FLUSH RELAY LOGS was disabled for channel "group_replication_applier". 5. Extend Trans_context_info structure with parallel applier options. Group Replication doesn't support database parallel applier, in order to prevent its use, it will check if it is in use and error out. 6. Add channel_flush() function to rpl_channel_service_interface Provide a API function to flush relay logs from within a plugin, so that START GROUP_REPLICATION can rotate applier relay log. Files changed: rpl_channel_service_interface.h, rpl_channel_service_interface.cc Group Replication plugin changes -------------------------------- 1. Add to Certifier class: * parallel_applier_last_committed_global * parallel_applier_sequence_number 2. Initialization of parallel_applier_sequence_number: group member join: parallel_applier_sequence_number= 2 3. parallel_applier_sequence_number is incremented after each positive remote transaction certification 4. Initialization of parallel_applier_last_conflicting_transaction: group member join: parallel_applier_last_committed_global= 1 5. parallel_applier_last_committed_global is updated when: garbage collection procedure purges data from certification info: parallel_applier_last_committed_global= parallel_applier_sequence_number after transaction is certified positively: transaction does not have write set: parallel_applier_last_committed_global= parallel_applier_sequence_number 6. Algorithm (AL1) to determine if a given remote transaction can be applied in parallel: 1) Transaction is certified positively trx.LC= parallel_applier_last_committed_global 2) Transaction have write set? a) Yes: Check if any of the transaction write set exists on the certification info. If write sets sequence number is greater than trx.LC then update trx.LC b) No: parallel_applier_last_committed_global= parallel_applier_sequence_number 3) Insert/update current transaction write set and snapshot version on certification info. 4) Assign parallel_applier_sequence_number to trx.SN 5) Increment parallel_applier_sequence_number
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.