WL#11763: Refactor the gcs pipeline to support multiple versions and stages

Affects: Server-8.0   —   Status: Complete

1. Summary
----------
In order to provide backwards compatibility, nodes should be able to negotiate a 
common protocol version and use only a message sub-set that all of them can 
handle. Unfortunately, the current GCS implementation has no means to figure out 
the nodes capabilities and a common protocol version among nodes. Besides, it is 
not prepared to accommodate multiple message formats and multiple message 
pipelines which are a natural consequence of different protocol versions.

This WL aims at extending GCS to figure out a common protocol version among nodes 
and refactoring it to support a different message set and a message pipeline stage 
per protocol version and make the solution easily extensible. Note, however, this 
will be done without breaking backwards compatibility.

2. Deliverable
--------------
The possibility of choosing the appropriate message pipeline stage along with a 
message sub-set after figuring out a common protocol version among nodes and 
easily extending the solution.

Note, however, that the current protocol version will not be changed in this 
worklog context.

3. User stories
---------------
As a Developer, I want to easily change a message format or add new stages to the 
message pipeline without breaking backwards compatibility.

4. Solution
-----------
The correctness of the design stands on the fact that:

 . A node always knows how to process protocol versions in the domain [initial 
   version, max-version-known(node)] by keeping a complete versioned pipeline for
   the entire domain

 . Every time the pipeline or a message changes, we bump the protocol version and
   create a new pipeline version accordingly with new codes for all the stages

 . Nodes will discard messages because they either: (a) contain an unknown cargo 
   type, or (b) contain an unknown type code
1. Requirements
---------------
1.1 GCS should configure its message pipeline stages and message sub-set based on
    a common protocol version among nodes.
1. Problem
----------
The current GCS implementation has no means to retrieve information on nodes 
capabilities in order to figure out a common protocol version that could be used 
among all of them. In other words, GCS cannot decide on the message sub-set nor 
the message pipeline stage that should be used without risking stability.

Besides, the current GCS implementation is not prepared to accommodate multiple 
message formats and multiple message pipelines which are a natural consequence of 
different protocol versions.

2. Background
-------------
Any outgoing or incoming message has to go through a pipeline when it is sent to 
or received from a group respectively.

The message pipeline has stages registered and these are assembled in an outgoing 
pipeline. Then outgoing messages always have to traverse this pipeline. For 
incoming messages, the pipeline is built on the fly, according to information 
contained in the type code in their stage headers.

The current message has the following fields that are relevant to our discussion:

    ---------------------------------------------------------------
    | Fixed Header | Stage Header N | Extra Stage N |   Payload   |
    ---------------------------------------------------------------

In particular, the fixed header has the following fields that are relevant to our
discussion:

    --------------------------------------
    | Version | ... ... ... | Cargo Type |
    --------------------------------------

Each stage header has the following fields that are relevant to our discussion:

    ---------------------------------------------
    | ... .... .... | Type Code | ... .... .... |
    ---------------------------------------------

3. Solution
-----------
The idea is to use the version in the message header to compute a common protocol 
version or the greatest common protocol version among nodes during the state 
exchange phase.

On receiving a global view notification, GCS starts a state exchange phase which 
basically gathers information from all members in the group and delivers it to the 
upper layers along with information on the set of nodes that have joined and left 
the group. During this phase, the current implementation guarantees that data 
messages are buffered and are not delivered to upper layers until all members send 
and process their state exchange messages. Besides, joining nodes cannot send 
messages during this phase meaning that any attempt to send a message triggers an 
error. 

In this WL, the protocol version number in use per member will be passed to the 
state exchange procedure which will gather the information and will compute the 
greatest common protocol version that could be used among them. Finally, it will 
contact the message pipeline object and will check whether the new computed 
protocol version can be deployed or not.

If the computed protocol version can be deployed, the system will proceed as 
usual. Note this will be the case for this WL as it will not change the current 
protocol version. In the future, when the joining nodes may have protocol versions 
that are not compatible with the protocol version in use in the group, any 
offending node will have to be removed from the system. While they remain as part 
of the system, they will not be allowed to send or receive messages.

In summary, these are the tasks:

  . (Extending State Exchange and Communication) Extending the State Exchange
    protocol to decide on a common protocol version among nodes

  . (Making things easy) Making it easy to create multiple message formats
    and pipeline states

  . (Downgrade/Upgrade) Smoothly handling protocol version notifications


4. Extending State Exchange and Communication
---------------------------------------------
All members will start using the maximum protocol version available and thus the 
state exchange messages sent by the node will carry this information.

During the state exchange phase, the information on the protocol version in use 
per member will be propagated to the state exchange module which will compute the 
greatest common protocol version that could be used among them. 

