MySQL 8.3.0
Source Code Documentation
rpl_channel_service_interface.cc File Reference
#include "sql/rpl_channel_service_interface.h"
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <atomic>
#include <map>
#include <sstream>
#include <utility>
#include "mutex_lock.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_inttypes.h"
#include "my_sys.h"
#include "my_thread.h"
#include "mysql/components/services/bits/psi_bits.h"
#include "mysql/components/services/bits/psi_stage_bits.h"
#include "mysql/components/services/log_builtins.h"
#include "mysql/my_loglevel.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_mutex.h"
#include "mysql/service_mysql_alloc.h"
#include "mysql_com.h"
#include "mysqld_error.h"
#include "sql/auth/sql_security_ctx.h"
#include "sql/binlog.h"
#include "sql/changestreams/apply/replication_thread_status.h"
#include "sql/current_thd.h"
#include "sql/log.h"
#include "sql/log_event.h"
#include "sql/mysqld.h"
#include "sql/mysqld_thd_manager.h"
#include "sql/protocol_classic.h"
#include "sql/raii/sentry.h"
#include "sql/rpl_async_conn_failover_configuration_propagation.h"
#include "sql/rpl_channel_credentials.h"
#include "sql/rpl_gtid.h"
#include "sql/rpl_info_factory.h"
#include "sql/rpl_info_handler.h"
#include "sql/rpl_mi.h"
#include "sql/rpl_msr.h"
#include "sql/rpl_mta_submode.h"
#include "sql/rpl_replica.h"
#include "sql/rpl_rli.h"
#include "sql/rpl_rli_pdb.h"
#include "sql/rpl_trx_boundary_parser.h"
#include "sql/sql_class.h"
#include "sql/sql_lex.h"

Classes

class  Kill_binlog_dump
 

Functions

int channel_stop (Master_info *mi, int threads_to_stop, long timeout)
 Auxiliary function to stop all the running channel threads according to the given mask. More...
 
int initialize_channel_service_interface ()
 Initializes channel structures if needed. More...
 
static void set_mi_settings (Master_info *mi, Channel_creation_info *channel_info)
 
static bool init_thread_context ()
 
static void clean_thread_context ()
 
static THDcreate_surrogate_thread ()
 
static void delete_surrogate_thread (THD *thd)
 
void initialize_channel_creation_info (Channel_creation_info *channel_info)
 
void initialize_channel_ssl_info (Channel_ssl_info *channel_ssl_info)
 
void initialize_channel_connection_info (Channel_connection_info *channel_info)
 
static void set_mi_ssl_options (LEX_MASTER_INFO *lex_mi, Channel_ssl_info *channel_ssl_info)
 
int channel_create (const char *channel, Channel_creation_info *channel_info)
 Initializes a channel connection in a similar way to a change master command. More...
 
int channel_start (const char *channel, Channel_connection_info *connection_info, int threads_to_start, int wait_for_connection, bool use_server_mta_configuration, bool channel_map_already_locked)
 Start the Applier/Receiver threads according to the given options. More...
 
int channel_stop (const char *channel, int threads_to_stop, long timeout)
 Stops the channel threads according to the given options. More...
 
int channel_stop_all (int threads_to_stop, long timeout, std::string *error_message)
 Stops all the running channel threads according to the given options. More...
 
int binlog_dump_thread_kill ()
 Kills the Binlog Dump threads. More...
 
int channel_purge_queue (const char *channel, bool reset_all)
 Purges the channel logs. More...
 
bool channel_is_active (const char *channel, enum_channel_thread_types thd_type)
 Tells if the selected component of the channel is active or not. More...
 
int channel_get_thread_id (const char *channel, enum_channel_thread_types thd_type, unsigned long **thread_id, bool need_lock)
 Returns the id(s) of the channel threads: receiver or applier. More...
 
long long channel_get_last_delivered_gno (const char *channel, int sidno)
 Returns last GNO from applier from a given UUID. More...
 
int channel_add_executed_gtids_to_received_gtids (const char *channel)
 Adds server executed GTID set to channel received GTID set. More...
 
int channel_queue_packet (const char *channel, const char *buf, unsigned long event_len)
 Queues a event packet into the current active channel. More...
 
int channel_wait_until_apply_queue_applied (const char *channel, double timeout)
 Checks if all the queued transactions were executed. More...
 
