MySQL 8.0.39
Source Code Documentation
Gcs_xcom_communication_protocol_changer Class Reference

Implements the communication protocol change logic. More...

#include <gcs_xcom_communication_protocol_changer.h>

Public Member Functions

 Gcs_xcom_communication_protocol_changer (Gcs_xcom_engine &gcs_engine, Gcs_message_pipeline &pipeline)
 
 Gcs_xcom_communication_protocol_changer (Gcs_xcom_communication_protocol_changer const &)=delete
 
 Gcs_xcom_communication_protocol_changer (Gcs_xcom_communication_protocol_changer &&)=delete
 
Gcs_xcom_communication_protocol_changeroperator= (Gcs_xcom_communication_protocol_changer const &)=delete
 
Gcs_xcom_communication_protocol_changeroperator= (Gcs_xcom_communication_protocol_changer &&)=delete
 
Gcs_protocol_version get_protocol_version () const
 Retrieves the current protocol version in use. More...
 
std::pair< bool, std::future< void > > set_protocol_version (Gcs_protocol_version new_version)
 Starts a protocol change. More...
 
Gcs_protocol_version get_maximum_supported_protocol_version () const
 Retrieves the greatest protocol version currently supported by the group. More...
 
void set_maximum_supported_protocol_version (Gcs_protocol_version version)
 Sets the greatest protocol version currently supported by the group. More...
 
bool is_protocol_change_ongoing () const
 Checks whether a protocol change is ongoing. More...
 
void atomically_increment_nr_packets_in_transit (Cargo_type const &cargo)
 Synchronises user threads, which send messages, with the GCS thread, which performs protocol changes. More...
 
void adjust_nr_packets_in_transit (Cargo_type const &cargo, std::size_t const &nr_additional_packets_to_send)
 After an outgoing message goes through the pipeline, it may produce more than one packet. More...
 
void decrement_nr_packets_in_transit (Gcs_packet const &packet, Gcs_xcom_nodes const &xcom_nodes)
 Decrement the number of my in-transit packets. More...
 
void finish_protocol_version_change (Gcs_tagged_lock::Tag const caller_tag)
 Due to the synchronisation protocol used between user threads, which send messages, and the GCS thread, which performs protocol changes, a user thread may be the one to hit the condition that triggers the protocol change to finish. More...
 

Private Member Functions

unsigned long get_nr_packets_in_transit () const
 
void begin_protocol_version_change (Gcs_protocol_version new_version)
 
void commit_protocol_version_change ()
 
void release_tagged_lock_and_notify_waiters ()
 
std::pair< bool, Gcs_tagged_lock::Tagoptimistically_increment_nr_packets_in_transit ()
 
void rollback_increment_nr_packets_in_transit (Gcs_tagged_lock::Tag const &tag)
 
void wait_for_protocol_change_to_finish ()
 

Private Attributes

Gcs_tagged_lock m_tagged_lock
 Tagged lock used for the optimistic synchronisation protocol between user threads, which send messages, and the GCS thread, which performs protocol changes. More...
 
std::mutex m_mutex
 For user threads to wait for an ongoing protocol change to finish. More...
 
std::condition_variable m_protocol_change_finished
 
std::promise< void > m_promise
 Stores the outcome of the protocol change operation. More...
 
Gcs_protocol_version m_tentative_new_protocol
 The protocol version we are going to change to when we start a protocol change. More...
 
std::atomic< Gcs_protocol_versionm_max_supported_protocol
 
std::atomic< unsigned long > m_nr_packets_in_transit
 
Gcs_xcom_enginem_gcs_engine
 
Gcs_message_pipelinem_msg_pipeline
 

Detailed Description

Implements the communication protocol change logic.

Design

The algorithm to change the communication protocol is roughly as follows:

1. Start buffering the node's outgoing messages.
2. Wait until all the node's outgoing messages have been delivered.
3. Modify the node's communication protocol version.
4. Stop buffering the node's outgoing messages and send any messages
   buffered in step (1).

Implementing the algorithm requires synchronising user threads, which send messages, with the GCS thread, which performs communication protocol changes.

