WL#7332: Group Replication: cluster constistent view-id and node attributes propagation

Affects: Server-5.7   —   Status: Complete

During GCS operation there're pieces of information that are requiered to be
consistent across cluster but GCS can't provide that.
Those include for example configuration parameters of a new joining Node
or attributes of the rest of the cluster that Joiner can't be aware of
at connecting time.

This WL specifies a simple State Exchange protocol that is run inside GCS
to obtain the cluster consistent view on necessary attributes. It effectively
implements Primary Component service and ensures the service can't be abused.

The protocol guarantees that at the end of State Exchange all members
of a quorate group (cluster) posseses the same knowledge. At that point
View-change the GCS event (see WL#6829) is delivered to the plugin to
designate consensus on a number of cluster and node parameters.

Besides, this WL designs Message class suite for own as well as other
modules consumption (e.g WL#6837).
FR-1: any attribute that describes the cluster or a node must be P_S-SELECT'able
from any node to yield consistent results.

FR-2: the set of shared attribute must be sufficient to conduct automatic
distributed recovery.
Terminology remark: we speak the terms the node and the member interchangeably.

When a chosen GCS does not provide notion of View and View-id we have to define
those and implement their updating and propagating to the cluster joining nodes.

`View' is defined as as view on the cluster membership and is represented by
its identifier and the membership vector.
The identifier must provide comparison so two views are comparable in terms
of earlier-newer.
The view attributes of the cluster are updated on every node through
View-change event delivery and its handing.

While the membership vector is delivered along with the event by the core
GCS via built-in membership service the other attributes may not, so in
order to be synchronized across the cluster they have to be propagated
using cluster communication. Notice that the core GCS' own "internal"
view-id, such as Corosync's ring-id, can't fit to View-id 'cos
ring-id values ordered only partially. The partial order does not
allow the cluster membership (view) construction being vulnerable
to network partitioning whatever it is caused - by a real network event,
or the core GCS' (false) perception of a failure.

More to view-id the list of such entities for synchronization includes
information related to distributed recovery: node_address, node_port and
node_state.

The cluster members should report to any joining one about their Group
member attribute values when it tries joining the cluster.
Notice that while nodes persist in the cluster they don't have to
exchange that info a second time. Even volatile cluster attribute is
supposed to be re-computed locally and consistently provided the
initial (joining time) value is correct.


Whenever there's a View change any Joiner node is identified when it
presents the last view_id of the cluster that it was a part of.
That is done via Cluster member state message exchange. A message from
every member of being built group contains the aforementioned attributes.
Upon gathering the state messages from all cluster members, joining nodes are
identified,
quorate property of the new configuration is computed and when it's true,
the view_id is incremented. The quorate view is called the Primary Component.
It has the greatest value for the cluster user as normally pieces of the cluster
may not modify data unless a piece represents a majority of the former cluster.
Non-quorate configurations (memberships) set
their view quorate to false and reset their running view-id to zero.

Notice that ultimately, in the crash-recoverable cluster that not provided by
this WL, every node must store the cluster and the node states on disk
to find them at local phase of recovery after a crash/stop.

The sketch of algorithm is as the following
-------------------------------------------

At receiving a quorate View-change every member broadcasts
State message with its last time view-id and node status description
containing the following pieces of info:

1. Distributed recovery related
   node_address, node_port, node_status

2. the old view-id

3. optionally possible flow control and debug info for catching
   various runtime asserts for instance number of unprocessed messages
   in GCS queue.

When all member messages get delivered every node computes a new view-id
as well as verifies values from the neighbors to stop in unlike case its version
disagreement against any majority.
The distributed recovery related group values are stored to be accessible
through a set of new Protocol (WL#6829) methods by interesting parties (e.g
WL#6837).

When State messages delivery is interrupted by another View-change the nodes
break the last round to restart State exchange anew.
While there's no requirement for Primary component (quorate View) to withstand
crashes the Node state is not stored on disk for that purpose.

An essential part of the Primary Component subprotocol is 
a blocking mechanisms to ensure the client can use
the broadcast service only when the local node is in the Primary Component,
to be implemented here.

I_S0: Message classes suite is defined and provided as public API to
      GCS-clustering framework.

I_S1: Local node can't broadcast while it's not in Primary Component.
      The latter service is available only for members of a quorate group.

I_S2: View-id of non-Primary component groups is zero.
Table of contents:
==================

0. Message class, layers and byte array layout
1. The State Message exchange algorithm flow in Low-level terms
2. New classes and classes affected
3. Server interfaces exported
4. Primary Component service enforcement

0. Message class, layers and byte array layout
==============================================

There are two use cases of GCS message.
The Internal that happens inside the binding, such as State Message,
and the Client "regular" case.
The Binding and the Client formally represent the forelast and
the last OSI Message layers.
Despite of that fact, the two layers are designed herein to use the single
Message Class.
The two are separated by a message type slot in the Message header
(see MessageHeader struct) that states which of the Binding or the
Client the message is for. So the message header type is kind of the layer
identifier.
It's a task of the Binding to check out the message type and redirect
the client type message to the plugin message delivery method.
Although in such a case the message is transfered to a higher layer,
the binding does not perform any header manipulation (stripping).

As an array of bytes the message consists of two sections:
the message-header and the payload.
The payload holds serialized (encoded) image of an informational object and the
object's code.

   +---------------------+               +---------------------+
   |  Message            |               |  Serializable       |
   +---------------------+               +---------------------+
   | Message_header  hdr*+----+          +---------------------+
   | uchar[]     payload-+----+----+     | virtual uchar* encode()
   | uchar* get_payload()|    |    |     | virtual Pld_code get_code()
   +---------------------+    |    |     | Serializable (uchar*) // decode
                              |    |     | static Pld_code read_code(m)
                              |    |     | static Pld_code read_data(m)
              +---------------+    |     +---------o-----------+
              |                    |               |
              |                    |               |
   +----------*----------+         |     +---------+-----------+
   |  Message_header     |         +-----+ Concrete            |
   +---------------------+               +---------------------+
   | Msg_type       type |               | uchar* encode()     |
   | ...                 |               | Concrete(uchar*)    |
   +---------------------+               | Pld_code code       |
                                         +---------------------+

Here `---*' is composition relationship, `---o' class association,
tip-less `---' corresponds to a plain association.

Message is composed (---*) of Message_header and a buffer holding
serialized payload. Serialized payload is associated (---) with an
informational instance like of class Concrete that is subclassed
(---0) as depicted.

Both the binding and the client messages are capable of
carrying a range (Pld_code) of different objects.
They both follow the same rules of payload encoding and decoding.
While the message header is accessible via the Message class methods,
details of the payload, including the Payload code of the informational object
are not.

Encoding/Decoding rules
-----------------------

//An object that a message carries is supposed to be
  subclassed from

class Serializable
{
   public:

     virtual char* encode(MessageBuffer) // buffer provided
     virtual Serializable (char*) // decode
     static enum Object_code read_code(m)
       code = *(Object_code*) m.get_payload()
     static char* read_data(Message m)
     {
       uchar *buf;
       buf = m.get_payload()
       return buf+= sizeof(Object_code);
     }
     virtual enum Object_code get_code();
}

Here,

// One enum to map all Classes regardless of their operational scope
typedef Pld_code
enum Object_code
{
  PLD_TRANSACTION_EVENT,
  PLD_STATE_EXCHANGE,
  PLD_RECOVERY_END_BY_JOINER
}

// Having Serializable as a parent is optional though, but mandatory is
   association of such kind of payload with an Object_code value.
   See use cases below.

//A being sent object as the Message payload is an array of bytes.
  The leading bytes hold the Object_code enum.
  The Message instance provides storage for the encoding routine
  results, see MessageBuffer in the following declaration:

class Message
{
   private:

   MessageBuffer buffer;
   Message_header hdr;
   uchar *payload_data;  // at receiving
   void store_header()
   {
     /* fills in members of Message_header struct allocated in first bytes of
`buffer'
        at sending (not at constructing!)
     */
   }
   void restore_header() { /* reads into members from Message_header struct at
receiving */}

   public:

   // Sending time constructor when the Object can't be supplied as an argument
   Message(Pld_code _pcode, Msg_type _mtype) hdr.type(_type)
   {
     store_header()
     append(_pcode);
    // the rest of Payload is done by callers in a style of
    // msg->get_buffer()->append(bytes), where bytes is of uchar[], see use cases
   }

   // Sending time constructor when the Object is available as an argument
   // It's a "shortcut" of typical
   // m = Message(type); m.append(s.get_code()); m.append(s.encode());
   Message(Serializable s, Msg_type _type) hdr.type(_type)
   {
     store_header();
     append(s.get_code());
     append(s.encode(), encoded_len);
   }

   // Receiving time constructor
   Message(uchar *raw_bytes) payload_data(raw_bytes + sizeof(hdr)) {
restore_header(); }

   void append(const unsigned char *s, len)  // Implementation does not literally
     buffer->append(s, len);                 //  repeat the append lines.
   append(enum Object_code val) { buffer.append(val); }

   int get_payload()  { return payload_data; }
   int get_message_type() { return hdr.type; }
   //... other getters from Message Header ...
}