int channel_wait_until_transactions_applied (const char *channel, const char *gtid_set, double timeout, bool update_THD_status)
 Checks if all the transactions in the given set were executed. More...
 
int channel_is_applier_waiting (const char *channel)
 Checks if the applier, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier is waiting for transactions to be queued. More...
 
int channel_is_applier_thread_waiting (unsigned long thread_id, bool worker)
 Checks if the applier thread, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier thread is waiting for transactions to be queued. More...
 
int channel_flush (const char *channel)
 Flush the channel. More...
 
int channel_get_retrieved_gtid_set (const char *channel, char **retrieved_set)
 Returns the receiver thread retrieved GTID set in string format. More...
 
int channel_get_credentials (const char *channel, std::string &username, std::string &password)
 Method to get the credentials configured for a channel. More...
 
int channel_get_network_namespace (const char *channel, std::string &net_ns)
 Method to get the network namespace configured for a channel. More...
 
bool channel_is_stopping (const char *channel, enum_channel_thread_types thd_type)
 Tells if the selected component of the channel is stopping or not. More...
 
bool is_partial_transaction_on_channel_relay_log (const char *channel)
 Checks if the given channel's relaylog contains a partial transaction. More...
 
bool channel_has_same_uuid_as_group_name (const char *group_name)
 Checks if any running channel uses the same UUID for assign_gtids_to_anonymous_transactions as the group_name. More...
 
bool is_any_slave_channel_running (int thread_mask)
 Checks if any slave threads of any channel is running. More...
 
bool is_any_slave_channel_running_with_failover_enabled (int thread_mask)
 Checks if any slave threads of any channel configured with SOURCE_CONNECTION_AUTO_FAILOVER is running. More...
 
enum_slave_channel_status has_any_slave_channel_open_temp_table_or_is_its_applier_running ()
 Checks if any slave channel applier is running or any slave channel has open temporary table(s). More...
 
int channel_delete_credentials (const char *channel_name)
 Delete stored credentials from Slave_credentials. More...
 
bool start_failover_channels ()
 Start channels which have SOURCE_CONNECTION_AUTO_FAILOVER=1. More...
 
bool channel_change_source_connection_auto_failover (const char *channel, bool status)
 Set SOURCE_CONNECTION_AUTO_FAILOVER on the given channel to the given status value. More...
 
bool unset_source_connection_auto_failover_on_all_channels ()
 Unset SOURCE_CONNECTION_AUTO_FAILOVER=0 on all channels. More...
 
void reload_failover_channels_status ()
 Reload the status values on Rpl_acf_status_configuration singleton. More...
 
bool get_replication_failover_channels_configuration (std::string &serialized_configuration)
 Get replication failover channels configuration in a serialized protobuf_replication_asynchronous_connection_failover::SourceAndManagedAndStatusList message. More...
 
bool set_replication_failover_channels_configuration (const std::vector< std::string > &exchanged_replication_failover_channels_serialized_configuration)
 Set replication failover channels configuration that was received from the group. More...
 
bool force_my_replication_failover_channels_configuration_on_all_members ()
 Collect and broadcast the replication failover channels configuration in a serialized protobuf_replication_asynchronous_connection_failover::SourceAndManagedAndStatusList message, that will override the configuration on all group members. More...
 

Function Documentation

◆ binlog_dump_thread_kill()

int binlog_dump_thread_kill ( )

Kills the Binlog Dump threads.

Returns
the operation status
Return values
0OK

◆ channel_add_executed_gtids_to_received_gtids()

int channel_add_executed_gtids_to_received_gtids ( const char *  channel)

Adds server executed GTID set to channel received GTID set.

Parameters
channelthe channel name
Returns
the operation status
Return values
0OK
!=0 Error

◆ channel_change_source_connection_auto_failover()

bool channel_change_source_connection_auto_failover ( const char *  channel,
bool  status 
)

Set SOURCE_CONNECTION_AUTO_FAILOVER on the given channel to the given status value.

Parameters
[in]channelThe channel name
[in]statustrue, enables SOURCE_CONNECTION_AUTO_FAILOVER false, disables SOURCE_CONNECTION_AUTO_FAILOVER
Returns
the operation status
Return values
falseOK
trueError

◆ channel_create()

int channel_create ( const char *  channel,
Channel_creation_info channel_information 
)

Initializes a channel connection in a similar way to a change master command.