5. Making things easy
---------------------
The bulk of the work is related to accommodate the possibility of having a message 
sub-set and a pipeline stage per protocol version and making things easier for 
future development.

In what follows, it will be presented the rules to add a new state to the 
pipeline, change the message format associated to a stage and change/create a new 
cargo type among other things.

In all cases, the type code of existing stages always needs to be changed and the 
type code associated to a pipeline stage must be unique. The pipeline 
configuration routine will validate these requirements and will throw an error if 
they are not followed.

If a node receives a message from a pipeline stage that is not currently 
registered, the message will be simply discarded. Besides, the implementation will 
guarantee that at least one stage in any pipeline will always be executed. 
Otherwise, a node running an older protocol version may be able to receive such 
messages and will either crash, hang or cause inconsistencies.

--- Adding a new stage ---

If a developer needs to add a new stage to the pipeline, the header protocol 
version number has to be incremented and the pipeline stage updated as follows:

   Gcs_message_old_stage *old_stage = new Gcs_message_old_stage();
   Gcs_message_modified_old_stage *modified_old_stage =
     new Gcs_message_modified_old_stage();
   Gcs_message_new_stage *new_stage = new Gcs_message_new_stage();

   pipeline.configure(
   {
     old_stage, modified_old_stage, new_stage,
   },
   {
     {
       1, {old_stage->type_code()}
     },
     {
       X, {modified_old_stage->type_code(), new_stage->type_code()}
     }
   });

   where X is the header protocol version after the update.

The difference between the Gcs_message_old_state and the 
Gcs_message_modified_old_stage is only the type code.

--- Changing stage format ---

If a developer needs to change any stage format, i.e. replace an existing stage of 
the pipeline, the header protocol version number has to be incremented and the 
pipeline stage updated as follows:

   Gcs_message_old_stage *old_stage = new Gcs_message_old_stage();
   Gcs_message_new_stage *new_stage = new Gcs_message_new_stage();

   pipeline.configure(
   {
     old_stage, new_stage
   },
   {
     {
       1, {old_stage->type_code()}
     },
     {
       X, {new_stage->type_code()}
     }
   });

   where X is the header protocol version after the update.

Note that a new pipeline stage with a unique type code has to be created. Besides, 
every message will carry the current protocol version in use and this information 
is available as part of the fixed header and can be read by any stage in order to 
decide how the message content shall be interpreted. In other words, the idea here 
is to foster code reusability.

--- Changing Cargo ---

If a developer needs to change a cargo format or create a new one, a new cargo 
type must always be created as the current cargo types are not prepared to be 
extended and the header protocol version number has to be incremented and the 
pipeline stage updated as follows:

   Gcs_message_old_stage *old_stage = new Gcs_message_old_stage();
   Gcs_message_modified_old_stage *modified_old_stage =
     new Gcs_message_modified_old_stage();

   pipeline.configure(
   {
     old_stage, modified_old_stage
   },
   {
     {
       1, {old_stage->type_code()}
     },
     {
       X, {modified_old_stage->type_code()}
     }
   });

   where X is the header protocol version after the update.

Although the cargo type has no direct relation to the message pipeline stages, 
increasing the protocol version number will allow nodes to decide if versions are 
compatible. The difference between the Gcs_message_old_state and the 
Gcs_message_modified_old_stage is only the type code.

--- Changing Headers ---

It is not possible to change neither the fixed header nor the stage header without 
breaking backwards compatibility.

--- Deprecating ----

One should usually never remove old pipeline stages, cargo types and the 
associated versions to avoid breaking backwards compatibility. This should only be 
done after going through a formal and thorough deprecation procedure.

6. Downgrade/Upgrade
--------------------
There are no downgrade/upgrade implications as this WL does not create any new 
protocol version.

7. Security
-----------
There are no security implications as this WL does not introduce new messages or 
change any message format.
1 - Defining classes and interfaces
-----------------------------------

--- Pipeline ---

Developers willing to extend the new pipeline should follow the instructions 
described in the previous sections and should use the configure method in the 
pipeline class, i.e. Gcs_message_pipeline.

The configure method has the following signature:

  using pipeline_version_number = unsigned int;
  using Gcs_outgoing_stages = std::vector;
  using Gcs_pair_version_stages =
    std::pair;
  /*
   Configure message pipeline.

   @param handlers Initialization list that contains the set of message handler 
                   objects.
   @param stages Initialization list that contains a mapping between a
                 version and the associated pipeline stages.
   */
  bool configure(std::initializer_list handlers,
                 std::initializer_list stages);

-- Stages ---

