WL#4398: Replication Interface for semi-synchronous replication
Affects: Server-5.5
—
Status: Complete
Replication interface for semi-synchronous replication feature (WL#1720). This enables semi-synchronous replication to be installed as a plugin to the MySQL Server. This interface is also a base to design a more universal replication interface that can be used for various replication extensions and protocols. The interface consists of two parts: A) Four observer interfaces Making it possible for the component to observe actions of the MySQL server B) Server Service Interface for two services: i) NET read/write/flush, and ii) Variable getters (three kinds)
=========================== PART A: OBSERVER INTERFACES =========================== Introduction ============ Semi-synchronous replication blocks the return from commit of a transaction until slave has received the binlog events of the transaction. The master only wait for a limited time, if it does not get a reply after given period, it turns off synchronous replication, and turn on later if the slave catches up. Semi-synchronous replication requires alteration to the control flow of SQL transaction and replication, which includes the alteration of control flow of following server units: Master 1. SQL transaction 2. binlog file 3. binlog dump thread Slave 4. slave IO thread On master, 1. The binlog file update process is altered to report the binlog file name and position after the binlog events of current transaction has been flushed to disk. The binlog filename and position is recorded and then used for sync. 2. The binlog dump thread checks if the binlog file name and position of the event being sent is recorded, and mark the event packet to request an ack from slave if it is. 3. The SQL transaction control flow has been altered to wait for an ack after transaction committed to storage engines and before return to user. On slave, 4. the slave IO thread checks each event if it requires an reply, and send an ack to master after written the event to relay log. In the server, a set of interfaces is implemented to extend the control flow of the SQL transaction and replication server units. Observers ========= There are four server replication interfaces: 1. transaction 2. binlog storage 3. binlog transmit 4. binlog relay IO These four interfaces corresponds to the server units mentioned above. Each interface is a set of callback pointers observing the control flow of that server unit. These observer interfaces can be used by replication plugins or other extension code to observe the control flow and extend how the server works. It is designed to support multiple observers using each observer interface. Delegates ========= The observers for each unit are managed by a Delegate, which is in charge of add/remove observers, and invoke callbacks of observers in appropriate position of the control flow. The delegates are entirely in the server domain, hence not part of the plug-in and therefore not part of the actual interface. They are presented in this worklog since they are part of the implementation, hence can be the target of review comments. Replication Plugin ================== By using the replication interface, we can write replication plugins to extend the service of replication. Semi-synchronous replication will be implemented as two plugins: 1. semisync_master, and 2. semisync_slave. A replication plugin can implement all or some of the observers available. These plugins can be developed and built seperate from the server, and are compatible between server versions if the replication interfaces are compatible between these server versions. Transaction observer ==================== Transaction extension points are required to observe the status of the current transaction. Semi-sync will use it to observe the commit of a transaction and wait until the slave has acknowledged that it has received and buffered the binlog events to the relay log. ``after_commit'' ---------------- This is called after commit to storage engines for transactional tables. For non-transactional tables, this is called at the end of the statement, before sending statement status, if the statement succeeded. Semi-sync use this and the following after_rollback callback to wait for a reply from slave for each transaction or statement (AUTOCOMMIT) befor return. ``after_rollback'' ------------------ This is called after transaction has been rolled back in storage engines for transactional tables. For non-transactional tables, this is called at the end of the statement, before sending statement status, if the statement failed. Binlog storage observer ======================= Binlog storage extension points are required to observe the update of binlog file, which is used by semi-sync to update the binlog filename and position of current transaction. ``after_flush`` --------------- This callback is used after a transactional group (either a real transaction or a single statement with any necessary context events) has been flushed from the in-memory cache and has been written to the binary log. If the variable ``sync_binlog_period`` dictates that the flush should sync the binary log, this callback is after that sync. Semi-sync use this to record the binlog filename and position of each transaction (including statements committed with AUTO COMMIT) need to wait for an ack. Binlog transmit observer ======================== Binlog transmit extension points are required to observe the binlog dumping thread on master. This is used by semi-sync to enable semi-synchronous replication on master side. The master will enable the semi-sync feature if the slave has requested semi-sync to be enabled on the master. When this feature is enabled, the master will wait for an acknowledgment from the slave after each transaction. ``transmit_start'' ------------------ This callback is called when binlog dumping thread starts. On the master component of semi-sync checks if the slave requests semi-sync feature or not, does some preparation for semi-sync replication and increase the number of semi-sync slaves if requested by the slave. ``transmit_stop'' ----------------- This callback is called when binlog dumping thread ends. The semi-sync master component decreases the number of semi-sync slaves here. ``reserve_header'' ------------------ This callback is called to reserve header bytes in the event packet buffer that will be sent to slave. The default header ('\0') is appended first, then each extra header bytes can be appended according to the register order of the observers of this type. Semi-sync use this callback to reserve two extra bytes (magic num, sync flag). ``before_send_event'' --------------------- This callback is called before sending an event packet to slave via net. Semi-sync use this to check if the event sending is the end of a transaction that should require an ack from slave, and set the sync flag in the reserved extra header if it is. ``after_send_event'' -------------------- This callback is called after sending an event packet to slave via net. Semi-sync use this to wait for an ack if current event is the end of a transaction. ``after_reset_master'' ---------------------- This callback is called when command RESET MASTER is issued. Semi-sync use this to reset extra master status variables. Binlog relay IO observer ======================== Binlog relay IO extension points are required to observe slave IO thread. This is used by semi-sync to request semi-sync replication when requesting binlog dumping from master and send a reply after received all events of a transaction. ``thread_start'' ---------------- This callback is called when slave IO thread starts, before connecting to master. Semi-sync slave components set the slave semi-sync status to 1 here. ``thread_stop'' --------------- This callback is called when slave IO thread ends, after connection has been closed. Semi-sync slave components set the slave semi-sync status to 0 here. ``before_request_transmit'' --------------------------- This callback is called after connected to master and before issue the BINLOG_DUMP command to master to request dumping of binlog events. Semi-sync use this to set a flags when requesting binlog dump to indicate synchronous behavior. ``after_read_event'' -------------------- This callback is called after successfully read an event packet from master. Semi-sync use this to check the sync flag in the extra header, set a variable to indicate if need to send a reply for current event, and remove this header. ``after_queue_event'' --------------------- This callback is called after the event has been successfully written to the relay log. Depending on the setting of the ``sync_relay_period`` the relay log might be sync:ed or not. In the event that the relay log is sync:ed, this call is after the sync. Semi-sync use this to send the ack to master if the current event needs a reply. ``after_reset_slave'' --------------------- This callback is called when command RESET SLAVE is issued. Semi-sync use this to reset extra slave IO status variables. ================================ PART B: SERVER SERVICE INTERFACE ================================ The follow Server Service Interface in two parts is defined in the header file "plugin.h". These functions are used by the semi-sync components. 1. Server Service interface: THD NET read/write/flush ================================================= The THD NET functions use in the server to send and receive packets via the thread connection to the client. Semi-sync master component use these functions to receive reply from slave. /** Read a packet from the thread connection. @param packet return the pointer to the packet read @param len return the length of packet read @return 0 on success, 1 on failure. */ int thd_net_read(const uchar **packet, size_t *len); /** Write a packet to the thread connection. @return 0 on success, 1 on failure */ int thd_net_write(const uchar *packet, size_t len); /** Flush write buffer of thread connection. @param thd user thread connection handle @return 0 on success, 1 on failure */ int thd_net_flush(); 2. Server Service interface: MYSQL NET read/write/flush ================================================= The MYSQL NET functions is used on the client side to send and receive packets via the connection to the server. Semi-sync slave component use these functions to send reply to master. /** Read a packet from the connection. @param mysql mysql client connection @param packet return the pointer to the packet read @param len return the length of packet read @return 0 on success, 1 on failure. */ int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len); /** Write a packet to the connection. @param mysql mysql client connection @param packet packet to write to the connection @param len length of the packet @return 0 on success, 1 on failure */ int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len); /** Flush write buffer of connection. @param mysql mysql client connection @return 0 on success, 1 on failure */ int mysql_net_flush(MYSQL *mysql); 3. Server Service interface: User Variables =========================================== These functions are used to get the value of a user variable. In semi-sync, the slave component set the user variable 'rpl_semi_sync_slave' of the connection thread on master to 1 (by issue the command 'SET @rpl_semi_sync_slave=1' to master) if it requests semi-sync replication feature before issue the BINLOG_DUMP command. The master component uses 'get_user_var_int' to get the value of 'rpl_semi_sync_slave' to check if the slave requests semi-sync replication before start binlog dumping. /** Get the value of user variable as an integer. @param name user variable name @param value pointer to return the value @param null_value true if the value of variable is null, false if not @return 0 success, 1 variable not found */ int get_user_var_int(const char *name, longlong *value, my_bool *null_value); /** Get the value of user variable as a double precision float number. @param name user variable name @param value pointer to return the value @param null_value true if the value of variable is null, false if not @return 0 success, 1 variable not found */ int get_user_var_real(const char *name, double *value, my_bool *null_value); /** Get the value of user variable as a string. @param name user variable name @param value pointer to the value buffer @param len length of the value buffer @param precision precision of the value if it is a float number @param null_value true if the value of variable is null, false if not @return 0 success, 1 variable not found */ int get_user_var_str(const char *name, char *value, ulong len, uint precision, my_bool *null_value);
Observers ========= Note that the low-level design might not be identical to the actual code used due to implementation-specific issue. The low-level design provide signatures for functions and storage of data, but omits details that are pure implementation issues (for example, the management of flag values might be handled differently, or extra function calls might be added to structure the code). Observers are described in detail in High-Level Specifications. ``Transaction observer'' ------------------------ /* Transaction observer */ typedef struct Trans_param { uint32 server_id; } Trans_param; typedef struct Trans_observer { uint32 len; bool (*after_commit)(Trans_param *param); bool (*after_rollback)(Trans_param *param); } Trans_observer; ``Binlog storage observer'' --------------------------- /* Binlog storage observer flags */ enum Binlog_storage_flags { // Binary log was sync:ed BINLOG_STORAGE_IS_SYNCED = 1 }; /* Binlog storage observer */ typedef struct Binlog_storage_param { uint32 server_id; } Binlog_storage_param; typedef struct Binlog_storage_observer { uint32 len; bool (*after_flush)(Binlog_storage_param *param, const char *log_file, my_off_t log_pos, uint32 flags); } Binlog_storage_observer; ``Binlog transmit observer'' --------------------------- /* Replication binlog transmitter (binlog dump) observer */ typedef struct Binlog_transmit_param { uint32 server_id; uint32 flags; } Binlog_transmit_param; typedef struct Binlog_transmit_observer { int32 len; bool (*transmit_start)(Binlog_transmit_param *param, const char *log_file, my_off_t log_pos); bool (*transmit_stop)(Binlog_transmit_param *param); bool (*reserve_header)(Binlog_transmit_param *param, String *packet, ulong *len); bool (*before_send_event)(Binlog_transmit_param *param, String *packet, const char *log_file, my_off_t log_pos ); bool (*after_send_event)(Binlog_transmit_param *param, const char *event_buf); bool (*after_reset_master)(Binlog_transmit_param *param); } Binlog_transmit_observer; ``Binlog relay IO observer'' --------------------------- /* Replication binlog relay IO flags */ enum Binlog_relay_flags { // The relay log was sync:ed BINLOG_RELAY_IS_SYNCED = 1 }; /* Replication binlog relay IO observer */ typedef struct Binlog_relay_IO_param { uint32 server_id; uint32 flags; /* Master host, user and port */ char *host; char *user; uint port; char *master_log_name; my_off_t master_log_pos; MYSQL *mysql; /* the connection to master */ } Binlog_relay_IO_param; typedef struct Binlog_relay_IO_observer { int32 len; bool (*thread_start)(Binlog_relay_IO_param *param); bool (*thread_stop)(Binlog_relay_IO_param *param); bool (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 *flags); bool (*after_read_event)(Binlog_relay_IO_param *param, const char *packet, ulong len, const char **event_buf, ulong *event_len); bool (*after_queue_event)(Binlog_relay_IO_param *param, const char *event_buf, ulong event_len, uint32 flags); bool (*after_reset_master)(Binlog_relay_IO_param *param); } Binlog_relay_IO_observer; Observer register/unregister ----------------------------- bool register_trans_observer(Trans_observer *observer); bool unregister_trans_observer(Trans_observer *observer); bool register_binlog_storage_observer(Binlog_storage_observer *observer); bool unregister_binlog_storage_observer(Binlog_storage_observer *observer); bool register_binlog_transmit_observer(Binlog_transmit_observer *observer); bool unregister_binlog_transmit_observer(Binlog_transmit_observer *observer); bool register_binlog_relay_io_observer(Binlog_relay_io_observer *observer); bool unregister_binlog_relay_io_observer(Binlog_relay_io_observer *observer); Delegates ========= A delegate is a class to manage a list of observers of the same type and invoke the callbacks defined by the observers in appropriate position with appropriate arguments. There is a delegate for each type of observer: - transaction delegate - binlog storage delegate - binlog transmit delegate - binlog relay IO delegate ``Base class of Delegates'' The base class Delegate implemented the common operations for all delegates, such as management of observers list and initialization. class Delegate { public: typedef T Observer; typedef ListObserver_list; typedef List_iterator Observer_iterator; bool add_observer(Observer *observer) { ... } bool remove_observer(Observer *observer) { ... } inline Observer_iterator observers_iter() { ... } inline bool is_empty() { ... } inline bool read_lock() { ... } inline bool write_lock() { ... } inline bool unlock() { ... } bool init() { ... } void destroy() { ... } Delegate(){} ~Delegate() {} private: Observer_list observers; rw_lock_t lock; MEM_ROOT memroot; }; Transaction delegate ==================== class Trans_delegate :public Delegate { public: bool after_commit(THD *thd, bool all); bool after_rollback(THD *thd, bool all); }; Binlog storage delegate ======================= class Binlog_storage_delegate :public Delegate{ public: bool after_flush(THD *thd, const char *log_file, my_off_t log_pos); }; Binlog transmit delegate ======================= class Binlog_transmit_delegate :public Delegate { public: bool transmit_start(THD *thd, ushort flags, const char *log_file, my_off_t log_pos); bool transmit_stop(THD *thd, ushort flags); bool reserve_header(THD *thd, ushort flags, String *packet, ulong *len); bool before_send_event(THD *thd, ushort flags, String *packet, const char *log_file, my_off_t log_pos ); bool after_send_event(THD *thd, ushort flags, const char *event_buf); bool after_reset_master(THD *thd, ushort flags); }; Binlog relay IO delegate ======================= class Binlog_relay_IO_delegate :public Delegate { public: bool thread_start(THD *thd, Master_info *mi); bool thread_stop(THD *thd, Master_info *mi); bool before_request_transmit(THD *thd, Master_info *mi, ushort *flags); bool after_read_event(THD *thd, Master_info *mi, const char *packet, ulong len, const char **event_buf, ulong *event_len); bool after_queue_event(THD *thd, Master_info *mi, const char *event_buf, ulong event_len); bool after_reset_slave(THD *thd, Master_info *mi); };
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.