MySQL 8.4.2
Source Code Documentation
Gcs_message_pipeline Class Reference

This is the pipeline that an outgoing or incoming message has to go through when being sent to or received from the group respectively. More...

#include <gcs_message_stages.h>

Public Member Functions

 Gcs_message_pipeline ()
 
 Gcs_message_pipeline (Gcs_message_pipeline &p)=delete
 
Gcs_message_pipelineoperator= (const Gcs_message_pipeline &p)=delete
 
 Gcs_message_pipeline (Gcs_message_pipeline &&p)=delete
 
Gcs_message_pipelineoperator= (Gcs_message_pipeline &&p)=delete
 
virtual ~Gcs_message_pipeline ()=default
 
std::pair< bool, std::vector< Gcs_packet > > process_outgoing (Gcs_message_data const &msg_data, Cargo_type cargo) const
 This member function SHALL be called by the message sender. More...
 
std::pair< Gcs_pipeline_incoming_result, Gcs_packetprocess_incoming (Gcs_packet &&packet) const
 This member function SHALL be called by the receiver thread to process the packet through the stages it was processed when it was sent. More...
 
void update_members_information (const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) const
 Update the list of members in the group as this may be required by some stages in the communication pipeline. More...
 
Gcs_xcom_synode_set get_snapshot () const
 
template<class T , class... Args>
void register_stage (Args... args)
 Register a stage to be used by the pipeline. More...
 
bool contains_stage (Stage_code code) const
 Check whether a stage is registered or not. More...
 
Gcs_message_stageget_stage (Stage_code code) const
 
bool register_pipeline (std::initializer_list< Gcs_pair_version_stages > stages)
 Register the stages per version that form the different pipelines. More...
 
bool contains_pipeline (Gcs_protocol_version pipeline_version) const
 Check whether a pipeline version is registered or not. More...
 
const Gcs_stages_listget_pipeline (Gcs_protocol_version pipeline_version) const
 
void cleanup ()
 Clean all data structures and objects created. More...
 
bool set_version (Gcs_protocol_version pipeline_version)
 Set the pipeline version in use. More...
 
Gcs_protocol_version get_version () const
 Return the pipeline version in use. More...
 

Private Member Functions

const Gcs_stages_listretrieve_pipeline (Gcs_protocol_version pipeline_version) const
 Retrieve the stages associated with a pipeline version. More...
 
Gcs_message_stageretrieve_stage (Stage_code stage_code) const
 This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is returned. More...
 
Gcs_message_stageretrieve_stage (const Gcs_packet &p) const
 This member function SHALL retrieve the current stage type code of a packet. More...
 
std::pair< bool, std::vector< Stage_code > > get_stages_to_apply (Gcs_protocol_version const &pipeline_version, uint64_t const &original_payload_size) const
 Find out which stages should be applied to an outgoing message. More...
 
std::pair< bool, Gcs_packetcreate_packet (Cargo_type const &cargo, Gcs_protocol_version const &current_version, uint64_t const &original_payload_size, std::vector< Stage_code > const &stages_to_apply) const
 Create a packet for a message with size original_payload_size and type cargo, that will go through the stages stages_to_apply from pipeline version current_version. More...
 
std::pair< bool, std::vector< Gcs_packet > > apply_stages (Gcs_packet &&packet, std::vector< Stage_code > const &stages) const
 Apply the given stages to the given outgoing packet. More...
 
std::pair< bool, std::vector< Gcs_packet > > apply_stage (std::vector< Gcs_packet > &&packets, Gcs_message_stage &stage) const
 Apply the given stage to the given outgoing packet. More...
 
std::pair< Gcs_pipeline_incoming_result, Gcs_packetrevert_stage (Gcs_packet &&packet, Stage_code const &stage_code) const
 Revert the given stage to the given incoming packet. More...
 

Private Attributes

Gcs_map_type_handler m_handlers
 The registered stages. More...
 
Gcs_map_version_stages m_pipelines
 This is the pre-assembled outgoing pipelines for the different versions that are currently supported, meaning that the stages are traversed in the given order. More...
 
std::atomic< Gcs_protocol_versionm_pipeline_version
 The pipeline version in use. More...
 

Detailed Description

This is the pipeline that an outgoing or incoming message has to go through when being sent to or received from the group respectively.

The message pipeline has stages registered and these are assembled in an outgoing pipeline. Then outgoing messages always have to traverse this pipeline. For incoming messages, the pipeline is built on the fly, according to the information contained in the message stage headers.

The following rules are always enforced to guarantee safety:

. A node always knows how to process protocol versions in the domain [initial version, max-version-known(node)] by keeping a complete versioned pipeline for the entire domain

. Every time the pipeline or a message changes, the protocol version is incremented and new pipeline version is also created accordingly with new codes for all the stages

. Running group can upgrade, but never downgrade, its protocol unless a user explicitly request to downgrade it

. Older nodes attempting to join a group running a newer protocol will discard all messages because the messages will either: (a) contain an unknown cargo type, or (b) contain an unknown type code

— Adding a new stage —