Developers willing to create a new stage have to inherit from the 
Gcs_message_stage class and implement six virtual methods that are self 
explanatory:

  . skip_apply() whether outgoing packets should be transformed by the stage 
    somehow.

  . skip_revert() whether incoming packets should be transformed by the stage 
    somehow.

  . calculate_payload_length() calculates the transformed payload.

  . transform_payload_apply() applies a transformation to the current payload 
    and/or append any extra information to the packet.

    Note that pointers to correct places in buffers are provided and the developer 
    only needs to process any additional information.

  . transform_payload_revert() reverts any transformation to the current payload 
    and/or extract any extra information in the packet.

    Note that pointers to correct places in buffers are provided and the developer
    only needs to process any additional information.

  . type_code() returns the unique code of a stage.

For example, implementing a stage that adds a unique a id to a message could be as
simple as follows:

class Gcs_message_id : public Gcs_message_stage {
 public:
  explicit Gcs_message_stage_id() {}

  virtual ~Gcs_message_stage_id() {}

 private:
  virtual enum_type_code type_code() const {
    return static_cast(10);
  }

  virtual Gcs_message_state::stage_status skip_apply(const Gcs_packet &packet) 
  const {
    bool result = (packet.get_payload_length() == 0);
    return result ? Gcs_message_stage::apply : Gcs_message_stage::abort;
  }

  virtual Gcs_message_state::stage_status skip_revert(const Gcs_packet &packet) 
  const {
    bool result = (packet.get_payload_length() == 0);
    return result ? Gcs_message_stage::apply : Gcs_message_stage::abort;
  }

  virtual unsigned long long calculate_payload_length(Gcs_packet &packet) const {
    return packet.get_payload_length() + MESSAGE_ID_SIZE;
  }

  virtual std::pair transform_payload_apply(
    unsigned int version,
    unsigned char *new_payload_ptr,
    unsigned long long new_payload_length,
    unsigned char *old_payload_ptr,
    unsigned long long old_payload_length) {

    assert(version == Gcs_internal_message_header::GCS_PROTO_VERSION);
    assert(new_payload_length == (old_payload_length + MESSAGE_ID_SIZE));

    int id = htole64(get_next_id());
    memcpy(new_payload_ptr, &id, MESSAGE_ID_SIZE);

    memcpy(new_payload_ptr + MESSAGE_ID_SIZE, old_payload_ptr,  
           old_payload_length);

    return std::make_pair(false, new_payload_length);
  }

  virtual std::pair transform_payload_revert(
    unsigned int version,
    unsigned char* new_payload_ptr,
    unsigned long long new_payload_length,
    unsigned char *old_payload_ptr,
    unsigned long long old_payload_length) {

    assert(version == Gcs_internal_message_header::GCS_PROTO_VERSION);
    assert(new_payload_length == (old_payload_length - MESSAGE_ID_SIZE));

    unsigned int id = 0;
    memcpy(&id, old_payload_ptr, MESSAGE_ID_SIZE);
    id = le64toh(id);

    memcpy(new_payload_ptr, old_payload_ptr + MESSAGE_ID_SIZE, 
           new_payload_length);
    return std::make_pair(false, new_payload_length);
  }
};

Note, however, that the current semantics assume that each new stage added to the 
pipeline will allocate a new buffer and copy the payload, which may be transformed 
or not, to it.

This copy assumption makes it easier to create a simple infra-structure to add new 
stages. Currently, this does not represent a performance bottleneck but users 
willing to avoid these extra copy can simply override the apply and the revert 
method after inheriting from the following class:

class Gcs_message_stage {
 private:
  /**
   Checks if the apply operation which affects outgoing packets should be skipped
   or not.

   For example, if a packet's length is less than a pre-defined threshold the
   packet is not compressed.

   @param packet The packet to which the transformation should be applied.
   @return whether the apply operation should be applied, skipped or aborted with
           an error
   */
  virtual stage_status skip_apply(const Gcs_packet &packet) const = 0;

  /**
   Checks if the revert operation which affects incoming packets should be skipped
   or not. This usually represents a malformed packet.

   For example, if the packet length is greater than the maximum allowed
   compressed information an error is returned.

   @param packet the packet upon which the transformation should be applied
   @return whether the revert operation should be applied, skipped or aborted with
           an error
  */
  virtual stage_status skip_revert(const Gcs_packet &packet) const = 0;

  /**
   Calculates or estimates the new payload length that will be produced after
   applying some transformation to the original payload.

   This is used to allocate enough memory to accommodate the transformed payload.
   For example, it is used to allocate a buffer where the compressed payload will
   be stored.

   @param packet The packet upon which the transformation should be applied
   @return the length of the new packet
   */
  virtual unsigned long long calculate_payload_length(Gcs_packet &packet) const
  = 0;

