WL#11610: MySQL GCS: Support Data Fragmentation

Affects: Server-8.0   —   Status: Complete   —   Priority: Medium

Executive summary

Currently, the communication system in use (i.e. XCom) is not prepared to handle big messages and an attempt to send them may result in some nodes being expelled from the cluster. This happens because XCom is single-threaded and a big message hijacks its thread for a long period thus triggering retransmissions and leading to stalls and eventually nodes are considered faulty and are expelled from the cluster.

In this WL, we give users the possibility of splitting messages that exceed a predefined threshold size. This change will require new GCS message formats and a new protocol version. To maintain backwards compatibility, we introduce commands to change the protocol version in use by a group.

User stories

  • As a MySQL User, I want to be able to configure the system to split big transactions into small communication messages in order to avoid stalling the communication system.

Requirements

1 Definitions

  • A node n supports communication protocol x if 5.7.14 <= x <= version(n), e.g. 5.7.25 <= 8.0.16.
  • A fragment is a portion of its original message.

2 Functional requirements

  1. It must be possible to specify the desired maximum message size threshold using the group_replication_communication_max_message_size system variable.
  2. Instances must automatically split a message into fragments if the message's size is greater than the value of group_replication_communication_max_message_size.
  3. Instances must send fragments individually, reassemble them automatically upon reception, and deliver the single, reassembled message, upwards.
  4. Instances must discard fragments sent by instances which do not belong to the group.
  5. An instance announces a communication protocol version to the group when joining. It must be possible to inspect and modify which protocol version will be announced using the group_replication_communication_protocol_join system variable.
  6. An instance announces a communication protocol version to the group when joining. It must be possible to modify the announced protocol version to x if the node supports x.
  7. An instance announces a communication protocol version to the group when joining. It must not be possible to modify the announced protocol version to x if the node does not support x.
  8. It must be possible to inspect the communication protocol version in use by a group using the group_replication_get_communication_protocol UDF.
  9. It must be possible to modify the group's communication protocol using the group_replication_set_communication_protocol UDF.
  10. It must be possible to modify the group's communication protocol to x if all members support x.
  11. It must not be possible to modify the group's communication protocol to x if any member does not support x.
  12. It must not be possible to modify the group's communication protocol without having the GROUP_REPLICATION_ADMIN privilege.
  13. It must not be possible to modify the group's communication protocol if some member is not ONLINE.
  14. It must not be possible to inspect or modify a group's communication protocol on an instance that does not belong to the group.
  15. An instance announcing a communication protocol x must become ONLINE in a group which uses the communication protocol x.
  16. An instance announcing a communication protocol x must not become ONLINE, and must eventually become ERROR, in a group which uses a communication protocol y != x.

3 Non-functional requirements

Interface specification

1 Communication protocol used by a group

1.1 Inspecting the protocol version

We create a new UDF group_replication_get_communication_protocol to inspect the group's communication protocol version at runtime. For example:

mysql> SELECT group_replication_get_communication_protocol();
+------------------------------------------------+
| group_replication_get_communication_protocol() |
+------------------------------------------------+
| 8.0.16                                         |
+------------------------------------------------+

The command must fail with an appropriate error code and message if the protocol cannot be inspected. The command fails if executing it would violate functional requirement 14.

1.2 Modifying the protocol version

We create a new UDF group_replication_set_communication_protocol(MySQL-version) to reconfigure the group's communication protocol version at runtime. The idea is that we set the communication protocol to the oldest MySQL version we wish to have in the group. Therefore, the MySQL-version parameter is a string formatted as major.minor.patch, e.g. 5.7.25 or 8.0.16.

For instance, consider we have a group running 8.0.16 instances, using the new message fragmentation support added by this worklog. We cannot simply add a 5.7.25 instance to the group, because it does not handle message fragmentation. We must first set the group's communication protocol to 5.7.25:

mysql> SELECT group_replication_set_communication_protocol("5.7.25");