Note
If the channel exists, it is reconfigured with the new options. About the logs, the preserve_relay_logs option allows the user to maintain them untouched.
Parameters
channelThe channel name
channel_informationChannel creation information.
Returns
the operation status
Return values
0OK
!=0Error on channel creation

◆ channel_delete_credentials()

int channel_delete_credentials ( const char *  channel_name)

Delete stored credentials from Slave_credentials.

Parameters
[in]channel_nameThe channel name
Returns
the operation status
Return values
0OK
1Error, channel not found

◆ channel_flush()

int channel_flush ( const char *  channel)

Flush the channel.

Returns
the operation status
Return values
0OK
!=0 Error on flush

◆ channel_get_credentials()

int channel_get_credentials ( const char *  channel,
std::string &  user,
std::string &  password 
)

Method to get the credentials configured for a channel.

Parameters
[in]channelThe channel name
[out]userThe user to extract
[out]passwordThe password to extract
Returns
the operation status
Return values
falseOK
trueError, channel not found

◆ channel_get_last_delivered_gno()

long long channel_get_last_delivered_gno ( const char *  channel,
int  sidno 
)

Returns last GNO from applier from a given UUID.

Parameters
channelthe channel name
sidnothe uuid associated to the desired gno
Returns
the last applier gno
Return values
<0the channel does no exists, or the applier is not present
>0the gno

◆ channel_get_network_namespace()

int channel_get_network_namespace ( const char *  channel,
std::string &  net_ns 
)

Method to get the network namespace configured for a channel.

Parameters
[in]channelThe channel name
[out]net_nsThe network namespace to extract
Returns
the operation status
Return values
falseOK
trueError, channel not found

◆ channel_get_retrieved_gtid_set()

int channel_get_retrieved_gtid_set ( const char *  channel,
char **  retrieved_set 
)

Returns the receiver thread retrieved GTID set in string format.

Parameters
channelThe channel name.
[out]retrieved_setPointer to pointer to string. The function will set it to point to a newly allocated buffer, or NULL on out of memory.
Returns
the operation status
Return values
0OK
!=0Error on retrieval

◆ channel_get_thread_id()

int channel_get_thread_id ( const char *  channel,
enum_channel_thread_types  thread_type,
unsigned long **  thread_id,
bool  need_lock = true 
)

Returns the id(s) of the channel threads: receiver or applier.

If more than one applier exists, an array is returned, on which first index is coordinator thread id.

Parameters
[in]channelThe channel name
[in]thread_typeThe thread type (receiver or applier)
[out]thread_idThe array of id(s)
[in]need_lockIs channel_map read lock needed?
Returns
the number of returned ids
Return values
-1the channel does no exists, or the thread is not present
>0the number of thread ids returned.

◆ channel_has_same_uuid_as_group_name()

bool channel_has_same_uuid_as_group_name ( const char *  group_name)

Checks if any running channel uses the same UUID for assign_gtids_to_anonymous_transactions as the group_name.

Parameters
[in]group_namethe group name
Return values
trueat least one channel has the same uuid
falsenone of the the channels have the same uuid

◆ channel_is_active()

bool channel_is_active ( const char *  channel,
enum_channel_thread_types  type 
)

Tells if the selected component of the channel is active or not.

If no component is passed, this method returns if the channel exists or not

Parameters
channelThe channel name
typeThe thread that should be checked. If 0, this method applies to the channel existence.
Returns
is the channel (component) active
Return values
trueYes
falseNo

◆ channel_is_applier_thread_waiting()

int channel_is_applier_thread_waiting ( unsigned long  thread_id,
bool  worker = false 
)

Checks if the applier thread, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier thread is waiting for transactions to be queued.

Parameters
thread_idthe applier thread id to check
workerflag to indicate if thread is a parallel worker
Returns
the operation status
Return values
-1Unable to find applier thread
0Applier thread is not waiting
1Applier thread is waiting

◆ channel_is_applier_waiting()

int channel_is_applier_waiting ( const char *  channel)

Checks if the applier, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier is waiting for transactions to be queued.

Parameters
channelThe channel name
Returns
the operation status
Return values
<0Error
0Applier is not waiting
1Applier is waiting

◆ channel_is_stopping()

bool channel_is_stopping ( const char *  channel,
enum_channel_thread_types  type 
)

Tells if the selected component of the channel is stopping or not.

Parameters
channelThe channel name
typeThe thread that should be checked.
Returns
is the channel (component) stopping
Return values
trueYes
falseNo, no type was specified or the channel does not exist.

