MySQL 8.3.0
Source Code Documentation File Reference

Code to run the io thread and the sql thread on the replication slave. More...

#include "sql/rpl_replica.h"
#include "my_config.h"
#include <errno.h>
#include <fcntl.h>
#include <math.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "include/compression.h"
#include "include/mutex_lock.h"
#include "m_string.h"
#include "mysql/components/services/bits/psi_bits.h"
#include "mysql/components/services/bits/psi_memory_bits.h"
#include "mysql/components/services/bits/psi_stage_bits.h"
#include "mysql/components/services/log_builtins.h"
#include "mysql/plugin.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_mutex.h"
#include "mysql/status_var.h"
#include "mysql/strings/int2str.h"
#include "sql/changestreams/apply/replication_thread_status.h"
#include "sql/rpl_channel_service_interface.h"
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <deque>
#include <map>
#include <regex>
#include <string>
#include <utility>
#include <vector>
#include "errmsg.h"
#include "lex_string.h"
#include "my_bitmap.h"
#include "my_byteorder.h"
#include "my_command.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_dir.h"
#include "my_io.h"
#include "my_macros.h"
#include "my_sys.h"
#include "my_systime.h"
#include "my_thread_local.h"
#include "mysql.h"
#include "mysql/binlog/event/binlog_event.h"
#include "mysql/binlog/event/control_events.h"
#include "mysql/binlog/event/debug_vars.h"
#include "mysql/my_loglevel.h"
#include "mysql/psi/mysql_file.h"
#include "mysql/psi/mysql_memory.h"
#include "mysql/psi/mysql_thread.h"
#include "mysql/service_mysql_alloc.h"
#include "mysql/strings/m_ctype.h"
#include "mysql/thread_type.h"
#include "mysql_com.h"
#include "mysqld_error.h"
#include "pfs_thread_provider.h"
#include "prealloced_array.h"
#include "sql-common/net_ns.h"
#include "sql/auth/auth_acls.h"
#include "sql/auth/sql_security_ctx.h"
#include "sql/auto_thd.h"
#include "sql/binlog.h"
#include "sql/binlog_reader.h"
#include "sql/clone_handler.h"
#include "sql/current_thd.h"
#include "sql/debug_sync.h"
#include "sql/derror.h"
#include "sql/dynamic_ids.h"
#include "sql/handler.h"
#include "sql/item.h"
#include "sql/log.h"
#include "sql/log_event.h"
#include "sql/mdl.h"
#include "sql/mysqld.h"
#include "sql/mysqld_thd_manager.h"
#include "sql/protocol.h"
#include "sql/protocol_classic.h"
#include "sql/psi_memory_key.h"
#include "sql/query_options.h"
#include "sql/rpl_applier_reader.h"
#include "sql/rpl_async_conn_failover.h"
#include "sql/rpl_async_conn_failover_configuration_propagation.h"
#include "sql/rpl_filter.h"
#include "sql/rpl_group_replication.h"
#include "sql/rpl_gtid.h"
#include "sql/rpl_handler.h"
#include "sql/rpl_info.h"
#include "sql/rpl_info_factory.h"
#include "sql/rpl_info_handler.h"
#include "sql/rpl_io_monitor.h"
#include "sql/rpl_mi.h"
#include "sql/rpl_msr.h"
#include "sql/rpl_mta_submode.h"
#include "sql/rpl_replica_commit_order_manager.h"
#include "sql/rpl_replica_until_options.h"
#include "sql/rpl_reporting.h"
#include "sql/rpl_rli.h"
#include "sql/rpl_rli_pdb.h"
#include "sql/rpl_trx_boundary_parser.h"
#include "sql/rpl_utility.h"
#include "sql/sql_backup_lock.h"
#include "sql/sql_class.h"
#include "sql/sql_const.h"
#include "sql/sql_error.h"
#include "sql/sql_lex.h"
#include "sql/sql_list.h"
#include "sql/sql_parse.h"
#include "sql/sql_plugin.h"
#include "sql/system_variables.h"
#include "sql/table.h"
#include "sql/transaction.h"
#include "sql/transaction_info.h"
#include "sql_common.h"
#include "sql_string.h"
#include "str2int.h"
#include "string_with_len.h"
#include "strmake.h"
#include "typelib.h"
#include "rpl_debug_points.h"
#include "scope_guard.h"