After the command above, the whole group falls back to a communication protocol compatible with 5.7.25 instances. At this point we may now add compatible older instances.

The command must fail with an appropriate error code and message if the protocol cannot be modified. The command fails if executing it would violate functional requirements 11--14.

2 Communication protocol announced by an instance


IMPORTANT: Earlier we removed the group_replication_communication_protocol_join system variable in favor of automatically adapting to the group's communication protocol when joining. The remainder of this section serves only as historical context.


We create a new system variable group_replication_communication_protocol_join to specify the communication protocol version the instance will announce to the group. The idea is that for the instance to successfully join the group, the instance must announce that it supports the communication protocol in use by the group.

For instance, consider we are running a group using a communication protocol compatible with 5.7.25. Consider a case where we have a 8.0.16 instance, using the new message fragmentation support added by this worklog, and want to add it to the group. However, the group does not handle message fragmentation. Therefore, for the 8.0.16 instance to successfully join the group, it must announce itself to the group using the appropriate communication protocol. To do so, we must set group_replication_communication_protocol_join="5.7.25". If the communication protocol matches the group's, the instance will join successfully and become ONLINE in the group.

If a node attempts to join a group announcing an incompatible protocol version, it temporarily joins the group but is expelled before becoming ONLINE.

Below are the characteristics of group_replication_communication_protocol_join:

Property Value
Command-Line Format --group-replication-communication-protocol-join=value
System Variable group_replication_communication_protocol_join
Scope Global
Dynamic Yes
SET_VAR Hint Applies No
Type String
Default Value ""

Please note that the default value ("") means that the server will announce the greatest protocol version it knows, i.e. it's MySQL server version.

3 Mapping between MySQL versions and GCS protocol versions

With the introduction of this worklog, two GCS protocol versions exist. Version 1 is the original GCS protocol version, which supports compression. Version 2 is the new GCS protocol version, which supports both compression and fragmentation.

With the objective of user friendliness, the group_communication_{get,set}_communication_protocol UDFs and group_replication_communication_protocol_join system variable operate over MySQL versions. The Group Replication plugin translates a MySQL version into the appropriate GCS protocol version. MySQL versions in the domain [5.7.14, 8.0.15] map to GCS protocol version 1, and MySQL version 8.0.16 maps to GCS protocol version 2.

In practice, this means that the communication protocols 5.7.14 and 8.0.15, and every other in between, are interchangeable and compatible with one another, because they all map to GCS protocol version 1. group_replication_get_communication_protocol always outputs the minimum MySQL version that maps to the same GCS protocol version as the MySQL version given to group_replication_set_communication_protocol. For example, executing group_replication_set_communication_protocol("8.0.15") and subsequently executing group_replication_get_communication_protocol will output 5.7.14.

The guideline is that users should use the MySQL version of the oldest instance they want their group to support.

4 Specifying the fragmentation threshold

We create a new system variable group_replication_communication_max_message_size to specify the fragmentation threshold used by the instance. The idea is that messages whose size exceeds the value of group_replication_communication_max_message_size will be automatically fragmented by the system.

For instance, consider a message whose size is 10, and group_replication_communication_max_message_size=3. The system will split the message into four fragments, the first three with size 3, and the last with size 1.

To turn off fragmentation, set group_replication_communication_max_message_size=0.

Below are the characteristics of group_replication_communication_max_message_size:

Property Value
Command-Line Format --group-replication-communication-max-message-size=value
System Variable group_replication_communication_max_message_size
Scope Global
Dynamic Yes
SET_VAR Hint Applies No
Type Integer
Default Value 10485760
Minimum Value 0
Maximum Value 1073741824

The default value is 10MiB. The maximum value is the same as the maximum value of --slave-max-allowed-packet.

5 Summary

