MySQL 9.5.0
Source Code Documentation
rpl_rli_pdb.cc File Reference
#include "sql/rpl_rli_pdb.h"
#include "my_config.h"
#include <string.h>
#include <sys/time.h>
#include <stdio.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include "lex_string.h"
#include "m_string.h"
#include "map_helpers.h"
#include "my_bitmap.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_sys.h"
#include "my_systime.h"
#include "my_thread.h"
#include "mysql/components/services/bits/psi_stage_bits.h"
#include "mysql/components/services/log_builtins.h"
#include "mysql/plugin.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_file.h"
#include "mysql/psi/mysql_mutex.h"
#include "mysql/thread_type.h"
#include "mysqld_error.h"
#include "scope_guard.h"
#include "sql/binlog.h"
#include "sql/binlog_reader.h"
#include "sql/current_thd.h"
#include "sql/debug_sync.h"
#include "sql/log.h"
#include "sql/mdl.h"
#include "sql/mysqld.h"
#include "sql/psi_memory_key.h"
#include "sql/raii/sentry.h"
#include "sql/rpl_info_handler.h"
#include "sql/rpl_msr.h"
#include "sql/rpl_replica_commit_order_manager.h"
#include "sql/rpl_reporting.h"
#include "sql/sql_error.h"
#include "sql/sql_lex.h"
#include "sql/table.h"
#include "sql/transaction_info.h"
#include "string_with_len.h"
#include "strmake.h"
#include "thr_mutex.h"

Macros

#define HASH_DYNAMIC_INIT   4
 

Enumerations

enum  { LINE_FOR_CHANNEL = 12 }
 

Functions

bool handle_slave_worker_stop (Slave_worker *worker, Slave_job_item *job_item)
 This function is called by both coordinator and workers. More...
 
bool set_max_updated_index_on_stop (Slave_worker *worker, Slave_job_item *job_item)
 This function is called by both coordinator and workers. More...
 
TABLEmts_move_temp_tables_to_thd (THD *thd, TABLE *temporary_tables)
 Relocation of the list of temporary tables to thd->temporary_tables. More...
 
Slave_workerget_least_occupied_worker (Relay_log_info *rli, Slave_worker_array *ws, Log_event *ev)
 Get the least occupied worker. More...
 
static bool may_have_timestamp (Log_event *ev)
 
static int64 get_last_committed (Log_event *ev)
 
static int64 get_sequence_number (Log_event *ev)
 
bool append_item_to_jobs (slave_job_item *job_item, Slave_worker *worker, Relay_log_info *rli)
 Coordinator enqueues a job item into a Worker private queue. More...
 
static void remove_item_from_jobs (slave_job_item *job_item, Slave_worker *worker, Relay_log_info *rli)
 Remove a job item from the given workers job queue. More...
 
static struct slave_job_itempop_jobs_item (Slave_worker *worker, Slave_job_item *job_item)
 Worker's routine to wait for a new assignment through append_item_to_jobs() More...
 
void report_error_to_coordinator (Slave_worker *worker)
 Report a not yet reported error to the coordinator if necessary. More...
 
int slave_worker_exec_job_group (Slave_worker *worker, Relay_log_info *rli)
 apply one job group. More...
 

Variables

ulong w_rr = 0
 
uint mta_debug_concurrent_access = 0
 
const char * info_slave_worker_fields []
 
const uint info_slave_worker_table_pk_field_indexes []
 

Macro Definition Documentation

◆ HASH_DYNAMIC_INIT

#define HASH_DYNAMIC_INIT   4

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
LINE_FOR_CHANNEL 

Function Documentation

◆ append_item_to_jobs()

bool append_item_to_jobs ( slave_job_item job_item,
Slave_worker worker,
Relay_log_info rli 
)

Coordinator enqueues a job item into a Worker private queue.

Parameters
job_itema pointer to struct carrying a reference to an event
workera pointer to the assigned Worker struct
rlia pointer to Relay_log_info of Coordinator
Returns
false Success. true Thread killed or worker stopped while waiting for successful enqueue.

◆ get_last_committed()

static int64 get_last_committed ( Log_event ev)
static

◆ get_least_occupied_worker()

Slave_worker * get_least_occupied_worker ( Relay_log_info rli,
Slave_worker_array ws,
Log_event ev 
)

Get the least occupied worker.

Legends running throughout the module:

Parameters
rlipointer to Relay_log_info of Coordinator
wsdynarray of pointers to Slave_worker
evevent for which we are searching for a worker
Returns
a pointer to chosen Slave_worker instance

◆ get_sequence_number()

static int64 get_sequence_number ( Log_event ev)
static

◆ handle_slave_worker_stop()

bool handle_slave_worker_stop ( Slave_worker worker,
Slave_job_item job_item 
)

This function is called by both coordinator and workers.

Upon receiving the STOP command, the workers will identify a maximum group index already executed (or under execution).

All groups whose index are below or equal to the maximum group index will be applied by the workers before stopping.

The workers with groups above the maximum group index will exit without applying these groups by setting their running status to "STOP_ACCEPTED".

Parameters
workera pointer to the waiting Worker struct
job_itema pointer to struct carrying a reference to an event
Returns
true if STOP command gets accepted otherwise false is returned.

◆ may_have_timestamp()

static bool may_have_timestamp ( Log_event ev)
static

◆ mts_move_temp_tables_to_thd()

TABLE * mts_move_temp_tables_to_thd ( THD thd,
TABLE temporary_tables 
)

Relocation of the list of temporary tables to thd->temporary_tables.

Parameters
thdTHD instance pointer of the destination
temporary_tablesthe source temporary_tables list
Note
destroying references to the source list, if necessary, is left to the caller.
Returns
the post-merge value of thd->temporary_tables.

◆ pop_jobs_item()

static struct slave_job_item * pop_jobs_item ( Slave_worker worker,
Slave_job_item job_item 
)
static

Worker's routine to wait for a new assignment through append_item_to_jobs()

Parameters
workera pointer to the waiting Worker struct
job_itema pointer to struct carrying a reference to an event
Returns
NULL failure or a-pointer to an item.

◆ remove_item_from_jobs()

static void remove_item_from_jobs ( slave_job_item job_item,
Slave_worker worker,
Relay_log_info rli 
)
static

Remove a job item from the given workers job queue.

It also updates related status.

param[in] job_item The job item will be removed param[in] worker The worker which job_item belongs to. param[in] rli slave's relay log info object.

◆ report_error_to_coordinator()

void report_error_to_coordinator ( Slave_worker worker)

Report a not yet reported error to the coordinator if necessary.

All issues detected when applying binary log events are reported using rli->report(), but when an issue is not reported by the log event being applied, there is a workaround at handle_slave_sql() to report the issue also using rli->report() for the STS applier (or the MTS coordinator).

This function implements the workaround for a MTS worker.

Parameters
workerthe worker to be evaluated.

◆ set_max_updated_index_on_stop()

bool set_max_updated_index_on_stop ( Slave_worker worker,
Slave_job_item job_item 
)

This function is called by both coordinator and workers.

Both coordinator and workers contribute to max_updated_index.

Parameters
workera pointer to the waiting Worker struct
job_itema pointer to struct carrying a reference to an event
Returns
true if STOP command gets accepted otherwise false is returned.

◆ slave_worker_exec_job_group()

int slave_worker_exec_job_group ( Slave_worker worker,
Relay_log_info rli 
)

apply one job group.

Note
the function maintains worker's CGEP and modifies APH, updates the current group item in GAQ via slave_worker_ends_group().

param[in] worker the worker which calls it. param[in] rli slave's relay log info object.

return returns 0 if the group of jobs are applied successfully, otherwise returns an error code.

Variable Documentation

◆ info_slave_worker_fields

const char* info_slave_worker_fields[]
Initial value:
= {
"id",
"group_relay_log_name", "group_relay_log_pos", "group_source_log_name",
"group_source_log_pos",
"checkpoint_relay_log_name", "checkpoint_relay_log_pos",
"checkpoint_source_log_name", "checkpoint_source_log_pos",
"checkpoint_seqno",
"checkpoint_group_size",
"checkpoint_group_bitmap",
"channel_name"}

◆ info_slave_worker_table_pk_field_indexes

const uint info_slave_worker_table_pk_field_indexes[]
Initial value:
= {
0,
}
@ LINE_FOR_CHANNEL
Definition: rpl_rli_pdb.cc:231

◆ mta_debug_concurrent_access

uint mta_debug_concurrent_access = 0

◆ w_rr

ulong w_rr = 0