WL#6837: Group Replication: Online distributed recovery
Affects: Server-5.7
—
Status: Complete
EXECUTIVE SUMMARY ================= On recovery, the server MUST recover from other servers in the network, in a distributed fashion. That way the replica can get up to date automatically, before actually start serving requests. PROBLEM STATEMENT ================= When a server joins a group, it MUST get all the missing transactions, where the simplest of the ways is by native asynchronous replication. While it gets them, it buffers other incoming transactions from the group communication layer (because the all of this happens online) and only once the recovery is complete, it starts applying those transactions that have arrived in the meantime. Once these two queues are flushed, the server can be brought online (i.e., can start accepting (update/delete/insert) requests). In theory the server can start serving read requests, but then it will be exporting stale data. There are several approaches to do this: multiple recovery rounds, one recovery round with buffering of incoming transactions, etc...
FR1: All messages from a new view can only be delivered when handler for view changes returns. FR2: The view id and the connection info about all online nodes (possible donors) should be available on recovery. FR3: Methods for extracting the Certification Database and setting it must be available. FR4: If no donor is found or no connection can be established, recovery aborts, an error is written in the log and the node leaves the group FR5: Connection options must be configurable, allowing to set a replication user and password FR6: Donor connection retries must be configurable.
Assuming a simple view synchronization model, we present here a generic recovery algorithm to be implemented outside the group communication layer upon view changes were the joining node needs to catch up with the rest of the cluster. When the node finally catches up, his state changes and this info is shared among the nodes, but there is no new induced communication group changes. From a higher up observation point, recovery is based on handling a incoming view change episode that: * on the joiner side: The applier_module (WL#6842) is paused, queuing incoming messages, and a donor is selected among the available/suitable cluster nodes so that all the data that is missing in the joiner up to the view change is transferred. After this step ends, the applier_module resumes and applies the queued messages, certifying them with the data also received from the donor. When it manages to catch up with the cluster, the joiner declares itself as online. * on the donor side: Upon a view change the donor marks this moment by creating a new replication log event that will end up in its binary log. This event will also contain certification data that will allow the joiner to certify future events. When sending the data to the joiner, this new event will allow the joiner to know when it has received all the pre-view data. NEW ELEMENTS ============ + View_change_log_event: The view change log event is a new log event that is created in all online nodes upon a view change GCS event. On the joiner when received, it represents the last event from the views that preceded the current new view. + START SQL THREAD UNTIL VIEW_ID= view_id A new mode for the SQL thread that executes all events until a View_change_event containing a view id equal to the given one is found. This mode shall not be public, at least for now. + Recovery end / Node online message: A new type of message that goes into the communication layer every time a joining node catches up with the cluster. This message has the purpose of letting the other cluster nodes knowing that the node is + New node states As of now, the node only has two possible states: "Online" and "Offline". With this worklog we introduce the notion of recovery and with it, the status "Recovering" is also added. Another added status is "Donor" that marks a node when a node is fulfilling this role. ALGORITHM ========= From the generic overview we can now pass to a high level description of the algorithm from both points of view. #JOINER 1) The handler_view_change method is invoked. This should be synchronized with message reception from the group communication to guarantee that we can queue packets/execute methods in the exact moment before the first view message arrives. 2) The joiner adds to the applier_module queue a message to suspend the applier_module activity. Note that the applier_module will only suspend after consuming possible messages from older views, a situation that can be caused by short disconnections 3) The recovery thread is launched. 4) The node state is changed to "Recovering" 5) The method returns and message queuing into the applier_module's queue proceeds as usual. -- On the newly launched recovery thread 6) Wait for the applier_module suspension and then for the consumption of the relay log by the SQL thread. For the monitoring of this last one, as we cannot use event log positions similar to what the current server does, GTIDs are used instead. When the GTID retrieved set becomes a sub set of the GTID executed set, we know that all the queued elements are now implemented. 7) Stop the applier SQL thread (*1) 8) Select a donor and change its state on the local state info. In a first phase this selection can be simplified by choosing the cluster node with the lowest id. This can be enhanced in several ways though. First off all, not all nodes can be selected. If a node is recovering he shall not be considered a suitable donor. Node selection is also a field for improvement as it can be enriched with cluster data that can tells us what is the "fastest" node or user given hints that tell the joiner which node to chose. This info shall come from state messages traded upon view changes making the decision deterministic and equal throughout the cluster. 9) Start a master/slave connection. This implies creating a new master info object that contains the donor connection credentials and starting a new SQL and IO thread. 9.1) The SQL thread is started with a UNTIL VIEW_ID = view_id mode that needs to be implemented. When the view id event is detected, the SQL thread returns and the certification data is set. This shall also awake the sleeping recover thread. 10) Terminate the recovery IO/SQL threads if they haven't stopped yet. 11) Broadcast a message that informs all cluster nodes that the donor as fulfilled its job. All nodes shall then change the donor status back to online. 12) Restart the applier module's SQL thread. 13) Awake the applier 14) In the recovery thread we monitor the applier module queue or the certifier stats until we assert that the node caught up to the cluster. 15) Broadcast a message into the group communication layer with a new type: RECOVERY_DONE and on its payload the node id. 16) The recovery thread is killed. #DONOR (and remaining online nodes) 1) The handler_view_change method is invoked. Again, this should be synchronized with message reception. 2) The node adds to the applier queue a View_change_log_event. 3) When the View_change_event enters the pipeline, this is a sign that the applier now finished processing all events from older views. When the event then goes into the Certifying handler, we now know that we can request the Certification Database and incorporate it in the event. This is in fact the cluster certification database for the new View, and when passed onto the joiner, it will allow it to certify its queued events. 4) The node to be used as a donor is calculated deterministically in all nodes. This allows all nodes to correctly change their node info, correctly displaying which nodes are currently serving data to joining nodes. The node status change is made. 5) If the node eventually becomes the donor, in the transmitted data the view_change_event will mark the end of data belonging to the previews views carrying also data for certification. Notes: (*1) Whenever direct interactions with the SQL/IO threads are mentioned in such points as 6), 8) 9) and 10), this results from the absence of suitable ways of re-utilizing this structures at the moment of this worklog design. This should be improved when WL#1697 comes into production. STATE TRANSITION ================ Being generic, the recovery process can be used to allow brand new nodes to catch up with the cluster and also for nodes that exited the quorum communication group for some reason but now re-joined. This two cases end up being the same, being the only difference that in the second case the there can still be some old data being applied in the joiner's applier module. Based on this, here is a state transaction summary of the possible node states in respect to recovery. States: Offline node - node not on the cluster. A brand new node is just another type of an offline node. Online node - a live node of the cluster that can receive messages. Recovery node (joiner) - A node that joined and is going trough recovery. Donor node - A node who sends all the missing info to joiner. States transaction: Online node -> view change -> still on view? -> yes -> selected as donor? -> yes -> Donor -> no -> Online -> no -> Offline Offline node -> view change -> recovery -> Online Recovery node (joiner) -> view change -> still on view? -> yes -> donor still on view? -> yes -> go on -> Online -> no -> select another donor -> recovery -> Online -> no -> kill the recovery process -> Offline Donor node -> joiner message of state transfer completion? -> Online On donors if the joiner exists the group, the dump thread just dies as if a slave went away. ENHANCEMENTS ============ One of the possible enhancements to this module is the monitoring of the recovery process. We have here 2 differentiated phases, the joiner-donor connection phase and the joiner catching up phase. For the first one, one can retrieve from the donor its GTID retrieved and from that estimate how many transactions are he behind. For the second phase, one could monitor the size of the applier's queue or extract information from the stable transactions data. Other methods that can be added are the ones for such status as the RAM monitoring on the applier's module queue defining a max value as well. Other needed change to the future is focused in the applier module's queue and the worries about it's memory consumption during recovery. The persistence of it's data is something that should be discussed in a separated worklog together with connected questions. For example, if the certification information is know at an early time, we can certify and queue events in parallel on a relay log. We can also queue them without certification but that would imply a decoupled relay log reader to be used on the Certifier.
::File :> applier_interfaces.h ::Context/Purpose :> Add a context flag to the Packet class identifying suspend and end packets. :: Added content :> A optional flag to the packet class. ===================================================================== ::File :> gcs_replication/gcs_applier.cc/h ::Context/Purpose :> Add suspend capabilities to the plugin applier module. :: Added content :> Mutexes/conditions/flag to suspend the applier :: Added Methods :> //Awakes a suspended applier void awake() //Tells if the applier is suspended void is_suspended() //Adds a suspend packet to the applier's queue void add_suspend_packet() :: Changed algorithms :> In the applier loop, we now have: while(!error) { incoming->pop(&packet) if(packet.status == end_packet) //do nothing, this is only to unlock the queue if(packet.status == suspend) suspend() // remaining logic } ===================================================================== ::File :> gcs_replication/gcs_recovery.cc/h [new] ::Context/Purpose :> Class that contains the main logic for the recovery algorithm ::Contains :> //A thread object for the recovery thread THD recovery thread Mutexes/conditions for waiting for the donor thread. ::Methods :> class Recovery_module { //Constructor that receives the applier Recovery_module(applier) //Launch the recovery thread for this view. //Receives the view id and the possible donor nodes int launch_recovery(view_id, cluster_nodes) //Method with node selection logic donor_info select_donor(nodes) //Method with logic for the slave connection with the donor int start_donor_connection(donor_info, view_id) //Method to terminate the connection to the donor int terminate_donor_connection() //Method to broadcast to all nodes the info to change the donor //state back to online. broadcast_state_transfer_end() //Method that resumes the applier SQL thread and awakes it int resume_applier() //Method that monitors the current applier delay //Blocking method that only returns when it caught up int monitor_applier_delay() //Broadcast the message that signals the node is now online notify_cluster_recovery_end(node_id) } :: Algorithms :> The recovery thread handle_recovery(view_id, nodes) { //wait for the applier suspension and //wait for the consumption of the relay log while( !applier_is_suspend() && !retrievedGTIDs.is_subset(executedGTIDs) ) { sleep } //select a donor donor_info = select_donor(nodes) //set the condition/mutex to allow a later awakening. //see below the changes to gcs_replication.cc/h set_donor_connection_condition(donor_condition,donor_mutex) //start a connection with the donor start_donor_connection(donor_info, view_id) //wait for the donor to send all the data while(get_retrieved_cert_db() == NULL) donor_condition.wait() //kill the donor connection kill_donor_connection() //Mark the donor as finished broadcast_state_transfer_end() //resume the applier module resume_applier() //block until the applier delay is minimal monitor_applier_delay() //broadcast a message to all nodes signal_recovery_end(node_id) } ===================================================================== ::File :> gcs_replication/include/gcs_protocol.cc ::Context/Purpose :> Add a new message type to GCS :: Added content :> Add new type "MSG_RECOVERY" to Msg_type enum The contents of the message are: | node type | node id | Where: node type: if the node is a donor or a joiner. Donor: State transfer ended. Joiner: Recovery ended node id : the id of the node. ===================================================================== ::File :> gcs_replication/gcs_event_handlers.cc ::Context/Purpose :> Add logic to the view handling method. The message handler also handles recovery end messages now. :: Added Methods :> //method that launches the recovery thread launch_recovery() //method that creates and adds a view_change_event to the applier //Also changes the donor state to "Donor" mark_view_change() :: Changed algorithms :> handle_view_change(view, total, left, joined, quorate) { for(node n : joined) { if n.id == local.id joiner= true; break; } if(joiner && total.size > 1) launch_recovery(view.get_view_id(), total) else if(joiner.size > 0) mark_view_change(view.get_view_id()) } handle_message_delivery(msg, view) { if (msg.type == MSG_REGULAR) //current logic else if (msg.type == MSG_RECOVERY) node_type = (Recovery_message)msg.get_node_type if(node_type == joiner) //set the state of the joiner node as being online if(node_type == donor) //set the state of the donor back to online } ===================================================================== ::File :> log_event.cc/h ::Context/Purpose :> Add a new log event :: Added content :> class View_change_event { Certification_database cert_db; View_id id; } :: Added Methods :> class View_change_event { //Event constructor based on the view id View_change_event(view_id) //Sets/Gets the event certification database set/get_certification_db() //gets the event view id get_view_id() //The log event methods: size, validation, write function, etc. write_data_header(IO_CACHE* file) write_data_body(IO_CACHE* file) ... } :: Algorithms :> write_data_header(IO_CACHE* file) { //The header contains the view id and the cert db size store(view_id); store(cert_db.size) } write_data_body(IO_CACHE* file) { for(cert_db_line in cert_db) { len= strlen(cert_db_line->first); //The key store(len) store(cert_db_line->first) //store the key store(cert_db_line->second) //version(gno has a fixed size) } } read_data_body(map_size) { //map_size came from the header for(int i = 0, i < map_size, i++) len= read() key = read(len) value = read(gno_len) map[key] = value; } ===================================================================== ::File :> rpl_rli.cc/h ::Context/Purpose :> Add a new until mode to the SQL thread :: Added content :> new value "VIEW_CHANGE_EVENT" on until_condition enum :: Changed algorithms :> is_until_satisfied(event) { case UNTIL_VIEW_ID: { if(event.get_type == VIEW_CHANGE_EVENT) { if( (View_change_event)event.view_id == view_id_arg) set_retrieved_cert_db(View_change_event) return true; } } } ===================================================================== ::File :> gcs_replication.cc ::Context/Purpose :> Add methods for the SQL thread to execute. These serve the purpose of storing the certification DB and awake the recovery thread. :: Added content :> //The lock/condition to awake the recovery thread donor_queue_condition/lock //The certification that the SQL thread got from the View_change_event certification_db :: Added Methods :> //Set the condition/mutex to later awake the recovery thread set_donor_connection_condition(condition, mutex) //Sets the certification DB received by the donor connection and //awakes the recovery thread set_retrieved_cert_db(View_change_event) //Gets the certification DB get_retrieved_cert_db() :: Added Algorithms :> set_retrieved_cert_db(view_change_event event) { certification_db = event.get_certification_db() cond_broadcast(donor_condition); } ===================================================================== ::File :> gcs_replication/handlers/certification_handler.cc ::Context/Purpose :> Add logic to the certifier to get the certification DB and put it on the View change event on the donor side. :: Changed algorithms :> switch (event_type) { case VIEW_CHANGE_EVENT: Certification_database certDB = get_certification_db() event.set_certification_db(certDB); ... } Added configuration options =========================== Status var: gcs_replication_plugin_recovery_user The user to use during recovery Status var: gcs_replication_plugin_recovery_password The password to use during recovery. This should be settable but not readable. Status var: gcs_replication_plugin_recovery_retry_count The number of connection tries to a donor. Also uses: Status var: gcs_replication_plugin_components_stop_timeout The time to wait until the recovery threads terminate.
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.