|  | MySQL 8.0.43
    Source Code Documentation | 
This is the pipeline that an outgoing or incoming message has to go through when being sent to or received from the group respectively. More...
#include <gcs_message_stages.h>
| Public Member Functions | |
| Gcs_message_pipeline () | |
| Gcs_message_pipeline (Gcs_message_pipeline &p)=delete | |
| Gcs_message_pipeline & | operator= (const Gcs_message_pipeline &p)=delete | 
| Gcs_message_pipeline (Gcs_message_pipeline &&p)=delete | |
| Gcs_message_pipeline & | operator= (Gcs_message_pipeline &&p)=delete | 
| virtual | ~Gcs_message_pipeline ()=default | 
| std::pair< bool, std::vector< Gcs_packet > > | process_outgoing (Gcs_message_data const &msg_data, Cargo_type cargo) const | 
| This member function SHALL be called by the message sender.  More... | |
| std::pair< Gcs_pipeline_incoming_result, Gcs_packet > | process_incoming (Gcs_packet &&packet) const | 
| This member function SHALL be called by the receiver thread to process the packet through the stages it was processed when it was sent.  More... | |
| void | update_members_information (const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) const | 
| Update the list of members in the group as this may be required by some stages in the communication pipeline.  More... | |
| Gcs_xcom_synode_set | get_snapshot () const | 
| template<class T , class... Args> | |
| void | register_stage (Args... args) | 
| Register a stage to be used by the pipeline.  More... | |
| bool | contains_stage (Stage_code code) const | 
| Check whether a stage is registered or not.  More... | |
| Gcs_message_stage & | get_stage (Stage_code code) const | 
| bool | register_pipeline (std::initializer_list< Gcs_pair_version_stages > stages) | 
| Register the stages per version that form the different pipelines.  More... | |
| bool | contains_pipeline (Gcs_protocol_version pipeline_version) const | 
| Check whether a pipeline version is registered or not.  More... | |
| const Gcs_stages_list & | get_pipeline (Gcs_protocol_version pipeline_version) const | 
| void | cleanup () | 
| Clean all data structures and objects created.  More... | |
| bool | set_version (Gcs_protocol_version pipeline_version) | 
| Set the pipeline version in use.  More... | |
| Gcs_protocol_version | get_version () const | 
| Return the pipeline version in use.  More... | |
| Private Member Functions | |
| const Gcs_stages_list * | retrieve_pipeline (Gcs_protocol_version pipeline_version) const | 
| Retrieve the stages associated with a pipeline version.  More... | |
| Gcs_message_stage * | retrieve_stage (Stage_code stage_code) const | 
| This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is returned.  More... | |
| Gcs_message_stage * | retrieve_stage (const Gcs_packet &p) const | 
| This member function SHALL retrieve the current stage type code of a packet.  More... | |
| std::pair< bool, std::vector< Stage_code > > | get_stages_to_apply (Gcs_protocol_version const &pipeline_version, uint64_t const &original_payload_size) const | 
| Find out which stages should be applied to an outgoing message.  More... | |
| std::pair< bool, Gcs_packet > | create_packet (Cargo_type const &cargo, Gcs_protocol_version const ¤t_version, uint64_t const &original_payload_size, std::vector< Stage_code > const &stages_to_apply) const | 
| Create a packet for a message with size original_payload_size and type cargo, that will go through the stages stages_to_apply from pipeline version current_version.  More... | |
| std::pair< bool, std::vector< Gcs_packet > > | apply_stages (Gcs_packet &&packet, std::vector< Stage_code > const &stages) const | 
| Apply the given stages to the given outgoing packet.  More... | |
| std::pair< bool, std::vector< Gcs_packet > > | apply_stage (std::vector< Gcs_packet > &&packets, Gcs_message_stage &stage) const | 
| Apply the given stage to the given outgoing packet.  More... | |
| std::pair< Gcs_pipeline_incoming_result, Gcs_packet > | revert_stage (Gcs_packet &&packet, Stage_code const &stage_code) const | 
| Revert the given stage to the given incoming packet.  More... | |
| Private Attributes | |
| Gcs_map_type_handler | m_handlers | 
| The registered stages.  More... | |
| Gcs_map_version_stages | m_pipelines | 
| This is the pre-assembled outgoing pipelines for the different versions that are currently supported, meaning that the stages are traversed in the given order.  More... | |
| std::atomic< Gcs_protocol_version > | m_pipeline_version | 
| The pipeline version in use.  More... | |
This is the pipeline that an outgoing or incoming message has to go through when being sent to or received from the group respectively.
The message pipeline has stages registered and these are assembled in an outgoing pipeline. Then outgoing messages always have to traverse this pipeline. For incoming messages, the pipeline is built on the fly, according to the information contained in the message stage headers.
The following rules are always enforced to guarantee safety:
. A node always knows how to process protocol versions in the domain [initial version, max-version-known(node)] by keeping a complete versioned pipeline for the entire domain
. Every time the pipeline or a message changes, the protocol version is incremented and new pipeline version is also created accordingly with new codes for all the stages
. Running group can upgrade, but never downgrade, its protocol unless a user explicitly request to downgrade it
. Older nodes attempting to join a group running a newer protocol will discard all messages because the messages will either: (a) contain an unknown cargo type, or (b) contain an unknown type code
— Adding a new stage —
If a developer needs to add a new stage to the pipeline, the header protocol version number has to be incremented and the pipeline stage updated as follows:
Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_modified_old_stage *modified_old_stage = pipeline.register_stage<Gcs_message_modified_old_stage>(); Gcs_message_new_stage *new_stage = pipeline.register_stage<Gcs_message_new_stage>();
pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {modified_old_stage->get_stage_code(), new_stage->get_stage_code()} } });
where X is the header protocol version after the update.
Note that the difference between the two old stages is only the type code.
— Changing stage format —
If a developer needs to change any stage format, i.e. replace an existing stage of the pipeline, the header protocol version number has to be incremented and the pipeline stage updated as follows:
Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_new_stage *new_stage = pipeline.register_stage<Gcs_message_new_stage>();
pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {new_stage->get_stage_code()} } });
where X is the header protocol version after the update.
Note that a new pipeline stage with a unique type code has to be created. Besides, every message will carry the current protocol version in use and this information is available as part of the fixed header and can be read by any stage in order to decide how the message content shall be interpreted.
— Changing Cargo —
If a developer needs to change a cargo format or create a new one, a new cargo type must always be created as the current cargo types are not prepared to be extended and the header protocol version number has to be incremented and the pipeline stage updated as follows:
Gcs_message_old_stage *old_stage = pipeline.register_stage<Gcs_message_old_stage>(); Gcs_message_modified_old_stage *modified_old_stage = pipeline.register_stage<Gcs_message_modified_old_stage>();
pipeline.register_pipeline( { { 1, {old_stage->get_stage_code()} }, { X, {modified_old_stage->get_stage_code()} } });
where X is the header protocol version after the update.
Although the cargo type has no direct relation to the message pipeline stages, increasing the protocol version number will allow nodes to decide if versions are compatible. Note that the difference between the two old stages is only the type code.
| 
 | inlineexplicit | 