To summarise:

  1. A group shall consist of members operating using the same communication protocol.
  2. An instance that is not using the same communication protocol as the group's will not be able to become ONLINE.
  3. We can inspect and modify the group's communication protocol using the group_replication_{get,set}_communication_protocol UDFs.
  4. We can inspect and modify communication protocol announced by a instance when joining a group using the group_replication_communication_protocol_join system variable.
  5. We can specify the fragmentation threshold of an instance using the group_replication_communication_max_message_size system variable.

Design specification

1 Definitions

  • Newer node: a node that implements this worklog.
  • Older node: a node that does not implement this worklog.

2 Modifying the group's communication protocol

The ability to modify the group's communication protocol will be implemented as a group action. The group action is initiated by a call to group_replication_set_communication_protocol(version) and will execute at the same logical moment in all group's members, i.e. it will be executed by the GCS engine thread. If the specified version is supported by all members, the group action is executed.

2.1 Checking member support

2.2.1 Announcing supported protocols and protocol in use

When a node joins a group, all the group members and the joining node perform a state exchange between them. Roughly, the messages exchanged have the following format:

+--------------+-...-+
| fixed header |     |
+--------------+-...-+

The on-the-wire representation of the fixed header is:

+---------+-----------+---------------------------------------+
| field   | wire size | description                           |
+=========+===========+=======================================+
| version |   4 bytes | protocol version in use by the sender |
| ...     | ... bytes | ...                                   |
+---------+-----------+---------------------------------------+

The version field contains the protocol version in use by the sender. Unfortunately, the receiver has no way to know what is the maximum protocol version supported by the sender. This information is paramount to be able to validate whether a protocol change is supported by the group.

To be able to disseminate this information, we will split the version field in two, as follows:

+-------------+-----------+----------------------------------------------+
| field       | wire size | description                                  |
+=============+===========+==============================================+
| max_version |   2 bytes | max protocol version supported by the sender |
| use_version |   2 bytes | protocol version in use by the sender        |
| ...         | ... bytes | ...                                          |
+-------------+-----------+----------------------------------------------+

This way, during the state exchange the nodes will be able to store this information to be able to validate protocol changes. A protocol change to x is valid if, for all members of the group, x <= max_version(member)

2.2.2 Backwards compatibility and correctness argument

We need to be careful to ensure backwards compatibility, because older nodes will continue to use a single 4-byte version field. These are the rules and facts that ensure correctness:

  • Older nodes serialise and deserialise the fixed header, but do not do anything with the version value.
  • The new {max,use}_version fields occupy exactly the same space of the previous version field: 2 + 2 = 4 bytes. This ensures that the older nodes will correctly deserialise the fixed header.
  • Since versions are unsigned, the older nodes will only use 2 of the 4 bytes of the version field up to protocol version 65,535. This shall be the end of the protocol version domain, i.e. no versions greater than 65,535 are supported. This value should be enough... (Last famous words.)
  • A message sent by a newer node will always be deserialised by other newer node with max_version = x and use_version = y, with y <= x.
  • Before this worklog, there is only a single protocol version 1. A message sent by an older node will always be deserialised by a newer node with max_version = 0 and use_version = 1. max_version = 0 shall be a special case which means that the maximum protocol the sender supports is use_version. This ensures the newer nodes process older nodes correctly, i.e. that they are using version 1 and only support version 1.

2.2 Action algorithm

The action's algorithm is as follows:

  1. Start buffering the node's outgoing messages.
  2. Wait until all the node's outgoing messages have been delivered.
  3. Modify the node's communication protocol version.
  4. Stop buffering the node's outgoing messages and send any messages buffered in step (1).

2.2.1 Guidelines for possible implementation

Steps (1) and (2) of the algorithm require synchronisation between user threads, which are sending messages, and the GCS thread, which is changing the protocol. One approach to implement the algorithm is for user threads wait for ongoing protocol changes before sending messages. Here is a rough sketch of the synchronisation requirements:

when send-message(m) from user thread:
    atomically:
        if protocol_changing:
           wait until protocol_changing = false
        nr_msgs_in_transit++
  ...