The high level view of the synchronisation protocol between the user and GCS threads is the following:

when send-message(m) from user thread:
  atomically:
    if protocol_changing:
      wait until protocol_changing = false
    nr_msgs_in_transit++
  ...

when change-protocol(v) from GCS thread:
  atomically:
    protocol_changing := true
  wait until nr_msgs_in_transit = 0
  ...

We expect that communication protocol changes are rare events, especially when compared to sending messages. As such, the actual implementation strives to minimise the overhead on the code path that sends messages.

To do this, we use an optimistic synchronisation protocol on the send message side, that works as follows:

Algorithm #0, User thread:

  1. If no protocol change is ongoing, the user thread will optimistically increment the number of messages in transit. 2.a If a protocol change did not start meanwhile, we are good to go. 2.b If a protocol change started meanwhile: 2.b.1. Rollback the increment to the number of messages in transit 2.b.2. Wait for the protocol change to finish.

There is an additional action that needs to be performed on step (2.b), but we will describe that action when we have the necessary context to understand it.

On the protocol change side, it works as follows:

Algorithm #1, GCS thread:

  1. Store that a protocol change is ongoing.
  2. When the number of messages in transit is zero: 2.1. Change the protocol version 2.2. Wake up any user threads waiting for the protocol change 2.3. Deem the protocol change finished

The central part of the Algorithm #1 is step (2). The question is: who triggers, and where, step (2)'s condition, i.e. the number of in-transit messages is zero? Well, the obvious place is that it is the GCS thread itself, when it is processing an incoming message. If that message comes from us, then we decrease the number of in-transit messages, which may set it to zero.

However, recall that the user threads employ an optimistic synchronisation protocol that "acts first, and asks for forgiveness later." If the user thread rolls back its increment to the number of in-transit messages, it may be the one to set it to zero—see Algorithm #0, step (2.b). In this situation, it is the user thread that hits the condition required by the GCS thread in Algorithm #1, step (2). In order for the GCS thread to finish the protocol change, the user thread must somehow signal the GCS thread to trigger its step (2). This is the missing action of Algorithm #0, step (2.b).

So, the final synchronisation protocol of the user thread's side looks like this:

Algorithm #2, User thread:

  1. If no protocol change is ongoing, the user thread will optimistically increment the number of messages in transit. 2.a If a protocol change did not start meanwhile, we are good to go. 2.b If a protocol change started meanwhile: 2.b.1. Rollback the the increment to the number of messages in transit 2.b.2. If our rollback set the number of messages in transit to zero, signal the GCS thread 2.b.3. Wait for the protocol change to finish.

Implementation

The implementation attempts to add as little overhead as possible to the common case, which is that no protocol change is ongoing. This is the fast path of Algorithm #2, step (2.a). To achieve this goal, it employs a tagged lock. For more details on the tagged lock implementation, see Gcs_tagged_lock.

In a nutshell, the tagged lock is a read-write spin lock which offers the following API:

try_lock() -> bool
unlock()
optimistic_read() -> tag
validate_optimistic_read(tag) -> bool

For the write-side section, one uses it as a typical spin lock, e.g.:

do:
  lock_acquired := try_lock()
while (not lock_acquired)
write-side section
unlock()

For the read-side section, one can use it as follows:

done := false
while (not done):
  tag := optimistic_read()
  unsynchronised read-side section
  done := validate_optimistic_read(tag)
  if (not done):
    rollback unsynchronized read-side section

The idea is to allow an optimistic read-side section that does not perform any memory stores. This is in contrast with a typical read-write lock, where the read side performs some memory stores to account for the reader, e.g. keeping a reader counter. The trade off is that:

a. the execution of the read-side of a tagged lock may be concurrent with the write-side section if meanwhile the tagged lock is acquired b. the read-side of a tagged lock may fail if meanwhile the tagged lock is acquired, in which case one may want to rollback the effects of the failed read-side section

The algorithms of the design are implemented as follows:

Algorithm #1 implementation, GCS thread:

  1. Lock the tagged lock
  2. When the number of messages in transit is zero: 2.1. Change the protocol version 2.2. Unlock the tagged lock, signal a condition variable to wake up any user threads waiting for the protocol change 2.3. Deem the protocol change finished