| 
 | delete | 
| 
 | delete | 
| 
 | virtualdefault | 
| 
 | private | 
Apply the given stage to the given outgoing packet.
| packets | The packet to transform | 
| stage | The stage to apply | 
| {true,_} | If there was an error applying the stage | 
| {false,P} | If the stage was successfully applied, and produced the set of transformed packets P | 
| 
 | private | 
Apply the given stages to the given outgoing packet.
| packet | The packet to transform | 
| stages | The stages to apply | 
| {true,_} | If there was an error applying the stages | 
| {false,P} | If the stages were successfully applied, and produced the set of transformed packets P | 
| void Gcs_message_pipeline::cleanup | ( | ) | 
Clean all data structures and objects created.
| 
 | inline | 
Check whether a pipeline version is registered or not.
| pipeline_version | Pipeline version | 
| 
 | inline | 
Check whether a stage is registered or not.
| code | Stage code | 
| 
 | private | 
Create a packet for a message with size original_payload_size and type cargo, that will go through the stages stages_to_apply from pipeline version current_version.
| cargo | The message type | 
| current_version | The pipeline version | 
| original_payload_size | The payload size | 
| stages_to_apply | The stages that will be applied to the packet | 
| {true,_} | If there was an error creating the packet | 
| {false,P} | If successful, and created packet P | 
| 
 | inline | 