when change-protocol(v) from GCS thread:
    atomically:
        protocol_changing := true
    wait until nr_msgs_in_transit = 0
  ...

When a user thread attempts to send a message, it must atomically: (1) detect that the protocol is changing, and wait for it to finish, and (2) increment a counter of messages in transit. The counter informs the GCS thread of whether we are in a stable state (no messages are in transit), so that the protocol can be changed. From the GCS thread side, we use a flag to signal to the user threads that a protocol change is underway, so no new messages may be sent.

Important

Please note that the state exchange message is sent by the GCS thread. As such, the GCS thread must not block when sending a state exchange message, regardless of whether there is a protocol change ongoing. Otherwise the entire system will hang.

State exchange messages are already a special case: they are always sent using the pipeline of the original protocol 1. This means that they pose no harm to correctness if they are sent while a protocol change is ongoing. Therefore, state exchange messages must not be buffered, and the GCS thread must always be able to send them, regardless of protocol changes.

3 Dealing with joins of incompatible nodes

When a node attempts to join the group, it must not become ONLINE if it is running an incompatible communication protocol. There are four cases:

  1. A compatible older node is joining a group.
  2. A compatible newer node is joining a group.
  3. An incompatible older node is joining a group.
  4. An incompatible newer node is joining a group.

3.1 Compatible older node joining group

This case is trivial as the node is by definition compatible with the group.

3.2 Compatible newer node joining group

This case is trivial as the node is by definition compatible with the group.

3.3 Incompatible older node joining group

In this situation we have an incompatible older node attempting to join a group. By definition, this means that the group contains newer nodes and is using the new communication protocol. Since the older node will not take any action to prevent himself to become ONLINE, the responsibility falls to the group members, which are newer nodes.

The group members can identify this situation as follows. After the state exchange has taken place, and immediately after GCS delivers the new view to GR, the existing group members will inspect the communication protocol version announced by all joining nodes (use_version header field), and expel any joining node which has announced an incompatible communication protocol version, i.e. different from the one in use by the group members.

To allow the older node to join the group, the group's communication protocol must be modified to one compatible with the older node's version. To do so, the user must use the group_replication_set_communication_protocol UDF.

3.4 Incompatible newer node joining group

In this situation we have an incompatible newer node attempting to join a group. There are two possible reasons for the joining node's incompatibility:

  1. The group has newer nodes, but the group is running an older communication protocol.
  2. The group consists entirely of older nodes.

Reason (1) is equivalent to §3.3, meaning that some group members will identify the joining node as incompatible and expel it. The expelled node can join the group if it modifies its communication protocol appropriately using the group_replication_communication_protocol_join system variable.

As for reason (2), it requires the joining node to self-identify as incompatible, because none of the existing group members will do so. After the state exchange has taken place, and immediately after GCS delivers the new view to GR, the joining node compares its communication protocol with the ones announced by the group's members (use_version header field). If the communication protocol versions do not match, the joining node expels himself.

To join the group, the newer node must modify its communication protocol appropriately using the group_replication_communication_protocol_join system variable.

5 Dealing with concurrency between joins and protocol changes

Since the protocol change will be implemented as a group action, it will inherit the group action framework's behaviour when a node joins while a group action is ongoing.

5.1 Group actions

5.1.1 Group action-aware node joining group

During the state exchange phase, nodes that implement the group action framework will disseminate whether a group action is ongoing. A joining node that implements the group action framework will see that an action is ongoing and leave the group.

5.1.2 Group action-oblivious node joining group

Nodes that predate the group action framework, however, do not process the information exchanged about group actions. At the time of writing, a node that does not implement group actions can join a group while a group action is ongoing.

5.2 This worklog

