MySQL 8.4.2
Source Code Documentation
|
Code to run the io thread and the sql thread on the replication slave. More...
#include "sql/rpl_replica.h"
#include "my_config.h"
#include <errno.h>
#include <fcntl.h>
#include <math.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "include/compression.h"
#include "include/mutex_lock.h"
#include "m_string.h"
#include "mysql/components/services/bits/psi_bits.h"
#include "mysql/components/services/bits/psi_memory_bits.h"
#include "mysql/components/services/bits/psi_stage_bits.h"
#include "mysql/components/services/log_builtins.h"
#include "mysql/plugin.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_mutex.h"
#include "mysql/status_var.h"
#include "mysql/strings/int2str.h"
#include "sql/changestreams/apply/replication_thread_status.h"
#include "sql/rpl_channel_service_interface.h"
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <deque>
#include <map>
#include <regex>
#include <string>
#include <utility>
#include <vector>
#include "errmsg.h"
#include "lex_string.h"
#include "my_bitmap.h"
#include "my_byteorder.h"
#include "my_command.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_dir.h"
#include "my_io.h"
#include "my_macros.h"
#include "my_sys.h"
#include "my_systime.h"
#include "my_thread_local.h"
#include "mysql.h"
#include "mysql/binlog/event/binlog_event.h"
#include "mysql/binlog/event/control_events.h"
#include "mysql/binlog/event/debug_vars.h"
#include "mysql/my_loglevel.h"
#include "mysql/psi/mysql_file.h"
#include "mysql/psi/mysql_memory.h"
#include "mysql/psi/mysql_thread.h"
#include "mysql/service_mysql_alloc.h"
#include "mysql/strings/m_ctype.h"
#include "mysql/thread_type.h"
#include "mysql_com.h"
#include "mysqld_error.h"
#include "pfs_thread_provider.h"
#include "prealloced_array.h"
#include "sql-common/net_ns.h"
#include "sql/auth/auth_acls.h"
#include "sql/auth/sql_security_ctx.h"
#include "sql/auto_thd.h"
#include "sql/binlog.h"
#include "sql/binlog_reader.h"
#include "sql/clone_handler.h"
#include "sql/current_thd.h"
#include "sql/debug_sync.h"
#include "sql/derror.h"
#include "sql/dynamic_ids.h"
#include "sql/handler.h"
#include "sql/item.h"
#include "sql/log.h"
#include "sql/log_event.h"
#include "sql/mdl.h"
#include "sql/mysqld.h"
#include "sql/mysqld_thd_manager.h"
#include "sql/protocol.h"
#include "sql/protocol_classic.h"
#include "sql/psi_memory_key.h"
#include "sql/query_options.h"
#include "sql/rpl_applier_reader.h"
#include "sql/rpl_async_conn_failover.h"
#include "sql/rpl_async_conn_failover_configuration_propagation.h"
#include "sql/rpl_filter.h"
#include "sql/rpl_group_replication.h"
#include "sql/rpl_gtid.h"
#include "sql/rpl_handler.h"
#include "sql/rpl_info.h"
#include "sql/rpl_info_factory.h"
#include "sql/rpl_info_handler.h"
#include "sql/rpl_io_monitor.h"
#include "sql/rpl_mi.h"
#include "sql/rpl_msr.h"
#include "sql/rpl_mta_submode.h"
#include "sql/rpl_replica_commit_order_manager.h"
#include "sql/rpl_replica_until_options.h"
#include "sql/rpl_reporting.h"
#include "sql/rpl_rli.h"
#include "sql/rpl_rli_pdb.h"
#include "sql/rpl_trx_boundary_parser.h"
#include "sql/rpl_utility.h"
#include "sql/sql_backup_lock.h"
#include "sql/sql_class.h"
#include "sql/sql_const.h"
#include "sql/sql_error.h"
#include "sql/sql_lex.h"
#include "sql/sql_list.h"
#include "sql/sql_parse.h"
#include "sql/sql_plugin.h"
#include "sql/system_variables.h"
#include "sql/table.h"
#include "sql/transaction.h"
#include "sql/transaction_info.h"
#include "sql_common.h"
#include "sql_string.h"
#include "str2int.h"
#include "string_with_len.h"
#include "strmake.h"
#include "typelib.h"
#include "rpl_debug_points.h"
#include "scope_guard.h"
Classes | |
struct | Reconnect_messages |
Encapsulates the messages and thread stages used for a specific call to try_to_reconnect. More... | |
Macros | |
#define | FLAGSTR(V, F) ((V) & (F) ? #F " " : "") |
#define | SLAVE_WAIT_GROUP_DONE 60 |
Functions | |
static int | process_io_rotate (Master_info *mi, Rotate_log_event *rev) |
Used by the slave IO thread when it receives a rotate event from the master. More... | |
static bool | wait_for_relay_log_space (Relay_log_info *rli, std::size_t queued_size) |
Checks whether relay log space will be exceeded after queueing additional 'queued_size' bytes. More... | |
static bool | exceeds_relay_log_limit (Relay_log_info *rli, std::size_t queued_size) |
Checks whether relay log space limit will be exceeded after queueing additional 'queued_size' bytes. More... | |
static bool | io_slave_killed (THD *thd, Master_info *mi) |
static bool | monitor_io_replica_killed (THD *thd, Master_info *mi) |
static bool | is_autocommit_off (THD *thd) |
Check if in multi-statement transaction mode. More... | |
static void | print_replica_skip_errors (void) |
Convert slave skip errors bitmap into a printable string. More... | |
static int | safe_connect (THD *thd, MYSQL *mysql, Master_info *mi, const std::string &host=std::string(), const uint port=0) |
static int | safe_reconnect (THD *thd, MYSQL *mysql, Master_info *mi, bool suppress_warnings, const std::string &host=std::string(), const uint port=0) |
static int | get_master_version_and_clock (MYSQL *mysql, Master_info *mi) |
static int | get_master_uuid (MYSQL *mysql, Master_info *mi) |
Get master's uuid on connecting. More... | |
int | io_thread_init_commands (MYSQL *mysql, Master_info *mi) |
Set user variables after connecting to the master. More... | |
static int | terminate_slave_thread (THD *thd, mysql_mutex_t *term_lock, mysql_cond_t *term_cond, std::atomic< uint > *slave_running, ulong *stop_wait_timeout, bool need_lock_term, bool force) |
Wait for a slave thread to terminate. More... | |
static bool | check_io_slave_killed (THD *thd, Master_info *mi, const char *info) |
static int | mts_event_coord_cmp (LOG_POS_COORD *id1, LOG_POS_COORD *id2) |
Orders jobs by comparing relay log information. More... | |
static int | check_slave_sql_config_conflict (const Relay_log_info *rli) |
Check if there is any slave SQL config conflict. More... | |
static void | group_replication_cleanup_after_clone () |
Purge Group Replication channels relay logs after this server being a recipient of clone. More... | |
static void | check_replica_configuration_restrictions () |
(end of group Replication) More... | |
static bool | check_replica_configuration_errors (Master_info *mi, int thread_mask) |
Checks the current replica configuration when starting a replication thread If some incompatibility is found an error is thrown. More... | |
static void | set_thd_tx_priority (THD *thd, int priority) |
static void | set_thd_write_set_options (THD *thd, bool ignore_limit, bool allow_drop_write_set) |
Set for the thread options about the memory and size limits when transactions collect write sets. More... | |
static void | set_replica_max_allowed_packet (THD *thd, MYSQL *mysql) |
bool | start_slave (THD *thd) |
Function to start a slave for all channels. More... | |
int | stop_slave (THD *thd) |
Function to stop a slave for all channels. More... | |
bool | start_slave_cmd (THD *thd) |
Entry point to the START REPLICA command. More... | |
bool | stop_slave_cmd (THD *thd) |
Entry point for the STOP REPLICA command. More... | |
static enum_read_rotate_from_relay_log_status | read_rotate_from_relay_log (char *filename, char *source_log_file, my_off_t *master_log_pos) |
Parse the given relay log and identify the rotate event from the master. More... | |
static int | find_first_relay_log_with_rotate_from_master (Relay_log_info *rli) |
Reads relay logs one by one starting from the first relay log. More... | |
static void | recover_relay_log (Master_info *mi) |
int | init_recovery (Master_info *mi) |
static int | fill_mts_gaps_and_recover (Master_info *mi) |
int | load_mi_and_rli_from_repositories (Master_info *mi, bool ignore_if_no_info, int thread_mask, bool skip_received_gtid_set_and_relaylog_recovery=false, bool force_load=false) |
Call mi->init_info() and/or mi->rli->init_info(), which will read the replication configuration from repositories. More... | |
void | end_info (Master_info *mi) |
void | clear_info (Master_info *mi) |
Clear the information regarding the Master_info and Relay_log_info objects represented by the parameter, meaning, setting to NULL all attributes that are not meant to be kept between slave resets. More... | |
int | remove_info (Master_info *mi) |
bool | reset_info (Master_info *mi) |
Resets the information regarding the Master_info and Relay_log_info objects represented by the parameter, meaning, setting to NULL all attributes that are not meant to be kept between slave resets and persisting all other attribute values in the repository. More... | |
int | flush_master_info (Master_info *mi, bool force, bool need_lock=true, bool flush_relay_log=true, bool skip_repo_persistence=false) |
This method flushes the current configuration for the channel into the connection metadata repository. More... | |
void | set_replica_skip_errors (char **replica_skip_errors_ptr) |
Change arg to the string with the nice, human-readable skip error values. More... | |
static void | init_replica_skip_errors () |
Init function to set up array for errors that should be skipped for slave. More... | |
static void | add_replica_skip_errors (const uint *errors, uint n_errors) |
void | add_replica_skip_errors (const char *arg) |
static void | set_thd_in_use_temporary_tables (Relay_log_info *rli) |
int | terminate_slave_threads (Master_info *mi, int thread_mask, ulong stop_wait_timeout, bool need_lock_term=true) |
Terminates the slave threads according to the given mask. More... | |
bool | start_slave_thread (PSI_thread_key thread_key, my_start_routine h_func, mysql_mutex_t *start_lock, mysql_mutex_t *cond_lock, mysql_cond_t *start_cond, std::atomic< uint > *slave_running, std::atomic< ulong > *slave_run_id, Master_info *mi) |
bool | start_slave_threads (bool need_lock_slave, bool wait_for_start, Master_info *mi, int thread_mask) |
void | end_slave () |
void | delete_slave_info_objects () |
Free all resources used by slave threads at time of executing shutdown. More... | |
bool | sql_slave_killed (THD *thd, Relay_log_info *rli) |
The function analyzes a possible killed status and makes a decision whether to accept it or not. More... | |
bool | net_request_file (NET *net, const char *fname) |
const char * | print_slave_db_safe (const char *db) |
bool | is_network_error (uint errorno) |
static enum_command_status | io_thread_init_command (Master_info *mi, const char *query, int allowed_error, MYSQL_RES **master_res=nullptr, MYSQL_ROW *master_row=nullptr) |
Execute an initialization query for the IO thread. More... | |
static int | write_rotate_to_master_pos_into_relay_log (THD *thd, Master_info *mi, bool force_flush_mi_info) |
static int | write_ignored_events_info_to_relay_log (THD *thd, Master_info *mi) |
static int | register_slave_on_master (MYSQL *mysql, Master_info *mi, bool *suppress_warnings) |
static void | show_slave_status_metadata (mem_root_deque< Item * > *field_list, int io_gtid_set_size, int sql_gtid_set_size) |
Function that fills the metadata required for SHOW REPLICA STATUS. More... | |
static bool | show_slave_status_send_data (THD *thd, Master_info *mi, char *io_gtid_set_buffer, char *sql_gtid_set_buffer) |
Send the data to the client of a Master_info during show_slave_status() This function has to be called after calling show_slave_status_metadata(). More... | |
bool | show_slave_status (THD *thd) |
Method to the show the replication status in all channels. More... | |
bool | show_slave_status (THD *thd, Master_info *mi) |
Execute a SHOW REPLICA STATUS statement. More... | |
bool | show_slave_status_cmd (THD *thd) |
Entry point for SHOW REPLICA STATUS command. More... | |
void | set_slave_thread_options (THD *thd) |
Set slave thread default options. More... | |
void | set_slave_thread_default_charset (THD *thd, Relay_log_info const *rli) |
int | init_replica_thread (THD *thd, SLAVE_THD_TYPE thd_type) |
template<typename killed_func , typename rpl_info > | |
static bool | slave_sleep (THD *thd, time_t seconds, killed_func func, rpl_info info) |
Sleep for a given amount of time or until killed. More... | |
static void | fix_gtid_set (MYSQL_RPL *rpl, uchar *packet_gtid_set) |
Callback function for mysql_binlog_open(). More... | |
static int | request_dump (THD *thd, MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi, bool *suppress_warnings) |
static ulong | read_event (MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi, bool *suppress_warnings) |
Read one event from the master. More... | |
static int | sql_delay_event (Log_event *ev, THD *thd, Relay_log_info *rli) |
If this is a lagging slave (specified with CHANGE REPLICATION SOURCE TO SOURCE_DELAY = X), delays accordingly. More... | |
static enum enum_slave_apply_event_and_update_pos_retval | apply_event_and_update_pos (Log_event **ptr_ev, THD *thd, Relay_log_info *rli) |
Applies the given event and advances the relay log position. More... | |
static bool | coord_handle_partial_binlogged_transaction (Relay_log_info *rli, const Log_event *ev) |
Let the worker applying the current group to rollback and gracefully finish its work before. More... | |
static int | exec_relay_log_event (THD *thd, Relay_log_info *rli, Rpl_applier_reader *applier_reader, Log_event *in) |
Top-level function for executing the next event in the relay log. More... | |
static int | try_to_reconnect (THD *thd, MYSQL *mysql, Master_info *mi, uint *retry_count, bool suppress_warnings, const Reconnect_messages &messages) |
Try to reconnect slave IO thread. More... | |
void * | handle_slave_io (void *arg) |
Slave IO thread entry point. More... | |
static int | check_temp_dir (char *tmp_file, const char *channel_name) |
static void * | handle_slave_worker (void *arg) |
bool | mts_recovery_groups (Relay_log_info *rli) |
bool | mta_checkpoint_routine (Relay_log_info *rli, bool force) |
Processing rli->gaq to find out the low-water-mark (lwm) coordinates which is stored into the central recovery table. More... | |
static int | slave_start_single_worker (Relay_log_info *rli, ulong i) |
Instantiation of a Slave_worker and forking out a single Worker thread. More... | |
static int | slave_start_workers (Relay_log_info *rli, ulong n, bool *mts_inited) |
Initialization of the central rli members for Coordinator's role, communication channels such as Assigned Partition Hash (APH), and starting the Worker pool. More... | |
static void | slave_stop_workers (Relay_log_info *rli, bool *mts_inited) |
static int | report_apply_event_error (THD *thd, Relay_log_info *rli) |
Processes the outcome of applying an event, logs it properly if it's an error and return the proper error code to trigger. More... | |
void * | handle_slave_sql (void *arg) |
Slave SQL thread entry point. More... | |
int | heartbeat_queue_event (bool is_valid, Master_info *&mi, std::string binlog_name, uint64_t position, unsigned long &inc_pos, bool &do_flush_mi) |
QUEUE_EVENT_RESULT | queue_event (Master_info *mi, const char *buf, ulong event_len, bool do_flush_mi) |
Store an event received from the master connection into the relay log. More... | |
void | slave_io_thread_detach_vio () |
Hook to detach the active VIO before closing a connection handle. More... | |
static int | connect_to_master_via_namespace (THD *thd, MYSQL *mysql, Master_info *mi, bool reconnect, bool suppress_warnings, const std::string &host, const uint port) |
int | connect_to_master (THD *thd, MYSQL *mysql, Master_info *mi, bool reconnect, bool suppress_warnings, const std::string &host, const uint port, bool is_io_thread) |
int | rotate_relay_log (Master_info *mi, bool log_master_fd=true, bool need_lock=true, bool need_log_space_lock=true) |
Rotates the relay log. More... | |
int | flush_relay_logs (Master_info *mi, THD *thd) |
flushes the relay logs of a replication channel. More... | |
bool | flush_relay_logs_cmd (THD *thd) |
Entry point for FLUSH RELAYLOGS command or to flush relaylogs for the FLUSH LOGS command. More... | |
bool | reencrypt_relay_logs () |
Re-encrypt previous relay logs with current master key for all slave channels. More... | |
bool | start_slave (THD *thd, LEX_REPLICA_CONNECTION *connection_param, LEX_SOURCE_INFO *master_param, int thread_mask_input, Master_info *mi, bool set_mts_settings) |
Executes a START REPLICA statement. More... | |
int | stop_slave (THD *thd, Master_info *mi, bool net_report, bool for_one_channel, bool *push_temp_tables_warning) |
Execute a STOP REPLICA statement. More... | |
int | reset_slave (THD *thd) |
Execute a RESET REPLICA (for all channels), used in Multisource replication. More... | |
int | reset_slave (THD *thd, Master_info *mi, bool reset_all) |
Execute a RESET REPLICA statement. More... | |
bool | reset_slave_cmd (THD *thd) |
Entry function for RESET REPLICA command. More... | |
static bool | have_change_replication_source_receive_option (const LEX_SOURCE_INFO *lex_mi) |
This function checks if the given CHANGE REPLICATION SOURCE command has any receive option being set or changed. More... | |
static bool | have_change_replication_source_execute_option (const LEX_SOURCE_INFO *lex_mi, bool *need_relay_log_purge) |
This function checks if the given CHANGE REPLICATION SOURCE command has any execute option being set or changed. More... | |
static bool | have_change_replication_source_applier_and_receive_option (const LEX_SOURCE_INFO *lex_mi) |
This function checks if the given CHANGE REPLICATION SOURCE command has any option that affect both the receiver and the applier. More... | |
static bool | change_master_set_compression (THD *, const LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This function checks all possible cases in which compression algorithm, compression level can be configured for a channel. More... | |
static int | change_receive_options (THD *thd, LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This function is called if the change replication source command had at least one receive option. More... | |
static bool | change_execute_options (LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This function is called if the change replication source command had at least one execute option. More... | |
static bool | change_applier_receiver_options (THD *thd, LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This function is called if the change replication source command had at least one option that affects both the receiver and applier parts. More... | |
static int | validate_gtid_option_restrictions (const LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This function validates that change replication source options are valid according to the current GTID_MODE. More... | |
bool | is_option_enabled_or_will_be (bool base_value, int option_value) |
This is an helper method for boolean vars like SOURCE_AUTO_POSITION REQUIRE_ROW_FORMAT SOURCE_CONNECTION_AUTO_FAILOVER It tells if the variable is already enabled or will be by the command. More... | |
int | evaluate_inter_option_dependencies (const LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
This method evaluates if the different options given to CHANGE REPLICATION SOURCE TO are compatible with the current configuration and with one another. More... | |
static void | log_invalid_position_warning (THD *thd, const LEX_SOURCE_INFO *lex_mi, Master_info *mi) |
Log a warning in case GTID_ONLY or SOURCE AUTO POSITION are disabled and the server contains invalid positions. More... | |
static std::pair< bool, bool > | validate_change_replication_source_options (THD *thd, const LEX_SOURCE_INFO *lex_mi, Master_info *mi, int thread_mask) |
This method aggregates the validation checks made for the command CHANGE REPLICATION SOURCE. More... | |
static bool | update_change_replication_source_options (THD *thd, LEX_SOURCE_INFO *lex_mi, Master_info *mi, bool have_both_receive_execute_option, bool have_execute_option, bool have_receive_option) |
This method aggregates the the instantiation of options for the command CHANGE REPLICATION SOURCE. More... | |
int | change_master (THD *thd, Master_info *mi, LEX_SOURCE_INFO *lex_mi, bool preserve_logs) |
Execute a CHANGE REPLICATION SOURCE statement. More... | |
int | add_new_channel (Master_info **mi, const char *channel) |
This function is first called when the Master_info object corresponding to a channel in a multisourced slave does not exist. More... | |
static bool | is_invalid_change_master_for_group_replication_recovery (const LEX_SOURCE_INFO *lex_mi) |
Method used to check if the user is trying to update any other option for the change replication source apart from the SOURCE_USER and SOURCE_PASSWORD. More... | |
static bool | is_invalid_change_master_for_group_replication_applier (const LEX_SOURCE_INFO *lex_mi) |
Method used to check if the user is trying to update any other option for the change replication source apart from the PRIVILEGE_CHECKS_USER. More... | |
bool | change_master_cmd (THD *thd) |
Entry point for the CHANGE REPLICATION SOURCE command. More... | |
Code to run the io thread and the sql thread on the replication slave.