  /**
   Applies some transformation to the old payload and stores the result into a
   new buffer.

   For example, it compresses the old payload and stores the compressed result
   into the new buffer.

   @param protocol_version Protocol version of outgoing packet
   @param new_payload_ptr Pointer to the new payload buffer
   @param new_payload_length Calculated or estimated length of the new payload
   @param old_payload_ptr Pointer to the old payload buffer
   @param old_payload_length Length of the old payload
   @return a pair where the first element indicates whether the transformation
   should be aborted and the second the length of the resulting payload
   */
  virtual std::pair transform_payload_apply(
    unsigned int protocol_version
    unsigned char *new_payload_ptr,
    unsigned long long new_payload_length,
    unsigned char *old_payload_ptr,
    unsigned long long old_payload_length) = 0;

  /**
   Applies some transformation to the old payload and stores the result into a
   new buffer.

   For example, it decompresses the old payload and stores the uncompressed result
   into the new buffer.

   @param protocol_version Protocol version of incoming packet
   @param new_payload_ptr Pointer to the new payload buffer
   @param new_payload_length Calculated or estimated length of the new payload
   @param old_payload_ptr Pointer to the old payload buffer
   @param old_payload_length Length of the old payload
   @return a pair where the first element indicates whether the transformation
   should be aborted and the second the length of the transformed payload
   */
  virtual std::pair transform_payload_revert(
    unsigned int protocol_version,
    unsigned char* new_payload_ptr,
    unsigned long long new_payload_length,
    unsigned char *old_payload_ptr,
    unsigned long long old_payload_length) = 0;

 public:
  /**
   Returns the unique type code of this stage.
   @return the type code of this stage.
   */
  virtual enum_type_code type_code() const = 0;

  /**
   Applies this stage transformation to the outgoing message.

   @param p the packet to which the transformation should be applied.
   @return false on success, true otherwise.
   */
   virtual bool apply(Gcs_packet &p);

  /**
   Reverts the stage transformation on the incoming message.

   @param p the packet to which the transformation should be applied.
   @return false on success, true otherwise.
   */
   virtual bool revert(Gcs_packet &p);

private:
   Replace the current buffer with the result of the new and updated
   buffer.

   @param packet The packet that contains the buffer with the old information.
   @param new_buffer The buffer with the new information.
   @param new_capacity The new buffer capacity.
   @param new_packet_length The new packet length.
   @param dyn_header_length The dynamic header length that will be added or
                            removed from the total amount.
   */
  void swap_buffer(Gcs_packet &packet,
                   unsigned char *new_buffer,
                   unsigned long long new_capacity,
                   unsigned long long new_packet_length,
                   int dyn_header_length);

  /**
   Encodes the fixed part of the associated dynamic header information into
   the header buffer.

   @param header Pointer to the header buffer.
   @param header_length Length of the header information.
   @param old_payload_length Length of previous stage payload.
   */
  void encode(unsigned char *header, unsigned short header_length,
              unsigned long long old_payload_length);

  /**
   Decodes the fixed part of the associated header information from the header
   buffer.

   @param header Pointer to the header buffer
   @param[out] header_length Pointer to the length of the header information
   @param[out] old_payload_length Pointer to the length of previous stage payload
   */
   void decode(const unsigned char *header, unsigned short *header_length,
               unsigned long long *old_payload_length);

  /**
   Calculates the fixed length of the dynamic header information generated by a
   stage.

   @return the length of the dynamic header
   */
  unsigned short calculate_dyn_header_length() const;
};

2. General improvements
-----------------------
We will implement the following improvements as well:

1. Add const to those methods that do not change internal members.

2. Fix the formula to calculate the buffer capacity as multiple of a given size as 
   the old formula was wrong and create a function as it was repeated in several 
   places:

    ((capacity + BLOCK_SIZE - 1) / BLOCK_SIZE) * BLOCK_SIZE

3. Declare and assign values to static const member variables in the class 
   declaration in order to make it possible to use compile time assertions:

     static_assert(
       WIRE_CARGO_TYPE_SIZE * std::numeric_limits::digits >=
       std::numeric_limits::digits,
       "Possible number of cargo types exceeds the storage capacity"
     );

4 - Improve comments, variable names and code whenever appropriate such as 
    removing raw loops.

The following files will be directly affected by such improvements:

plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_internal_message.h
plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_internal_message.cc
plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_message_stage_lz4.h
plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_message_stage_lz4.cc
plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_message_stages.h
plugin/group_replication/libmysqlgcs/src/bindings/xcom/gcs_message_stages.cc