In the specific case of this worklog, we can identify whether a node is joining during a protocol change. Recall that all members begin the protocol change at the same logical moment. However, each node finishes its protocol change at its own pace, because each node only waits for its own outstanding messages to be delivered---see step (2) of §2.2. This means that between the logical moment where members begin the protocol change, and the physical moment where they actually change their protocol version, a node can join the group. In particular, the following situation is possible:

  1. The group begins changing the protocol version from v to v+1.
  2. A new node joins the group using protocol v.
  3. The state exchange takes place with all nodes announcing they are using protocol v.
  4. The new node joins the group, using protocol v.
  5. The remaining members finish changing the protocol to v+1.

This situation must not happen. To prevent this situation, if a protocol change is ongoing, state exchange messages will announce the protocol the node is changing to, regardless of whether the change has already physically happened or not.

This behaviour transforms the situation described above into §3.3 (§3.4) if the joining node is using an older (newer) protocol than the one the group is changing to. It also enforces the invariant that the version announced by all the existing members of a group is always the same, regardless of whether all the nodes have physically changed the protocol or not. In other words, the protocol change happens logically at the moment the group action starts, even though it happens physically on each node at its own pace.

6 Fragmentation

6.1 Overview

In order to avoid getting nodes expelled because XCom (i.e. the communication system) is not prepared to handle big messages, reduce latency and increase throughput, the GCS will have to fragment big messages into small messages that can be easily handled by the communication system.

Before sending a message, the system (i.e split stage) will check if its size exceeds the group_replication_communication_max_message_size option. If it does, the message will be split into several fragments and the resulting fragments will be moved into the next processing stage.

Messages that were split will be put back together before being delivered to the upper layer (i.e. Group Replication).

6.2 Background

Any outgoing or incoming message has to go through a pipeline when it is sent to or received from a group, respectively.

The message pipeline has stages registered and these are assembled in an outgoing pipeline. The outgoing messages always have to traverse this pipeline. For incoming messages, the pipeline is built on the fly, according to information contained in the type code of their stage headers.

The current message has the following fields that are relevant to our discussion:

+--------------+-----------------+-...-+-------------------+---------+
| Fixed header | Stage #1 header |     | Stage #n-1 header | Payload |
+--------------+-----------------+-...-+-------------------+---------+

In particular, the fixed header has the following fields that are relevant to our discussion:

+-------------+-------------+-...-+------------+
| Max version | Use version |     | Cargo type |
+-------------+-------------+-...-+------------+

Each stage header has the following fields that are relevant to our discussion:

+-...-+-----------+-...-+
|     | Type code |     |
+-...-+-----------+-...-+

Introducing a new stage requires incrementing the GCS protocol version number, existing stages' type code, and setting up the pipeline for the new protocol version.

6.3 Sketch of solution

The idea is to introduce an additional message stage, the split stage, that will be responsible for splitting big messages into smaller fragments.

Besides splitting the original payload into fragments, the split stage will add some auxiliary metadata to each fragment to help reassemble the original message. The metadata will have the following format:

+-----------+------------+-------------+---------------+------+
| Sender ID | Message ID | Fragment ID | Nr. Fragments | Size |
+-----------+------------+-------------+---------------+------+
  • Sender ID: Identifier that is used to uniquely identify the original sender.
  • Message ID: Identifier that is used to uniquely identify the original message from the sender.
  • Fragment ID: Fragment number of the original message that the payload corresponds to.
  • Number of fragments: The total number of fragments that together compose the original message.
  • Size: The payload size that is being carried on by this fragment.

Messages whose size are above a predefined threshold shall be automatically fragmented by the system. Note, however, that fragmented messages are sent individually but are reassembled upon reception and only then is the original message delivered.

6.4 Reducing memory allocation and copy in the pipeline

To reduce memory allocation and copy, this WL will slightly modify the message stage API and the wire format of a packet.

6.4.1 Message stage API

Here's a simplified description of the message stage API. The two new functions are prepended by *:

