MySQL 9.1.0
Source Code Documentation
applier.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef APPLIER_INCLUDE
25#define APPLIER_INCLUDE
26
29#include <list>
30#include <vector>
31
32#include "my_inttypes.h"
44#include "sql/sql_class.h"
45
46// Define the applier packet types
47#define ACTION_PACKET_TYPE 2
48#define SINGLE_PRIMARY_PACKET_TYPE 4
49#define SYNC_BEFORE_EXECUTION_PACKET_TYPE 5
50#define TRANSACTION_PREPARED_PACKET_TYPE 6
51#define LEAVING_MEMBERS_PACKET_TYPE 7
52#define RECOVERY_METADATA_PROCESSING_PACKET_TYPE 8
53#define ERROR_PACKET_TYPE 9 ///< Make the applier fail
54
55// Define the applier return error codes
56#define APPLIER_GTID_CHECK_TIMEOUT_ERROR -1
57#define APPLIER_RELAY_LOG_NOT_INITED -2
58#define APPLIER_THREAD_ABORTED -3
59
60extern char applier_module_channel_name[];
61
62/* Types of action packets used in the context of the applier module */
64 TERMINATION_PACKET = 0, // Packet for a termination action
65 SUSPENSION_PACKET, // Packet to signal something to suspend
66 CHECKPOINT_PACKET, // Packet to wait for queue consumption
67 ACTION_NUMBER = 3 // The number of actions
68};
69
70/**
71 @class Action_packet
72 A packet to control the applier in a event oriented way.
73*/
74class Action_packet : public Packet {
75 public:
76 /**
77 Create a new action packet.
78 @param action the packet action
79 */
82
83 ~Action_packet() override = default;
84
86};
87
88/**
89 @class Recovery_metadata_processing_packets
90 A packet to send Metadata related processing.
91*/
93 public:
94 /**
95 Create a new data packet with associated data.
96 */
99
100 virtual ~Recovery_metadata_processing_packets() override = default;
101
102 /*
103 List of view-id of which metadata is received.
104 */
105 std::vector<std::string> m_view_id_to_be_deleted;
106
107 /* List of member that left the group. */
108 std::vector<Gcs_member_identifier> m_member_left_the_group;
109
110 /* A flag that indicates all the recovery metadata should be cleared. */
112};
113
114/**
115 @class Single_primary_action_packet
116 A packet to send new primary election related info to the applier
117*/
119 public:
121
122 /**
123 Create a new single primary action packet with associated data.
124
125 @param action_arg the packet action
126 */
128 : Packet(SINGLE_PRIMARY_PACKET_TYPE), action(action_arg) {}
129
130 ~Single_primary_action_packet() override = default;
131
133};
134
135/**
136 @class Queue_checkpoint_packet
137 A packet to wait for queue consumption
138*/
140 public:
141 /**
142 Create a new Queue_checkpoint_packet packet.
143 */
145 std::shared_ptr<Continuation> checkpoint_condition_arg)
147 checkpoint_condition(checkpoint_condition_arg) {}
148
150
151 private:
152 /**If we discard a packet */
153 std::shared_ptr<Continuation> checkpoint_condition;
154};
155
156/**
157 @class Transaction_prepared_action_packet
158 A packet to inform that a given member did prepare a given transaction.
159*/
161 public:
162 /**
163 Create a new transaction prepared action.
164
165 @param tsid the prepared transaction tsid
166 @param is_tsid_specified information on whether tsid is specified
167 @param gno the prepared transaction gno
168 @param gcs_member_id the member id that did prepare the
169 transaction
170 */
172 bool is_tsid_specified, rpl_gno gno,
173 const Gcs_member_identifier &gcs_member_id)
176 m_gno(gno),
177 m_gcs_member_id(gcs_member_id.get_member_id()) {
178 if (m_tsid_specified) {
179 m_tsid = tsid;
180 }
181 }
182
184
188
189 /// @brief tsid accessor
190 /// @return tsid const reference
191 const gr::Gtid_tsid &get_tsid() const { return m_tsid; }
192
193 /// @brief returns information on whether TSID is specified for this trx
194 /// @return information on whether TSID is specified for this trx
195 bool is_tsid_specified() const { return m_tsid_specified; }
196
197 private:
199};
200
201/**
202 @class Sync_before_execution_action_packet
203 A packet to request a synchronization point on the global message
204 order on a given member before transaction execution.
205*/
207 public:
208 /**
209 Create a new synchronization point request.
210
211 @param thread_id the thread that did the request
212 @param gcs_member_id the member id that did the request
213 */
215 my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
218 m_gcs_member_id(gcs_member_id.get_member_id()) {}
219
221
224};
225
226/**
227 @class Leaving_members_action_packet
228 A packet to inform pipeline listeners of leaving members,
229 this packet will be handled on the global message order,
230 that is, ordered with certification.
231*/
233 public:
234 /**
235 Create a new leaving members packet.
236
237 @param leaving_members the members that left the group
238 */
240 const std::vector<Gcs_member_identifier> &leaving_members)
242 m_leaving_members(leaving_members) {}
243
244 ~Leaving_members_action_packet() override = default;
245
246 const std::vector<Gcs_member_identifier> m_leaving_members;
247};
248
249/// @class Error_action_packet
250/// A packet to inform the applier it should fail.
251/// It should include a message about the failure
253 public:
254 /// Create a new error packet.
255 /// @param error_message the error that will make the applier stop
256 Error_action_packet(const char *error_message)
257 : Packet(ERROR_PACKET_TYPE), m_error_message(error_message) {}
258
259 ~Error_action_packet() override = default;
260
261 /// @brief Returns the error message for the failure
262 /// @return the error message
263 const char *get_error_message() { return m_error_message; }
264
265 private:
266 /// The error message for the failure process
267 const char *m_error_message;
268};
269
270typedef enum enum_applier_state {
275
277 public:
278 virtual ~Applier_module_interface() = default;
281 bool *abort_flag, bool wait_for_execution = true) = 0;
282 virtual void awake_applier_module() = 0;
285 double timeout, bool check_and_purge_partial_transactions) = 0;
287 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
288 bool update_THD_status = true) = 0;
289 virtual bool get_retrieved_gtid_set(std::string &retrieved_set) = 0;
291 std::string &retrieved_set, double timeout,
292 bool update_THD_status = true) = 0;
293 virtual size_t get_message_queue_size() = 0;
295 virtual void add_suspension_packet() = 0;
296 virtual void add_packet(Packet *packet) = 0;
301 Single_primary_action_packet *packet) = 0;
308 virtual int handle(const uchar *data, ulong len,
310 std::list<Gcs_member_identifier> *online_members,
311 PSI_memory_key key) = 0;
314 virtual void run_flow_control_step() = 0;
317 std::shared_ptr<Continuation> checkpoint_condition) = 0;
320};
321
323 public:
325 ~Applier_module() override;
326
327 /**
328 Initializes and launches the applier thread
329
330 @return the operation status
331 @retval 0 OK
332 @retval !=0 Error
333 */
335
336 /**
337 * Return the local applier stats.
338 */
340
341 /**
342 Terminates the applier thread.
343
344 @return the operation status
345 @retval 0 OK
346 @retval !=0 A timeout occurred
347 */
349
350 /**
351 Is the applier marked for shutdown?
352
353 @return is the applier on shutdown
354 @retval 0 no
355 @retval !=0 yes
356 */
358 return (applier_aborted || applier_thd == nullptr || applier_thd->killed);
359 }
360
361 /**
362 Is the applier running?
363
364 @return applier running?
365 @retval 0 no
366 @retval !=0 yes
367 */
369
370 /**
371 Configure the applier pipeline according to the given configuration
372
373 @param[in] pipeline_type the chosen pipeline
374 @param[in] reset_logs if a reset happened in the server
375 @param[in] stop_timeout the timeout when waiting on shutdown
376 @param[in] group_sidno the group configured sidno
377 @param[in] gtid_assignment_block_size the group gtid assignment block size
378
379 @return the operation status
380 @retval 0 OK
381 @retval !=0 Error
382 */
383 int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs,
384 ulong stop_timeout, rpl_sidno group_sidno,
386
387 /**
388 Configure the applier pipeline handlers
389
390 @return the operation status
391 @retval 0 OK
392 @retval !=0 Error
393 */
395
396 /**
397 Purges the relay logs and restarts the applier thread.
398
399 @return the operation status
400 @retval 0 OK
401 @retval !=0 Error
402 */
404
405 /**
406 Runs the applier thread process, reading events and processing them.
407
408 @note When killed, the thread will finish handling the current packet, and
409 then die, ignoring all possible existing events in the incoming queue.
410
411 @return the operation status
412 @retval 0 OK
413 @retval !=0 Error
414 */
416
417 /**
418 Queues the packet coming from the reader for future application.
419
420 @param[in] data the packet data
421 @param[in] len the packet length
422 @param[in] consistency_level the transaction consistency level
423 @param[in] online_members the ONLINE members when the transaction
424 message was delivered
425 @param[in] key the memory instrument key
426
427 @return the operation status
428 @retval 0 OK
429 @retval !=0 Error on queue
430 */
431 int handle(const uchar *data, ulong len,
433 std::list<Gcs_member_identifier> *online_members,
434 PSI_memory_key key) override {
435 this->incoming->push(
436 new Data_packet(data, len, key, consistency_level, online_members));
437 return 0;
438 }
439
440 /**
441 Gives the pipeline an action for execution.
442
443 @param[in] action the action to be executed
444
445 @return the operation status
446 @retval 0 OK
447 @retval !=0 Error executing the action
448 */
450 return this->pipeline->handle_action(action);
451 }
452
453 /**
454 Injects an event into the pipeline and waits for its handling.
455
456 @param[in] pevent the event to be injected
457 @param[in] cont the object used to wait
458
459 @return the operation status
460 @retval 0 OK
461 @retval !=0 Error on queue
462 */
464
465 /**
466 Terminates the pipeline, shutting down the handlers and deleting them.
467
468 @note the pipeline will always be deleted even if an error occurs.
469
470 @return the operation status
471 @retval 0 OK
472 @retval !=0 Error on pipeline termination
473 */
475
476 /**
477 Sets the applier shutdown timeout.
478
479 @param[in] timeout the timeout
480 */
483
484 // Configure any thread based applier
487 pipeline->handle_action(conf_action);
488
489 delete conf_action;
490 }
491
492 /**
493 This method informs the applier module that an applying thread stopped
494 */
495 void inform_of_applier_stop(char *channel_name, bool aborted);
496
497 /**
498 Check whether to ignore applier errors during stop or not.
499 Errors put the members into ERROR state.
500 If errors are ignored member will stay in ONLINE state.
501 During clone, applier errors are ignored, since data will come from clone.
502
503 @param[in] ignore_errors if true ignore applier errors during stop
504 */
507 }
508
509 // Packet based interface methods
510
511 /**
512 Queues a packet that will eventually make the applier module suspend.
513 This will happen only after all the previous packets are processed.
514
515 @note This will happen only after all the previous packets are processed.
516 */
517 void add_suspension_packet() override {
518 this->incoming->push(new Action_packet(SUSPENSION_PACKET));
519 }
520
521 /**
522 Queues a packet that will make the applier module terminate it's handling
523 process. Due to the blocking nature of the queue, this method is useful to
524 unblock the handling process on shutdown.
525
526 @note This will happen only after all the previous packets are processed.
527 */
529 this->incoming->push(new Action_packet(TERMINATION_PACKET));
530 }
531
532 /// @brief Generic add packet method
533 /// @param packet the packet to be queued in the applier
534 void add_packet(Packet *packet) override { incoming->push(packet); }
535
536 /**
537 Queues a view change packet into the applier.
538 This packets contain the new view id and they mark the exact frontier
539 between transactions from the old and new views.
540
541 @note This will happen only after all the previous packets are processed.
542
543 @param[in] packet The view change packet to be queued
544 */
546 incoming->push(packet);
547 }
548
550 Recovery_metadata_processing_packets *packet) override {
551 incoming->push(packet);
552 }
553
554 /**
555 Queues a single primary action packet into the applier.
556
557 @note This will happen only after all the previous packets are processed.
558
559 @param[in] packet The packet to be queued
560 */
562 Single_primary_action_packet *packet) override {
563 incoming->push(packet);
564 }
565
566 /**
567 Queues a transaction prepared action packet into the applier.
568
569 @note This will happen only after all the previous packets are processed.
570
571 @param[in] packet The packet to be queued
572 */
574 Transaction_prepared_action_packet *packet) override {
575 incoming->push(packet);
576 }
577
578 /**
579 Queues a synchronization before execution action packet into the applier.
580
581 @note This will happen only after all the previous packets are processed.
582
583 @param[in] packet The packet to be queued
584 */
586 Sync_before_execution_action_packet *packet) override {
587 incoming->push(packet);
588 }
589
590 /**
591 Queues a leaving members action packet into the applier.
592
593 @note This will happen only after all the previous packets are processed.
594
595 @param[in] packet The packet to be queued
596 */
598 Leaving_members_action_packet *packet) override {
599 incoming->push(packet);
600 }
601
602 /**
603 Queues a single a packet that will enable certification on this member
604 */
606
607 /**
608 Awakes the applier module
609 */
610 void awake_applier_module() override {
612 suspended = false;
615 }
616
617 /**
618 Waits for the applier to suspend and apply all the transactions previous to
619 the suspend request.
620
621 @param abort_flag a pointer to a flag that the caller can use to
622 cancel the request.
623 @param wait_for_execution specify if the suspension waits for transactions
624 execution
625
626 @return the operation status
627 @retval 0 OK
628 @retval !=0 Error when accessing the applier module status
629 */
631 bool *abort_flag, bool wait_for_execution = true) override;
632
633 /**
634 Interrupts the current applier waiting process either for it's suspension
635 or it's wait for the consumption of pre suspension events
636 */
637 void interrupt_applier_suspension_wait() override;
638
639 /**
640 Checks if the applier, and its workers when parallel applier is
641 enabled, has already consumed all relay log, that is, applier is
642 waiting for transactions to be queued.
643
644 @return the applier status
645 @retval true the applier is waiting
646 @retval false otherwise
647 */
649
650 /**
651 Waits for the execution of all events by part of the current SQL applier.
652 Due to the possible asynchronous nature of module's applier handler, this
653 method inquires the current handler to check if all transactions queued up
654 to this point are already executed.
655
656 If no handler exists, then it is assumed that transactions were processed.
657
658 @param timeout the time (seconds) after which the method returns if the
659 above condition was not satisfied
660 @param check_and_purge_partial_transactions
661 on successful executions, check and purge partial
662 transactions in the relay log
663
664 @return the operation status
665 @retval 0 All transactions were executed
666 @retval -1 A timeout occurred
667 @retval -2 An error occurred
668 */
670 double timeout, bool check_and_purge_partial_transactions) override;
671
672 /**
673 Waits for the execution of all current events by part of the current SQL
674 applier.
675
676 The current gtid retrieved set is extracted and a loop is executed until
677 these transactions are executed.
678
679 If the applier SQL thread stops, the method will return an error.
680
681 If no handler exists, then it is assumed that transactions were processed.
682
683 @param checkpoint_condition the class used to wait for the queue to be
684 consumed. Can be used to cancel the wait.
685 @param abort_flag a pointer to a flag that the caller can use to
686 cancel the request.
687 @param update_THD_status Shall the method update the THD stage
688
689
690 @return the operation status
691 @retval false All transactions were executed
692 @retval true An error occurred
693 */
695 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
696 bool update_THD_status = true) override;
697
698 /**
699 Returns the retrieved gtid set for the applier channel
700
701 @param[out] retrieved_set the set in string format.
702
703 @retval true there was an error.
704 @retval false the operation has succeeded.
705 */
706 bool get_retrieved_gtid_set(std::string &retrieved_set) override;
707
708 /**
709 Waits for the execution of all events in the given set by the current SQL
710 applier. If no handler exists, then it is assumed that transactions were
711 processed.
712
713 @param retrieved_set the set in string format of transaction to wait for
714 @param timeout the time (seconds) after which the method returns if the
715 above condition was not satisfied
716 @param update_THD_status Shall the method update the THD stage
717
718 @return the operation status
719 @retval 0 All transactions were executed
720 @retval -1 A timeout occurred
721 @retval -2 An error occurred
722 */
723 int wait_for_applier_event_execution(std::string &retrieved_set,
724 double timeout,
725 bool update_THD_status = true) override;
726
727 /**
728 Returns the handler instance in the applier module responsible for
729 certification.
730
731 @note If new certification handlers appear, an interface must be created.
732
733 @return a pointer to the applier's certification handler.
734 @retval !=NULL The certification handler
735 @retval NULL No certification handler present
736 */
738
739 /**
740 Returns the applier module's queue size.
741
742 @return the size of the queue
743 */
744 size_t get_message_queue_size() override { return incoming->size(); }
745
748 return APPLIER_STATE_ON;
749 else if (suspended) /* purecov: inspected */
750 return APPLIER_STATE_OFF; /* purecov: inspected */
751 else
752 return APPLIER_ERROR; /* purecov: inspected */
753 }
754
756 override {
758 }
759
761 return &flow_control_module;
762 }
763
764 void run_flow_control_step() override {
766 }
767
769 std::shared_ptr<Continuation> checkpoint_condition) override;
770
771 private:
772 // Applier packet handlers
773
774 /**
775 Apply an action packet received by the applier.
776 It can be a order to suspend or terminate.
777
778 @param action_packet the received action packet
779
780 @return if the applier should terminate (with no associated error).
781 */
782 bool apply_action_packet(Action_packet *action_packet);
783
784 /**
785 Apply a View Change packet received by the applier.
786 It executes some certification operations and queues a View Change Event
787
788 @param view_change_packet the received view change packet
789 @param fde_evt the Format description event associated to the event
790 @param cont the applier Continuation Object
791
792 @return the operation status
793 @retval 0 OK
794 @retval !=0 Error when injecting event
795 */
796 int apply_view_change_packet(View_change_packet *view_change_packet,
798 Continuation *cont);
799
800 /**
801 Apply a Recovery metadata processing information received from the GCS.
802
803 @param metadata_processing_packet Information of member left the group
804
805 @return the operation status
806 @retval 0 OK
807 @retval !=0 Error when injecting event
808 */
810 Recovery_metadata_processing_packets *metadata_processing_packet);
811
812 /**
813 Apply a Data packet received by the applier.
814 It executes some certification operations and queues a View Change Event
815
816 @param data_packet the received data packet packet
817 @param fde_evt the Format description event associated to the event
818 @param cont the applier Continuation Object
819
820 @return the operation status
821 @retval 0 OK
822 @retval !=0 Error when injecting event
823 */
824 int apply_data_packet(Data_packet *data_packet,
826 Continuation *cont);
827
828 /**
829 Apply an single primary action packet received by the applier.
830
831 @param packet the received action packet
832
833 @return the operation status
834 @retval 0 OK
835 @retval !=0 Error when applying packet
836 */
838
839 /**
840 Apply a transaction prepared action packet received by the applier.
841
842 @param packet the received action packet
843
844 @return the operation status
845 @retval 0 OK
846 @retval !=0 Error when applying packet
847 */
850
851 /**
852 Apply a synchronization before execution action packet received
853 by the applier.
854
855 @param packet the received action packet
856
857 @return the operation status
858 @retval 0 OK
859 @retval !=0 Error when applying packet
860 */
863
864 /**
865 Apply a leaving members action packet received by the applier.
866
867 @param packet the received action packet
868
869 @return the operation status
870 @retval 0 OK
871 @retval !=0 Error when applying packet
872 */
875
876 /**
877 Suspends the applier module, being transactions still queued in the incoming
878 queue.
879
880 @note if the proper condition is set, possible listeners can be awaken by
881 this method.
882 */
885
886 suspended = true;
888 __LINE__, 0, 0);
889
890 // Alert any interested party about the applier suspension
892
893 while (suspended) {
894 struct timespec abstime;
895 set_timespec(&abstime, 1);
897 }
899 __LINE__, 0, 0);
900
902 }
903
904 /**
905 Cleans the thread context for the applier thread
906 This includes such tasks as removing the thread from the global thread list
907 */
909
910 /**
911 Set the thread context for the applier thread.
912 This allows the thread to behave like an slave thread and perform
913 such tasks as queuing to a relay log.
914 */
916
917 /**
918 This method calculates the intersection of the given sets passed as a list
919 of strings.
920
921 @param[in] gtid_sets the vector containing the GTID sets to intersect
922 @param[out] output_set the final GTID calculated from the intersection
923
924 @return the operation status
925 @retval 0 all went fine
926 @retval !=0 error
927 */
928 int intersect_group_executed_sets(std::vector<std::string> &gtid_sets,
929 Gtid_set *output_set);
930
931 // applier thread variables
934
935 // configuration options
939
940 // run conditions and locks
943 /* Applier thread state */
945 /* Applier abort flag */
947 /* Applier error during execution */
949 /* Applier killed status */
951 /* Ignore applier errors during stop. */
953
954 // condition and lock used to suspend/awake the applier module
955 /* The lock for suspending/wait for the awake of the applier module */
957 /* The condition for suspending/wait for the awake of the applier module */
959 /* Suspend flag that informs if the applier is suspended */
961
962 /* The condition for signaling the applier suspension*/
964
965 /* The incoming event queue */
967
968 /* The applier pipeline for event execution */
970
971 /* Applier timeout on shutdown */
973
974 /* Applier channel observer to detect failures */
976
980};
981
982#endif /* APPLIER_INCLUDE */
#define ERROR_PACKET_TYPE
Make the applier fail.
Definition: applier.h:53
#define RECOVERY_METADATA_PROCESSING_PACKET_TYPE
Definition: applier.h:52
#define ACTION_PACKET_TYPE
Definition: applier.h:47
char applier_module_channel_name[]
Definition: applier.cc:49
#define LEAVING_MEMBERS_PACKET_TYPE
Definition: applier.h:51
enum_applier_state
Definition: applier.h:270
@ APPLIER_STATE_ON
Definition: applier.h:271
@ APPLIER_STATE_OFF
Definition: applier.h:272
@ APPLIER_ERROR
Definition: applier.h:273
#define TRANSACTION_PREPARED_PACKET_TYPE
Definition: applier.h:50
enum enum_applier_state Member_applier_state
#define SINGLE_PRIMARY_PACKET_TYPE
Definition: applier.h:48
#define SYNC_BEFORE_EXECUTION_PACKET_TYPE
Definition: applier.h:49
enum_packet_action
Definition: applier.h:63
@ TERMINATION_PACKET
Definition: applier.h:64
@ ACTION_NUMBER
Definition: applier.h:67
@ SUSPENSION_PACKET
Definition: applier.h:65
@ CHECKPOINT_PACKET
Definition: applier.h:66
A packet to control the applier in a event oriented way.
Definition: applier.h:74
Action_packet(enum_packet_action action)
Create a new action packet.
Definition: applier.h:80
enum_packet_action packet_action
Definition: applier.h:85
~Action_packet() override=default
Definition: applier_channel_state_observer.h:30
Definition: applier.h:276
virtual Pipeline_stats_member_collector * get_pipeline_stats_member_collector()=0
virtual bool get_retrieved_gtid_set(std::string &retrieved_set)=0
virtual ~Applier_module_interface()=default
virtual int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true)=0
virtual void add_packet(Packet *packet)=0
virtual size_t get_message_queue_size()=0
virtual Certification_handler * get_certification_handler()=0
virtual Flow_control_module * get_flow_control_module()=0
virtual int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions)=0
virtual int wait_for_applier_event_execution(std::string &retrieved_set, double timeout, bool update_THD_status=true)=0
virtual void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)=0
virtual void add_leaving_members_action_packet(Leaving_members_action_packet *packet)=0
virtual void awake_applier_module()=0
virtual int purge_applier_queue_and_restart_applier_module()=0
virtual Member_applier_state get_applier_status()=0
virtual int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key)=0
virtual void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)=0
virtual void add_suspension_packet()=0
virtual bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true)=0
virtual void add_metadata_processing_packet(Recovery_metadata_processing_packets *packet)=0
virtual void run_flow_control_step()=0
virtual void add_view_change_packet(View_change_packet *packet)=0
virtual bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition)=0
virtual void add_single_primary_action_packet(Single_primary_action_packet *packet)=0
virtual int handle_pipeline_action(Pipeline_action *action)=0
virtual void interrupt_applier_suspension_wait()=0
Definition: applier.h:322
~Applier_module() override
Definition: applier.cc:77
int apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a Data packet received by the applier.
Definition: applier.cc:345
Certification_handler * get_certification_handler() override
Returns the handler instance in the applier module responsible for certification.
Definition: applier.cc:989
bool is_applier_thread_aborted()
Is the applier marked for shutdown?
Definition: applier.h:357
void add_termination_packet()
Queues a packet that will make the applier module terminate it's handling process.
Definition: applier.h:528
Plugin_stage_monitor_handler stage_handler
Definition: applier.h:979
int applier_error
Definition: applier.h:948
void clean_applier_thread_context()
Cleans the thread context for the applier thread This includes such tasks as removing the thread from...
Definition: applier.cc:230
bool apply_action_packet(Action_packet *action_packet)
Apply an action packet received by the applier.
Definition: applier.cc:247
int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true) override
Waits for the applier to suspend and apply all the transactions previous to the suspend request.
Definition: applier.cc:845
int applier_thread_handle()
Runs the applier thread process, reading events and processing them.
Definition: applier.cc:444
Member_applier_state get_applier_status() override
Definition: applier.h:746
bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true) override
Waits for the execution of all current events by part of the current SQL applier.
Definition: applier.cc:967
int setup_pipeline_handlers()
Configure the applier pipeline handlers.
Definition: applier.cc:164
bool applier_killed_status
Definition: applier.h:950
void set_stop_wait_timeout(ulong timeout)
Sets the applier shutdown timeout.
Definition: applier.h:481
bool m_ignore_applier_errors_during_stop
Definition: applier.h:952
my_thread_handle applier_pthd
Definition: applier.h:932
int apply_single_primary_action_packet(Single_primary_action_packet *packet)
Apply an single primary action packet received by the applier.
Definition: applier.cc:403
void ignore_errors_during_stop(bool ignore_errors)
Check whether to ignore applier errors during stop or not.
Definition: applier.h:505
int purge_applier_queue_and_restart_applier_module() override
Purges the relay logs and restarts the applier thread.
Definition: applier.cc:121
mysql_mutex_t run_lock
Definition: applier.h:941
int apply_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)
Apply a transaction prepared action packet received by the applier.
Definition: applier.cc:422
bool suspended
Definition: applier.h:960
int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs, ulong stop_timeout, rpl_sidno group_sidno, ulonglong gtid_assignment_block_size)
Configure the applier pipeline according to the given configuration.
Definition: applier.cc:95
void add_suspension_packet() override
Queues a packet that will eventually make the applier module suspend.
Definition: applier.h:517
mysql_cond_t suspension_waiting_condition
Definition: applier.h:963
rpl_sidno group_replication_sidno
Definition: applier.h:937
int intersect_group_executed_sets(std::vector< std::string > &gtid_sets, Gtid_set *output_set)
This method calculates the intersection of the given sets passed as a list of strings.
Definition: applier.cc:997
bool is_running()
Is the applier running?
Definition: applier.h:368
void add_packet(Packet *packet) override
Generic add packet method.
Definition: applier.h:534
void add_single_primary_action_packet(Single_primary_action_packet *packet) override
Queues a single primary action packet into the applier.
Definition: applier.h:561
int apply_leaving_members_action_packet(Leaving_members_action_packet *packet)
Apply a leaving members action packet received by the applier.
Definition: applier.cc:437
void suspend_applier_module()
Suspends the applier module, being transactions still queued in the incoming queue.
Definition: applier.h:883
void set_applier_thread_context()
Set the thread context for the applier thread.
Definition: applier.cc:190
int initialize_applier_thread()
Initializes and launches the applier thread.
Definition: applier.cc:687
int terminate_applier_thread()
Terminates the applier thread.
Definition: applier.cc:739
size_t get_message_queue_size() override
Returns the applier module's queue size.
Definition: applier.h:744
bool reset_applier_logs
Definition: applier.h:936
void inform_of_applier_stop(char *channel_name, bool aborted)
This method informs the applier module that an applying thread stopped.
Definition: applier.cc:817
bool is_applier_thread_waiting()
Checks if the applier, and its workers when parallel applier is enabled, has already consumed all rel...
Definition: applier.cc:900
Flow_control_module flow_control_module
Definition: applier.h:978
void add_metadata_processing_packet(Recovery_metadata_processing_packets *packet) override
Definition: applier.h:549
int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions) override
Waits for the execution of all events by part of the current SQL applier.
Definition: applier.cc:912
int terminate_applier_pipeline()
Terminates the pipeline, shutting down the handlers and deleting them.
Definition: applier.cc:724
Event_handler * pipeline
Definition: applier.h:969
bool get_retrieved_gtid_set(std::string &retrieved_set) override
Returns the retrieved gtid set for the applier channel.
Definition: applier.cc:938
int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key) override
Queues the packet coming from the reader for future application.
Definition: applier.h:431
bool applier_aborted
Definition: applier.h:946
Pipeline_stats_member_collector * get_pipeline_stats_member_collector() override
Definition: applier.h:755
mysql_mutex_t suspend_lock
Definition: applier.h:956
int apply_view_change_packet(View_change_packet *view_change_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a View Change packet received by the applier.
Definition: applier.cc:269
void awake_applier_module() override
Awakes the applier module.
Definition: applier.h:610
Pipeline_member_stats * get_local_pipeline_stats()
Return the local applier stats.
Definition: applier.cc:1062
bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition) override
Definition: applier.cc:1056
int inject_event_into_pipeline(Pipeline_event *pevent, Continuation *cont)
Injects an event into the pipeline and waits for its handling.
Definition: applier.cc:236
thread_state applier_thd_state
Definition: applier.h:944
ulonglong gtid_assignment_block_size
Definition: applier.h:938
int apply_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)
Apply a synchronization before execution action packet received by the applier.
Definition: applier.cc:430
void add_view_change_packet(View_change_packet *packet) override
Queues a view change packet into the applier.
Definition: applier.h:545
void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet) override
Queues a synchronization before execution action packet into the applier.
Definition: applier.h:585
THD * applier_thd
Definition: applier.h:933
mysql_cond_t suspend_cond
Definition: applier.h:958
int handle_pipeline_action(Pipeline_action *action) override
Gives the pipeline an action for execution.
Definition: applier.h:449
Pipeline_stats_member_collector pipeline_stats_member_collector
Definition: applier.h:977
void run_flow_control_step() override
Definition: applier.h:764
mysql_cond_t run_cond
Definition: applier.h:942
virtual void queue_certification_enabling_packet()
Queues a single a packet that will enable certification on this member.
Definition: applier.cc:1051
ulong stop_wait_timeout
Definition: applier.h:972
Applier_module()
Definition: applier.cc:58
int apply_metadata_processing_packet(Recovery_metadata_processing_packets *metadata_processing_packet)
Apply a Recovery metadata processing information received from the GCS.
Definition: applier.cc:331
void interrupt_applier_suspension_wait() override
Interrupts the current applier waiting process either for it's suspension or it's wait for the consum...
Definition: applier.cc:894
Flow_control_module * get_flow_control_module() override
Definition: applier.h:760
Applier_channel_state_observer * applier_channel_observer
Definition: applier.h:975
void add_leaving_members_action_packet(Leaving_members_action_packet *packet) override
Queues a leaving members action packet into the applier.
Definition: applier.h:597
Synchronized_queue< Packet * > * incoming
Definition: applier.h:966
void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet) override
Queues a transaction prepared action packet into the applier.
Definition: applier.h:573
Definition: certification_handler.h:32
Class used to wait on the execution of some action.
Definition: pipeline_interfaces.h:536
A wrapper for raw network packets.
Definition: pipeline_interfaces.h:76
A packet to inform the applier it should fail.
Definition: applier.h:252
const char * m_error_message
The error message for the failure process.
Definition: applier.h:267
~Error_action_packet() override=default
Error_action_packet(const char *error_message)
Create a new error packet.
Definition: applier.h:256
const char * get_error_message()
Returns the error message for the failure.
Definition: applier.h:263
Interface for the application of events, them being packets or log events.
Definition: pipeline_interfaces.h:663
virtual int handle_action(Pipeline_action *action)=0
Handling of an action as defined in the handler implementation.
The pipeline stats aggregator of all group members stats and flow control module.
Definition: pipeline_stats.h:608
void flow_control_step(Pipeline_stats_member_collector *)
Evaluate the information received in the last flow control period and adjust the system parameters ac...
Definition: pipeline_stats.cc:749
For binlog version 4.
Definition: log_event.h:1536
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
Represents a set of GTIDs.
Definition: rpl_gtid.h:1557
Action to configure existing applier handlers.
Definition: pipeline_handlers.h:105
A packet to inform pipeline listeners of leaving members, this packet will be handled on the global m...
Definition: applier.h:232
const std::vector< Gcs_member_identifier > m_leaving_members
Definition: applier.h:246
~Leaving_members_action_packet() override=default
Leaving_members_action_packet(const std::vector< Gcs_member_identifier > &leaving_members)
Create a new leaving members packet.
Definition: applier.h:239
A generic interface for different kinds of packets.
Definition: pipeline_interfaces.h:51
A wrapper for pipeline actions.
Definition: pipeline_interfaces.h:633
A wrapper for log events/packets.
Definition: pipeline_interfaces.h:167
Computed statistics per member.
Definition: pipeline_stats.h:433
The pipeline collector for the local member stats.
Definition: pipeline_stats.h:284
Definition: stage_monitor_handler.h:30
int set_stage(PSI_stage_key key, const char *file, int line, ulonglong estimated_work, ulonglong work_completed)
Set that a new stage is now in progress.
Definition: stage_monitor_handler.cc:79
A packet to wait for queue consumption.
Definition: applier.h:139
std::shared_ptr< Continuation > checkpoint_condition
If we discard a packet.
Definition: applier.h:153
void signal_checkpoint_reached()
Definition: applier.h:149
Queue_checkpoint_packet(std::shared_ptr< Continuation > checkpoint_condition_arg)
Create a new Queue_checkpoint_packet packet.
Definition: applier.h:144
A packet to send Metadata related processing.
Definition: applier.h:92
Recovery_metadata_processing_packets()
Create a new data packet with associated data.
Definition: applier.h:97
bool m_current_member_leaving_the_group
Definition: applier.h:111
std::vector< Gcs_member_identifier > m_member_left_the_group
Definition: applier.h:108
std::vector< std::string > m_view_id_to_be_deleted
Definition: applier.h:105
virtual ~Recovery_metadata_processing_packets() override=default
A packet to send new primary election related info to the applier.
Definition: applier.h:118
enum_action
Definition: applier.h:120
@ NEW_PRIMARY
Definition: applier.h:120
@ QUEUE_APPLIED
Definition: applier.h:120
Single_primary_action_packet(enum enum_action action_arg)
Create a new single primary action packet with associated data.
Definition: applier.h:127
~Single_primary_action_packet() override=default
enum enum_action action
Definition: applier.h:132
A packet to request a synchronization point on the global message order on a given member before tran...
Definition: applier.h:206
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:223
Sync_before_execution_action_packet(my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
Create a new synchronization point request.
Definition: applier.h:214
const my_thread_id m_thread_id
Definition: applier.h:222
~Sync_before_execution_action_packet() override=default
Definition: plugin_utils.h:182
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
std::atomic< killed_state > killed
Definition: sql_class.h:2745
A packet to inform that a given member did prepare a given transaction.
Definition: applier.h:160
const gr::Gtid_tsid & get_tsid() const
tsid accessor
Definition: applier.h:191
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:187
const rpl_gno m_gno
Definition: applier.h:186
~Transaction_prepared_action_packet() override=default
gr::Gtid_tsid m_tsid
Definition: applier.h:198
Transaction_prepared_action_packet(const gr::Gtid_tsid &tsid, bool is_tsid_specified, rpl_gno gno, const Gcs_member_identifier &gcs_member_id)
Create a new transaction prepared action.
Definition: applier.h:171
bool is_tsid_specified() const
returns information on whether TSID is specified for this trx
Definition: applier.h:195
const bool m_tsid_specified
Definition: applier.h:185
A packet to send view change related info to the applier.
Definition: pipeline_interfaces.h:116
Represents Transaction Source Identifier which is composed of source UUID and transaction tag.
Definition: tsid.h:44
#define mysql_cond_timedwait(C, M, T)
Definition: mysql_cond.h:51
#define mysql_mutex_lock(M)
Definition: mysql_mutex.h:50
#define mysql_mutex_unlock(M)
Definition: mysql_mutex.h:57
unsigned int PSI_memory_key
Instrumented memory key.
Definition: psi_memory_bits.h:49
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:56
unsigned char uchar
Definition: my_inttypes.h:52
void set_timespec(struct timespec *abstime, Timeout_type sec)
Set the value of a timespec object to the current time plus a number of seconds using seconds.
Definition: my_systime.cc:84
static my_thread_id thread_id
Definition: my_thr_init.cc:63
uint32 my_thread_id
Definition: my_thread_local.h:34
int(* mysql_cond_broadcast)(mysql_cond_t *that, const char *src_file, unsigned int src_line)
Definition: mysql_cond_service.h:52
static bool ignore_errors
Definition: mysqlcheck.cc:62
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
Handler_pipeline_type
Definition: pipeline_factory.h:45
API for Group Replication plugin.
enum_group_replication_consistency_level
Definition: plugin_group_replication.h:35
PSI_stage_info info_GR_STAGE_module_suspending
Definition: plugin_psi.h:230
PSI_stage_info info_GR_STAGE_module_executing
Definition: plugin_psi.h:229
required string key
Definition: replication_asynchronous_connection_failover.proto:60
repeated Action action
Definition: replication_group_member_actions.proto:43
mysql::gtid::gno_t rpl_gno
GNO, the second (numeric) component of a GTID, is an alias of mysql::gtid::gno_t.
Definition: rpl_gtid.h:113
cs::index::rpl_sidno rpl_sidno
Type of SIDNO (source ID number, first component of GTID)
Definition: rpl_gtid.h:109
PSI_stage_key m_key
The registered stage key.
Definition: psi_stage_bits.h:76
Definition: my_thread_bits.h:58
An instrumented cond structure.
Definition: mysql_cond_bits.h:50
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:50
Definition: plugin_utils.h:48
bool is_running() const
Definition: plugin_utils.h:85