◆ channel_purge_queue()

int channel_purge_queue ( const char *  channel,
bool  reset_all 
)

Purges the channel logs.

Parameters
channelThe channel name
reset_allIf true, the method will purge logs and remove the channel If false, only the channel information will be reset.
Returns
the operation status
Return values
0OK
!=0Error

◆ channel_queue_packet()

int channel_queue_packet ( const char *  channel,
const char *  buf,
unsigned long  len 
)

Queues a event packet into the current active channel.

Parameters
channelthe channel name
bufthe event buffer
lenthe event buffer length
Returns
the operation status
Return values
0OK
!=0 Error on queue

◆ channel_start()

int channel_start ( const char *  channel,
Channel_connection_info connection_info,
int  threads_to_start,
int  wait_for_connection,
bool  use_server_mta_configuration = false,
bool  channel_map_already_locked = false 
)

Start the Applier/Receiver threads according to the given options.

If the receiver thread is to be started, connection credential must be supported.

Parameters
channelThe channel name
connection_infoChannel connection information
threads_to_startThe types of threads to be started
wait_for_connectionIf when starting the receiver, the method should wait for the connection to succeed
use_server_mta_configurationIf true, the channel uses the server parallel applier configuration when starting the applier thread, instead of the configuration given on channel_create()
channel_map_already_lockedIf set to true, will not acquire a write lock of channel_map
Returns
the operation status
Return values
0OK
!=0Error

◆ channel_stop() [1/2]

int channel_stop ( const char *  channel,
int  threads_to_stop,
long  timeout 
)

Stops the channel threads according to the given options.

Parameters
channelThe channel name
threads_to_stopThe types of threads to be stopped
timeoutThe expected time in which the thread should stop
Returns
the operation status
Return values
0OK
!=0Error

◆ channel_stop() [2/2]

int channel_stop ( Master_info mi,
int  threads_to_stop,
long  timeout 
)

Auxiliary function to stop all the running channel threads according to the given mask.

Note
: The caller shall possess channel_map lock before calling this function, and unlock after returning from this function.
Parameters
miThe pointer to Master_info instance
threads_to_stopThe types of threads to be stopped
timeoutThe expected time in which the thread should stop
Returns
the operation status
Return values
0OK
!=0Error

◆ channel_stop_all()

int channel_stop_all ( int  threads_to_stop,
long  timeout,
std::string *  error_message 
)

Stops all the running channel threads according to the given options.

Parameters
threads_to_stopThe types of threads to be stopped
timeoutThe expected time in which the thread should stop
error_messageThe returned error_message
Returns
the operation status
Return values
0OK
!=0Error

◆ channel_wait_until_apply_queue_applied()

int channel_wait_until_apply_queue_applied ( const char *  channel,
double  timeout 
)

Checks if all the queued transactions were executed.

Note
This method assumes that the channel is not receiving any more events. If it is still receiving, then the method should wait for execution of transactions that were present when this method was invoked.
Parameters
channelthe channel name
timeoutthe time (seconds) after which the method returns if the above condition was not satisfied
Returns
the operation status
Return values
0All transactions were executed
REPLICATION_THREAD_WAIT_TIMEOUT_ERRORA timeout occurred
REPLICATION_THREAD_WAIT_NO_INFO_ERRORAn error occurred

◆ channel_wait_until_transactions_applied()

int channel_wait_until_transactions_applied ( const char *  channel,
const char *  gtid_set,
double  timeout,
bool  update_THD_status = true 
)

Checks if all the transactions in the given set were executed.

Parameters
channelthe channel name
gtid_setthe set in string format of transaction to wait for
timeoutthe time (seconds) after which the method returns if the above condition was not satisfied
update_THD_statusShall the method update the THD stage
Returns
the operation status
Return values
0All transactions were executed
REPLICATION_THREAD_WAIT_TIMEOUT_ERRORA timeout occurred
REPLICATION_THREAD_WAIT_NO_INFO_ERRORAn error occurred

◆ clean_thread_context()

static void clean_thread_context ( )
static

◆ create_surrogate_thread()

static THD * create_surrogate_thread ( )
static

◆ delete_surrogate_thread()

static void delete_surrogate_thread ( THD thd)
static

◆ force_my_replication_failover_channels_configuration_on_all_members()

bool force_my_replication_failover_channels_configuration_on_all_members ( )

Collect and broadcast the replication failover channels configuration in a serialized protobuf_replication_asynchronous_connection_failover::SourceAndManagedAndStatusList message, that will override the configuration on all group members.

Returns
the operation status
Return values
falseOK
trueError

◆ get_replication_failover_channels_configuration()

bool get_replication_failover_channels_configuration ( std::string &  serialized_configuration)

Get replication failover channels configuration in a serialized protobuf_replication_asynchronous_connection_failover::SourceAndManagedAndStatusList message.

Parameters
[out]serialized_configurationthe serialized configuration
Returns
the operation status
Return values
falseOK
trueError

◆ has_any_slave_channel_open_temp_table_or_is_its_applier_running()

enum_slave_channel_status has_any_slave_channel_open_temp_table_or_is_its_applier_running ( )

Checks if any slave channel applier is running or any slave channel has open temporary table(s).

This holds handled appliers' run_locks until finding a running slave channel applier or a slave channel which has open temporary table(s), or handling all slave channels.

Returns
SLAVE_CHANNEL_NO_APPLIER_RUNNING_AND_NO_OPEN_TEMPORARY_TABLE, SLAVE_CHANNEL_APPLIER_IS_RUNNING or SLAVE_CHANNEL_HAS_OPEN_TEMPORARY_TABLE.

◆ init_thread_context()

static bool init_thread_context ( )
static

◆ initialize_channel_connection_info()

void initialize_channel_connection_info ( Channel_connection_info channel_info)

◆ initialize_channel_creation_info()

void initialize_channel_creation_info ( Channel_creation_info channel_info)

◆ initialize_channel_service_interface()

int initialize_channel_service_interface ( )

Initializes channel structures if needed.

Returns
the operation status
Return values
0OK
!=0 Error on queue

◆ initialize_channel_ssl_info()

void initialize_channel_ssl_info ( Channel_ssl_info channel_ssl_info)

◆ is_any_slave_channel_running()

bool is_any_slave_channel_running ( int  thread_mask)

Checks if any slave threads of any channel is running.

Parameters
[in]thread_masktype of slave thread- IO/SQL or any
Return values
trueat least one channel thread is running.
falsenone of the the channels are running.

◆ is_any_slave_channel_running_with_failover_enabled()

bool is_any_slave_channel_running_with_failover_enabled ( int  thread_mask)

Checks if any slave threads of any channel configured with SOURCE_CONNECTION_AUTO_FAILOVER is running.

Parameters
[in]thread_masktype of slave thread- IO/SQL or any
Return values
trueat least one channel threads are running.
falsenone of the the channels are running.

◆ is_partial_transaction_on_channel_relay_log()

bool is_partial_transaction_on_channel_relay_log ( const char *  channel)

Checks if the given channel's relaylog contains a partial transaction.

Parameters
channelThe channel name
Return values
trueIf relaylog contains partial transcation.
falseIf relaylog does not contain partial transaction.

◆ reload_failover_channels_status()

void reload_failover_channels_status ( )

Reload the status values on Rpl_acf_status_configuration singleton.

◆ set_mi_settings()

static void set_mi_settings ( Master_info mi,
Channel_creation_info channel_info 
)
static

◆ set_mi_ssl_options()

static void set_mi_ssl_options ( LEX_MASTER_INFO lex_mi,
Channel_ssl_info channel_ssl_info 
)
static

◆ set_replication_failover_channels_configuration()

bool set_replication_failover_channels_configuration ( const std::vector< std::string > &  exchanged_replication_failover_channels_serialized_configuration)

Set replication failover channels configuration that was received from the group.

Each member of the group will send its own configuration in a serialized protobuf_replication_asynchronous_connection_failover::SourceAndManagedAndStatusList message.

Parameters
[in]exchanged_replication_failover_channels_serialized_configurationvector with the serialized configuration from each member
Returns
the operation status
Return values
falseOK
trueError

◆ start_failover_channels()

bool start_failover_channels ( )

Start channels which have SOURCE_CONNECTION_AUTO_FAILOVER=1.

Returns
the operation status
Return values
falseOK
trueError

◆ unset_source_connection_auto_failover_on_all_channels()

bool unset_source_connection_auto_failover_on_all_channels ( )

Unset SOURCE_CONNECTION_AUTO_FAILOVER=0 on all channels.

Returns
the operation status
Return values
falseOK
trueError