MySQL 8.0.39
Source Code Documentation
Applier_module Class Reference

#include <applier.h>

Inheritance diagram for Applier_module:
[legend]

Public Member Functions

 Applier_module ()
 
 ~Applier_module () override
 
int initialize_applier_thread ()
 Initializes and launches the applier thread. More...
 
Pipeline_member_statsget_local_pipeline_stats ()
 Return the local applier stats. More...
 
int terminate_applier_thread ()
 Terminates the applier thread. More...
 
bool is_applier_thread_aborted ()
 Is the applier marked for shutdown? More...
 
bool is_running ()
 Is the applier running? More...
 
int setup_applier_module (Handler_pipeline_type pipeline_type, bool reset_logs, ulong stop_timeout, rpl_sidno group_sidno, ulonglong gtid_assignment_block_size)
 Configure the applier pipeline according to the given configuration. More...
 
int setup_pipeline_handlers ()
 Configure the applier pipeline handlers. More...
 
int purge_applier_queue_and_restart_applier_module () override
 Purges the relay logs and restarts the applier thread. More...
 
int applier_thread_handle ()
 Runs the applier thread process, reading events and processing them. More...
 
int handle (const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key) override
 Queues the packet coming from the reader for future application. More...
 
int handle_pipeline_action (Pipeline_action *action) override
 Gives the pipeline an action for execution. More...
 
int inject_event_into_pipeline (Pipeline_event *pevent, Continuation *cont)
 Injects an event into the pipeline and waits for its handling. More...
 
int terminate_applier_pipeline ()
 Terminates the pipeline, shutting down the handlers and deleting them. More...
 
void set_stop_wait_timeout (ulong timeout)
 Sets the applier shutdown timeout. More...
 
void inform_of_applier_stop (char *channel_name, bool aborted)
 This method informs the applier module that an applying thread stopped. More...
 
void ignore_errors_during_stop (bool ignore_errors)
 Check whether to ignore applier errors during stop or not. More...
 
void add_suspension_packet () override
 Queues a packet that will eventually make the applier module suspend. More...
 
void add_termination_packet ()
 Queues a packet that will make the applier module terminate it's handling process. More...
 
void add_view_change_packet (View_change_packet *packet) override
 Queues a view change packet into the applier. More...
 
void add_single_primary_action_packet (Single_primary_action_packet *packet) override
 Queues a single primary action packet into the applier. More...
 
void add_transaction_prepared_action_packet (Transaction_prepared_action_packet *packet) override
 Queues a transaction prepared action packet into the applier. More...
 
void add_sync_before_execution_action_packet (Sync_before_execution_action_packet *packet) override
 Queues a synchronization before execution action packet into the applier. More...
 
void add_leaving_members_action_packet (Leaving_members_action_packet *packet) override
 Queues a leaving members action packet into the applier. More...
 
virtual void queue_certification_enabling_packet ()
 Queues a single a packet that will enable certification on this member. More...
 
void awake_applier_module () override
 Awakes the applier module. More...
 
int wait_for_applier_complete_suspension (bool *abort_flag, bool wait_for_execution=true) override
 Waits for the applier to suspend and apply all the transactions previous to the suspend request. More...
 
void interrupt_applier_suspension_wait () override
 Interrupts the current applier waiting process either for it's suspension or it's wait for the consumption of pre suspension events. More...
 
bool is_applier_thread_waiting ()
 Checks if the applier, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier is waiting for transactions to be queued. More...
 
int wait_for_applier_event_execution (double timeout, bool check_and_purge_partial_transactions) override
 Waits for the execution of all events by part of the current SQL applier. More...
 
bool wait_for_current_events_execution (std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true) override
 Waits for the execution of all current events by part of the current SQL applier. More...
 
bool get_retrieved_gtid_set (std::string &retrieved_set) override
 Returns the retrieved gtid set for the applier channel. More...
 
int wait_for_applier_event_execution (std::string &retrieved_set, double timeout, bool update_THD_status=true) override
 Waits for the execution of all events in the given set by the current SQL applier. More...
 
Certification_handlerget_certification_handler () override
 Returns the handler instance in the applier module responsible for certification. More...
 
size_t get_message_queue_size () override
 Returns the applier module's queue size. More...
 
