MySQL 8.0.40
Source Code Documentation
applier.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef APPLIER_INCLUDE
25#define APPLIER_INCLUDE
26
29#include <list>
30#include <vector>
31
32#include "my_inttypes.h"
44#include "sql/sql_class.h"
45
46// Define the applier packet types
47#define ACTION_PACKET_TYPE 2
48#define VIEW_CHANGE_PACKET_TYPE 3
49#define SINGLE_PRIMARY_PACKET_TYPE 4
50#define SYNC_BEFORE_EXECUTION_PACKET_TYPE 5
51#define TRANSACTION_PREPARED_PACKET_TYPE 6
52#define LEAVING_MEMBERS_PACKET_TYPE 7
53
54// Define the applier return error codes
55#define APPLIER_GTID_CHECK_TIMEOUT_ERROR -1
56#define APPLIER_RELAY_LOG_NOT_INITED -2
57#define APPLIER_THREAD_ABORTED -3
58
59extern char applier_module_channel_name[];
60
61/* Types of action packets used in the context of the applier module */
63 TERMINATION_PACKET = 0, // Packet for a termination action
64 SUSPENSION_PACKET, // Packet to signal something to suspend
65 CHECKPOINT_PACKET, // Packet to wait for queue consumption
66 ACTION_NUMBER = 3 // The number of actions
67};
68
69/**
70 @class Action_packet
71 A packet to control the applier in a event oriented way.
72*/
73class Action_packet : public Packet {
74 public:
75 /**
76 Create a new action packet.
77 @param action the packet action
78 */
81
82 ~Action_packet() override = default;
83
85};
86
87/**
88 @class View_change_packet
89 A packet to send view change related info to the applier
90*/
91class View_change_packet : public Packet {
92 public:
93 /**
94 Create a new data packet with associated data.
95
96 @param view_id_arg The view id associated to this view
97 */
98 View_change_packet(std::string &view_id_arg)
99 : Packet(VIEW_CHANGE_PACKET_TYPE), view_id(view_id_arg) {}
100
101 ~View_change_packet() override = default;
102
103 std::string view_id;
104 std::vector<std::string> group_executed_set;
105};
106
107/**
108 @class Single_primary_action_packet
109 A packet to send new primary election related info to the applier
110*/
112 public:
114
115 /**
116 Create a new single primary action packet with associated data.
117
118 @param action_arg the packet action
119 */
121 : Packet(SINGLE_PRIMARY_PACKET_TYPE), action(action_arg) {}
122
123 ~Single_primary_action_packet() override = default;
124
126};
127
128/**
129 @class Queue_checkpoint_packet
130 A packet to wait for queue consumption
131*/
133 public:
134 /**
135 Create a new Queue_checkpoint_packet packet.
136 */
138 std::shared_ptr<Continuation> checkpoint_condition_arg)
140 checkpoint_condition(checkpoint_condition_arg) {}
141
143
144 private:
145 /**If we discard a packet */
146 std::shared_ptr<Continuation> checkpoint_condition;
147};
148
149/**
150 @class Transaction_prepared_action_packet
151 A packet to inform that a given member did prepare a given transaction.
152*/
154 public:
155 /**
156 Create a new transaction prepared action.
157
158 @param sid the prepared transaction sid
159 @param gno the prepared transaction gno
160 @param gcs_member_id the member id that did prepare the
161 transaction
162 */
164 const Gcs_member_identifier &gcs_member_id)
166 m_sid_specified(sid != nullptr ? true : false),
167 m_gno(gno),
168 m_gcs_member_id(gcs_member_id.get_member_id()) {
169 if (sid != nullptr) {
170 m_sid.copy_from(*sid);
171 }
172 }
173
175
176 const bool m_sid_specified;
179
180 const rpl_sid *get_sid() { return m_sid_specified ? &m_sid : nullptr; }
181
182 private:
184};
185
186/**
187 @class Sync_before_execution_action_packet
188 A packet to request a synchronization point on the global message
189 order on a given member before transaction execution.
190*/
192 public:
193 /**
194 Create a new synchronization point request.
195
196 @param thread_id the thread that did the request
197 @param gcs_member_id the member id that did the request
198 */
200 my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
203 m_gcs_member_id(gcs_member_id.get_member_id()) {}
204
206
209};
210
211/**
212 @class Leaving_members_action_packet
213 A packet to inform pipeline listeners of leaving members,
214 this packet will be handled on the global message order,
215 that is, ordered with certification.
216*/
218 public:
219 /**
220 Create a new leaving members packet.
221
222 @param leaving_members the members that left the group
223 */
225 const std::vector<Gcs_member_identifier> &leaving_members)
227 m_leaving_members(leaving_members) {}
228
229 ~Leaving_members_action_packet() override = default;
230
231 const std::vector<Gcs_member_identifier> m_leaving_members;
232};
233
234typedef enum enum_applier_state {
239
241 public:
242 virtual ~Applier_module_interface() = default;
245 bool *abort_flag, bool wait_for_execution = true) = 0;
246 virtual void awake_applier_module() = 0;
249 double timeout, bool check_and_purge_partial_transactions) = 0;
251 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
252 bool update_THD_status = true) = 0;
253 virtual bool get_retrieved_gtid_set(std::string &retrieved_set) = 0;
255 std::string &retrieved_set, double timeout,
256 bool update_THD_status = true) = 0;
257 virtual size_t get_message_queue_size() = 0;
259 virtual void add_suspension_packet() = 0;
262 Single_primary_action_packet *packet) = 0;
269 virtual int handle(const uchar *data, ulong len,
271 std::list<Gcs_member_identifier> *online_members,
272 PSI_memory_key key) = 0;
275 virtual void run_flow_control_step() = 0;
278 std::shared_ptr<Continuation> checkpoint_condition) = 0;
281};
282
284 public:
286 ~Applier_module() override;
287
288 /**
289 Initializes and launches the applier thread
290
291 @return the operation status
292 @retval 0 OK
293 @retval !=0 Error
294 */
296
297 /**
298 * Return the local applier stats.
299 */
301
302 /**
303 Terminates the applier thread.
304
305 @return the operation status
306 @retval 0 OK
307 @retval !=0 A timeout occurred
308 */
310
311 /**
312 Is the applier marked for shutdown?
313
314 @return is the applier on shutdown
315 @retval 0 no
316 @retval !=0 yes
317 */
319 return (applier_aborted || applier_thd == nullptr || applier_thd->killed);
320 }
321
322 /**
323 Is the applier running?
324
325 @return applier running?
326 @retval 0 no
327 @retval !=0 yes
328 */
330
331 /**
332 Configure the applier pipeline according to the given configuration
333
334 @param[in] pipeline_type the chosen pipeline
335 @param[in] reset_logs if a reset happened in the server
336 @param[in] stop_timeout the timeout when waiting on shutdown
337 @param[in] group_sidno the group configured sidno
338 @param[in] gtid_assignment_block_size the group gtid assignment block size
339
340 @return the operation status
341 @retval 0 OK
342 @retval !=0 Error
343 */
344 int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs,
345 ulong stop_timeout, rpl_sidno group_sidno,
347
348 /**
349 Configure the applier pipeline handlers
350
351 @return the operation status
352 @retval 0 OK
353 @retval !=0 Error
354 */
356
357 /**
358 Purges the relay logs and restarts the applier thread.
359
360 @return the operation status
361 @retval 0 OK
362 @retval !=0 Error
363 */
365
366 /**
367 Runs the applier thread process, reading events and processing them.
368
369 @note When killed, the thread will finish handling the current packet, and
370 then die, ignoring all possible existing events in the incoming queue.
371
372 @return the operation status
373 @retval 0 OK
374 @retval !=0 Error
375 */
377
378 /**
379 Queues the packet coming from the reader for future application.
380
381 @param[in] data the packet data
382 @param[in] len the packet length
383 @param[in] consistency_level the transaction consistency level
384 @param[in] online_members the ONLINE members when the transaction
385 message was delivered
386 @param[in] key the memory instrument key
387
388 @return the operation status
389 @retval 0 OK
390 @retval !=0 Error on queue
391 */
392 int handle(const uchar *data, ulong len,
394 std::list<Gcs_member_identifier> *online_members,
395 PSI_memory_key key) override {
396 this->incoming->push(
397 new Data_packet(data, len, key, consistency_level, online_members));
398 return 0;
399 }
400
401 /**
402 Gives the pipeline an action for execution.
403
404 @param[in] action the action to be executed
405
406 @return the operation status
407 @retval 0 OK
408 @retval !=0 Error executing the action
409 */
411 return this->pipeline->handle_action(action);
412 }
413
414 /**
415 Injects an event into the pipeline and waits for its handling.
416
417 @param[in] pevent the event to be injected
418 @param[in] cont the object used to wait
419
420 @return the operation status
421 @retval 0 OK
422 @retval !=0 Error on queue
423 */
425
426 /**
427 Terminates the pipeline, shutting down the handlers and deleting them.
428
429 @note the pipeline will always be deleted even if an error occurs.
430
431 @return the operation status
432 @retval 0 OK
433 @retval !=0 Error on pipeline termination
434 */
436
437 /**
438 Sets the applier shutdown timeout.
439
440 @param[in] timeout the timeout
441 */
444
445 // Configure any thread based applier
448 pipeline->handle_action(conf_action);
449
450 delete conf_action;
451 }
452
453 /**
454 This method informs the applier module that an applying thread stopped
455 */
456 void inform_of_applier_stop(char *channel_name, bool aborted);
457
458 /**
459 Check whether to ignore applier errors during stop or not.
460 Errors put the members into ERROR state.
461 If errors are ignored member will stay in ONLINE state.
462 During clone, applier errors are ignored, since data will come from clone.
463
464 @param[in] ignore_errors if true ignore applier errors during stop
465 */
468 }
469
470 // Packet based interface methods
471
472 /**
473 Queues a packet that will eventually make the applier module suspend.
474 This will happen only after all the previous packets are processed.
475
476 @note This will happen only after all the previous packets are processed.
477 */
478 void add_suspension_packet() override {
479 this->incoming->push(new Action_packet(SUSPENSION_PACKET));
480 }
481
482 /**
483 Queues a packet that will make the applier module terminate it's handling
484 process. Due to the blocking nature of the queue, this method is useful to
485 unblock the handling process on shutdown.
486
487 @note This will happen only after all the previous packets are processed.
488 */
490 this->incoming->push(new Action_packet(TERMINATION_PACKET));
491 }
492
493 /**
494 Queues a view change packet into the applier.
495 This packets contain the new view id and they mark the exact frontier
496 between transactions from the old and new views.
497
498 @note This will happen only after all the previous packets are processed.
499
500 @param[in] packet The view change packet to be queued
501 */
503 incoming->push(packet);
504 }
505
506 /**
507 Queues a single primary action packet into the applier.
508
509 @note This will happen only after all the previous packets are processed.
510
511 @param[in] packet The packet to be queued
512 */
514 Single_primary_action_packet *packet) override {
515 incoming->push(packet);
516 }
517
518 /**
519 Queues a transaction prepared action packet into the applier.
520
521 @note This will happen only after all the previous packets are processed.
522
523 @param[in] packet The packet to be queued
524 */
526 Transaction_prepared_action_packet *packet) override {
527 incoming->push(packet);
528 }
529
530 /**
531 Queues a synchronization before execution action packet into the applier.
532
533 @note This will happen only after all the previous packets are processed.
534
535 @param[in] packet The packet to be queued
536 */
538 Sync_before_execution_action_packet *packet) override {
539 incoming->push(packet);
540 }
541
542 /**
543 Queues a leaving members action packet into the applier.
544
545 @note This will happen only after all the previous packets are processed.
546
547 @param[in] packet The packet to be queued
548 */
550 Leaving_members_action_packet *packet) override {
551 incoming->push(packet);
552 }
553
554 /**
555 Queues a single a packet that will enable certification on this member
556 */
558
559 /**
560 Awakes the applier module
561 */
562 void awake_applier_module() override {
564 suspended = false;
567 }
568
569 /**
570 Waits for the applier to suspend and apply all the transactions previous to
571 the suspend request.
572
573 @param abort_flag a pointer to a flag that the caller can use to
574 cancel the request.
575 @param wait_for_execution specify if the suspension waits for transactions
576 execution
577
578 @return the operation status
579 @retval 0 OK
580 @retval !=0 Error when accessing the applier module status
581 */
583 bool *abort_flag, bool wait_for_execution = true) override;
584
585 /**
586 Interrupts the current applier waiting process either for it's suspension
587 or it's wait for the consumption of pre suspension events
588 */
589 void interrupt_applier_suspension_wait() override;
590
591 /**
592 Checks if the applier, and its workers when parallel applier is
593 enabled, has already consumed all relay log, that is, applier is
594 waiting for transactions to be queued.
595
596 @return the applier status
597 @retval true the applier is waiting
598 @retval false otherwise
599 */
601
602 /**
603 Waits for the execution of all events by part of the current SQL applier.
604 Due to the possible asynchronous nature of module's applier handler, this
605 method inquires the current handler to check if all transactions queued up
606 to this point are already executed.
607
608 If no handler exists, then it is assumed that transactions were processed.
609
610 @param timeout the time (seconds) after which the method returns if the
611 above condition was not satisfied
612 @param check_and_purge_partial_transactions
613 on successful executions, check and purge partial
614 transactions in the relay log
615
616 @return the operation status
617 @retval 0 All transactions were executed
618 @retval -1 A timeout occurred
619 @retval -2 An error occurred
620 */
622 double timeout, bool check_and_purge_partial_transactions) override;
623
624 /**
625 Waits for the execution of all current events by part of the current SQL
626 applier.
627
628 The current gtid retrieved set is extracted and a loop is executed until
629 these transactions are executed.
630
631 If the applier SQL thread stops, the method will return an error.
632
633 If no handler exists, then it is assumed that transactions were processed.
634
635 @param checkpoint_condition the class used to wait for the queue to be
636 consumed. Can be used to cancel the wait.
637 @param abort_flag a pointer to a flag that the caller can use to
638 cancel the request.
639 @param update_THD_status Shall the method update the THD stage
640
641
642 @return the operation status
643 @retval false All transactions were executed
644 @retval true An error occurred
645 */
647 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
648 bool update_THD_status = true) override;
649
650 /**
651 Returns the retrieved gtid set for the applier channel
652
653 @param[out] retrieved_set the set in string format.
654
655 @retval true there was an error.
656 @retval false the operation has succeeded.
657 */
658 bool get_retrieved_gtid_set(std::string &retrieved_set) override;
659
660 /**
661 Waits for the execution of all events in the given set by the current SQL
662 applier. If no handler exists, then it is assumed that transactions were
663 processed.
664
665 @param retrieved_set the set in string format of transaction to wait for
666 @param timeout the time (seconds) after which the method returns if the
667 above condition was not satisfied
668 @param update_THD_status Shall the method update the THD stage
669
670 @return the operation status
671 @retval 0 All transactions were executed
672 @retval -1 A timeout occurred
673 @retval -2 An error occurred
674 */
675 int wait_for_applier_event_execution(std::string &retrieved_set,
676 double timeout,
677 bool update_THD_status = true) override;
678
679 /**
680 Returns the handler instance in the applier module responsible for
681 certification.
682
683 @note If new certification handlers appear, an interface must be created.
684
685 @return a pointer to the applier's certification handler.
686 @retval !=NULL The certification handler
687 @retval NULL No certification handler present
688 */
690
691 /**
692 Returns the applier module's queue size.
693
694 @return the size of the queue
695 */
696 size_t get_message_queue_size() override { return incoming->size(); }
697
700 return APPLIER_STATE_ON;
701 else if (suspended) /* purecov: inspected */
702 return APPLIER_STATE_OFF; /* purecov: inspected */
703 else
704 return APPLIER_ERROR; /* purecov: inspected */
705 }
706
708 override {
710 }
711
713 return &flow_control_module;
714 }
715
716 void run_flow_control_step() override {
718 }
719
721 std::shared_ptr<Continuation> checkpoint_condition) override;
722
723 private:
724 // Applier packet handlers
725
726 /**
727 Apply an action packet received by the applier.
728 It can be a order to suspend or terminate.
729
730 @param action_packet the received action packet
731
732 @return if the applier should terminate (with no associated error).
733 */
734 bool apply_action_packet(Action_packet *action_packet);
735
736 /**
737 Apply a View Change packet received by the applier.
738 It executes some certification operations and queues a View Change Event
739
740 @param view_change_packet the received view change packet
741 @param fde_evt the Format description event associated to the event
742 @param cont the applier Continuation Object
743
744 @return the operation status
745 @retval 0 OK
746 @retval !=0 Error when injecting event
747 */
748 int apply_view_change_packet(View_change_packet *view_change_packet,
750 Continuation *cont);
751
752 /**
753 Apply a Data packet received by the applier.
754 It executes some certification operations and queues a View Change Event
755
756 @param data_packet the received data packet packet
757 @param fde_evt the Format description event associated to the event
758 @param cont the applier Continuation Object
759
760 @return the operation status
761 @retval 0 OK
762 @retval !=0 Error when injecting event
763 */
764 int apply_data_packet(Data_packet *data_packet,
766 Continuation *cont);
767
768 /**
769 Apply an single primary action packet received by the applier.
770
771 @param packet the received action packet
772
773 @return the operation status
774 @retval 0 OK
775 @retval !=0 Error when applying packet
776 */
778
779 /**
780 Apply a transaction prepared action packet received by the applier.
781
782 @param packet the received action packet
783
784 @return the operation status
785 @retval 0 OK
786 @retval !=0 Error when applying packet
787 */
790
791 /**
792 Apply a synchronization before execution action packet received
793 by the applier.
794
795 @param packet the received action packet
796
797 @return the operation status
798 @retval 0 OK
799 @retval !=0 Error when applying packet
800 */
803
804 /**
805 Apply a leaving members action packet received by the applier.
806
807 @param packet the received action packet
808
809 @return the operation status
810 @retval 0 OK
811 @retval !=0 Error when applying packet
812 */
815
816 /**
817 Suspends the applier module, being transactions still queued in the incoming
818 queue.
819
820 @note if the proper condition is set, possible listeners can be awaken by
821 this method.
822 */
825
826 suspended = true;
828 __LINE__, 0, 0);
829
830 // Alert any interested party about the applier suspension
832
833 while (suspended) {
835 }
837 __LINE__, 0, 0);
838
840 }
841
842 /**
843 Cleans the thread context for the applier thread
844 This includes such tasks as removing the thread from the global thread list
845 */
847
848 /**
849 Set the thread context for the applier thread.
850 This allows the thread to behave like an slave thread and perform
851 such tasks as queuing to a relay log.
852 */
854
855 /**
856 This method calculates the intersection of the given sets passed as a list
857 of strings.
858
859 @param[in] gtid_sets the vector containing the GTID sets to intersect
860 @param[out] output_set the final GTID calculated from the intersection
861
862 @return the operation status
863 @retval 0 all went fine
864 @retval !=0 error
865 */
866 int intersect_group_executed_sets(std::vector<std::string> &gtid_sets,
867 Gtid_set *output_set);
868
869 // applier thread variables
872
873 // configuration options
877
878 // run conditions and locks
881 /* Applier thread state */
883 /* Applier abort flag */
885 /* Applier error during execution */
887 /* Applier killed status */
889 /* Ignore applier errors during stop. */
891
892 // condition and lock used to suspend/awake the applier module
893 /* The lock for suspending/wait for the awake of the applier module */
895 /* The condition for suspending/wait for the awake of the applier module */
897 /* Suspend flag that informs if the applier is suspended */
899
900 /* The condition for signaling the applier suspension*/
902
903 /* The incoming event queue */
905
906 /* The applier pipeline for event execution */
908
909 /* Applier timeout on shutdown */
911
912 /* Applier channel observer to detect failures */
914
918};
919
920#endif /* APPLIER_INCLUDE */
#define ACTION_PACKET_TYPE
Definition: applier.h:47
char applier_module_channel_name[]
Definition: applier.cc:46
#define LEAVING_MEMBERS_PACKET_TYPE
Definition: applier.h:52
enum_applier_state
Definition: applier.h:234
@ APPLIER_STATE_ON
Definition: applier.h:235
@ APPLIER_STATE_OFF
Definition: applier.h:236
@ APPLIER_ERROR
Definition: applier.h:237
#define TRANSACTION_PREPARED_PACKET_TYPE
Definition: applier.h:51
enum enum_applier_state Member_applier_state
#define SINGLE_PRIMARY_PACKET_TYPE
Definition: applier.h:49
#define VIEW_CHANGE_PACKET_TYPE
Definition: applier.h:48
#define SYNC_BEFORE_EXECUTION_PACKET_TYPE
Definition: applier.h:50
enum_packet_action
Definition: applier.h:62
@ TERMINATION_PACKET
Definition: applier.h:63
@ ACTION_NUMBER
Definition: applier.h:66
@ SUSPENSION_PACKET
Definition: applier.h:64
@ CHECKPOINT_PACKET
Definition: applier.h:65
A packet to control the applier in a event oriented way.
Definition: applier.h:73
Action_packet(enum_packet_action action)
Create a new action packet.
Definition: applier.h:79
enum_packet_action packet_action
Definition: applier.h:84
~Action_packet() override=default
Definition: applier_channel_state_observer.h:30
Definition: applier.h:240
virtual Pipeline_stats_member_collector * get_pipeline_stats_member_collector()=0
virtual bool get_retrieved_gtid_set(std::string &retrieved_set)=0
virtual ~Applier_module_interface()=default
virtual int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true)=0
virtual size_t get_message_queue_size()=0
virtual Certification_handler * get_certification_handler()=0
virtual Flow_control_module * get_flow_control_module()=0
virtual int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions)=0
virtual int wait_for_applier_event_execution(std::string &retrieved_set, double timeout, bool update_THD_status=true)=0
virtual void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)=0
virtual void add_leaving_members_action_packet(Leaving_members_action_packet *packet)=0
virtual void awake_applier_module()=0
virtual int purge_applier_queue_and_restart_applier_module()=0
virtual Member_applier_state get_applier_status()=0
virtual int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key)=0
virtual void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)=0
virtual void add_suspension_packet()=0
virtual bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true)=0
virtual void run_flow_control_step()=0
virtual void add_view_change_packet(View_change_packet *packet)=0
virtual bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition)=0
virtual void add_single_primary_action_packet(Single_primary_action_packet *packet)=0
virtual int handle_pipeline_action(Pipeline_action *action)=0
virtual void interrupt_applier_suspension_wait()=0
Definition: applier.h:283
~Applier_module() override
Definition: applier.cc:74
int apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a Data packet received by the applier.
Definition: applier.cc:326
Certification_handler * get_certification_handler() override
Returns the handler instance in the applier module responsible for certification.
Definition: applier.cc:953
bool is_applier_thread_aborted()
Is the applier marked for shutdown?
Definition: applier.h:318
void add_termination_packet()
Queues a packet that will make the applier module terminate it's handling process.
Definition: applier.h:489
Plugin_stage_monitor_handler stage_handler
Definition: applier.h:917
int applier_error
Definition: applier.h:886
void clean_applier_thread_context()
Cleans the thread context for the applier thread This includes such tasks as removing the thread from...
Definition: applier.cc:227
bool apply_action_packet(Action_packet *action_packet)
Apply an action packet received by the applier.
Definition: applier.cc:244
int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true) override
Waits for the applier to suspend and apply all the transactions previous to the suspend request.
Definition: applier.cc:812
int applier_thread_handle()
Runs the applier thread process, reading events and processing them.
Definition: applier.cc:423
Member_applier_state get_applier_status() override
Definition: applier.h:698
bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true) override
Waits for the execution of all current events by part of the current SQL applier.
Definition: applier.cc:931
int setup_pipeline_handlers()
Configure the applier pipeline handlers.
Definition: applier.cc:161
bool applier_killed_status
Definition: applier.h:888
void set_stop_wait_timeout(ulong timeout)
Sets the applier shutdown timeout.
Definition: applier.h:442
bool m_ignore_applier_errors_during_stop
Definition: applier.h:890
my_thread_handle applier_pthd
Definition: applier.h:870
int apply_single_primary_action_packet(Single_primary_action_packet *packet)
Apply an single primary action packet received by the applier.
Definition: applier.cc:383
void ignore_errors_during_stop(bool ignore_errors)
Check whether to ignore applier errors during stop or not.
Definition: applier.h:466
int purge_applier_queue_and_restart_applier_module() override
Purges the relay logs and restarts the applier thread.
Definition: applier.cc:118
mysql_mutex_t run_lock
Definition: applier.h:879
int apply_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)
Apply a transaction prepared action packet received by the applier.
Definition: applier.cc:402
bool suspended
Definition: applier.h:898
int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs, ulong stop_timeout, rpl_sidno group_sidno, ulonglong gtid_assignment_block_size)
Configure the applier pipeline according to the given configuration.
Definition: applier.cc:92
void add_suspension_packet() override
Queues a packet that will eventually make the applier module suspend.
Definition: applier.h:478
mysql_cond_t suspension_waiting_condition
Definition: applier.h:901
rpl_sidno group_replication_sidno
Definition: applier.h:875
int intersect_group_executed_sets(std::vector< std::string > &gtid_sets, Gtid_set *output_set)
This method calculates the intersection of the given sets passed as a list of strings.
Definition: applier.cc:961
bool is_running()
Is the applier running?
Definition: applier.h:329
void add_single_primary_action_packet(Single_primary_action_packet *packet) override
Queues a single primary action packet into the applier.
Definition: applier.h:513
int apply_leaving_members_action_packet(Leaving_members_action_packet *packet)
Apply a leaving members action packet received by the applier.
Definition: applier.cc:416
void suspend_applier_module()
Suspends the applier module, being transactions still queued in the incoming queue.
Definition: applier.h:823
void set_applier_thread_context()
Set the thread context for the applier thread.
Definition: applier.cc:187
int initialize_applier_thread()
Initializes and launches the applier thread.
Definition: applier.cc:653
int terminate_applier_thread()
Terminates the applier thread.
Definition: applier.cc:706
size_t get_message_queue_size() override
Returns the applier module's queue size.
Definition: applier.h:696
bool reset_applier_logs
Definition: applier.h:874
void inform_of_applier_stop(char *channel_name, bool aborted)
This method informs the applier module that an applying thread stopped.
Definition: applier.cc:784
bool is_applier_thread_waiting()
Checks if the applier, and its workers when parallel applier is enabled, has already consumed all rel...
Definition: applier.cc:864
Flow_control_module flow_control_module
Definition: applier.h:916
int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions) override
Waits for the execution of all events by part of the current SQL applier.
Definition: applier.cc:876
int terminate_applier_pipeline()
Terminates the pipeline, shutting down the handlers and deleting them.
Definition: applier.cc:691
Event_handler * pipeline
Definition: applier.h:907
bool get_retrieved_gtid_set(std::string &retrieved_set) override
Returns the retrieved gtid set for the applier channel.
Definition: applier.cc:902
int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key) override
Queues the packet coming from the reader for future application.
Definition: applier.h:392
bool applier_aborted
Definition: applier.h:884
Pipeline_stats_member_collector * get_pipeline_stats_member_collector() override
Definition: applier.h:707
mysql_mutex_t suspend_lock
Definition: applier.h:894
int apply_view_change_packet(View_change_packet *view_change_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a View Change packet received by the applier.
Definition: applier.cc:266
void awake_applier_module() override
Awakes the applier module.
Definition: applier.h:562
Pipeline_member_stats * get_local_pipeline_stats()
Return the local applier stats.
Definition: applier.cc:1026
bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition) override
Definition: applier.cc:1020
int inject_event_into_pipeline(Pipeline_event *pevent, Continuation *cont)
Injects an event into the pipeline and waits for its handling.
Definition: applier.cc:233
thread_state applier_thd_state
Definition: applier.h:882
ulonglong gtid_assignment_block_size
Definition: applier.h:876
int apply_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)
Apply a synchronization before execution action packet received by the applier.
Definition: applier.cc:409
void add_view_change_packet(View_change_packet *packet) override
Queues a view change packet into the applier.
Definition: applier.h:502
void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet) override
Queues a synchronization before execution action packet into the applier.
Definition: applier.h:537
THD * applier_thd
Definition: applier.h:871
mysql_cond_t suspend_cond
Definition: applier.h:896
int handle_pipeline_action(Pipeline_action *action) override
Gives the pipeline an action for execution.
Definition: applier.h:410
Pipeline_stats_member_collector pipeline_stats_member_collector
Definition: applier.h:915
void run_flow_control_step() override
Definition: applier.h:716
mysql_cond_t run_cond
Definition: applier.h:880
virtual void queue_certification_enabling_packet()
Queues a single a packet that will enable certification on this member.
Definition: applier.cc:1015
ulong stop_wait_timeout
Definition: applier.h:910
Applier_module()
Definition: applier.cc:55
void interrupt_applier_suspension_wait() override
Interrupts the current applier waiting process either for it's suspension or it's wait for the consum...
Definition: applier.cc:858
Flow_control_module * get_flow_control_module() override
Definition: applier.h:712
Applier_channel_state_observer * applier_channel_observer
Definition: applier.h:913
void add_leaving_members_action_packet(Leaving_members_action_packet *packet) override
Queues a leaving members action packet into the applier.
Definition: applier.h:549
Synchronized_queue< Packet * > * incoming
Definition: applier.h:904
void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet) override
Queues a transaction prepared action packet into the applier.
Definition: applier.h:525
Definition: certification_handler.h:32
Class used to wait on the execution of some action.
Definition: pipeline_interfaces.h:462
A wrapper for raw network packets.
Definition: pipeline_interfaces.h:75
Interface for the application of events, them being packets or log events.
Definition: pipeline_interfaces.h:587
virtual int handle_action(Pipeline_action *action)=0
Handling of an action as defined in the handler implementation.
The pipeline stats aggregator of all group members stats and flow control module.
Definition: pipeline_stats.h:593
void flow_control_step(Pipeline_stats_member_collector *)
Evaluate the information received in the last flow control period and adjust the system parameters ac...
Definition: pipeline_stats.cc:742
For binlog version 4.
Definition: log_event.h:1515
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
Represents a set of GTIDs.
Definition: rpl_gtid.h:1455
Action to configure existing applier handlers.
Definition: pipeline_handlers.h:105
A packet to inform pipeline listeners of leaving members, this packet will be handled on the global m...
Definition: applier.h:217
const std::vector< Gcs_member_identifier > m_leaving_members
Definition: applier.h:231
~Leaving_members_action_packet() override=default
Leaving_members_action_packet(const std::vector< Gcs_member_identifier > &leaving_members)
Create a new leaving members packet.
Definition: applier.h:224
A generic interface for different kinds of packets.
Definition: pipeline_interfaces.h:50
A wrapper for pipeline actions.
Definition: pipeline_interfaces.h:557
A wrapper for log events/packets.
Definition: pipeline_interfaces.h:128
Computed statistics per member.
Definition: pipeline_stats.h:418
The pipeline collector for the local member stats.
Definition: pipeline_stats.h:269
Definition: stage_monitor_handler.h:30
int set_stage(PSI_stage_key key, const char *file, int line, ulonglong estimated_work, ulonglong work_completed)
Set that a new stage is now in progress.
Definition: stage_monitor_handler.cc:79
A packet to wait for queue consumption.
Definition: applier.h:132
std::shared_ptr< Continuation > checkpoint_condition
If we discard a packet.
Definition: applier.h:146
void signal_checkpoint_reached()
Definition: applier.h:142
Queue_checkpoint_packet(std::shared_ptr< Continuation > checkpoint_condition_arg)
Create a new Queue_checkpoint_packet packet.
Definition: applier.h:137
A packet to send new primary election related info to the applier.
Definition: applier.h:111
enum_action
Definition: applier.h:113
@ NEW_PRIMARY
Definition: applier.h:113
@ QUEUE_APPLIED
Definition: applier.h:113
Single_primary_action_packet(enum enum_action action_arg)
Create a new single primary action packet with associated data.
Definition: applier.h:120
~Single_primary_action_packet() override=default
enum enum_action action
Definition: applier.h:125
A packet to request a synchronization point on the global message order on a given member before tran...
Definition: applier.h:191
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:208
Sync_before_execution_action_packet(my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
Create a new synchronization point request.
Definition: applier.h:199
const my_thread_id m_thread_id
Definition: applier.h:207
~Sync_before_execution_action_packet() override=default
Definition: plugin_utils.h:181
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:34
std::atomic< killed_state > killed
Definition: sql_class.h:2667
A packet to inform that a given member did prepare a given transaction.
Definition: applier.h:153
rpl_sid m_sid
Definition: applier.h:183
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:178
const rpl_gno m_gno
Definition: applier.h:177
~Transaction_prepared_action_packet() override=default
const rpl_sid * get_sid()
Definition: applier.h:180
const bool m_sid_specified
Definition: applier.h:176
Transaction_prepared_action_packet(const rpl_sid *sid, rpl_gno gno, const Gcs_member_identifier &gcs_member_id)
Create a new transaction prepared action.
Definition: applier.h:163
A packet to send view change related info to the applier.
Definition: applier.h:91
std::string view_id
Definition: applier.h:103
View_change_packet(std::string &view_id_arg)
Create a new data packet with associated data.
Definition: applier.h:98
std::vector< std::string > group_executed_set
Definition: applier.h:104
~View_change_packet() override=default
#define mysql_cond_wait(C, M)
Definition: mysql_cond.h:48
#define mysql_mutex_lock(M)
Definition: mysql_mutex.h:50
#define mysql_mutex_unlock(M)
Definition: mysql_mutex.h:57
Fido Client Authentication nullptr
Definition: fido_client_plugin.cc:222
unsigned int PSI_memory_key
Instrumented memory key.
Definition: psi_memory_bits.h:49
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:56
unsigned char uchar
Definition: my_inttypes.h:52
static my_thread_id thread_id
Definition: my_thr_init.cc:63
uint32 my_thread_id
Definition: my_thread_local.h:34
int(* mysql_cond_broadcast)(mysql_cond_t *that, const char *src_file, unsigned int src_line)
Definition: mysql_cond_service.h:52
static bool ignore_errors
Definition: mysqlcheck.cc:61
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
Handler_pipeline_type
Definition: pipeline_factory.h:45
API for Group Replication plugin.
enum_group_replication_consistency_level
Definition: plugin_group_replication.h:35
PSI_stage_info info_GR_STAGE_module_suspending
Definition: plugin_psi.h:228
PSI_stage_info info_GR_STAGE_module_executing
Definition: plugin_psi.h:227
required string key
Definition: replication_asynchronous_connection_failover.proto:60
repeated Action action
Definition: replication_group_member_actions.proto:43
int rpl_sidno
Type of SIDNO (source ID number, first component of GTID)
Definition: rpl_gtid.h:96
binary_log::gtids::gno_t rpl_gno
GNO, the second (numeric) component of a GTID, is an alias of binary_log::gtids::gno_t.
Definition: rpl_gtid.h:103
PSI_stage_key m_key
The registered stage key.
Definition: psi_stage_bits.h:76
This is a POD.
Definition: uuid.h:61
void copy_from(const unsigned char *data)
Copies the given 16-byte data to this UUID.
Definition: uuid.h:65
Definition: my_thread_bits.h:52
An instrumented cond structure.
Definition: mysql_cond_bits.h:50
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:50
Definition: plugin_utils.h:47
bool is_running() const
Definition: plugin_utils.h:84