struct  Reconnect_messages
 Encapsulates the messages and thread stages used for a specific call to try_to_reconnect. More...


#define FLAGSTR(V, F)   ((V) & (F) ? #F " " : "")


enum  enum_slave_apply_event_and_update_pos_retval {
enum  enum_read_rotate_from_relay_log_status { FOUND_ROTATE , NOT_FOUND_ROTATE , ERROR }


static int process_io_rotate (Master_info *mi, Rotate_log_event *rev)
 Used by the slave IO thread when it receives a rotate event from the master. More...
static bool wait_for_relay_log_space (Relay_log_info *rli)
static bool io_slave_killed (THD *thd, Master_info *mi)
static bool monitor_io_replica_killed (THD *thd, Master_info *mi)
static bool is_autocommit_off (THD *thd)
 Check if in multi-statement transaction mode. More...
static void print_replica_skip_errors (void)
 Convert slave skip errors bitmap into a printable string. More...
static int safe_connect (THD *thd, MYSQL *mysql, Master_info *mi, const std::string &host=std::string(), const uint port=0)
static int safe_reconnect (THD *thd, MYSQL *mysql, Master_info *mi, bool suppress_warnings, const std::string &host=std::string(), const uint port=0)
static int get_master_version_and_clock (MYSQL *mysql, Master_info *mi)
static int get_master_uuid (MYSQL *mysql, Master_info *mi)
 Get master's uuid on connecting. More...
int io_thread_init_commands (MYSQL *mysql, Master_info *mi)
 Set user variables after connecting to the master. More...
static int terminate_slave_thread (THD *thd, mysql_mutex_t *term_lock, mysql_cond_t *term_cond, std::atomic< uint > *slave_running, ulong *stop_wait_timeout, bool need_lock_term, bool force)
 Wait for a slave thread to terminate. More...
static bool check_io_slave_killed (THD *thd, Master_info *mi, const char *info)
static int mts_event_coord_cmp (LOG_POS_COORD *id1, LOG_POS_COORD *id2)
 Orders jobs by comparing relay log information. More...
static int check_slave_sql_config_conflict (const Relay_log_info *rli)
 Check if there is any slave SQL config conflict. More...
static void group_replication_cleanup_after_clone ()
 Purge Group Replication channels relay logs after this server being a recipient of clone. More...
static void check_replica_configuration_restrictions ()
 (end of group Replication) More...
static bool check_replica_configuration_errors (Master_info *mi, int thread_mask)
 Checks the current replica configuration when starting a replication thread If some incompatibility is found an error is thrown. More...
static void set_thd_tx_priority (THD *thd, int priority)
static void set_thd_write_set_options (THD *thd, bool ignore_limit, bool allow_drop_write_set)
 Set for the thread options about the memory and size limits when transactions collect write sets. More...
static void set_replica_max_allowed_packet (THD *thd, MYSQL *mysql)
bool start_slave (THD *thd)
 Function to start a slave for all channels. More...
int stop_slave (THD *thd)
 Function to stop a slave for all channels. More...
bool start_slave_cmd (THD *thd)
 Entry point to the START SLAVE command. More...
bool stop_slave_cmd (THD *thd)
 Entry point for the STOP SLAVE command. More...
static enum_read_rotate_from_relay_log_status read_rotate_from_relay_log (char *filename, char *master_log_file, my_off_t *master_log_pos)
 Parse the given relay log and identify the rotate event from the master. More...
static int find_first_relay_log_with_rotate_from_master (Relay_log_info *rli)
 Reads relay logs one by one starting from the first relay log. More...
static void recover_relay_log (Master_info *mi)
int init_recovery (Master_info *mi)
static int fill_mts_gaps_and_recover (Master_info *mi)
int load_mi_and_rli_from_repositories (Master_info *mi, bool ignore_if_no_info, int thread_mask, bool skip_received_gtid_set_recovery=false, bool force_load=false)
 Call mi->init_info() and/or mi->rli->init_info(), which will read the replication configuration from repositories. More...
void end_info (Master_info *mi)
void clear_info (Master_info *mi)
 Clear the information regarding the Master_info and Relay_log_info objects represented by the parameter, meaning, setting to NULL all attributes that are not meant to be kept between slave resets. More...
int remove_info (Master_info *mi)
bool reset_info (Master_info *mi)
 Resets the information regarding the Master_info and Relay_log_info objects represented by the parameter, meaning, setting to NULL all attributes that are not meant to be kept between slave resets and persisting all other attribute values in the repository. More...
int flush_master_info (Master_info *mi, bool force, bool need_lock=true, bool flush_relay_log=true, bool skip_repo_persistence=false)
 This method flushes the current configuration for the channel into the connection metadata repository. More...
void set_replica_skip_errors (char **replica_skip_errors_ptr)
 Change arg to the string with the nice, human-readable skip error values. More...
static void init_replica_skip_errors ()
 Init function to set up array for errors that should be skipped for slave. More...
static void add_replica_skip_errors (const uint *errors, uint n_errors)
void add_replica_skip_errors (const char *arg)
static void set_thd_in_use_temporary_tables (Relay_log_info *rli)
int terminate_slave_threads (Master_info *mi, int thread_mask, ulong stop_wait_timeout, bool need_lock_term=true)
 Terminates the slave threads according to the given mask. More...
bool start_slave_thread (PSI_thread_key thread_key, my_start_routine h_func, mysql_mutex_t *start_lock, mysql_mutex_t *cond_lock, mysql_cond_t *start_cond, std::atomic< uint > *slave_running, std::atomic< ulong > *slave_run_id, Master_info *mi)
bool start_slave_threads (bool need_lock_slave, bool wait_for_start, Master_info *mi, int thread_mask)
void end_slave ()
void delete_slave_info_objects ()
 Free all resources used by slave threads at time of executing shutdown. More...
bool sql_slave_killed (THD *thd, Relay_log_info *rli)
 The function analyzes a possible killed status and makes a decision whether to accept it or not. More...
bool net_request_file (NET *net, const char *fname)
const char * print_slave_db_safe (const char *db)
bool is_network_error (uint errorno)
static enum_command_status io_thread_init_command (Master_info *mi, const char *query, int allowed_error, MYSQL_RES **master_res=nullptr, MYSQL_ROW *master_row=nullptr)
 Execute an initialization query for the IO thread. More...
static int write_rotate_to_master_pos_into_relay_log (THD *thd, Master_info *mi, bool force_flush_mi_info)
static int write_ignored_events_info_to_relay_log (THD *thd, Master_info *mi)
static int register_slave_on_master (MYSQL *mysql, Master_info *mi, bool *suppress_warnings)
static void show_slave_status_metadata (mem_root_deque< Item * > *field_list, int io_gtid_set_size, int sql_gtid_set_size)
 Function that fills the metadata required for SHOW REPLICA STATUS. More...
static bool show_slave_status_send_data (THD *thd, Master_info *mi, char *io_gtid_set_buffer, char *sql_gtid_set_buffer)
 Send the data to the client of a Master_info during show_slave_status() This function has to be called after calling show_slave_status_metadata(). More...
bool show_slave_status (THD *thd)
 Method to the show the replication status in all channels. More...
bool show_slave_status (THD *thd, Master_info *mi)
 Execute a SHOW REPLICA STATUS statement. More...
bool show_slave_status_cmd (THD *thd)
 Entry point for SHOW REPLICA STATUS command. More...
void set_slave_thread_options (THD *thd)
 Set slave thread default options. More...
void set_slave_thread_default_charset (THD *thd, Relay_log_info const *rli)
int init_replica_thread (THD *thd, SLAVE_THD_TYPE thd_type)
template<typename killed_func , typename rpl_info >
static bool slave_sleep (THD *thd, time_t seconds, killed_func func, rpl_info info)
 Sleep for a given amount of time or until killed. More...
static void fix_gtid_set (MYSQL_RPL *rpl, uchar *packet_gtid_set)
 Callback function for mysql_binlog_open(). More...
static int request_dump (THD *thd, MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi, bool *suppress_warnings)
static ulong read_event (MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi, bool *suppress_warnings)
 Read one event from the master. More...
static int sql_delay_event (Log_event *ev, THD *thd, Relay_log_info *rli)
 If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. More...
static enum enum_slave_apply_event_and_update_pos_retval apply_event_and_update_pos (Log_event **ptr_ev, THD *thd, Relay_log_info *rli)
 Applies the given event and advances the relay log position. More...
static bool coord_handle_partial_binlogged_transaction (Relay_log_info *rli, const Log_event *ev)
 Let the worker applying the current group to rollback and gracefully finish its work before. More...
static int exec_relay_log_event (THD *thd, Relay_log_info *rli, Rpl_applier_reader *applier_reader, Log_event *in)
 Top-level function for executing the next event in the relay log. More...
static int try_to_reconnect (THD *thd, MYSQL *mysql, Master_info *mi, uint *retry_count, bool suppress_warnings, const Reconnect_messages &messages)
 Try to reconnect slave IO thread. More...
void * handle_slave_io (void *arg)
 Slave IO thread entry point. More...
static int check_temp_dir (char *tmp_file, const char *channel_name)
static void * handle_slave_worker (void *arg)
bool mts_recovery_groups (Relay_log_info *rli)
bool mta_checkpoint_routine (Relay_log_info *rli, bool force)
 Processing rli->gaq to find out the low-water-mark (lwm) coordinates which is stored into the central recovery table. More...
static int slave_start_single_worker (Relay_log_info *rli, ulong i)
 Instantiation of a Slave_worker and forking out a single Worker thread. More...
static int slave_start_workers (Relay_log_info *rli, ulong n, bool *mts_inited)
 Initialization of the central rli members for Coordinator's role, communication channels such as Assigned Partition Hash (APH), and starting the Worker pool. More...
static void slave_stop_workers (Relay_log_info *rli, bool *mts_inited)
static int report_apply_event_error (THD *thd, Relay_log_info *rli)
 Processes the outcome of applying an event, logs it properly if it's an error and return the proper error code to trigger. More...
void * handle_slave_sql (void *arg)
 Slave SQL thread entry point. More...
int heartbeat_queue_event (bool is_valid, Master_info *&mi, std::string binlog_name, uint64_t position, unsigned long &inc_pos, bool &do_flush_mi)
QUEUE_EVENT_RESULT queue_event (Master_info *mi, const char *buf, ulong event_len, bool do_flush_mi)
 Store an event received from the master connection into the relay log. More...
void slave_io_thread_detach_vio ()
 Hook to detach the active VIO before closing a connection handle. More...
static int connect_to_master_via_namespace (THD *thd, MYSQL *mysql, Master_info *mi, bool reconnect, bool suppress_warnings, const std::string &host, const uint port)
int connect_to_master (THD *thd, MYSQL *mysql, Master_info *mi, bool reconnect, bool suppress_warnings, const std::string &host, const uint port, bool is_io_thread)
int rotate_relay_log (Master_info *mi, bool log_master_fd, bool need_lock, bool need_log_space_lock)
int flush_relay_logs (Master_info *mi, THD *thd)
 flushes the relay logs of a replication channel. More...
bool flush_relay_logs_cmd (THD *thd)
 Entry point for FLUSH RELAYLOGS command or to flush relaylogs for the FLUSH LOGS command. More...
bool reencrypt_relay_logs ()
 Re-encrypt previous relay logs with current master key for all slave channels. More...
bool start_slave (THD *thd, LEX_SLAVE_CONNECTION *connection_param, LEX_MASTER_INFO *master_param, int thread_mask_input, Master_info *mi, bool set_mts_settings)
 Executes a START SLAVE statement. More...
int stop_slave (THD *thd, Master_info *mi, bool net_report, bool for_one_channel, bool *push_temp_tables_warning)
 Execute a STOP SLAVE statement. More...
int reset_slave (THD *thd)
 Execute a RESET SLAVE (for all channels), used in Multisource replication. More...
int reset_slave (THD *thd, Master_info *mi, bool reset_all)
 Execute a RESET REPLICA statement. More...
bool reset_slave_cmd (THD *thd)
 Entry function for RESET SLAVE command. More...
static bool have_change_replication_source_receive_option (const LEX_MASTER_INFO *lex_mi)
 This function checks if the given CHANGE MASTER/REPLICATION SOURCE command has any receive option being set or changed. More...
static bool have_change_replication_source_execute_option (const LEX_MASTER_INFO *lex_mi, bool *need_relay_log_purge)
 This function checks if the given CHANGE MASTER/REPLICATION SOURCE command has any execute option being set or changed. More...
static bool have_change_replication_source_applier_and_receive_option (const LEX_MASTER_INFO *lex_mi)
 This function checks if the given CHANGE REPLICATION SOURCE command has any option that affect both the receiver and the applier. More...
static bool change_master_set_compression (THD *, const LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This function checks all possible cases in which compression algorithm, compression level can be configured for a channel. More...
static int change_receive_options (THD *thd, LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This function is called if the change master command had at least one receive option. More...
static bool change_execute_options (LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This function is called if the change master command had at least one execute option. More...
static bool change_applier_receiver_options (THD *thd, LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This function is called if the change replication source command had at least one option that affects both the receiver and applier parts. More...
static int validate_gtid_option_restrictions (const LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This function validates that change replication source options are valid according to the current GTID_MODE. More...
bool is_option_enabled_or_will_be (bool base_value, int option_value)
 This is an helper method for boolean vars like SOURCE_AUTO_POSITION REQUIRE_ROW_FORMAT SOURCE_CONNECTION_AUTO_FAILOVER It tells if the variable is already enabled or will be by the command. More...
int evaluate_inter_option_dependencies (const LEX_MASTER_INFO *lex_mi, Master_info *mi)
 This method evaluates if the different options given to CHANGE REPLICATION SOURCE TO are compatible with the current configuration and with one another. More...
static void log_invalid_position_warning (THD *thd, const LEX_MASTER_INFO *lex_mi, Master_info *mi)
 Log a warning in case GTID_ONLY or SOURCE AUTO POSITION are disabled and the server contains invalid positions. More...
static std::pair< bool, bool > validate_change_replication_source_options (THD *thd, const LEX_MASTER_INFO *lex_mi, Master_info *mi, int thread_mask)
 This method aggregates the validation checks made for the command CHANGE REPLICATION SOURCE. More...
static bool update_change_replication_source_options (THD *thd, LEX_MASTER_INFO *lex_mi, Master_info *mi, bool have_both_receive_execute_option, bool have_execute_option, bool have_receive_option)
 This method aggregates the the instantiation of options for the command CHANGE REPLICATION SOURCE. More...
int change_master (THD *thd, Master_info *mi, LEX_MASTER_INFO *lex_mi, bool preserve_logs)
 Execute a CHANGE MASTER statement. More...
int add_new_channel (Master_info **mi, const char *channel)
 This function is first called when the Master_info object corresponding to a channel in a multisourced slave does not exist. More...
static bool is_invalid_change_master_for_group_replication_recovery (const LEX_MASTER_INFO *lex_mi)
 Method used to check if the user is trying to update any other option for the change master apart from the MASTER_USER and MASTER_PASSWORD. More...
static bool is_invalid_change_master_for_group_replication_applier (const LEX_MASTER_INFO *lex_mi)
 Method used to check if the user is trying to update any other option for the change master apart from the PRIVILEGE_CHECKS_USER. More...
bool change_master_cmd (THD *thd)
 Entry point for the CHANGE MASTER command. More...


bool use_slave_mask = false
MY_BITMAP slave_error_mask
char slave_skip_error_names [SHOW_VAR_FUNC_BUFF_SIZE]
char * replica_load_tmpdir = nullptr
bool replicate_same_server_id
ulonglong relay_log_space_limit = 0
const char * relay_log_index = nullptr
const char * relay_log_basename = nullptr
const ulong mts_slave_worker_queue_len_max = 16384
const ulong mts_coordinator_basic_nap = 5
const ulong mts_worker_underrun_level = 10
static thread_local Master_infoRPL_MASTER_INFO = nullptr
static Reconnect_messages reconnect_messages_after_failed_registration
static Reconnect_messages reconnect_messages_after_failed_dump
static Reconnect_messages reconnect_messages_after_failed_event_read
static PSI_memory_key key_memory_rli_mta_coor
static PSI_thread_key key_thread_replica_io
static PSI_thread_key key_thread_replica_sql
static PSI_thread_key key_thread_replica_worker
static PSI_thread_key key_thread_replica_monitor_io
static PSI_thread_info all_slave_threads []
static PSI_memory_info all_slave_memory []
uint sql_replica_skip_counter
 a copy of active_mi->rli->slave_skip_counter, for showing in SHOW GLOBAL VARIABLES, INFORMATION_SCHEMA.GLOBAL_VARIABLES and @sql_replica_skip_counter without taking all the mutexes needed to access active_mi->rli->slave_skip_counter properly. More...

Detailed Description

Code to run the io thread and the sql thread on the replication slave.