Member_applier_state get_applier_status () override
 
Pipeline_stats_member_collectorget_pipeline_stats_member_collector () override
 
Flow_control_moduleget_flow_control_module () override
 
void run_flow_control_step () override
 
bool queue_and_wait_on_queue_checkpoint (std::shared_ptr< Continuation > checkpoint_condition) override
 
- Public Member Functions inherited from Applier_module_interface
virtual ~Applier_module_interface ()=default
 

Private Member Functions

bool apply_action_packet (Action_packet *action_packet)
 Apply an action packet received by the applier. More...
 
int apply_view_change_packet (View_change_packet *view_change_packet, Format_description_log_event *fde_evt, Continuation *cont)
 Apply a View Change packet received by the applier. More...
 
int apply_data_packet (Data_packet *data_packet, Format_description_log_event *fde_evt, Continuation *cont)
 Apply a Data packet received by the applier. More...
 
int apply_single_primary_action_packet (Single_primary_action_packet *packet)
 Apply an single primary action packet received by the applier. More...
 
int apply_transaction_prepared_action_packet (Transaction_prepared_action_packet *packet)
 Apply a transaction prepared action packet received by the applier. More...
 
int apply_sync_before_execution_action_packet (Sync_before_execution_action_packet *packet)
 Apply a synchronization before execution action packet received by the applier. More...
 
int apply_leaving_members_action_packet (Leaving_members_action_packet *packet)
 Apply a leaving members action packet received by the applier. More...
 
void suspend_applier_module ()
 Suspends the applier module, being transactions still queued in the incoming queue. More...
 
void clean_applier_thread_context ()
 Cleans the thread context for the applier thread This includes such tasks as removing the thread from the global thread list. More...
 
void set_applier_thread_context ()
 Set the thread context for the applier thread. More...
 
int intersect_group_executed_sets (std::vector< std::string > &gtid_sets, Gtid_set *output_set)
 This method calculates the intersection of the given sets passed as a list of strings. More...
 

Private Attributes

my_thread_handle applier_pthd
 
THDapplier_thd
 
bool reset_applier_logs
 
rpl_sidno group_replication_sidno
 
ulonglong gtid_assignment_block_size
 
mysql_mutex_t run_lock
 
mysql_cond_t run_cond
 
thread_state applier_thd_state
 
bool applier_aborted
 
int applier_error
 
bool applier_killed_status
 
bool m_ignore_applier_errors_during_stop {false}
 
mysql_mutex_t suspend_lock
 
mysql_cond_t suspend_cond
 
bool suspended
 
mysql_cond_t suspension_waiting_condition
 
Synchronized_queue< Packet * > * incoming
 
Event_handlerpipeline
 
ulong stop_wait_timeout
 
Applier_channel_state_observerapplier_channel_observer
 
Pipeline_stats_member_collector pipeline_stats_member_collector
 
Flow_control_module flow_control_module
 
Plugin_stage_monitor_handler stage_handler
 

Constructor & Destructor Documentation

◆ Applier_module()

Applier_module::Applier_module ( )

◆ ~Applier_module()

Applier_module::~Applier_module ( )
override

Member Function Documentation

◆ add_leaving_members_action_packet()

void Applier_module::add_leaving_members_action_packet ( Leaving_members_action_packet packet)
inlineoverridevirtual

Queues a leaving members action packet into the applier.

Note
This will happen only after all the previous packets are processed.
Parameters
[in]packetThe packet to be queued

Implements Applier_module_interface.

◆ add_single_primary_action_packet()

void Applier_module::add_single_primary_action_packet ( Single_primary_action_packet packet)
inlineoverridevirtual

Queues a single primary action packet into the applier.

Note
This will happen only after all the previous packets are processed.
Parameters
[in]packetThe packet to be queued

Implements Applier_module_interface.

◆ add_suspension_packet()

void Applier_module::add_suspension_packet ( )
inlineoverridevirtual

Queues a packet that will eventually make the applier module suspend.

This will happen only after all the previous packets are processed.

Note
This will happen only after all the previous packets are processed.

Implements Applier_module_interface.

◆ add_sync_before_execution_action_packet()

void Applier_module::add_sync_before_execution_action_packet ( Sync_before_execution_action_packet packet)
inlineoverridevirtual