| Gcs_xcom_synode_set Gcs_message_pipeline::get_snapshot | ( | ) | const | 
| 
 | inline | 
| 
 | private | 
Find out which stages should be applied to an outgoing message.
| pipeline_version | The pipeline version to use | 
| original_payload_size | The size of the outgoing message | 
| {true,_} | If there was an error | 
| {false,S} | If successful, and the message should go through the sequence of stages S | 
| Gcs_protocol_version Gcs_message_pipeline::get_version | ( | ) | const | 
Return the pipeline version in use.
| 
 | delete | 
| 
 | delete | 
| std::pair< Gcs_pipeline_incoming_result, Gcs_packet > Gcs_message_pipeline::process_incoming | ( | Gcs_packet && | packet | ) | const | 
This member function SHALL be called by the receiver thread to process the packet through the stages it was processed when it was sent.
This reverts the effect on the receiving end.
| packet | The packet to process. | 
| {ERROR,_} | If there was an error in the pipeline | 
| {OK_NO_PACKET,NP} | If the pipeline was successful, but produced no packet | 
| {OK_PACKET,P} | If the pipeline was successful, and produced the packet P | 
| std::pair< bool, std::vector< Gcs_packet > > Gcs_message_pipeline::process_outgoing | ( | Gcs_message_data const & | msg_data, | 
| Cargo_type | cargo | ||
| ) | const | 
This member function SHALL be called by the message sender.
It makes the message go through the pipeline of stages before it is actually handed over to the group communication engine.
Note that the fragmentation layer may produce more than one packet.
| [in] | msg_data | Message data to send. | 
| [in] | cargo | The cargo type of the message to send | 
| {true,_} | If there was an error in the pipeline | 
| {false,P} | If the pipeline was successful, and produced the set of transformed packets P | 
| bool Gcs_message_pipeline::register_pipeline | ( | std::initializer_list< Gcs_pair_version_stages > | stages | ) | 
Register the stages per version that form the different pipelines.
This method must be called after registering all the desired stages using register_stage.
This method must only be called on an unregistered pipeline. If you want to reuse the pipeline, new calls to this method must be preceded by calls to cleanup and register_stage.
| stages | Initialization list that contains a mapping between a version and the associated pipeline stages. | 
| 
 | inline | 
Register a stage to be used by the pipeline.
| T | Stage class type | 
| Args | Type of Parameters to the stage constructor | 
| args | Parameters to the stage constructor | 
| 
 | private | 
Retrieve the stages associated with a pipeline version.
| pipeline_version | Pipeline version | 
| 
 | private | 
This member function SHALL retrieve the current stage type code of a packet.
| p | the packet to process. | 
| 
 | private | 
This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is returned.
| stage_code | unique stage code | 
| 
 | private | 
Revert the given stage to the given incoming packet.
| packet | The packet to transform | 
| stage_code | The stage to revert | 
| {ERROR,_} | If there was an error in the stage | 
| {OK_NO_PACKET,NP} | If the stage was successfully reverted, but produced no packet | 
| {OK_PACKET,P} | If the stage was successfully reverted, and produced the packet P | 
| bool Gcs_message_pipeline::set_version | ( | Gcs_protocol_version | pipeline_version | ) | 
Set the pipeline version in use.
| pipeline_version | Pipeline version. | 
| void Gcs_message_pipeline::update_members_information | ( | const Gcs_member_identifier & | me, | 
| const Gcs_xcom_nodes & | xcom_nodes | ||
| ) | const | 
Update the list of members in the group as this may be required by some stages in the communication pipeline.
By default though, the call is simply ignored.
| me | The local member identifier. | 
| xcom_nodes | List of members in the group. | 
| 
 | private | 
The registered stages.
These are all stages that are known by this version of MySQL GCS. This needs to contain an instance of all possible stages, since it needs to handle cross-version communication.
| 
 | private | 
The pipeline version in use.
| 
 | private | 
This is the pre-assembled outgoing pipelines for the different versions that are currently supported, meaning that the stages are traversed in the given order.