Algorithm #2 implementation, User thread:

  1. If the tagged lock is unlocked: 1.1. Start an optimistic read-side section 1.2. Atomically increment the number of messages in transit 2.a If the optimistic read-side section validates, we are good to go. 2.b If the optimistic read-side section fails validation: 2.b.1. Atomically rollback the increment to the number of messages in transit 2.b.2. If our rollback set the number of messages in transit to zero, signal the GCS thread 2.b.3. Wait on a condition variable for the protocol change to finish.

Note that we have concurrent access to the number of messages in transit which needs to be synchronised. This is done by using an std::atomic to implement the number of messages in transit.

Some final implementation pointers:

a. Algorithm #1: see the code path that starts on set_protocol_version and finish_protocol_version_change. b. Algorithm #2: see the code paths that start on atomically_increment_nr_packets_in_transit, adjust_nr_packets_in_transit, and decrement_nr_packets_in_transit.

Constructor & Destructor Documentation

◆ Gcs_xcom_communication_protocol_changer() [1/3]

Gcs_xcom_communication_protocol_changer::Gcs_xcom_communication_protocol_changer ( Gcs_xcom_engine gcs_engine,
Gcs_message_pipeline pipeline 
)
explicit

◆ Gcs_xcom_communication_protocol_changer() [2/3]

Gcs_xcom_communication_protocol_changer::Gcs_xcom_communication_protocol_changer ( Gcs_xcom_communication_protocol_changer const &  )
delete

◆ Gcs_xcom_communication_protocol_changer() [3/3]

Gcs_xcom_communication_protocol_changer::Gcs_xcom_communication_protocol_changer ( Gcs_xcom_communication_protocol_changer &&  )
delete

Member Function Documentation

◆ adjust_nr_packets_in_transit()

void Gcs_xcom_communication_protocol_changer::adjust_nr_packets_in_transit ( Cargo_type const &  cargo,
std::size_t const &  nr_additional_packets_to_send 
)

After an outgoing message goes through the pipeline, it may produce more than one packet.

This method adjusts the increment done by atomically_increment_nr_packets_in_transit to take into account the additional packets produced by the pipeline.

Parameters
cargoThe type of message that will be sent
nr_additional_packets_to_sendThe number of additional packets that will actually be sent

◆ atomically_increment_nr_packets_in_transit()

void Gcs_xcom_communication_protocol_changer::atomically_increment_nr_packets_in_transit ( Cargo_type const &  cargo)

Synchronises user threads, which send messages, with the GCS thread, which performs protocol changes.

This method should be called by user threads when sending a message, before the message goes through the pipeline.

Parameters
cargoThe type of message that will be sent

◆ begin_protocol_version_change()

void Gcs_xcom_communication_protocol_changer::begin_protocol_version_change ( Gcs_protocol_version  new_version)
private

◆ commit_protocol_version_change()

void Gcs_xcom_communication_protocol_changer::commit_protocol_version_change ( )
private

◆ decrement_nr_packets_in_transit()

void Gcs_xcom_communication_protocol_changer::decrement_nr_packets_in_transit ( Gcs_packet const &  packet,
Gcs_xcom_nodes const &  xcom_nodes 
)

Decrement the number of my in-transit packets.

Parameters
packetThe incoming packet
xcom_nodesThe XCom membership at the time of delivery

◆ finish_protocol_version_change()

void Gcs_xcom_communication_protocol_changer::finish_protocol_version_change ( Gcs_tagged_lock::Tag const  caller_tag)

Due to the synchronisation protocol used between user threads, which send messages, and the GCS thread, which performs protocol changes, a user thread may be the one to hit the condition that triggers the protocol change to finish.

This function should be called by the user thread when it hits the condition, to signal the GCS thread that the protocol change should finish.

Parameters
caller_tagIdentifier of the protocol change

◆ get_maximum_supported_protocol_version()

Gcs_protocol_version Gcs_xcom_communication_protocol_changer::get_maximum_supported_protocol_version ( ) const

Retrieves the greatest protocol version currently supported by the group.