+--------------------------------------+----------------------------------------------------------------------------------------------------+
| Function                             | Description                                                                                        |
+======================================+====================================================================================================+
| apply(Packet) -> set<Packet>         | Applies the stage transformation to the given packet, which produces a set of transformed packets. |
| revert(Packet) -> Packet?            | Reverts the stage transformation to the given packet, which may produce a packet.                  |
| *create_metadata() -> Stage_metadata | Returns the metadata added by the stage.                                                           |
| *get_metadata_size() -> number       | Returns the size of the metadata added by the stage.                                               |
+--------------------------------------+----------------------------------------------------------------------------------------------------+

The idea is that every stage must now specify the metadata it will add to the transformed message. The pipeline will use that information to allocate a buffer with enough space for everything.

6.4.2 Wire format

As stated, each stage can now specify its own metadata. The wire format of a message now has the following layout:

+--------------+-----------------+-...-+-------------------+-------------------+-...-+---------------------+---------+
| Fixed header | Stage #1 header |     | Stage #n-1 header | Stage #1 metadata |     | Stage #n-1 metadata | Payload |
+--------------+-----------------+-...-+-------------------+-------------------+-...-+---------------------+---------+

Note that stage metadata is optional; each stage decides whether it adds any metadata to the message. For example, as presented in §6.3, the new split stage will add metadata. However, the compression stage does not add any metadata. Here is an example of the wire format of a message that goes through the compression and fragmentation stages:

+--------------+--------------------+----------------------+------------------------+---------+
| Fixed header | Compression header | Fragmentation header | Fragmentation metadata | Payload |
+--------------+--------------------+----------------------+------------------------+---------+

6.4.3 Discussion

With this scheme, the pipeline allocates an initial packet for an outgoing message that is able to hold the fixed header, stage headers, stage metadata, and original payload. Each stage will typically perform some, or all, of the following actions:

  • Populate its metadata section
  • Allocate new packet(s) and copy the modified payload to the new packet(s)

For example, the compression stage must allocate a new packet to where it will write the compressed payload. If the fragmentation stage produces N fragments, it can reuse the original packet for the first fragment, and must allocate N-1 packets to where it will copy the remaining fragments of the original payload.

The implementation should strive to not make any more allocations/copies than these.

6.5 Impact on the state exchange phase

The current wire format of the state exchange message is the following:

+--------+---------------------+
| Header | Upper-layer payload |
+--------+---------------------+

Where the header has the following format:

+---------+------------+
| View ID | XCom synod |
+---------+------------+

With the introduction of the fragmentation stage into the message pipeline, it is possible that an original message is fragmented into several fragments, and each fragment is delivered by XCom individually.

This means that when a node joins, it is possible for the transmission of a fragmented message to be ongoing, i.e. the existing group members have already received some, but not all, of the fragments. In this situation, the last fragment will arrive after the new node has joined the group. Since the original message is only delivered when all its fragments are received, this means that the original message will need to be delivered by the new node as well. But this poses a challenge: the new node does not have the fragments that were received before the new node joined.

We solve this problem by augmenting the state exchange message with information about the ongoing transmission of fragmented messages. In particular, for every fragmented message whose transmission is ongoing, a node will attach the XCom synods of the fragments it has already received. The new node can use this information to fetch the fragments from XCom.

As such, this worklog changes the state exchange message's wire format to the following:

+--------+---------------------+----------------+
| Header | Upper-layer payload | Recovery info. |
+--------+---------------------+----------------+

Where the recovery information has the following format:

+---------------+-...-+----------------+------------+
| XCom synod #1 |     | XCom synode #n | Nr. synods |
+---------------+-...-+----------------+------------+

6.5.1 Safety concerns

GCS instances that do not implement this worklog will deserialize the recovery information as part of the upper-layer payload. However, this is safe as long as the upper layer consumes only the portion it is expecting (upper-layer payload), even though it is fed its expected payload with the recovery information appended. This is the case with Group Replication.

6.5.2 Recovering the missing fragments

After the state exchange finishes, the joining node will attempt to recover the payloads of all the synods in the recovery information from the XCom instance of some existing group member. It the joining node recovers the fragments successfully, it joins the group. It the joining node fails to recover the fragments, it expels itself from the group.