MySQL 8.3.0
Source Code Documentation
applier.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef APPLIER_INCLUDE
24#define APPLIER_INCLUDE
25
28#include <list>
29#include <vector>
30
31#include "my_inttypes.h"
43#include "sql/sql_class.h"
44
45// Define the applier packet types
46#define ACTION_PACKET_TYPE 2
47#define SINGLE_PRIMARY_PACKET_TYPE 4
48#define SYNC_BEFORE_EXECUTION_PACKET_TYPE 5
49#define TRANSACTION_PREPARED_PACKET_TYPE 6
50#define LEAVING_MEMBERS_PACKET_TYPE 7
51#define RECOVERY_METADATA_PROCESSING_PACKET_TYPE 8
52#define ERROR_PACKET_TYPE 9 ///< Make the applier fail
53
54// Define the applier return error codes
55#define APPLIER_GTID_CHECK_TIMEOUT_ERROR -1
56#define APPLIER_RELAY_LOG_NOT_INITED -2
57#define APPLIER_THREAD_ABORTED -3
58
59extern char applier_module_channel_name[];
60
61/* Types of action packets used in the context of the applier module */
63 TERMINATION_PACKET = 0, // Packet for a termination action
64 SUSPENSION_PACKET, // Packet to signal something to suspend
65 CHECKPOINT_PACKET, // Packet to wait for queue consumption
66 ACTION_NUMBER = 3 // The number of actions
67};
68
69/**
70 @class Action_packet
71 A packet to control the applier in a event oriented way.
72*/
73class Action_packet : public Packet {
74 public:
75 /**
76 Create a new action packet.
77 @param action the packet action
78 */
81
82 ~Action_packet() override = default;
83
85};
86
87/**
88 @class Recovery_metadata_processing_packets
89 A packet to send Metadata related processing.
90*/
92 public:
93 /**
94 Create a new data packet with associated data.
95 */
98
99 virtual ~Recovery_metadata_processing_packets() override = default;
100
101 /*
102 List of view-id of which metadata is received.
103 */
104 std::vector<std::string> m_view_id_to_be_deleted;
105
106 /* List of member that left the group. */
107 std::vector<Gcs_member_identifier> m_member_left_the_group;
108
109 /* A flag that indicates all the recovery metadata should be cleared. */
111};
112
113/**
114 @class Single_primary_action_packet
115 A packet to send new primary election related info to the applier
116*/
118 public:
120
121 /**
122 Create a new single primary action packet with associated data.
123
124 @param action_arg the packet action
125 */
127 : Packet(SINGLE_PRIMARY_PACKET_TYPE), action(action_arg) {}
128
129 ~Single_primary_action_packet() override = default;
130
132};
133
134/**
135 @class Queue_checkpoint_packet
136 A packet to wait for queue consumption
137*/
139 public:
140 /**
141 Create a new Queue_checkpoint_packet packet.
142 */
144 std::shared_ptr<Continuation> checkpoint_condition_arg)
146 checkpoint_condition(checkpoint_condition_arg) {}
147
149
150 private:
151 /**If we discard a packet */
152 std::shared_ptr<Continuation> checkpoint_condition;
153};
154
155/**
156 @class Transaction_prepared_action_packet
157 A packet to inform that a given member did prepare a given transaction.
158*/
160 public:
161 /**
162 Create a new transaction prepared action.
163
164 @param tsid the prepared transaction tsid
165 @param is_tsid_specified information on whether tsid is specified
166 @param gno the prepared transaction gno
167 @param gcs_member_id the member id that did prepare the
168 transaction
169 */
171 bool is_tsid_specified, rpl_gno gno,
172 const Gcs_member_identifier &gcs_member_id)
175 m_gno(gno),
176 m_gcs_member_id(gcs_member_id.get_member_id()) {
177 if (m_tsid_specified) {
178 m_tsid = tsid;
179 }
180 }
181
183
187
188 /// @brief tsid accessor
189 /// @return tsid const reference
190 const gr::Gtid_tsid &get_tsid() const { return m_tsid; }
191
192 /// @brief returns information on whether TSID is specified for this trx
193 /// @return information on whether TSID is specified for this trx
194 bool is_tsid_specified() const { return m_tsid_specified; }
195
196 private:
198};
199
200/**
201 @class Sync_before_execution_action_packet
202 A packet to request a synchronization point on the global message
203 order on a given member before transaction execution.
204*/
206 public:
207 /**
208 Create a new synchronization point request.
209
210 @param thread_id the thread that did the request
211 @param gcs_member_id the member id that did the request
212 */
214 my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
217 m_gcs_member_id(gcs_member_id.get_member_id()) {}
218
220
223};
224
225/**
226 @class Leaving_members_action_packet
227 A packet to inform pipeline listeners of leaving members,
228 this packet will be handled on the global message order,
229 that is, ordered with certification.
230*/
232 public:
233 /**
234 Create a new leaving members packet.
235
236 @param leaving_members the members that left the group
237 */
239 const std::vector<Gcs_member_identifier> &leaving_members)
241 m_leaving_members(leaving_members) {}
242
243 ~Leaving_members_action_packet() override = default;
244
245 const std::vector<Gcs_member_identifier> m_leaving_members;
246};
247
248/// @class Error_action_packet
249/// A packet to inform the applier it should fail.
250/// It should include a message about the failure
252 public:
253 /// Create a new error packet.
254 /// @param error_message the error that will make the applier stop
255 Error_action_packet(const char *error_message)
256 : Packet(ERROR_PACKET_TYPE), m_error_message(error_message) {}
257
258 ~Error_action_packet() override = default;
259
260 /// @brief Returns the error message for the failure
261 /// @return the error message
262 const char *get_error_message() { return m_error_message; }
263
264 private:
265 /// The error message for the failure process
266 const char *m_error_message;
267};
268
269typedef enum enum_applier_state {
274
276 public:
277 virtual ~Applier_module_interface() = default;
280 bool *abort_flag, bool wait_for_execution = true) = 0;
281 virtual void awake_applier_module() = 0;
284 double timeout, bool check_and_purge_partial_transactions) = 0;
286 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
287 bool update_THD_status = true) = 0;
288 virtual bool get_retrieved_gtid_set(std::string &retrieved_set) = 0;
290 std::string &retrieved_set, double timeout,
291 bool update_THD_status = true) = 0;
292 virtual size_t get_message_queue_size() = 0;
294 virtual void add_suspension_packet() = 0;
295 virtual void add_packet(Packet *packet) = 0;
300 Single_primary_action_packet *packet) = 0;
307 virtual int handle(const uchar *data, ulong len,
309 std::list<Gcs_member_identifier> *online_members,
310 PSI_memory_key key) = 0;
313 virtual void run_flow_control_step() = 0;
316 std::shared_ptr<Continuation> checkpoint_condition) = 0;
319};
320
322 public:
324 ~Applier_module() override;
325
326 /**
327 Initializes and launches the applier thread
328
329 @return the operation status
330 @retval 0 OK
331 @retval !=0 Error
332 */
334
335 /**
336 * Return the local applier stats.
337 */
339
340 /**
341 Terminates the applier thread.
342
343 @return the operation status
344 @retval 0 OK
345 @retval !=0 A timeout occurred
346 */
348
349 /**
350 Is the applier marked for shutdown?
351
352 @return is the applier on shutdown
353 @retval 0 no
354 @retval !=0 yes
355 */
357 return (applier_aborted || applier_thd == nullptr || applier_thd->killed);
358 }
359
360 /**
361 Is the applier running?
362
363 @return applier running?
364 @retval 0 no
365 @retval !=0 yes
366 */
368
369 /**
370 Configure the applier pipeline according to the given configuration
371
372 @param[in] pipeline_type the chosen pipeline
373 @param[in] reset_logs if a reset happened in the server
374 @param[in] stop_timeout the timeout when waiting on shutdown
375 @param[in] group_sidno the group configured sidno
376 @param[in] gtid_assignment_block_size the group gtid assignment block size
377
378 @return the operation status
379 @retval 0 OK
380 @retval !=0 Error
381 */
382 int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs,
383 ulong stop_timeout, rpl_sidno group_sidno,
385
386 /**
387 Configure the applier pipeline handlers
388
389 @return the operation status
390 @retval 0 OK
391 @retval !=0 Error
392 */
394
395 /**
396 Purges the relay logs and restarts the applier thread.
397
398 @return the operation status
399 @retval 0 OK
400 @retval !=0 Error
401 */
403
404 /**
405 Runs the applier thread process, reading events and processing them.
406
407 @note When killed, the thread will finish handling the current packet, and
408 then die, ignoring all possible existing events in the incoming queue.
409
410 @return the operation status
411 @retval 0 OK
412 @retval !=0 Error
413 */
415
416 /**
417 Queues the packet coming from the reader for future application.
418
419 @param[in] data the packet data
420 @param[in] len the packet length
421 @param[in] consistency_level the transaction consistency level
422 @param[in] online_members the ONLINE members when the transaction
423 message was delivered
424 @param[in] key the memory instrument key
425
426 @return the operation status
427 @retval 0 OK
428 @retval !=0 Error on queue
429 */
430 int handle(const uchar *data, ulong len,
432 std::list<Gcs_member_identifier> *online_members,
433 PSI_memory_key key) override {
434 this->incoming->push(
435 new Data_packet(data, len, key, consistency_level, online_members));
436 return 0;
437 }
438
439 /**
440 Gives the pipeline an action for execution.
441
442 @param[in] action the action to be executed
443
444 @return the operation status
445 @retval 0 OK
446 @retval !=0 Error executing the action
447 */
449 return this->pipeline->handle_action(action);
450 }
451
452 /**
453 Injects an event into the pipeline and waits for its handling.
454
455 @param[in] pevent the event to be injected
456 @param[in] cont the object used to wait
457
458 @return the operation status
459 @retval 0 OK
460 @retval !=0 Error on queue
461 */
463
464 /**
465 Terminates the pipeline, shutting down the handlers and deleting them.
466
467 @note the pipeline will always be deleted even if an error occurs.
468
469 @return the operation status
470 @retval 0 OK
471 @retval !=0 Error on pipeline termination
472 */
474
475 /**
476 Sets the applier shutdown timeout.
477
478 @param[in] timeout the timeout
479 */
482
483 // Configure any thread based applier
486 pipeline->handle_action(conf_action);
487
488 delete conf_action;
489 }
490
491 /**
492 This method informs the applier module that an applying thread stopped
493 */
494 void inform_of_applier_stop(char *channel_name, bool aborted);
495
496 /**
497 Check whether to ignore applier errors during stop or not.
498 Errors put the members into ERROR state.
499 If errors are ignored member will stay in ONLINE state.
500 During clone, applier errors are ignored, since data will come from clone.
501
502 @param[in] ignore_errors if true ignore applier errors during stop
503 */
506 }
507
508 // Packet based interface methods
509
510 /**
511 Queues a packet that will eventually make the applier module suspend.
512 This will happen only after all the previous packets are processed.
513
514 @note This will happen only after all the previous packets are processed.
515 */
516 void add_suspension_packet() override {
517 this->incoming->push(new Action_packet(SUSPENSION_PACKET));
518 }
519
520 /**
521 Queues a packet that will make the applier module terminate it's handling
522 process. Due to the blocking nature of the queue, this method is useful to
523 unblock the handling process on shutdown.
524
525 @note This will happen only after all the previous packets are processed.
526 */
528 this->incoming->push(new Action_packet(TERMINATION_PACKET));
529 }
530
531 /// @brief Generic add packet method
532 /// @param packet the packet to be queued in the applier
533 void add_packet(Packet *packet) override { incoming->push(packet); }
534
535 /**
536 Queues a view change packet into the applier.
537 This packets contain the new view id and they mark the exact frontier
538 between transactions from the old and new views.
539
540 @note This will happen only after all the previous packets are processed.
541
542 @param[in] packet The view change packet to be queued
543 */
545 incoming->push(packet);
546 }
547
549 Recovery_metadata_processing_packets *packet) override {
550 incoming->push(packet);
551 }
552
553 /**
554 Queues a single primary action packet into the applier.
555
556 @note This will happen only after all the previous packets are processed.
557
558 @param[in] packet The packet to be queued
559 */
561 Single_primary_action_packet *packet) override {
562 incoming->push(packet);
563 }
564
565 /**
566 Queues a transaction prepared action packet into the applier.
567
568 @note This will happen only after all the previous packets are processed.
569
570 @param[in] packet The packet to be queued
571 */
573 Transaction_prepared_action_packet *packet) override {
574 incoming->push(packet);
575 }
576
577 /**
578 Queues a synchronization before execution action packet into the applier.
579
580 @note This will happen only after all the previous packets are processed.
581
582 @param[in] packet The packet to be queued
583 */
585 Sync_before_execution_action_packet *packet) override {
586 incoming->push(packet);
587 }
588
589 /**
590 Queues a leaving members action packet into the applier.
591
592 @note This will happen only after all the previous packets are processed.
593
594 @param[in] packet The packet to be queued
595 */
597 Leaving_members_action_packet *packet) override {
598 incoming->push(packet);
599 }
600
601 /**
602 Queues a single a packet that will enable certification on this member
603 */
605
606 /**
607 Awakes the applier module
608 */
609 void awake_applier_module() override {
611 suspended = false;
614 }
615
616 /**
617 Waits for the applier to suspend and apply all the transactions previous to
618 the suspend request.
619
620 @param abort_flag a pointer to a flag that the caller can use to
621 cancel the request.
622 @param wait_for_execution specify if the suspension waits for transactions
623 execution
624
625 @return the operation status
626 @retval 0 OK
627 @retval !=0 Error when accessing the applier module status
628 */
630 bool *abort_flag, bool wait_for_execution = true) override;
631
632 /**
633 Interrupts the current applier waiting process either for it's suspension
634 or it's wait for the consumption of pre suspension events
635 */
636 void interrupt_applier_suspension_wait() override;
637
638 /**
639 Checks if the applier, and its workers when parallel applier is
640 enabled, has already consumed all relay log, that is, applier is
641 waiting for transactions to be queued.
642
643 @return the applier status
644 @retval true the applier is waiting
645 @retval false otherwise
646 */
648
649 /**
650 Waits for the execution of all events by part of the current SQL applier.
651 Due to the possible asynchronous nature of module's applier handler, this
652 method inquires the current handler to check if all transactions queued up
653 to this point are already executed.
654
655 If no handler exists, then it is assumed that transactions were processed.
656
657 @param timeout the time (seconds) after which the method returns if the
658 above condition was not satisfied
659 @param check_and_purge_partial_transactions
660 on successful executions, check and purge partial
661 transactions in the relay log
662
663 @return the operation status
664 @retval 0 All transactions were executed
665 @retval -1 A timeout occurred
666 @retval -2 An error occurred
667 */
669 double timeout, bool check_and_purge_partial_transactions) override;
670
671 /**
672 Waits for the execution of all current events by part of the current SQL
673 applier.
674
675 The current gtid retrieved set is extracted and a loop is executed until
676 these transactions are executed.
677
678 If the applier SQL thread stops, the method will return an error.
679
680 If no handler exists, then it is assumed that transactions were processed.
681
682 @param checkpoint_condition the class used to wait for the queue to be
683 consumed. Can be used to cancel the wait.
684 @param abort_flag a pointer to a flag that the caller can use to
685 cancel the request.
686 @param update_THD_status Shall the method update the THD stage
687
688
689 @return the operation status
690 @retval false All transactions were executed
691 @retval true An error occurred
692 */
694 std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
695 bool update_THD_status = true) override;
696
697 /**
698 Returns the retrieved gtid set for the applier channel
699
700 @param[out] retrieved_set the set in string format.
701
702 @retval true there was an error.
703 @retval false the operation has succeeded.
704 */
705 bool get_retrieved_gtid_set(std::string &retrieved_set) override;
706
707 /**
708 Waits for the execution of all events in the given set by the current SQL
709 applier. If no handler exists, then it is assumed that transactions were
710 processed.
711
712 @param retrieved_set the set in string format of transaction to wait for
713 @param timeout the time (seconds) after which the method returns if the
714 above condition was not satisfied
715 @param update_THD_status Shall the method update the THD stage
716
717 @return the operation status
718 @retval 0 All transactions were executed
719 @retval -1 A timeout occurred
720 @retval -2 An error occurred
721 */
722 int wait_for_applier_event_execution(std::string &retrieved_set,
723 double timeout,
724 bool update_THD_status = true) override;
725
726 /**
727 Returns the handler instance in the applier module responsible for
728 certification.
729
730 @note If new certification handlers appear, an interface must be created.
731
732 @return a pointer to the applier's certification handler.
733 @retval !=NULL The certification handler
734 @retval NULL No certification handler present
735 */
737
738 /**
739 Returns the applier module's queue size.
740
741 @return the size of the queue
742 */
743 size_t get_message_queue_size() override { return incoming->size(); }
744
747 return APPLIER_STATE_ON;
748 else if (suspended) /* purecov: inspected */
749 return APPLIER_STATE_OFF; /* purecov: inspected */
750 else
751 return APPLIER_ERROR; /* purecov: inspected */
752 }
753
755 override {
757 }
758
760 return &flow_control_module;
761 }
762
763 void run_flow_control_step() override {
765 }
766
768 std::shared_ptr<Continuation> checkpoint_condition) override;
769
770 private:
771 // Applier packet handlers
772
773 /**
774 Apply an action packet received by the applier.
775 It can be a order to suspend or terminate.
776
777 @param action_packet the received action packet
778
779 @return if the applier should terminate (with no associated error).
780 */
781 bool apply_action_packet(Action_packet *action_packet);
782
783 /**
784 Apply a View Change packet received by the applier.
785 It executes some certification operations and queues a View Change Event
786
787 @param view_change_packet the received view change packet
788 @param fde_evt the Format description event associated to the event
789 @param cont the applier Continuation Object
790
791 @return the operation status
792 @retval 0 OK
793 @retval !=0 Error when injecting event
794 */
795 int apply_view_change_packet(View_change_packet *view_change_packet,
797 Continuation *cont);
798
799 /**
800 Apply a Recovery metadata processing information received from the GCS.
801
802 @param metadata_processing_packet Information of member left the group
803
804 @return the operation status
805 @retval 0 OK
806 @retval !=0 Error when injecting event
807 */
809 Recovery_metadata_processing_packets *metadata_processing_packet);
810
811 /**
812 Apply a Data packet received by the applier.
813 It executes some certification operations and queues a View Change Event
814
815 @param data_packet the received data packet packet
816 @param fde_evt the Format description event associated to the event
817 @param cont the applier Continuation Object
818
819 @return the operation status
820 @retval 0 OK
821 @retval !=0 Error when injecting event
822 */
823 int apply_data_packet(Data_packet *data_packet,
825 Continuation *cont);
826
827 /**
828 Apply an single primary action packet received by the applier.
829
830 @param packet the received action packet
831
832 @return the operation status
833 @retval 0 OK
834 @retval !=0 Error when applying packet
835 */
837
838 /**
839 Apply a transaction prepared action packet received by the applier.
840
841 @param packet the received action packet
842
843 @return the operation status
844 @retval 0 OK
845 @retval !=0 Error when applying packet
846 */
849
850 /**
851 Apply a synchronization before execution action packet received
852 by the applier.
853
854 @param packet the received action packet
855
856 @return the operation status
857 @retval 0 OK
858 @retval !=0 Error when applying packet
859 */
862
863 /**
864 Apply a leaving members action packet received by the applier.
865
866 @param packet the received action packet
867
868 @return the operation status
869 @retval 0 OK
870 @retval !=0 Error when applying packet
871 */
874
875 /**
876 Suspends the applier module, being transactions still queued in the incoming
877 queue.
878
879 @note if the proper condition is set, possible listeners can be awaken by
880 this method.
881 */
884
885 suspended = true;
887 __LINE__, 0, 0);
888
889 // Alert any interested party about the applier suspension
891
892 while (suspended) {
894 }
896 __LINE__, 0, 0);
897
899 }
900
901 /**
902 Cleans the thread context for the applier thread
903 This includes such tasks as removing the thread from the global thread list
904 */
906
907 /**
908 Set the thread context for the applier thread.
909 This allows the thread to behave like an slave thread and perform
910 such tasks as queuing to a relay log.
911 */
913
914 /**
915 This method calculates the intersection of the given sets passed as a list
916 of strings.
917
918 @param[in] gtid_sets the vector containing the GTID sets to intersect
919 @param[out] output_set the final GTID calculated from the intersection
920
921 @return the operation status
922 @retval 0 all went fine
923 @retval !=0 error
924 */
925 int intersect_group_executed_sets(std::vector<std::string> &gtid_sets,
926 Gtid_set *output_set);
927
928 // applier thread variables
931
932 // configuration options
936
937 // run conditions and locks
940 /* Applier thread state */
942 /* Applier abort flag */
944 /* Applier error during execution */
946 /* Applier killed status */
948 /* Ignore applier errors during stop. */
950
951 // condition and lock used to suspend/awake the applier module
952 /* The lock for suspending/wait for the awake of the applier module */
954 /* The condition for suspending/wait for the awake of the applier module */
956 /* Suspend flag that informs if the applier is suspended */
958
959 /* The condition for signaling the applier suspension*/
961
962 /* The incoming event queue */
964
965 /* The applier pipeline for event execution */
967
968 /* Applier timeout on shutdown */
970
971 /* Applier channel observer to detect failures */
973
977};
978
979#endif /* APPLIER_INCLUDE */
#define ERROR_PACKET_TYPE
Make the applier fail.
Definition: applier.h:52
#define RECOVERY_METADATA_PROCESSING_PACKET_TYPE
Definition: applier.h:51
#define ACTION_PACKET_TYPE
Definition: applier.h:46
char applier_module_channel_name[]
Definition: applier.cc:48
#define LEAVING_MEMBERS_PACKET_TYPE
Definition: applier.h:50
enum_applier_state
Definition: applier.h:269
@ APPLIER_STATE_ON
Definition: applier.h:270
@ APPLIER_STATE_OFF
Definition: applier.h:271
@ APPLIER_ERROR
Definition: applier.h:272
#define TRANSACTION_PREPARED_PACKET_TYPE
Definition: applier.h:49
enum enum_applier_state Member_applier_state
#define SINGLE_PRIMARY_PACKET_TYPE
Definition: applier.h:47
#define SYNC_BEFORE_EXECUTION_PACKET_TYPE
Definition: applier.h:48
enum_packet_action
Definition: applier.h:62
@ TERMINATION_PACKET
Definition: applier.h:63
@ ACTION_NUMBER
Definition: applier.h:66
@ SUSPENSION_PACKET
Definition: applier.h:64
@ CHECKPOINT_PACKET
Definition: applier.h:65
A packet to control the applier in a event oriented way.
Definition: applier.h:73
Action_packet(enum_packet_action action)
Create a new action packet.
Definition: applier.h:79
enum_packet_action packet_action
Definition: applier.h:84
~Action_packet() override=default
Definition: applier_channel_state_observer.h:29
Definition: applier.h:275
virtual Pipeline_stats_member_collector * get_pipeline_stats_member_collector()=0
virtual bool get_retrieved_gtid_set(std::string &retrieved_set)=0
virtual ~Applier_module_interface()=default
virtual int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true)=0
virtual void add_packet(Packet *packet)=0
virtual size_t get_message_queue_size()=0
virtual Certification_handler * get_certification_handler()=0
virtual Flow_control_module * get_flow_control_module()=0
virtual int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions)=0
virtual int wait_for_applier_event_execution(std::string &retrieved_set, double timeout, bool update_THD_status=true)=0
virtual void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)=0
virtual void add_leaving_members_action_packet(Leaving_members_action_packet *packet)=0
virtual void awake_applier_module()=0
virtual int purge_applier_queue_and_restart_applier_module()=0
virtual Member_applier_state get_applier_status()=0
virtual int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key)=0
virtual void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)=0
virtual void add_suspension_packet()=0
virtual bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true)=0
virtual void add_metadata_processing_packet(Recovery_metadata_processing_packets *packet)=0
virtual void run_flow_control_step()=0
virtual void add_view_change_packet(View_change_packet *packet)=0
virtual bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition)=0
virtual void add_single_primary_action_packet(Single_primary_action_packet *packet)=0
virtual int handle_pipeline_action(Pipeline_action *action)=0
virtual void interrupt_applier_suspension_wait()=0
Definition: applier.h:321
~Applier_module() override
Definition: applier.cc:76
int apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a Data packet received by the applier.
Definition: applier.cc:361
Certification_handler * get_certification_handler() override
Returns the handler instance in the applier module responsible for certification.
Definition: applier.cc:1003
bool is_applier_thread_aborted()
Is the applier marked for shutdown?
Definition: applier.h:356
void add_termination_packet()
Queues a packet that will make the applier module terminate it's handling process.
Definition: applier.h:527
Plugin_stage_monitor_handler stage_handler
Definition: applier.h:976
int applier_error
Definition: applier.h:945
void clean_applier_thread_context()
Cleans the thread context for the applier thread This includes such tasks as removing the thread from...
Definition: applier.cc:229
bool apply_action_packet(Action_packet *action_packet)
Apply an action packet received by the applier.
Definition: applier.cc:246
int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true) override
Waits for the applier to suspend and apply all the transactions previous to the suspend request.
Definition: applier.cc:862
int applier_thread_handle()
Runs the applier thread process, reading events and processing them.
Definition: applier.cc:460
Member_applier_state get_applier_status() override
Definition: applier.h:745
bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true) override
Waits for the execution of all current events by part of the current SQL applier.
Definition: applier.cc:981
int setup_pipeline_handlers()
Configure the applier pipeline handlers.
Definition: applier.cc:163
bool applier_killed_status
Definition: applier.h:947
void set_stop_wait_timeout(ulong timeout)
Sets the applier shutdown timeout.
Definition: applier.h:480
bool m_ignore_applier_errors_during_stop
Definition: applier.h:949
my_thread_handle applier_pthd
Definition: applier.h:929
int apply_single_primary_action_packet(Single_primary_action_packet *packet)
Apply an single primary action packet received by the applier.
Definition: applier.cc:419
void ignore_errors_during_stop(bool ignore_errors)
Check whether to ignore applier errors during stop or not.
Definition: applier.h:504
int purge_applier_queue_and_restart_applier_module() override
Purges the relay logs and restarts the applier thread.
Definition: applier.cc:120
mysql_mutex_t run_lock
Definition: applier.h:938
int apply_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)
Apply a transaction prepared action packet received by the applier.
Definition: applier.cc:438
bool suspended
Definition: applier.h:957
int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs, ulong stop_timeout, rpl_sidno group_sidno, ulonglong gtid_assignment_block_size)
Configure the applier pipeline according to the given configuration.
Definition: applier.cc:94
void add_suspension_packet() override
Queues a packet that will eventually make the applier module suspend.
Definition: applier.h:516
mysql_cond_t suspension_waiting_condition
Definition: applier.h:960
rpl_sidno group_replication_sidno
Definition: applier.h:934
int intersect_group_executed_sets(std::vector< std::string > &gtid_sets, Gtid_set *output_set)
This method calculates the intersection of the given sets passed as a list of strings.
Definition: applier.cc:1011
bool is_running()
Is the applier running?
Definition: applier.h:367
void add_packet(Packet *packet) override
Generic add packet method.
Definition: applier.h:533
void add_single_primary_action_packet(Single_primary_action_packet *packet) override
Queues a single primary action packet into the applier.
Definition: applier.h:560
int apply_leaving_members_action_packet(Leaving_members_action_packet *packet)
Apply a leaving members action packet received by the applier.
Definition: applier.cc:453
void suspend_applier_module()
Suspends the applier module, being transactions still queued in the incoming queue.
Definition: applier.h:882
void set_applier_thread_context()
Set the thread context for the applier thread.
Definition: applier.cc:189
int initialize_applier_thread()
Initializes and launches the applier thread.
Definition: applier.cc:703
int terminate_applier_thread()
Terminates the applier thread.
Definition: applier.cc:756
size_t get_message_queue_size() override
Returns the applier module's queue size.
Definition: applier.h:743
bool reset_applier_logs
Definition: applier.h:933
void inform_of_applier_stop(char *channel_name, bool aborted)
This method informs the applier module that an applying thread stopped.
Definition: applier.cc:834
bool is_applier_thread_waiting()
Checks if the applier, and its workers when parallel applier is enabled, has already consumed all rel...
Definition: applier.cc:914
Flow_control_module flow_control_module
Definition: applier.h:975
void add_metadata_processing_packet(Recovery_metadata_processing_packets *packet) override
Definition: applier.h:548
int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions) override
Waits for the execution of all events by part of the current SQL applier.
Definition: applier.cc:926
int terminate_applier_pipeline()
Terminates the pipeline, shutting down the handlers and deleting them.
Definition: applier.cc:741
Event_handler * pipeline
Definition: applier.h:966
bool get_retrieved_gtid_set(std::string &retrieved_set) override
Returns the retrieved gtid set for the applier channel.
Definition: applier.cc:952
int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members, PSI_memory_key key) override
Queues the packet coming from the reader for future application.
Definition: applier.h:430
bool applier_aborted
Definition: applier.h:943
Pipeline_stats_member_collector * get_pipeline_stats_member_collector() override
Definition: applier.h:754
mysql_mutex_t suspend_lock
Definition: applier.h:953
int apply_view_change_packet(View_change_packet *view_change_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a View Change packet received by the applier.
Definition: applier.cc:268
void awake_applier_module() override
Awakes the applier module.
Definition: applier.h:609
Pipeline_member_stats * get_local_pipeline_stats()
Return the local applier stats.
Definition: applier.cc:1076
bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition) override
Definition: applier.cc:1070
int inject_event_into_pipeline(Pipeline_event *pevent, Continuation *cont)
Injects an event into the pipeline and waits for its handling.
Definition: applier.cc:235
thread_state applier_thd_state
Definition: applier.h:941
ulonglong gtid_assignment_block_size
Definition: applier.h:935
int apply_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)
Apply a synchronization before execution action packet received by the applier.
Definition: applier.cc:446
void add_view_change_packet(View_change_packet *packet) override
Queues a view change packet into the applier.
Definition: applier.h:544
void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet) override
Queues a synchronization before execution action packet into the applier.
Definition: applier.h:584
THD * applier_thd
Definition: applier.h:930
mysql_cond_t suspend_cond
Definition: applier.h:955
int handle_pipeline_action(Pipeline_action *action) override
Gives the pipeline an action for execution.
Definition: applier.h:448
Pipeline_stats_member_collector pipeline_stats_member_collector
Definition: applier.h:974
void run_flow_control_step() override
Definition: applier.h:763
mysql_cond_t run_cond
Definition: applier.h:939
virtual void queue_certification_enabling_packet()
Queues a single a packet that will enable certification on this member.
Definition: applier.cc:1065
ulong stop_wait_timeout
Definition: applier.h:969
Applier_module()
Definition: applier.cc:57
int apply_metadata_processing_packet(Recovery_metadata_processing_packets *metadata_processing_packet)
Apply a Recovery metadata processing information received from the GCS.
Definition: applier.cc:347
void interrupt_applier_suspension_wait() override
Interrupts the current applier waiting process either for it's suspension or it's wait for the consum...
Definition: applier.cc:908
Flow_control_module * get_flow_control_module() override
Definition: applier.h:759
Applier_channel_state_observer * applier_channel_observer
Definition: applier.h:972
void add_leaving_members_action_packet(Leaving_members_action_packet *packet) override
Queues a leaving members action packet into the applier.
Definition: applier.h:596
Synchronized_queue< Packet * > * incoming
Definition: applier.h:963
void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet) override
Queues a transaction prepared action packet into the applier.
Definition: applier.h:572
Definition: certification_handler.h:31
Class used to wait on the execution of some action.
Definition: pipeline_interfaces.h:535
A wrapper for raw network packets.
Definition: pipeline_interfaces.h:75
A packet to inform the applier it should fail.
Definition: applier.h:251
const char * m_error_message
The error message for the failure process.
Definition: applier.h:266
~Error_action_packet() override=default
Error_action_packet(const char *error_message)
Create a new error packet.
Definition: applier.h:255
const char * get_error_message()
Returns the error message for the failure.
Definition: applier.h:262
Interface for the application of events, them being packets or log events.
Definition: pipeline_interfaces.h:660
virtual int handle_action(Pipeline_action *action)=0
Handling of an action as defined in the handler implementation.
The pipeline stats aggregator of all group members stats and flow control module.
Definition: pipeline_stats.h:607
void flow_control_step(Pipeline_stats_member_collector *)
Evaluate the information received in the last flow control period and adjust the system parameters ac...
Definition: pipeline_stats.cc:748
For binlog version 4.
Definition: log_event.h:1524
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:39
Represents a set of GTIDs.
Definition: rpl_gtid.h:1555
Action to configure existing applier handlers.
Definition: pipeline_handlers.h:104
A packet to inform pipeline listeners of leaving members, this packet will be handled on the global m...
Definition: applier.h:231
const std::vector< Gcs_member_identifier > m_leaving_members
Definition: applier.h:245
~Leaving_members_action_packet() override=default
Leaving_members_action_packet(const std::vector< Gcs_member_identifier > &leaving_members)
Create a new leaving members packet.
Definition: applier.h:238
A generic interface for different kinds of packets.
Definition: pipeline_interfaces.h:50
A wrapper for pipeline actions.
Definition: pipeline_interfaces.h:630
A wrapper for log events/packets.
Definition: pipeline_interfaces.h:166
Computed statistics per member.
Definition: pipeline_stats.h:432
The pipeline collector for the local member stats.
Definition: pipeline_stats.h:283
Definition: stage_monitor_handler.h:29
int set_stage(PSI_stage_key key, const char *file, int line, ulonglong estimated_work, ulonglong work_completed)
Set that a new stage is now in progress.
Definition: stage_monitor_handler.cc:78
A packet to wait for queue consumption.
Definition: applier.h:138
std::shared_ptr< Continuation > checkpoint_condition
If we discard a packet.
Definition: applier.h:152
void signal_checkpoint_reached()
Definition: applier.h:148
Queue_checkpoint_packet(std::shared_ptr< Continuation > checkpoint_condition_arg)
Create a new Queue_checkpoint_packet packet.
Definition: applier.h:143
A packet to send Metadata related processing.
Definition: applier.h:91
Recovery_metadata_processing_packets()
Create a new data packet with associated data.
Definition: applier.h:96
bool m_current_member_leaving_the_group
Definition: applier.h:110
std::vector< Gcs_member_identifier > m_member_left_the_group
Definition: applier.h:107
std::vector< std::string > m_view_id_to_be_deleted
Definition: applier.h:104
virtual ~Recovery_metadata_processing_packets() override=default
A packet to send new primary election related info to the applier.
Definition: applier.h:117
enum_action
Definition: applier.h:119
@ NEW_PRIMARY
Definition: applier.h:119
@ QUEUE_APPLIED
Definition: applier.h:119
Single_primary_action_packet(enum enum_action action_arg)
Create a new single primary action packet with associated data.
Definition: applier.h:126
~Single_primary_action_packet() override=default
enum enum_action action
Definition: applier.h:131
A packet to request a synchronization point on the global message order on a given member before tran...
Definition: applier.h:205
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:222
Sync_before_execution_action_packet(my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
Create a new synchronization point request.
Definition: applier.h:213
const my_thread_id m_thread_id
Definition: applier.h:221
~Sync_before_execution_action_packet() override=default
Definition: plugin_utils.h:181
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:35
std::atomic< killed_state > killed
Definition: sql_class.h:2667
A packet to inform that a given member did prepare a given transaction.
Definition: applier.h:159
const gr::Gtid_tsid & get_tsid() const
tsid accessor
Definition: applier.h:190
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:186
const rpl_gno m_gno
Definition: applier.h:185
~Transaction_prepared_action_packet() override=default
gr::Gtid_tsid m_tsid
Definition: applier.h:197
Transaction_prepared_action_packet(const gr::Gtid_tsid &tsid, bool is_tsid_specified, rpl_gno gno, const Gcs_member_identifier &gcs_member_id)
Create a new transaction prepared action.
Definition: applier.h:170
bool is_tsid_specified() const
returns information on whether TSID is specified for this trx
Definition: applier.h:194
const bool m_tsid_specified
Definition: applier.h:184
A packet to send view change related info to the applier.
Definition: pipeline_interfaces.h:115
Represents Transaction Source Identifier which is composed of source UUID and transaction tag.
Definition: tsid.h:46
#define mysql_cond_wait(C, M)
Definition: mysql_cond.h:47
#define mysql_mutex_lock(M)
Definition: mysql_mutex.h:49
#define mysql_mutex_unlock(M)
Definition: mysql_mutex.h:56
unsigned int PSI_memory_key
Instrumented memory key.
Definition: psi_memory_bits.h:48
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:55
unsigned char uchar
Definition: my_inttypes.h:51
static my_thread_id thread_id
Definition: my_thr_init.cc:62
uint32 my_thread_id
Definition: my_thread_local.h:33
int(* mysql_cond_broadcast)(mysql_cond_t *that, const char *src_file, unsigned int src_line)
Definition: mysql_cond_service.h:51
static bool ignore_errors
Definition: mysqlcheck.cc:61
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:497
Handler_pipeline_type
Definition: pipeline_factory.h:44
API for Group Replication plugin.
enum_group_replication_consistency_level
Definition: plugin_group_replication.h:34
PSI_stage_info info_GR_STAGE_module_suspending
Definition: plugin_psi.h:229
PSI_stage_info info_GR_STAGE_module_executing
Definition: plugin_psi.h:228
required string key
Definition: replication_asynchronous_connection_failover.proto:59
repeated Action action
Definition: replication_group_member_actions.proto:42
mysql::gtid::gno_t rpl_gno
GNO, the second (numeric) component of a GTID, is an alias of mysql::gtid::gno_t.
Definition: rpl_gtid.h:111
cs::index::rpl_sidno rpl_sidno
Type of SIDNO (source ID number, first component of GTID)
Definition: rpl_gtid.h:107
PSI_stage_key m_key
The registered stage key.
Definition: psi_stage_bits.h:75
Definition: my_thread_bits.h:57
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
Definition: plugin_utils.h:47
bool is_running() const
Definition: plugin_utils.h:84