Queues a synchronization before execution action packet into the applier.

Note
This will happen only after all the previous packets are processed.
Parameters
[in]packetThe packet to be queued

Implements Applier_module_interface.

◆ add_termination_packet()

void Applier_module::add_termination_packet ( )
inline

Queues a packet that will make the applier module terminate it's handling process.

Due to the blocking nature of the queue, this method is useful to unblock the handling process on shutdown.

Note
This will happen only after all the previous packets are processed.

◆ add_transaction_prepared_action_packet()

void Applier_module::add_transaction_prepared_action_packet ( Transaction_prepared_action_packet packet)
inlineoverridevirtual

Queues a transaction prepared action packet into the applier.

Note
This will happen only after all the previous packets are processed.
Parameters
[in]packetThe packet to be queued

Implements Applier_module_interface.

◆ add_view_change_packet()

void Applier_module::add_view_change_packet ( View_change_packet packet)
inlineoverridevirtual

Queues a view change packet into the applier.

This packets contain the new view id and they mark the exact frontier between transactions from the old and new views.

Note
This will happen only after all the previous packets are processed.
Parameters
[in]packetThe view change packet to be queued

Implements Applier_module_interface.

◆ applier_thread_handle()

int Applier_module::applier_thread_handle ( )

Runs the applier thread process, reading events and processing them.

Note
When killed, the thread will finish handling the current packet, and then die, ignoring all possible existing events in the incoming queue.
Returns
the operation status
Return values
0OK
!=0Error

group_replication_wait_for_current_events_execution_fail Member leave has been received. Primary change has started in separate thread. Primary change will go to error and try to suspend the applier by adding suspension packet. But we want to kill the applier via shutdown before suspension packet is processed. So block here till SHUTDOWN forwards the KILL signal.

Note
If we do not block here, even if KILL is forwarded suspension packet is processed and kill is seen post processing of suspend packet, hence the DEBUG here

◆ apply_action_packet()

bool Applier_module::apply_action_packet ( Action_packet action_packet)
private

Apply an action packet received by the applier.

It can be a order to suspend or terminate.

Parameters
action_packetthe received action packet
Returns
if the applier should terminate (with no associated error).

◆ apply_data_packet()

int Applier_module::apply_data_packet ( Data_packet data_packet,
Format_description_log_event fde_evt,
Continuation cont 
)
private

Apply a Data packet received by the applier.

It executes some certification operations and queues a View Change Event

Parameters
data_packetthe received data packet packet
fde_evtthe Format description event associated to the event
contthe applier Continuation Object
Returns
the operation status
Return values
0OK
!=0Error when injecting event

◆ apply_leaving_members_action_packet()

int Applier_module::apply_leaving_members_action_packet ( Leaving_members_action_packet packet)
private

Apply a leaving members action packet received by the applier.

Parameters
packetthe received action packet
Returns
the operation status
Return values
0OK
!=0Error when applying packet

◆ apply_single_primary_action_packet()

int Applier_module::apply_single_primary_action_packet ( Single_primary_action_packet packet)
private

Apply an single primary action packet received by the applier.

Parameters
packetthe received action packet
Returns
the operation status
Return values
0OK
!=0Error when applying packet

◆ apply_sync_before_execution_action_packet()

int Applier_module::apply_sync_before_execution_action_packet ( Sync_before_execution_action_packet packet)
private

Apply a synchronization before execution action packet received by the applier.

Parameters
packetthe received action packet
Returns
the operation status
Return values
0OK
!=0Error when applying packet

◆ apply_transaction_prepared_action_packet()

int Applier_module::apply_transaction_prepared_action_packet ( Transaction_prepared_action_packet packet)
private

Apply a transaction prepared action packet received by the applier.

Parameters
packetthe received action packet
Returns
the operation status
Return values
0OK
!=0Error when applying packet

◆ apply_view_change_packet()

int Applier_module::apply_view_change_packet ( View_change_packet view_change_packet,
Format_description_log_event fde_evt,
Continuation cont 
)
private

Apply a View Change packet received by the applier.

It executes some certification operations and queues a View Change Event

Parameters
view_change_packetthe received view change packet
fde_evtthe Format description event associated to the event
contthe applier Continuation Object
Returns
the operation status
Return values
0OK
!=0Error when injecting event