If a developer needs to add a new stage to the pipeline, the header protocol version number has to be incremented and the pipeline stage updated as follows:

Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_modified_old_stage *modified_old_stage = pipeline.register_stage<Gcs_message_modified_old_stage>(); Gcs_message_new_stage *new_stage = pipeline.register_stage<Gcs_message_new_stage>();

pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {modified_old_stage->get_stage_code(), new_stage->get_stage_code()} } });

where X is the header protocol version after the update.

Note that the difference between the two old stages is only the type code.

— Changing stage format —

If a developer needs to change any stage format, i.e. replace an existing stage of the pipeline, the header protocol version number has to be incremented and the pipeline stage updated as follows:

Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_new_stage *new_stage = pipeline.register_stage<Gcs_message_new_stage>();

pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {new_stage->get_stage_code()} } });

where X is the header protocol version after the update.

Note that a new pipeline stage with a unique type code has to be created. Besides, every message will carry the current protocol version in use and this information is available as part of the fixed header and can be read by any stage in order to decide how the message content shall be interpreted.

— Changing Cargo —

If a developer needs to change a cargo format or create a new one, a new cargo type must always be created as the current cargo types are not prepared to be extended and the header protocol version number has to be incremented and the pipeline stage updated as follows:

Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_modified_old_stage *modified_old_stage = pipeline.register_stage<Gcs_message_modified_old_stage>();

pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {modified_old_stage->get_stage_code()} } });

where X is the header protocol version after the update.

Although the cargo type has no direct relation to the message pipeline stages, increasing the protocol version number will allow nodes to decide if versions are compatible. Note that the difference between the two old stages is only the type code.

Constructor & Destructor Documentation

◆ Gcs_message_pipeline() [1/3]

Gcs_message_pipeline::Gcs_message_pipeline ( )
inlineexplicit

◆ Gcs_message_pipeline() [2/3]

Gcs_message_pipeline::Gcs_message_pipeline ( Gcs_message_pipeline p)
delete

◆ Gcs_message_pipeline() [3/3]

Gcs_message_pipeline::Gcs_message_pipeline ( Gcs_message_pipeline &&  p)
delete

◆ ~Gcs_message_pipeline()

virtual Gcs_message_pipeline::~Gcs_message_pipeline ( )
virtualdefault

Member Function Documentation

◆ apply_stage()

std::pair< bool, std::vector< Gcs_packet > > Gcs_message_pipeline::apply_stage ( std::vector< Gcs_packet > &&  packets,
Gcs_message_stage stage 
) const
private

Apply the given stage to the given outgoing packet.

Parameters
packetsThe packet to transform
stageThe stage to apply
Return values
{true,_}If there was an error applying the stage
{false,P}If the stage was successfully applied, and produced the set of transformed packets P

◆ apply_stages()

std::pair< bool, std::vector< Gcs_packet > > Gcs_message_pipeline::apply_stages ( Gcs_packet &&  packet,
std::vector< Stage_code > const &  stages 
) const
private

Apply the given stages to the given outgoing packet.

Parameters
packetThe packet to transform
stagesThe stages to apply
Return values
{true,_}If there was an error applying the stages
{false,P}If the stages were successfully applied, and produced the set of transformed packets P

◆ cleanup()

void Gcs_message_pipeline::cleanup ( )

Clean all data structures and objects created.

◆ contains_pipeline()

bool Gcs_message_pipeline::contains_pipeline ( Gcs_protocol_version  pipeline_version) const
inline

Check whether a pipeline version is registered or not.

Parameters
pipeline_versionPipeline version
Returns
whether a pipeline version is registered or not.

◆ contains_stage()

bool Gcs_message_pipeline::contains_stage ( Stage_code  code) const
inline

Check whether a stage is registered or not.

Parameters
codeStage code
Returns
whether a stage is registered or not.

◆ create_packet()

std::pair< bool, Gcs_packet > Gcs_message_pipeline::create_packet ( Cargo_type const &  cargo,
Gcs_protocol_version const &  current_version,
uint64_t const &  original_payload_size,
std::vector< Stage_code > const &  stages_to_apply 
) const
private

Create a packet for a message with size original_payload_size and type cargo, that will go through the stages stages_to_apply from pipeline version current_version.

Parameters
cargoThe message type
current_versionThe pipeline version
original_payload_sizeThe payload size
stages_to_applyThe stages that will be applied to the packet
Return values
{true,_}If there was an error creating the packet
{false,P}If successful, and created packet P

◆ get_pipeline()

const Gcs_stages_list & Gcs_message_pipeline::get_pipeline ( Gcs_protocol_version  pipeline_version) const
inline

◆ get_snapshot()

Gcs_xcom_synode_set Gcs_message_pipeline::get_snapshot ( ) const

◆ get_stage()

Gcs_message_stage & Gcs_message_pipeline::get_stage ( Stage_code  code) const
inline

◆ get_stages_to_apply()

std::pair< bool, std::vector< Stage_code > > Gcs_message_pipeline::get_stages_to_apply ( Gcs_protocol_version const &  pipeline_version,
uint64_t const &  original_payload_size 
) const
private

Find out which stages should be applied to an outgoing message.