Here,

enum Msg_type
{
  Msg_internal   // WL#7332 to combine with Object_code::Client_info_code as of
current
  Msg_client     // All "regular" classes
}

// The old enum_message_type { MSG_REGULAR }
   changes semantics to expresses its former intent by a new version combined
   with a new enums of Object_code.


struct Message_header
{
  uint64 version;    // (the plugin's) version.
                     // Todo: version handshake (upgrade case) at Group joining
  Msg_type type;

  uint64 micro_time; // Value in microseconds
  uint64 sender_id;  // an unique id, might be GCS core specific such as
corosync's handle
  uint64 view_id;
  uint64 local_cnt;  // sender's total sent message counter
  // If any more entity is added the version must step. Version compatibility
  // verification must adopt the lowest level everywhere in the cluster.
};


Use cases
---------

Send A:

  \exists `c' an instance of class Concrete (subclass of Serializable)
              operating in the Binding

  m = new Message(Msg_internal, c);  // message is ready for sending

Send B:

  `c' operates in the Client e.g Transaction cache it is not subclassed
  but it is (must be) associated with a value-of-Object_code.

  // also \exist
  uchar *image_of_c; // pointer to an array of bytes of a serialized object
  size_t len;        // the array's size

  m = new Message(value-of-Object_code); // message is *not* ready yet for sending
  m.append(image_of_c, len);      // now, message is ready for sending


Receive:

   \exists `m'

  if (Serializable::read_code(m) == Code_j)
    c = new Object_class(Serializable::read_data(m))   // object is restored

here Object_class is a conventional name of a Class e.g Client_info of
WL#7332.


1. The State Exchange algorithm flow in Low-level terms:
========================================================

The GCS event receiver thread runs the following pseudo-code
executing State Exchange sub-protocol.

When Token-ring-change delivered:

   "Corosync_binding"::token_ring_change(cpg_handle_t handle...)

   if (there's a deferred View-change which
       bound to this event)
     awaited_vector.clear();
     update_awaited_vector();
     begin_exchange_states();

When View-change delivered:

   "Corosync_binding"::view_change(cpg_handle_t handle...)
       ...
     reset view; // prepare for installation
     if (the event is not combined with any Token-ring change)
        update_awaited_vector();
        begin_exchange_states();
     else
        defer state-exchange;

In above

  begin_exchange_states(|total-#-members|)
    create State_message;
    broadcast(state_message);

When Message m delivered:

    if m is State message of the current configuration
      quorate= process_state_messages m;
    if (m is the last State msg)
       install(view, quorate);

where

  process_state_messages(Msg m)
    add new_member_state= Member_state(m) to view.member_states;
       awaited_vector[Process id of the orig]--   /* decrements at that index */
       if |view.state_messages| == |total-#-members|
          \for all ms:s in view.member_states such that ms.view < max(view_id)
               ms is a Joiner
          return compute_quorum();
       else
         return false;

Here, view.member_states is a set Member_state objects delivered from new
configuration participants including the local member.
Its size corresponds to the number of non-zero indexes in awaited_vector.

Here is a proof of a claim that implementation relies on:

Claim:  when the last State Message is received
        for all i awaited_vector[i] == 0.

  It's trivially true when in between of the State exchange initiation and
  the final State Message there were no View-changes or Token ring changes.

  When there're View-change but no Totem changes each VC increments
  awaited_vector according to the total set. Each arriving state message
  decrements. Since Totem is stable no messages can lost.
  However messages from declared to be left members in VC will not arrive.
  Even though before that event awaited_vector[member-left id] > 1.
  So when such member shows up in VC's left set, the vector is reset at that index:

      awaited_vector[member-left id] := 0

  This remark proves the stable Totem case.

  Finally, when before the final State Message there's Totem ring change that
  designates no more messages (State messages) that have been broadcast so far
will *ever*
  arrive. That is captured by awaited_vector.clear().
  Each member has to repeat or initiate (when was deferred in case Totem change
is bound
  to a View-change) State message exchange. That is captured with updating
  awaited_vector accordingly to the last View-change membership.
  The rest of the proof repeats the first "trivial" paragraph.


2. New classes and classes affected
=========================================

Legends:
`+' in the 1st column to designate an added attribute,
a method or a class,
`-' when an entity is removed/updated

A.

/*
   Instances of this kind holds
   the local server info as a part of Member state
   description
*/

+class Member_state : public Serializable
+{
+public:
+  ulonglong view_id;
+  Client_info client_info;
+  set member_uuids;
+  uint install_attempt;
+  Pld_code get_code() { return PLD_STATE_EXCHANGE; }
+ ...}

here Client_Info

+class Client_info
+{
+  Client_info(const uchar *msg_raw_data); /* decoder */
+  char* encode();
+  string hostname;
+  uint port;
+  string uuid;
+  Recovery_status status;
+} Client_info;

is the local server info, Recovery_status is defined as

+typedef enum en_recovery_status
+{
+  MEMBER_ONLINE,
+  MEMBER_IN_PROGRESS,
+  MEMBER_OFFLINE
+  MEMBER_SERVING_DONOR
+} Recovery_status;
+

B.

The following class made to contain protocol specific objects such
as Closed Process Group member sets as well as instances related to State exchange.
The class is also augmented with a host of new methods part of which are enclassed
former static functions.


Class Protocol_corosync
{
+  set state_messages /* received State messages data store */;
+  /*
+    Set of id:s in GCS naitive format as reported by View-change handler.
+  */
+  Process_id_set ms_totl, ms_left, ms_join;
+ /*
+   Two following slot serves in State Messages Exchange,
+   to terminate it in particular. The termination condition is
+   emptiness of the vector.
+   It's implemented as Process_id-keyed map.
+   Its value part is updated per CPG view-change perhaps being
+   deferred till Totem ring change event, and the whole map
+   gets reset at View install().
+ */
+ map awaited_vector;
+ /*
+   The flag is set up when Totem-ring-change bound view-change is
+   detected. It affect time when @c awaited_vector is updated.
+ */
+ bool pending_awaited_vector;
+  /*
+    Following a notification on CPG membership or Totem ring change
+    the method adjusts the awaited vector of State message.
+    The rules below aim to tackle a "concurrent" View-change notification
+    that could arrive in the middle of an ongoing exchange:
+
+    \forall member of total set
+       awaited_vector[member]++
+    \forall member of left set
+       awaited_vector[left] = 0
+
+    The first rule reads that any member of the new being installed
+    membership must broadcast a State message and the message will be
+    expected everywhere.
+    The second rules says that when a member is found in the left set
+    its expectations in awaited_vector[left-member] are gone.
+    It can't broadcast anything incl State message after it has left.
+    If it had to sent anything it must 've done that and the message
+    would 've been already delivered.
+  */
+  void update_awaited_vector(bool reset_arg);
+  /*
+    The method is a part of Primary Component computation to be
+    reacted on the Corosync CPG view-change. It clears marks of
+    former Primary Component and reset objects relevant to View
+    installation to the values supplied by the view-change event.
+    Protocol_corosync::is_leaving is computed basing on the received event
+    content.
+  */
+  void reset_view_and_compute_leaving();
+  /*
+    The method processes a State message and when it's found to be the
+    last expected to finalize States exchange and install the new
+    configuration with computed quorate.
+  */
+  void do_process_state_message(Message *ptr_msg, Process_id p_id);
+  /*
+    The method installs a new view to return its quorate property.
+    State exchange objects reset is done as well.
+
+    @return true when quorate property is true, false otherwise.
+  */
+  void install_view(bool quorate);
}

C.

// In View class the local Member identifier is added.
   Superfluous data in local_rinfo are relevanet to distributed recovery
   so the View as such.

Class View
{
+  Client_info& local_rinfo;        /* local member identifier */
}

D.


// In Member class a new attribute of Client_info holds all GCS-neutral descriptors
   of the member including its identifier.
   The member is identified as info.uuid.

+class Member
+{
+public:

+  Client_info info;
+  Protocol_member_id* m_id;

+/**
+   The Protocol_member_id class holds protocol specific identifier that
+   the Member class instance may like have association with.
+   Each interested protocol should define a derived class.
+*/
+class Protocol_member_id
+{
+  ...
+ public:
+    virtual uchar* describe(uchar*, size_t)= 0;
+};

/* The Corosync binding implementation of the above abstract class */
+class Corosync_member_id : public Protocol_member_id
+{
+  ...
+  uchar* describe(uchar *buf, size_t len) {...}
+}

class View : public Group_members

+  /* Two sets to be reported to the Client at view-change event delivery */
+  Member_set left, join;
+  void install(Member_set &ms, bool verdict) { /* computes left, join set as
well */}


3. Server interfaces exported
=============================

/*
   The plugin extracts the recovery info from the server at
   gcs_replication_init() time. A new interface function to extract
   recovery info the server identity
*/
+  get_server_host_port_uuid(char **hostname, uint *port, char** uuid)


4. Primary Component service enforcement
========================================

A Node operational state diagram is on the following picture:

                                  View-change
                  +----------------------+-----------------------+
                  |                      +-------+               |
                  |              --------V-------+------         |
------------------+----         (   State Exchange      )        |
(   Primary Component  )         --------+--------------         |
-----------------------<-----------------+Last State msg         |
                   qurate true           |                       |
                                         | quorate false         |
              -------------------------  |                       |
             ( Non-quorate member      <-+                       |
              --------------^----------                          |
                            |                                    |
                            |                                    |
                            +------------------------------------+


The local node's GCS protocol allows broadcast service only when
the node has Primary Component status.

That's implemented with introducing a mutex guarding the broadcast
critical section. The mutex is locked when the first View-change
happens that makes Primary-Component to State-Exchange transition.
The mutex is hold locked until Primary-Component is restored to signal
resumption to waiting senders.
In the case of transition to the Non-quorate state the senders receive
a negative results out of protocol->broadcast() invocation.