◆ awake_applier_module()

void Applier_module::awake_applier_module ( )
inlineoverridevirtual

Awakes the applier module.

Implements Applier_module_interface.

◆ clean_applier_thread_context()

void Applier_module::clean_applier_thread_context ( )
private

Cleans the thread context for the applier thread This includes such tasks as removing the thread from the global thread list.

◆ get_applier_status()

Member_applier_state Applier_module::get_applier_status ( )
inlineoverridevirtual

◆ get_certification_handler()

Certification_handler * Applier_module::get_certification_handler ( )
overridevirtual

Returns the handler instance in the applier module responsible for certification.

Note
If new certification handlers appear, an interface must be created.
Returns
a pointer to the applier's certification handler.
Return values
!=NULLThe certification handler
NULLNo certification handler present

Implements Applier_module_interface.

◆ get_flow_control_module()

Flow_control_module * Applier_module::get_flow_control_module ( )
inlineoverridevirtual

◆ get_local_pipeline_stats()

Pipeline_member_stats * Applier_module::get_local_pipeline_stats ( )

Return the local applier stats.

◆ get_message_queue_size()

size_t Applier_module::get_message_queue_size ( )
inlineoverridevirtual

Returns the applier module's queue size.

Returns
the size of the queue

Implements Applier_module_interface.

◆ get_pipeline_stats_member_collector()

Pipeline_stats_member_collector * Applier_module::get_pipeline_stats_member_collector ( )
inlineoverridevirtual

◆ get_retrieved_gtid_set()

bool Applier_module::get_retrieved_gtid_set ( std::string &  retrieved_set)
overridevirtual

Returns the retrieved gtid set for the applier channel.

Parameters
[out]retrieved_setthe set in string format.
Return values
truethere was an error.
falsethe operation has succeeded.

Implements Applier_module_interface.

◆ handle()

int Applier_module::handle ( const uchar data,
ulong  len,
enum_group_replication_consistency_level  consistency_level,
std::list< Gcs_member_identifier > *  online_members,
PSI_memory_key  key 
)
inlineoverridevirtual

Queues the packet coming from the reader for future application.

Parameters
[in]datathe packet data
[in]lenthe packet length
[in]consistency_levelthe transaction consistency level
[in]online_membersthe ONLINE members when the transaction message was delivered
[in]keythe memory instrument key
Returns
the operation status
Return values
0OK
!=0Error on queue

Implements Applier_module_interface.

◆ handle_pipeline_action()

int Applier_module::handle_pipeline_action ( Pipeline_action action)
inlineoverridevirtual

Gives the pipeline an action for execution.

Parameters
[in]actionthe action to be executed
Returns
the operation status
Return values
0OK
!=0Error executing the action

Implements Applier_module_interface.

◆ ignore_errors_during_stop()

void Applier_module::ignore_errors_during_stop ( bool  ignore_errors)
inline

Check whether to ignore applier errors during stop or not.

Errors put the members into ERROR state. If errors are ignored member will stay in ONLINE state. During clone, applier errors are ignored, since data will come from clone.

Parameters
[in]ignore_errorsif true ignore applier errors during stop

◆ inform_of_applier_stop()

void Applier_module::inform_of_applier_stop ( char *  channel_name,
bool  aborted 
)

This method informs the applier module that an applying thread stopped.

◆ initialize_applier_thread()

int Applier_module::initialize_applier_thread ( )

Initializes and launches the applier thread.

Returns
the operation status
Return values
0OK
!=0Error

◆ inject_event_into_pipeline()

int Applier_module::inject_event_into_pipeline ( Pipeline_event pevent,
Continuation cont 
)

Injects an event into the pipeline and waits for its handling.

Parameters
[in]peventthe event to be injected
[in]contthe object used to wait
Returns
the operation status
Return values
0OK
!=0Error on queue

◆ interrupt_applier_suspension_wait()

void Applier_module::interrupt_applier_suspension_wait ( )
overridevirtual

Interrupts the current applier waiting process either for it's suspension or it's wait for the consumption of pre suspension events.

Implements Applier_module_interface.

◆ intersect_group_executed_sets()

int Applier_module::intersect_group_executed_sets ( std::vector< std::string > &  gtid_sets,
Gtid_set output_set 
)
private

This method calculates the intersection of the given sets passed as a list of strings.

Parameters
[in]gtid_setsthe vector containing the GTID sets to intersect
[out]output_setthe final GTID calculated from the intersection
Returns
the operation status
Return values
0all went fine
!=0error

◆ is_applier_thread_aborted()

bool Applier_module::is_applier_thread_aborted ( )
inline

Is the applier marked for shutdown?

Returns
is the applier on shutdown
Return values
0no
!=0yes

◆ is_applier_thread_waiting()

bool Applier_module::is_applier_thread_waiting ( )

Checks if the applier, and its workers when parallel applier is enabled, has already consumed all relay log, that is, applier is waiting for transactions to be queued.

Returns
the applier status
Return values
truethe applier is waiting
falseotherwise

◆ is_running()

bool Applier_module::is_running ( )
inline

Is the applier running?

Returns
applier running?
Return values
0no
!=0yes

◆ purge_applier_queue_and_restart_applier_module()

int Applier_module::purge_applier_queue_and_restart_applier_module ( )
overridevirtual

Purges the relay logs and restarts the applier thread.

Returns
the operation status
Return values
0OK
!=0Error

Implements Applier_module_interface.

◆ queue_and_wait_on_queue_checkpoint()

bool Applier_module::queue_and_wait_on_queue_checkpoint ( std::shared_ptr< Continuation checkpoint_condition)
overridevirtual

◆ queue_certification_enabling_packet()

void Applier_module::queue_certification_enabling_packet ( )
virtual

Queues a single a packet that will enable certification on this member.

◆ run_flow_control_step()

void Applier_module::run_flow_control_step ( )
inlineoverridevirtual

◆ set_applier_thread_context()

void Applier_module::set_applier_thread_context ( )
private

Set the thread context for the applier thread.

This allows the thread to behave like an slave thread and perform such tasks as queuing to a relay log.

◆ set_stop_wait_timeout()

void Applier_module::set_stop_wait_timeout ( ulong  timeout)
inline

Sets the applier shutdown timeout.

Parameters
[in]timeoutthe timeout

◆ setup_applier_module()

int Applier_module::setup_applier_module ( Handler_pipeline_type  pipeline_type,
bool  reset_logs,
ulong  stop_timeout,
rpl_sidno  group_sidno,
ulonglong  gtid_assignment_block_size 
)

Configure the applier pipeline according to the given configuration.

Parameters
[in]pipeline_typethe chosen pipeline
[in]reset_logsif a reset happened in the server
[in]stop_timeoutthe timeout when waiting on shutdown
[in]group_sidnothe group configured sidno
[in]gtid_assignment_block_sizethe group gtid assignment block size
Returns
the operation status
Return values
0OK
!=0Error

◆ setup_pipeline_handlers()

int Applier_module::setup_pipeline_handlers ( )

Configure the applier pipeline handlers.

Returns
the operation status
Return values
0OK
!=0Error

◆ suspend_applier_module()

void Applier_module::suspend_applier_module ( )
inlineprivate

Suspends the applier module, being transactions still queued in the incoming queue.

Note
if the proper condition is set, possible listeners can be awaken by this method.

◆ terminate_applier_pipeline()

int Applier_module::terminate_applier_pipeline ( )

Terminates the pipeline, shutting down the handlers and deleting them.

Note
the pipeline will always be deleted even if an error occurs.
Returns
the operation status
Return values
0OK
!=0Error on pipeline termination

◆ terminate_applier_thread()

int Applier_module::terminate_applier_thread ( )

Terminates the applier thread.

Returns
the operation status
Return values
0OK
!=0A timeout occurred

◆ wait_for_applier_complete_suspension()

int Applier_module::wait_for_applier_complete_suspension ( bool *  abort_flag,
bool  wait_for_execution = true 
)
overridevirtual

Waits for the applier to suspend and apply all the transactions previous to the suspend request.

Parameters
abort_flaga pointer to a flag that the caller can use to cancel the request.
wait_for_executionspecify if the suspension waits for transactions execution
Returns
the operation status
Return values
0OK
!=0Error when accessing the applier module status

Wait for the applier execution of pre suspension events (blocking method) while(the wait method times out) wait()