Parameters
pipeline_versionThe pipeline version to use
original_payload_sizeThe size of the outgoing message
Return values
{true,_}If there was an error
{false,S}If successful, and the message should go through the sequence of stages S

◆ get_version()

Gcs_protocol_version Gcs_message_pipeline::get_version ( ) const

Return the pipeline version in use.

◆ operator=() [1/2]

Gcs_message_pipeline & Gcs_message_pipeline::operator= ( const Gcs_message_pipeline p)
delete

◆ operator=() [2/2]

Gcs_message_pipeline & Gcs_message_pipeline::operator= ( Gcs_message_pipeline &&  p)
delete

◆ process_incoming()

std::pair< Gcs_pipeline_incoming_result, Gcs_packet > Gcs_message_pipeline::process_incoming ( Gcs_packet &&  packet) const

This member function SHALL be called by the receiver thread to process the packet through the stages it was processed when it was sent.

This reverts the effect on the receiving end.

Parameters
packetThe packet to process.
Return values
{ERROR,_}If there was an error in the pipeline
{OK_NO_PACKET,NP}If the pipeline was successful, but produced no packet
{OK_PACKET,P}If the pipeline was successful, and produced the packet P

◆ process_outgoing()

std::pair< bool, std::vector< Gcs_packet > > Gcs_message_pipeline::process_outgoing ( Gcs_message_data const &  msg_data,
Cargo_type  cargo 
) const

This member function SHALL be called by the message sender.

It makes the message go through the pipeline of stages before it is actually handed over to the group communication engine.

Note that the fragmentation layer may produce more than one packet.

Parameters
[in]msg_dataMessage data to send.
[in]cargoThe cargo type of the message to send
Return values
{true,_}If there was an error in the pipeline
{false,P}If the pipeline was successful, and produced the set of transformed packets P

◆ register_pipeline()

bool Gcs_message_pipeline::register_pipeline ( std::initializer_list< Gcs_pair_version_stages stages)

Register the stages per version that form the different pipelines.

This method must be called after registering all the desired stages using register_stage.

This method must only be called on an unregistered pipeline. If you want to reuse the pipeline, new calls to this method must be preceded by calls to cleanup and register_stage.

Parameters
stagesInitialization list that contains a mapping between a version and the associated pipeline stages.
Returns
false on success, true otherwise.

◆ register_stage()

template<class T , class... Args>
void Gcs_message_pipeline::register_stage ( Args...  args)
inline

Register a stage to be used by the pipeline.

Template Parameters
TStage class type
ArgsType of Parameters to the stage constructor
Parameters
argsParameters to the stage constructor

◆ retrieve_pipeline()

const Gcs_stages_list * Gcs_message_pipeline::retrieve_pipeline ( Gcs_protocol_version  pipeline_version) const
private

Retrieve the stages associated with a pipeline version.

Parameters
pipeline_versionPipeline version

◆ retrieve_stage() [1/2]

Gcs_message_stage * Gcs_message_pipeline::retrieve_stage ( const Gcs_packet p) const
private

This member function SHALL retrieve the current stage type code of a packet.

Parameters
pthe packet to process.

◆ retrieve_stage() [2/2]

Gcs_message_stage * Gcs_message_pipeline::retrieve_stage ( Stage_code  stage_code) const
private

This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is returned.

Parameters
stage_codeunique stage code

◆ revert_stage()

std::pair< Gcs_pipeline_incoming_result, Gcs_packet > Gcs_message_pipeline::revert_stage ( Gcs_packet &&  packet,
Stage_code const &  stage_code 
) const
private

Revert the given stage to the given incoming packet.

Parameters
packetThe packet to transform
stage_codeThe stage to revert
Return values
{ERROR,_}If there was an error in the stage
{OK_NO_PACKET,NP}If the stage was successfully reverted, but produced no packet
{OK_PACKET,P}If the stage was successfully reverted, and produced the packet P

◆ set_version()

bool Gcs_message_pipeline::set_version ( Gcs_protocol_version  pipeline_version)

Set the pipeline version in use.

Parameters
pipeline_versionPipeline version.
Returns
false if successfully set, true otherwise

◆ update_members_information()

void Gcs_message_pipeline::update_members_information ( const Gcs_member_identifier me,
const Gcs_xcom_nodes xcom_nodes 
) const

Update the list of members in the group as this may be required by some stages in the communication pipeline.

By default though, the call is simply ignored.

Parameters
meThe local member identifier.
xcom_nodesList of members in the group.

Member Data Documentation

◆ m_handlers

Gcs_map_type_handler Gcs_message_pipeline::m_handlers
private

The registered stages.

These are all stages that are known by this version of MySQL GCS. This needs to contain an instance of all possible stages, since it needs to handle cross-version communication.

◆ m_pipeline_version

std::atomic<Gcs_protocol_version> Gcs_message_pipeline::m_pipeline_version
private

The pipeline version in use.

◆ m_pipelines

Gcs_map_version_stages Gcs_message_pipeline::m_pipelines
private

This is the pre-assembled outgoing pipelines for the different versions that are currently supported, meaning that the stages are traversed in the given order.


The documentation for this class was generated from the following files: