MySQL 9.1.0
Source Code Documentation
|
#include <stdarg.h>
#include <sys/types.h>
#include <time.h>
#include <atomic>
#include <tuple>
#include "my_bitmap.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_inttypes.h"
#include "my_io.h"
#include "my_psi_config.h"
#include "mysql/binlog/event/binlog_event.h"
#include "mysql/components/services/bits/mysql_cond_bits.h"
#include "mysql/components/services/bits/mysql_mutex_bits.h"
#include "mysql/components/services/bits/psi_bits.h"
#include "mysql/components/services/bits/psi_mutex_bits.h"
#include "mysql/my_loglevel.h"
#include "mysql/service_mysql_alloc.h"
#include "prealloced_array.h"
#include "sql/changestreams/apply/metrics/applier_metrics.h"
#include "sql/changestreams/apply/metrics/dummy_worker_metrics.h"
#include "sql/changestreams/apply/metrics/mta_worker_metrics.h"
#include "sql/log_event.h"
#include "sql/rpl_gtid.h"
#include "sql/rpl_mta_submode.h"
#include "sql/rpl_replica.h"
#include "sql/rpl_rli.h"
#include "sql/sql_class.h"
#include "sql/system_variables.h"
Go to the source code of this file.
Classes | |
struct | db_worker_hash_entry |
Legends running throughout the module: More... | |
struct | Slave_job_group |
class | circular_buffer_queue< Element_type > |
The class defines a type of queue with a predefined max capacity that is implemented using the circular memory buffer. More... | |
class | Slave_committed_queue |
Group Assigned Queue whose first element identifies first gap in committed sequence. More... | |
class | Slave_jobs_queue |
class | Slave_worker |
Macros | |
#define | SLAVE_INIT_DBS_IN_GROUP 4 |
Functions | |
bool | init_hash_workers (Relay_log_info *rli) |
void | destroy_hash_workers (Relay_log_info *) |
Slave_worker * | map_db_to_worker (const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w) |
The function produces a reference to the struct of a Worker that has been or will be engaged to process the dbname -keyed partition (D). More... | |
Slave_worker * | get_least_occupied_worker (Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev) |
Get the least occupied worker. More... | |
bool | handle_slave_worker_stop (Slave_worker *worker, Slave_job_item *job_item) |
This function is called by both coordinator and workers. More... | |
bool | set_max_updated_index_on_stop (Slave_worker *worker, Slave_job_item *job_item) |
This function is called by both coordinator and workers. More... | |
TABLE * | mts_move_temp_table_to_entry (TABLE *, THD *, db_worker_hash_entry *) |
Relocating temporary table reference into entry's table list head. More... | |
TABLE * | mts_move_temp_tables_to_thd (THD *, TABLE *) |
Relocation of the list of temporary tables to thd->temporary_tables. More... | |
TABLE * | mts_move_temp_tables_to_thd (THD *, TABLE *, enum_mts_parallel_type) |
bool | append_item_to_jobs (slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli) |
Coordinator enqueues a job item into a Worker private queue. More... | |
Slave_worker * | get_thd_worker (const THD *thd) |
int | slave_worker_exec_job_group (Slave_worker *w, Relay_log_info *rli) |
apply one job group. More... | |
Variables | |
ulong | w_rr |
#define SLAVE_INIT_DBS_IN_GROUP 4 |
bool append_item_to_jobs | ( | slave_job_item * | job_item, |
Slave_worker * | worker, | ||
Relay_log_info * | rli | ||
) |
Coordinator enqueues a job item into a Worker private queue.
job_item | a pointer to struct carrying a reference to an event |
worker | a pointer to the assigned Worker struct |
rli | a pointer to Relay_log_info of Coordinator |
void destroy_hash_workers | ( | Relay_log_info * | rli | ) |
Slave_worker * get_least_occupied_worker | ( | Relay_log_info * | rli, |
Slave_worker_array * | ws, | ||
Log_event * | ev | ||
) |
Get the least occupied worker.
rli | pointer to Relay_log_info of Coordinator |
ws | dynarray of pointers to Slave_worker |
ev | event for which we are searching for a worker |
|
inline |
bool handle_slave_worker_stop | ( | Slave_worker * | worker, |
Slave_job_item * | job_item | ||
) |
This function is called by both coordinator and workers.
Upon receiving the STOP command, the workers will identify a maximum group index already executed (or under execution).
All groups whose index are below or equal to the maximum group index will be applied by the workers before stopping.
The workers with groups above the maximum group index will exit without applying these groups by setting their running status to "STOP_ACCEPTED".
worker | a pointer to the waiting Worker struct |
job_item | a pointer to struct carrying a reference to an event |
bool init_hash_workers | ( | Relay_log_info * | rli | ) |
Slave_worker * map_db_to_worker | ( | const char * | dbname, |
Relay_log_info * | rli, | ||
db_worker_hash_entry ** | ptr_entry, | ||
bool | need_temp_tables, | ||
Slave_worker * | last_worker | ||
) |
The function produces a reference to the struct of a Worker that has been or will be engaged to process the dbname
-keyed partition (D).
It checks a local to Coordinator CGAP list first and returns last_assigned_worker
when found (todo: assert).
Otherwise, the partition is appended to the current group list:
CGAP .= D
here .= is concatenate operation, and a possible D's Worker id is searched in Assigned Partition Hash (APH) that collects tuples (P, W_id, U, mutex, cond). In case not found,
W_d := W_c unless W_c is NULL.
When W_c is NULL it is assigned to a least occupied as defined by get_least_occupied_worker()
.
W_d := W_c := W_{least_occupied} APH .= a new (D, W_d, 1)
In a case APH contains W_d == W_c, (assert U >= 1)
update APH set U++ where APH.P = D
The case APH contains a W_d != W_c != NULL assigned to D-partition represents the hashing conflict and is handled as the following:
a. marks the record of APH with a flag requesting to signal in the cond var when ‘U’ the usage counter drops to zero by the other Worker; b. waits for the other Worker to finish tasks on that partition and gets the signal; c. updates the APH record to point to the first Worker (naturally, U := 1), scheduled the event, and goes back into the parallel mode
dbname | pointer to c-string containing database name It can be empty string to indicate specific locking to facilitate sequential applying. |
rli | pointer to Coordinators relay-log-info instance |
ptr_entry | reference to a pointer to the resulted entry in the Assigned Partition Hash where the entry's pointer is stored at return. |
need_temp_tables | if false migration of temporary tables not needed |
last_worker | caller opts for this Worker, it must be rli->last_assigned_worker if one is determined. |
dbname
-keyd temporary tables from C's thd->temporary_tables to move them into the entry record.TABLE * mts_move_temp_table_to_entry | ( | TABLE * | table, |
THD * | thd, | ||
db_worker_hash_entry * | entry | ||
) |
Relocating temporary table reference into entry's
table list head.
Sources can be the coordinator's and the Worker's thd->temporary_tables.
table | TABLE instance pointer |
thd | THD instance pointer of the source of relocation |
entry | db_worker_hash_entry instance pointer |
Relocation of the list of temporary tables to thd->temporary_tables.
thd | THD instance pointer of the destination |
temporary_tables | the source temporary_tables list |
TABLE * mts_move_temp_tables_to_thd | ( | THD * | , |
TABLE * | , | ||
enum_mts_parallel_type | |||
) |
bool set_max_updated_index_on_stop | ( | Slave_worker * | worker, |
Slave_job_item * | job_item | ||
) |
This function is called by both coordinator and workers.
Both coordinator and workers contribute to max_updated_index.
worker | a pointer to the waiting Worker struct |
job_item | a pointer to struct carrying a reference to an event |
int slave_worker_exec_job_group | ( | Slave_worker * | worker, |
Relay_log_info * | rli | ||
) |
apply one job group.
slave_worker_ends_group()
.param[in] worker the worker which calls it. param[in] rli slave's relay log info object.
return returns 0 if the group of jobs are applied successfully, otherwise returns an error code.
|
extern |