Returns
the greatest protocol version currently supported by the group

◆ get_nr_packets_in_transit()

unsigned long Gcs_xcom_communication_protocol_changer::get_nr_packets_in_transit ( ) const
private

◆ get_protocol_version()

Gcs_protocol_version Gcs_xcom_communication_protocol_changer::get_protocol_version ( ) const

Retrieves the current protocol version in use.

Returns
the current protocol version in use

◆ is_protocol_change_ongoing()

bool Gcs_xcom_communication_protocol_changer::is_protocol_change_ongoing ( ) const

Checks whether a protocol change is ongoing.

Returns
true if a protocol change is ongoing, false otherwise

◆ operator=() [1/2]

Gcs_xcom_communication_protocol_changer & Gcs_xcom_communication_protocol_changer::operator= ( Gcs_xcom_communication_protocol_changer &&  )
delete

◆ operator=() [2/2]

Gcs_xcom_communication_protocol_changer & Gcs_xcom_communication_protocol_changer::operator= ( Gcs_xcom_communication_protocol_changer const &  )
delete

◆ optimistically_increment_nr_packets_in_transit()

std::pair< bool, Gcs_tagged_lock::Tag > Gcs_xcom_communication_protocol_changer::optimistically_increment_nr_packets_in_transit ( )
private

◆ release_tagged_lock_and_notify_waiters()

void Gcs_xcom_communication_protocol_changer::release_tagged_lock_and_notify_waiters ( )
private

◆ rollback_increment_nr_packets_in_transit()

void Gcs_xcom_communication_protocol_changer::rollback_increment_nr_packets_in_transit ( Gcs_tagged_lock::Tag const &  tag)
private

◆ set_maximum_supported_protocol_version()

void Gcs_xcom_communication_protocol_changer::set_maximum_supported_protocol_version ( Gcs_protocol_version  version)

Sets the greatest protocol version currently supported by the group.

Parameters
versionprotocol

◆ set_protocol_version()

std::pair< bool, std::future< void > > Gcs_xcom_communication_protocol_changer::set_protocol_version ( Gcs_protocol_version  new_version)

Starts a protocol change.

The protocol change is asynchronous, the caller can wait for the change to finish using the returned future.

Note that for safety this method must only be called by the GCS engine thread.

Parameters
new_versionThe desired protocol version to change to
Return values
{true,future}If successful
{false,_}If the group does not support the requested protocol

◆ wait_for_protocol_change_to_finish()

void Gcs_xcom_communication_protocol_changer::wait_for_protocol_change_to_finish ( )
private

Member Data Documentation

◆ m_gcs_engine

Gcs_xcom_engine& Gcs_xcom_communication_protocol_changer::m_gcs_engine
private

◆ m_max_supported_protocol

std::atomic<Gcs_protocol_version> Gcs_xcom_communication_protocol_changer::m_max_supported_protocol
private

◆ m_msg_pipeline

Gcs_message_pipeline& Gcs_xcom_communication_protocol_changer::m_msg_pipeline
private

◆ m_mutex

std::mutex Gcs_xcom_communication_protocol_changer::m_mutex
private

For user threads to wait for an ongoing protocol change to finish.

◆ m_nr_packets_in_transit

std::atomic<unsigned long> Gcs_xcom_communication_protocol_changer::m_nr_packets_in_transit
private

◆ m_promise

std::promise<void> Gcs_xcom_communication_protocol_changer::m_promise
private

Stores the outcome of the protocol change operation.

◆ m_protocol_change_finished

std::condition_variable Gcs_xcom_communication_protocol_changer::m_protocol_change_finished
private

◆ m_tagged_lock

Gcs_tagged_lock Gcs_xcom_communication_protocol_changer::m_tagged_lock
private

Tagged lock used for the optimistic synchronisation protocol between user threads, which send messages, and the GCS thread, which performs protocol changes.

◆ m_tentative_new_protocol

Gcs_protocol_version Gcs_xcom_communication_protocol_changer::m_tentative_new_protocol
private

The protocol version we are going to change to when we start a protocol change.


The documentation for this class was generated from the following files: