WL#4398: Replication Interface for semi-synchronous replication

Affects: Server-5.5   —   Status: Complete   —   Priority: Medium

Replication interface for semi-synchronous replication feature (WL#1720).
This enables semi-synchronous replication to be installed as a plugin
to the MySQL Server.

This interface is also a base to design a more universal replication 
interface that can be used for various replication extensions and 
protocols.

The interface consists of two parts:

A) Four observer interfaces 
   Making it possible for the component 
   to observe actions of the MySQL server

B) Server Service Interface for two services:
   i) NET read/write/flush, and
   ii) Variable getters (three kinds)
===========================
PART A: OBSERVER INTERFACES
===========================

Introduction
============

Semi-synchronous replication blocks the return from commit of a
transaction until slave has received the binlog events of the
transaction.   The master only wait for a limited time, 
if it does not get a reply after given period, it turns off 
synchronous replication, and turn on later if the slave 
catches up.

Semi-synchronous replication requires alteration to the control flow
of SQL transaction and replication, which includes the alteration of
control flow of following server units:

  Master
    1. SQL transaction
    2. binlog file
    3. binlog dump thread
    
  Slave
    4. slave IO thread

On master, 
1. The binlog file update process is altered to report the
   binlog file name and position after the binlog events of 
   current transaction has been flushed to disk. The binlog 
   filename and position is recorded and then used for sync. 
2. The binlog dump thread checks if the binlog file name and 
   position of the event being sent is recorded, and mark 
   the event packet to request an ack from slave if it
   is. 
3. The SQL transaction control flow has been altered to wait 
   for an ack after transaction committed to storage engines 
   and before return to user. 

On slave, 
4. the slave IO thread checks each event if it requires an
   reply, and send an ack to master after written the event 
   to relay log.

In the server, a set of interfaces is implemented to extend 
the control flow of the SQL transaction and replication server 
units.

Observers
=========

There are four server replication interfaces:
  1. transaction
  2. binlog storage
  3. binlog transmit
  4. binlog relay IO

These four interfaces corresponds to the server units mentioned 
above. Each interface is a set of callback pointers observing 
the control flow of that server unit.  These observer interfaces
can be used by replication plugins or other extension
code to observe the control flow and extend how the server works. 
It is designed to support multiple observers using each 
observer interface.


Delegates
=========

The observers for each unit are managed by a Delegate, which is in
charge of add/remove observers, and invoke callbacks of observers in
appropriate position of the control flow.

The delegates are entirely in the server domain, hence not part of the plug-in
and therefore not part of the actual interface. They are presented in this
worklog since they are part of the implementation, hence can be the target of
review comments.


Replication Plugin
==================

By using the replication interface, we can write replication plugins
to extend the service of replication. Semi-synchronous replication
will be implemented as two plugins: 
1. semisync_master, and
2. semisync_slave. 
A replication plugin can implement all or some of the 
observers available.

These plugins can be developed and built seperate from the server, 
and are compatible between server versions if the replication 
interfaces are compatible between these server versions.

Transaction observer
====================

Transaction extension points are required to observe the status 
of the current transaction. Semi-sync will use it to observe 
the commit of a transaction and wait until the slave has acknowledged
that it has received and buffered the binlog events to the relay log.

``after_commit''
----------------

This is called after commit to storage engines for transactional
tables.

For non-transactional tables, this is called at the end of the
statement, before sending statement status, if the statement
succeeded.

Semi-sync use this and the following after_rollback callback to wait
for a reply from slave for each transaction or statement (AUTOCOMMIT)
befor return.

``after_rollback''
------------------

This is called after transaction has been rolled back in storage
engines for transactional tables.

For non-transactional tables, this is called at the end of the
statement, before sending statement status, if the statement failed.


Binlog storage observer
=======================

Binlog storage extension points are required to observe the 
update of binlog file, which is used by semi-sync to update 
the binlog filename and position of current transaction.

``after_flush``
---------------

This callback is used after a transactional group (either a real transaction or
a single statement with any necessary context events) has been flushed from the
in-memory cache and has been written to the binary log. If the variable
``sync_binlog_period`` dictates that the flush should sync the binary log, this
callback is after that sync.

Semi-sync use this to record the binlog filename and position of each
transaction (including statements committed with AUTO COMMIT) 
need to wait for an ack.

Binlog transmit observer
========================

Binlog transmit extension points are required to observe the binlog 
dumping thread on master.  This is used by semi-sync to enable
semi-synchronous replication on master side. 

The master will enable the semi-sync feature if the
slave has requested semi-sync to be enabled on the master.  
When this feature is enabled, the master will wait for an
acknowledgment from the slave after each transaction.

``transmit_start''
------------------

This callback is called when binlog dumping thread starts.

On the master component of semi-sync checks if the slave requests semi-sync
feature or not, does some preparation for semi-sync replication and increase the
number of semi-sync slaves if requested by the slave.

``transmit_stop''
-----------------

This callback is called when binlog dumping thread ends.

The semi-sync master component decreases the number of semi-sync slaves here.


``reserve_header''
------------------

This callback is called to reserve header bytes in the event packet
buffer that will be sent to slave.

The default header ('\0') is appended first, then each extra header
bytes can be appended according to the register order of the observers
of this type.

Semi-sync use this callback to reserve two extra bytes
(magic num, sync flag).

``before_send_event''
---------------------

This callback is called before sending an event packet to slave via
net.

Semi-sync use this to check if the event sending is the end of a
transaction that should require an ack from slave, and set the sync
flag in the reserved extra header if it is.

``after_send_event''
--------------------

This callback is called after sending an event packet to slave via
net.

Semi-sync use this to wait for an ack if current event is the end of a
transaction.

``after_reset_master''
----------------------

This callback is called when command RESET MASTER is issued.

Semi-sync use this to reset extra master status variables.

Binlog relay IO observer
========================

Binlog relay IO extension points are required to observe slave 
IO thread.  This is used by semi-sync to request semi-sync 
replication when requesting binlog dumping from master and 
send a reply after received all events of a transaction.

``thread_start''
----------------

This callback is called when slave IO thread starts, 
before connecting to master.

Semi-sync slave components set the slave semi-sync status to 1 here.

``thread_stop''
---------------

This callback is called when slave IO thread ends, 
after connection has been closed.

Semi-sync slave components set the slave semi-sync status to 0 here.

``before_request_transmit''
---------------------------

This callback is called after connected to master and before issue the
BINLOG_DUMP command to master to request dumping of binlog events.

Semi-sync use this to set a flags when requesting binlog dump to
indicate synchronous behavior.

``after_read_event''
--------------------

This callback is called after successfully read an event packet from
master.

Semi-sync use this to check the sync flag in the extra header, set a
variable to indicate if need to send a reply for current event, and
remove this header.

``after_queue_event''
---------------------

This callback is called after the event has been successfully written to the
relay log. Depending on the setting of the ``sync_relay_period`` the relay log
might be sync:ed or not. In the event that the relay log is sync:ed, this call
is after the sync.

Semi-sync use this to send the ack to master if the current event
needs a reply.

``after_reset_slave''
---------------------

This callback is called when command RESET SLAVE is issued.

Semi-sync use this to reset extra slave IO status variables.

================================
PART B: SERVER SERVICE INTERFACE
================================

The follow Server Service Interface in two parts is defined 
in the header file "plugin.h".

These functions are used by the semi-sync components.

1. Server Service interface: THD NET read/write/flush
=================================================

The THD NET functions use in the server to send and receive packets via the
thread connection to the client.
 
Semi-sync master component use these functions to receive reply from slave.

/**
   Read a packet from the thread connection.

   @param packet   return the pointer to the packet read
   @param len      return the length of packet read

   @return 0 on success, 1 on failure.
*/
int thd_net_read(const uchar **packet, size_t *len);

/**
   Write a packet to the thread connection.

   @return 0 on success, 1 on failure
*/
int thd_net_write(const uchar *packet, size_t len);

/**
   Flush write buffer of thread connection.

   @param thd      user thread connection handle

   @return 0 on success, 1 on failure
*/
int thd_net_flush();

2. Server Service interface: MYSQL NET read/write/flush
=================================================

The MYSQL NET functions is used on the client side to send and receive packets
via the connection to the server.

Semi-sync slave component use these functions to send reply to master.


/**
   Read a packet from the connection.

   @param mysql    mysql client connection
   @param packet   return the pointer to the packet read
   @param len      return the length of packet read

   @return 0 on success, 1 on failure.
*/
int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len);

/**
   Write a packet to the connection.

   @param mysql    mysql client connection
   @param packet   packet to write to the connection
   @param len      length of the packet
   @return 0 on success, 1 on failure
*/
int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len);

/**
   Flush write buffer of connection.

   @param mysql    mysql client connection

   @return 0 on success, 1 on failure
*/
int mysql_net_flush(MYSQL *mysql);

3. Server Service interface: User Variables
===========================================

These functions are used to get the value of a user variable.

In semi-sync, the slave component set the user variable 'rpl_semi_sync_slave' of
the connection thread on master to 1 (by issue the command 'SET
@rpl_semi_sync_slave=1' to master) if it requests semi-sync replication feature
before issue the BINLOG_DUMP command. The master component uses
'get_user_var_int' to get the value of 'rpl_semi_sync_slave' to check if the
slave requests semi-sync replication before start binlog dumping.


/**
   Get the value of user variable as an integer.

   @param name     user variable name
   @param value    pointer to return the value
   @param null_value true if the value of variable is null, false if not

   @return 0 success, 1 variable not found
*/
int get_user_var_int(const char *name, longlong *value, 
                     my_bool *null_value);

/**
   Get the value of user variable as a double precision float number.

   @param name     user variable name
   @param value    pointer to return the value
   @param null_value true if the value of variable is null, false if not

   @return 0 success, 1 variable not found
*/
int get_user_var_real(const char *name, double *value, 
                      my_bool *null_value);

/**
   Get the value of user variable as a string.

   @param name     user variable name
   @param value    pointer to the value buffer
   @param len      length of the value buffer
   @param precision precision of the value if it is a float number
   @param null_value true if the value of variable is null, false if not

   @return 0 success, 1 variable not found
*/
int get_user_var_str(const char *name, char *value, ulong len, 
                     uint precision, my_bool *null_value);
Observers
=========

Note that the low-level design might not be identical to the actual code used
due to implementation-specific issue. The low-level design provide signatures
for functions and storage of data, but omits details that are pure
implementation issues (for example, the management of flag values might be
handled differently, or extra function calls might be added to structure the code).

Observers are described in detail in High-Level Specifications.

``Transaction observer''
------------------------

/*
  Transaction observer
 */
typedef struct Trans_param {
  uint32 server_id;
} Trans_param;

typedef struct Trans_observer {
  uint32 len;

  bool (*after_commit)(Trans_param *param);
  bool (*after_rollback)(Trans_param *param);
} Trans_observer;

``Binlog storage observer''
---------------------------


/*
  Binlog storage observer flags
*/
enum Binlog_storage_flags {
  // Binary log was sync:ed
  BINLOG_STORAGE_IS_SYNCED = 1
};

/*
  Binlog storage observer
 */

typedef struct Binlog_storage_param {
  uint32 server_id;
} Binlog_storage_param;

typedef struct Binlog_storage_observer {
  uint32 len;
  
  bool (*after_flush)(Binlog_storage_param *param,
			const char *log_file, my_off_t log_pos, uint32 flags);
} Binlog_storage_observer;


``Binlog transmit observer''
---------------------------

/*
  Replication binlog transmitter (binlog dump) observer
 */
typedef struct Binlog_transmit_param {
  uint32 server_id;
  uint32 flags;
} Binlog_transmit_param;

typedef struct Binlog_transmit_observer {
  int32 len;

  bool (*transmit_start)(Binlog_transmit_param *param,
		const char *log_file, my_off_t log_pos);
  bool (*transmit_stop)(Binlog_transmit_param *param);
  bool (*reserve_header)(Binlog_transmit_param *param, 
                         String *packet, ulong *len);
  bool (*before_send_event)(Binlog_transmit_param *param,
			    String *packet, const
			    char *log_file, my_off_t log_pos );
  bool (*after_send_event)(Binlog_transmit_param *param,
			   const char *event_buf);
  bool (*after_reset_master)(Binlog_transmit_param *param);
} Binlog_transmit_observer;


``Binlog relay IO observer''
---------------------------

/*
  Replication binlog relay IO flags
*/

enum Binlog_relay_flags {
  // The relay log was sync:ed
  BINLOG_RELAY_IS_SYNCED = 1
};

/*
  Replication binlog relay IO observer
 */
typedef struct Binlog_relay_IO_param {
  uint32 server_id;
  uint32 flags;

  /* Master host, user and port */
  char *host;
  char *user;
  uint port;

  char *master_log_name;
  my_off_t master_log_pos;

  MYSQL *mysql;			/* the connection to master */

} Binlog_relay_IO_param;

typedef struct Binlog_relay_IO_observer {
  int32 len;

  bool (*thread_start)(Binlog_relay_IO_param *param);
  bool (*thread_stop)(Binlog_relay_IO_param *param);
  bool (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 *flags);
  bool (*after_read_event)(Binlog_relay_IO_param *param,
			  const char *packet, ulong len,
			  const char **event_buf, ulong *event_len);
  bool (*after_queue_event)(Binlog_relay_IO_param *param,
			   const char *event_buf, ulong event_len, 
                           uint32 flags);
  bool (*after_reset_master)(Binlog_relay_IO_param *param);
} Binlog_relay_IO_observer;


Observer register/unregister
-----------------------------
bool register_trans_observer(Trans_observer *observer);
bool unregister_trans_observer(Trans_observer *observer);

bool register_binlog_storage_observer(Binlog_storage_observer *observer);
bool unregister_binlog_storage_observer(Binlog_storage_observer *observer);

bool register_binlog_transmit_observer(Binlog_transmit_observer *observer);
bool unregister_binlog_transmit_observer(Binlog_transmit_observer *observer);

bool register_binlog_relay_io_observer(Binlog_relay_io_observer *observer);
bool unregister_binlog_relay_io_observer(Binlog_relay_io_observer *observer);


Delegates
=========

A delegate is a class to manage a list of observers of the same type and
invoke the callbacks defined by the observers in appropriate position with
appropriate arguments.

There is a delegate for each type of observer:
  - transaction delegate
  - binlog storage delegate
  - binlog transmit delegate
  - binlog relay IO delegate

``Base class of Delegates''

The base class Delegate implemented the common operations for all
delegates, such as management of observers list and initialization.

class Delegate {
public:
  typedef T Observer;
  typedef List<T> Observer_list;
  typedef List_iterator<T> Observer_iterator;
  bool add_observer(Observer *observer)
  { ... }
  bool remove_observer(Observer *observer)
  { ... }

  inline Observer_iterator observers_iter()
  { ... }

  inline bool is_empty()
  { ... }

  inline bool read_lock()
  { ... }

  inline bool write_lock()
  { ... }

  inline bool unlock()
  { ... }
  
  bool init()
  { ... }

  void destroy()
  { ... }

  Delegate(){}
  ~Delegate() {}

private:
  Observer_list observers;
  rw_lock_t lock;
  MEM_ROOT memroot;
};

Transaction delegate
====================

class Trans_delegate
  :public Delegate {
public:
  bool after_commit(THD *thd, bool all);
  bool after_rollback(THD *thd, bool all);
};

Binlog storage delegate
=======================

class Binlog_storage_delegate
  :public Delegate{
public:
  bool after_flush(THD *thd, const char *log_file, my_off_t log_pos);
};

Binlog transmit delegate
=======================

class Binlog_transmit_delegate
  :public Delegate {
public:
  bool transmit_start(THD *thd, ushort flags,
	     const char *log_file, my_off_t log_pos);
  bool transmit_stop(THD *thd, ushort flags);
  bool reserve_header(THD *thd, ushort flags, String *packet, ulong *len);
  bool before_send_event(THD *thd, ushort flags,
			 String *packet, const
			 char *log_file, my_off_t log_pos );
  bool after_send_event(THD *thd, ushort flags,
			const char *event_buf);
  bool after_reset_master(THD *thd, ushort flags);
};

Binlog relay IO delegate
=======================

class Binlog_relay_IO_delegate
  :public Delegate {
public:
  bool thread_start(THD *thd, Master_info *mi);
  bool thread_stop(THD *thd, Master_info *mi);
  bool before_request_transmit(THD *thd, Master_info *mi, ushort *flags);
  bool after_read_event(THD *thd, Master_info *mi,
			const char *packet, ulong len,
			const char **event_buf, ulong *event_len);
  bool after_queue_event(THD *thd, Master_info *mi,
			 const char *event_buf, ulong event_len);
  bool after_reset_slave(THD *thd, Master_info *mi);
};