Implements Applier_module_interface.

◆ wait_for_applier_event_execution() [1/2]

int Applier_module::wait_for_applier_event_execution ( double  timeout,
bool  check_and_purge_partial_transactions 
)
overridevirtual

Waits for the execution of all events by part of the current SQL applier.

Due to the possible asynchronous nature of module's applier handler, this method inquires the current handler to check if all transactions queued up to this point are already executed.

If no handler exists, then it is assumed that transactions were processed.

Parameters
timeoutthe time (seconds) after which the method returns if the above condition was not satisfied
check_and_purge_partial_transactionson successful executions, check and purge partial transactions in the relay log
Returns
the operation status
Return values
0All transactions were executed
-1A timeout occurred
-2An error occurred

Implements Applier_module_interface.

◆ wait_for_applier_event_execution() [2/2]

int Applier_module::wait_for_applier_event_execution ( std::string &  retrieved_set,
double  timeout,
bool  update_THD_status = true 
)
overridevirtual

Waits for the execution of all events in the given set by the current SQL applier.

If no handler exists, then it is assumed that transactions were processed.

Parameters
retrieved_setthe set in string format of transaction to wait for
timeoutthe time (seconds) after which the method returns if the above condition was not satisfied
update_THD_statusShall the method update the THD stage
Returns
the operation status
Return values
0All transactions were executed
-1A timeout occurred
-2An error occurred

Implements Applier_module_interface.

◆ wait_for_current_events_execution()

bool Applier_module::wait_for_current_events_execution ( std::shared_ptr< Continuation checkpoint_condition,
bool *  abort_flag,
bool  update_THD_status = true 
)
overridevirtual

Waits for the execution of all current events by part of the current SQL applier.

The current gtid retrieved set is extracted and a loop is executed until these transactions are executed.

If the applier SQL thread stops, the method will return an error.

If no handler exists, then it is assumed that transactions were processed.

Parameters
checkpoint_conditionthe class used to wait for the queue to be consumed. Can be used to cancel the wait.
abort_flaga pointer to a flag that the caller can use to cancel the request.
update_THD_statusShall the method update the THD stage
Returns
the operation status
Return values
falseAll transactions were executed
trueAn error occurred

Implements Applier_module_interface.

Member Data Documentation

◆ applier_aborted

bool Applier_module::applier_aborted
private

◆ applier_channel_observer

Applier_channel_state_observer* Applier_module::applier_channel_observer
private

◆ applier_error

int Applier_module::applier_error
private

◆ applier_killed_status

bool Applier_module::applier_killed_status
private

◆ applier_pthd

my_thread_handle Applier_module::applier_pthd
private

◆ applier_thd

THD* Applier_module::applier_thd
private

◆ applier_thd_state

thread_state Applier_module::applier_thd_state
private

◆ flow_control_module

Flow_control_module Applier_module::flow_control_module
private

◆ group_replication_sidno

rpl_sidno Applier_module::group_replication_sidno
private

◆ gtid_assignment_block_size

ulonglong Applier_module::gtid_assignment_block_size
private

◆ incoming

Synchronized_queue<Packet *>* Applier_module::incoming
private

◆ m_ignore_applier_errors_during_stop

bool Applier_module::m_ignore_applier_errors_during_stop {false}
private

◆ pipeline

Event_handler* Applier_module::pipeline
private

◆ pipeline_stats_member_collector

Pipeline_stats_member_collector Applier_module::pipeline_stats_member_collector
private

◆ reset_applier_logs

bool Applier_module::reset_applier_logs
private

◆ run_cond

mysql_cond_t Applier_module::run_cond
private

◆ run_lock

mysql_mutex_t Applier_module::run_lock
private

◆ stage_handler

Plugin_stage_monitor_handler Applier_module::stage_handler
private

◆ stop_wait_timeout

ulong Applier_module::stop_wait_timeout
private

◆ suspend_cond

mysql_cond_t Applier_module::suspend_cond
private

◆ suspend_lock

mysql_mutex_t Applier_module::suspend_lock
private

◆ suspended

bool Applier_module::suspended
private

◆ suspension_waiting_condition

mysql_cond_t Applier_module::suspension_waiting_condition
private

The documentation for this class was generated from the following files: