MySQL  8.0.23
Source Code Documentation
applier.h
Go to the documentation of this file.
1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef APPLIER_INCLUDE
24 #define APPLIER_INCLUDE
25 
28 #include <list>
29 #include <vector>
30 
31 #include "my_inttypes.h"
43 #include "sql/sql_class.h"
44 
45 // Define the applier packet types
46 #define ACTION_PACKET_TYPE 2
47 #define VIEW_CHANGE_PACKET_TYPE 3
48 #define SINGLE_PRIMARY_PACKET_TYPE 4
49 #define SYNC_BEFORE_EXECUTION_PACKET_TYPE 5
50 #define TRANSACTION_PREPARED_PACKET_TYPE 6
51 #define LEAVING_MEMBERS_PACKET_TYPE 7
52 
53 // Define the applier return error codes
54 #define APPLIER_GTID_CHECK_TIMEOUT_ERROR -1
55 #define APPLIER_RELAY_LOG_NOT_INITED -2
56 #define APPLIER_THREAD_ABORTED -3
57 
58 extern char applier_module_channel_name[];
59 
60 /* Types of action packets used in the context of the applier module */
62  TERMINATION_PACKET = 0, // Packet for a termination action
63  SUSPENSION_PACKET, // Packet to signal something to suspend
64  CHECKPOINT_PACKET, // Packet to wait for queue consumption
65  ACTION_NUMBER = 3 // The number of actions
66 };
67 
68 /**
69  @class Action_packet
70  A packet to control the applier in a event oriented way.
71 */
72 class Action_packet : public Packet {
73  public:
74  /**
75  Create a new action packet.
76  @param action the packet action
77  */
80 
81  ~Action_packet() override {}
82 
84 };
85 
86 /**
87  @class View_change_packet
88  A packet to send view change related info to the applier
89 */
90 class View_change_packet : public Packet {
91  public:
92  /**
93  Create a new data packet with associated data.
94 
95  @param view_id_arg The view id associated to this view
96  */
97  View_change_packet(std::string &view_id_arg)
98  : Packet(VIEW_CHANGE_PACKET_TYPE), view_id(view_id_arg) {}
99 
100  ~View_change_packet() override {}
101 
102  std::string view_id;
103  std::vector<std::string> group_executed_set;
104 };
105 
106 /**
107  @class Single_primary_action_packet
108  A packet to send new primary election related info to the applier
109 */
111  public:
113 
114  /**
115  Create a new single primary action packet with associated data.
116 
117  @param action_arg the packet action
118  */
120  : Packet(SINGLE_PRIMARY_PACKET_TYPE), action(action_arg) {}
121 
123 
124  enum enum_action action;
125 };
126 
127 /**
128  @class Queue_checkpoint_packet
129  A packet to wait for queue consumption
130 */
132  public:
133  /**
134  Create a new Queue_checkpoint_packet packet.
135  */
137  std::shared_ptr<Continuation> checkpoint_condition_arg)
139  checkpoint_condition(checkpoint_condition_arg) {}
140 
142 
143  private:
144  /**If we discard a packet */
145  std::shared_ptr<Continuation> checkpoint_condition;
146 };
147 
148 /**
149  @class Transaction_prepared_action_packet
150  A packet to inform that a given member did prepare a given transaction.
151 */
153  public:
154  /**
155  Create a new transaction prepared action.
156 
157  @param sid the prepared transaction sid
158  @param gno the prepared transaction gno
159  @param gcs_member_id the member id that did prepare the
160  transaction
161  */
163  const Gcs_member_identifier &gcs_member_id)
165  m_sid_specified(sid != nullptr ? true : false),
166  m_gno(gno),
167  m_gcs_member_id(gcs_member_id.get_member_id()) {
168  if (sid != nullptr) {
169  m_sid.copy_from(*sid);
170  }
171  }
172 
174 
175  const bool m_sid_specified;
176  const rpl_gno m_gno;
178 
179  const rpl_sid *get_sid() { return m_sid_specified ? &m_sid : nullptr; }
180 
181  private:
183 };
184 
185 /**
186  @class Sync_before_execution_action_packet
187  A packet to request a synchronization point on the global message
188  order on a given member before transaction execution.
189 */
191  public:
192  /**
193  Create a new synchronization point request.
194 
195  @param thread_id the thread that did the request
196  @param gcs_member_id the member id that did the request
197  */
199  my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
202  m_gcs_member_id(gcs_member_id.get_member_id()) {}
203 
205 
208 };
209 
210 /**
211  @class Leaving_members_action_packet
212  A packet to inform pipeline listeners of leaving members,
213  this packet will be handled on the global message order,
214  that is, ordered with certification.
215 */
217  public:
218  /**
219  Create a new leaving members packet.
220 
221  @param leaving_members the members that left the group
222  */
224  const std::vector<Gcs_member_identifier> &leaving_members)
226  m_leaving_members(leaving_members) {}
227 
229 
230  const std::vector<Gcs_member_identifier> m_leaving_members;
231 };
232 
233 typedef enum enum_applier_state {
238 
240  public:
244  bool *abort_flag, bool wait_for_execution = true) = 0;
245  virtual void awake_applier_module() = 0;
248  double timeout, bool check_and_purge_partial_transactions) = 0;
250  std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
251  bool update_THD_status = true) = 0;
252  virtual bool get_retrieved_gtid_set(std::string &retrieved_set) = 0;
254  std::string &retrieved_set, double timeout,
255  bool update_THD_status = true) = 0;
256  virtual size_t get_message_queue_size() = 0;
258  virtual void add_suspension_packet() = 0;
259  virtual void add_view_change_packet(View_change_packet *packet) = 0;
261  Single_primary_action_packet *packet) = 0;
267  Leaving_members_action_packet *packet) = 0;
268  virtual int handle(const uchar *data, ulong len,
270  std::list<Gcs_member_identifier> *online_members) = 0;
271  virtual int handle_pipeline_action(Pipeline_action *action) = 0;
273  virtual void run_flow_control_step() = 0;
276  std::shared_ptr<Continuation> checkpoint_condition) = 0;
279 };
280 
282  public:
283  Applier_module();
284  ~Applier_module() override;
285 
286  /**
287  Initializes and launches the applier thread
288 
289  @return the operation status
290  @retval 0 OK
291  @retval !=0 Error
292  */
294 
295  /**
296  * Return the local applier stats.
297  */
299 
300  /**
301  Terminates the applier thread.
302 
303  @return the operation status
304  @retval 0 OK
305  @retval !=0 A timeout occurred
306  */
308 
309  /**
310  Is the applier marked for shutdown?
311 
312  @return is the applier on shutdown
313  @retval 0 no
314  @retval !=0 yes
315  */
317  return (applier_aborted || applier_thd->killed);
318  }
319 
320  /**
321  Is the applier running?
322 
323  @return applier running?
324  @retval 0 no
325  @retval !=0 yes
326  */
327  bool is_running() { return (applier_thd_state.is_running()); }
328 
329  /**
330  Configure the applier pipeline according to the given configuration
331 
332  @param[in] pipeline_type the chosen pipeline
333  @param[in] reset_logs if a reset happened in the server
334  @param[in] stop_timeout the timeout when waiting on shutdown
335  @param[in] group_sidno the group configured sidno
336  @param[in] gtid_assignment_block_size the group gtid assignment block size
337  @param[in] shared_stop_lock the lock used to block transactions
338 
339  @return the operation status
340  @retval 0 OK
341  @retval !=0 Error
342  */
343  int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs,
344  ulong stop_timeout, rpl_sidno group_sidno,
346  Shared_writelock *shared_stop_lock);
347 
348  /**
349  Configure the applier pipeline handlers
350 
351  @return the operation status
352  @retval 0 OK
353  @retval !=0 Error
354  */
356 
357  /**
358  Purges the relay logs and restarts the applier thread.
359 
360  @return the operation status
361  @retval 0 OK
362  @retval !=0 Error
363  */
365 
366  /**
367  Runs the applier thread process, reading events and processing them.
368 
369  @note When killed, the thread will finish handling the current packet, and
370  then die, ignoring all possible existing events in the incoming queue.
371 
372  @return the operation status
373  @retval 0 OK
374  @retval !=0 Error
375  */
376  int applier_thread_handle();
377 
378  /**
379  Queues the packet coming from the reader for future application.
380 
381  @param[in] data the packet data
382  @param[in] len the packet length
383  @param[in] consistency_level the transaction consistency level
384  @param[in] online_members the ONLINE members when the transaction
385  message was delivered
386 
387  @return the operation status
388  @retval 0 OK
389  @retval !=0 Error on queue
390  */
391  int handle(const uchar *data, ulong len,
393  std::list<Gcs_member_identifier> *online_members) override {
394  this->incoming->push(
395  new Data_packet(data, len, consistency_level, online_members));
396  return 0;
397  }
398 
399  /**
400  Gives the pipeline an action for execution.
401 
402  @param[in] action the action to be executed
403 
404  @return the operation status
405  @retval 0 OK
406  @retval !=0 Error executing the action
407  */
408  int handle_pipeline_action(Pipeline_action *action) override {
409  return this->pipeline->handle_action(action);
410  }
411 
412  /**
413  Injects an event into the pipeline and waits for its handling.
414 
415  @param[in] pevent the event to be injected
416  @param[in] cont the object used to wait
417 
418  @return the operation status
419  @retval 0 OK
420  @retval !=0 Error on queue
421  */
423 
424  /**
425  Terminates the pipeline, shutting down the handlers and deleting them.
426 
427  @note the pipeline will always be deleted even if an error occurs.
428 
429  @return the operation status
430  @retval 0 OK
431  @retval !=0 Error on pipeline termination
432  */
434 
435  /**
436  Sets the applier shutdown timeout.
437 
438  @param[in] timeout the timeout
439  */
440  void set_stop_wait_timeout(ulong timeout) {
441  stop_wait_timeout = timeout;
442 
443  // Configure any thread based applier
446  pipeline->handle_action(conf_action);
447 
448  delete conf_action;
449  }
450 
451  /**
452  This method informs the applier module that an applying thread stopped
453  */
454  void inform_of_applier_stop(char *channel_name, bool aborted);
455 
456  // Packet based interface methods
457 
458  /**
459  Queues a packet that will eventually make the applier module suspend.
460  This will happen only after all the previous packets are processed.
461 
462  @note This will happen only after all the previous packets are processed.
463  */
464  void add_suspension_packet() override {
466  }
467 
468  /**
469  Queues a packet that will make the applier module terminate it's handling
470  process. Due to the blocking nature of the queue, this method is useful to
471  unblock the handling process on shutdown.
472 
473  @note This will happen only after all the previous packets are processed.
474  */
477  }
478 
479  /**
480  Queues a view change packet into the applier.
481  This packets contain the new view id and they mark the exact frontier
482  between transactions from the old and new views.
483 
484  @note This will happen only after all the previous packets are processed.
485 
486  @param[in] packet The view change packet to be queued
487  */
489  incoming->push(packet);
490  }
491 
492  /**
493  Queues a single primary action packet into the applier.
494 
495  @note This will happen only after all the previous packets are processed.
496 
497  @param[in] packet The packet to be queued
498  */
500  Single_primary_action_packet *packet) override {
501  incoming->push(packet);
502  }
503 
504  /**
505  Queues a transaction prepared action packet into the applier.
506 
507  @note This will happen only after all the previous packets are processed.
508 
509  @param[in] packet The packet to be queued
510  */
512  Transaction_prepared_action_packet *packet) override {
513  incoming->push(packet);
514  }
515 
516  /**
517  Queues a synchronization before execution action packet into the applier.
518 
519  @note This will happen only after all the previous packets are processed.
520 
521  @param[in] packet The packet to be queued
522  */
524  Sync_before_execution_action_packet *packet) override {
525  incoming->push(packet);
526  }
527 
528  /**
529  Queues a leaving members action packet into the applier.
530 
531  @note This will happen only after all the previous packets are processed.
532 
533  @param[in] packet The packet to be queued
534  */
536  Leaving_members_action_packet *packet) override {
537  incoming->push(packet);
538  }
539 
540  /**
541  Queues a single a packet that will enable certification on this member
542  */
544 
545  /**
546  Awakes the applier module
547  */
548  void awake_applier_module() override {
550  suspended = false;
553  }
554 
555  /**
556  Waits for the applier to suspend and apply all the transactions previous to
557  the suspend request.
558 
559  @param abort_flag a pointer to a flag that the caller can use to
560  cancel the request.
561  @param wait_for_execution specify if the suspension waits for transactions
562  execution
563 
564  @return the operation status
565  @retval 0 OK
566  @retval !=0 Error when accessing the applier module status
567  */
569  bool *abort_flag, bool wait_for_execution = true) override;
570 
571  /**
572  Interrupts the current applier waiting process either for it's suspension
573  or it's wait for the consumption of pre suspension events
574  */
575  void interrupt_applier_suspension_wait() override;
576 
577  /**
578  Checks if the applier, and its workers when parallel applier is
579  enabled, has already consumed all relay log, that is, applier is
580  waiting for transactions to be queued.
581 
582  @return the applier status
583  @retval true the applier is waiting
584  @retval false otherwise
585  */
587 
588  /**
589  Waits for the execution of all events by part of the current SQL applier.
590  Due to the possible asynchronous nature of module's applier handler, this
591  method inquires the current handler to check if all transactions queued up
592  to this point are already executed.
593 
594  If no handler exists, then it is assumed that transactions were processed.
595 
596  @param timeout the time (seconds) after which the method returns if the
597  above condition was not satisfied
598  @param check_and_purge_partial_transactions
599  on successful executions, check and purge partial
600  transactions in the relay log
601 
602  @return the operation status
603  @retval 0 All transactions were executed
604  @retval -1 A timeout occurred
605  @retval -2 An error occurred
606  */
608  double timeout, bool check_and_purge_partial_transactions) override;
609 
610  /**
611  Waits for the execution of all current events by part of the current SQL
612  applier.
613 
614  The current gtid retrieved set is extracted and a loop is executed until
615  these transactions are executed.
616 
617  If the applier SQL thread stops, the method will return an error.
618 
619  If no handler exists, then it is assumed that transactions were processed.
620 
621  @param checkpoint_condition the class used to wait for the queue to be
622  consumed. Can be used to cancel the wait.
623  @param abort_flag a pointer to a flag that the caller can use to
624  cancel the request.
625  @param update_THD_status Shall the method update the THD stage
626 
627 
628  @return the operation status
629  @retval false All transactions were executed
630  @retval true An error occurred
631  */
633  std::shared_ptr<Continuation> checkpoint_condition, bool *abort_flag,
634  bool update_THD_status = true) override;
635 
636  /**
637  Returns the retrieved gtid set for the applier channel
638 
639  @param[out] retrieved_set the set in string format.
640 
641  @retval true there was an error.
642  @retval false the operation has succeeded.
643  */
644  bool get_retrieved_gtid_set(std::string &retrieved_set) override;
645 
646  /**
647  Waits for the execution of all events in the given set by the current SQL
648  applier. If no handler exists, then it is assumed that transactions were
649  processed.
650 
651  @param retrieved_set the set in string format of transaction to wait for
652  @param timeout the time (seconds) after which the method returns if the
653  above condition was not satisfied
654  @param update_THD_status Shall the method update the THD stage
655 
656  @return the operation status
657  @retval 0 All transactions were executed
658  @retval -1 A timeout occurred
659  @retval -2 An error occurred
660  */
661  int wait_for_applier_event_execution(std::string &retrieved_set,
662  double timeout,
663  bool update_THD_status = true) override;
664 
665  /**
666  Returns the handler instance in the applier module responsible for
667  certification.
668 
669  @note If new certification handlers appear, an interface must be created.
670 
671  @return a pointer to the applier's certification handler.
672  @retval !=NULL The certification handler
673  @retval NULL No certification handler present
674  */
676 
677  /**
678  Returns the applier module's queue size.
679 
680  @return the size of the queue
681  */
682  size_t get_message_queue_size() override { return incoming->size(); }
683 
686  return APPLIER_STATE_ON;
687  else if (suspended) /* purecov: inspected */
688  return APPLIER_STATE_OFF; /* purecov: inspected */
689  else
690  return APPLIER_ERROR; /* purecov: inspected */
691  }
692 
694  override {
696  }
697 
699  return &flow_control_module;
700  }
701 
702  void run_flow_control_step() override {
704  }
705 
707  std::shared_ptr<Continuation> checkpoint_condition) override;
708 
709  private:
710  // Applier packet handlers
711 
712  /**
713  Apply an action packet received by the applier.
714  It can be a order to suspend or terminate.
715 
716  @param action_packet the received action packet
717 
718  @return if the applier should terminate (with no associated error).
719  */
720  bool apply_action_packet(Action_packet *action_packet);
721 
722  /**
723  Apply a View Change packet received by the applier.
724  It executes some certification operations and queues a View Change Event
725 
726  @param view_change_packet the received view change packet
727  @param fde_evt the Format description event associated to the event
728  @param cont the applier Continuation Object
729 
730  @return the operation status
731  @retval 0 OK
732  @retval !=0 Error when injecting event
733  */
734  int apply_view_change_packet(View_change_packet *view_change_packet,
736  Continuation *cont);
737 
738  /**
739  Apply a Data packet received by the applier.
740  It executes some certification operations and queues a View Change Event
741 
742  @param data_packet the received data packet packet
743  @param fde_evt the Format description event associated to the event
744  @param cont the applier Continuation Object
745 
746  @return the operation status
747  @retval 0 OK
748  @retval !=0 Error when injecting event
749  */
750  int apply_data_packet(Data_packet *data_packet,
752  Continuation *cont);
753 
754  /**
755  Apply an single primary action packet received by the applier.
756 
757  @param packet the received action packet
758 
759  @return the operation status
760  @retval 0 OK
761  @retval !=0 Error when applying packet
762  */
764 
765  /**
766  Apply a transaction prepared action packet received by the applier.
767 
768  @param packet the received action packet
769 
770  @return the operation status
771  @retval 0 OK
772  @retval !=0 Error when applying packet
773  */
776 
777  /**
778  Apply a synchronization before execution action packet received
779  by the applier.
780 
781  @param packet the received action packet
782 
783  @return the operation status
784  @retval 0 OK
785  @retval !=0 Error when applying packet
786  */
789 
790  /**
791  Apply a leaving members action packet received by the applier.
792 
793  @param packet the received action packet
794 
795  @return the operation status
796  @retval 0 OK
797  @retval !=0 Error when applying packet
798  */
801 
802  /**
803  Suspends the applier module, being transactions still queued in the incoming
804  queue.
805 
806  @note if the proper condition is set, possible listeners can be awaken by
807  this method.
808  */
811 
812  suspended = true;
814  __LINE__, 0, 0);
815 
816  // Alert any interested party about the applier suspension
818 
819  while (suspended) {
821  }
823  __LINE__, 0, 0);
824 
826  }
827 
828  /**
829  Cleans the thread context for the applier thread
830  This includes such tasks as removing the thread from the global thread list
831  */
833 
834  /**
835  Set the thread context for the applier thread.
836  This allows the thread to behave like an slave thread and perform
837  such tasks as queuing to a relay log.
838  */
840 
841  /**
842  This method calculates the intersection of the given sets passed as a list
843  of strings.
844 
845  @param[in] gtid_sets the vector containing the GTID sets to intersect
846  @param[out] output_set the final GTID calculated from the intersection
847 
848  @return the operation status
849  @retval 0 all went fine
850  @retval !=0 error
851  */
852  int intersect_group_executed_sets(std::vector<std::string> &gtid_sets,
853  Gtid_set *output_set);
854 
855  // applier thread variables
858 
859  // configuration options
863 
864  // run conditions and locks
867  /* Applier thread state */
869  /* Applier abort flag */
871  /* Applier error during execution */
873  /* Applier killed status */
875 
876  // condition and lock used to suspend/awake the applier module
877  /* The lock for suspending/wait for the awake of the applier module */
879  /* The condition for suspending/wait for the awake of the applier module */
881  /* Suspend flag that informs if the applier is suspended */
882  bool suspended;
883  /* Suspend wait flag used when waiting for the applier to suspend */
885 
886  /* The condition for signaling the applier suspension*/
888 
889  /* The stop lock used when killing transaction/stopping server*/
891 
892  /* The incoming event queue */
894 
895  /* The applier pipeline for event execution */
897 
898  /* Applier timeout on shutdown */
900 
901  /* Applier channel observer to detect failures */
903 
907 };
908 
909 #endif /* APPLIER_INCLUDE */
Shared_writelock
Definition: plugin_utils.h:696
Single_primary_action_packet::NEW_PRIMARY
@ NEW_PRIMARY
Definition: applier.h:112
Applier_module_interface::add_sync_before_execution_action_packet
virtual void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)=0
SUSPENSION_PACKET
@ SUSPENSION_PACKET
Definition: applier.h:63
Applier_module::wait_for_applier_event_execution
int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions) override
Waits for the execution of all events by part of the current SQL applier.
Definition: applier.cc:822
Action_packet::packet_action
enum_packet_action packet_action
Definition: applier.h:83
Applier_module::suspension_waiting_condition
mysql_cond_t suspension_waiting_condition
Definition: applier.h:887
sql_class.h
THD
Definition: sql_class.h:807
applier_handler.h
Format_description_log_event
Definition: log_event.h:1613
Applier_module::applier_error
int applier_error
Definition: applier.h:872
Applier_module_interface::wait_for_applier_event_execution
virtual int wait_for_applier_event_execution(double timeout, bool check_and_purge_partial_transactions)=0
Applier_module::apply_transaction_prepared_action_packet
int apply_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)
Apply a transaction prepared action packet received by the applier.
Definition: applier.cc:387
Handler_applier_configuration_action
Definition: pipeline_handlers.h:104
Transaction_prepared_action_packet::Transaction_prepared_action_packet
Transaction_prepared_action_packet(const rpl_sid *sid, rpl_gno gno, const Gcs_member_identifier &gcs_member_id)
Create a new transaction prepared action.
Definition: applier.h:162
Applier_module::initialize_applier_thread
int initialize_applier_thread()
Initializes and launches the applier thread.
Definition: applier.cc:611
Applier_module::get_certification_handler
Certification_handler * get_certification_handler() override
Returns the handler instance in the applier module responsible for certification.
Definition: applier.cc:899
mysql_mutex_lock
#define mysql_mutex_lock(M)
Definition: mysql_mutex.h:49
Applier_module::setup_applier_module
int setup_applier_module(Handler_pipeline_type pipeline_type, bool reset_logs, ulong stop_timeout, rpl_sidno group_sidno, ulonglong gtid_assignment_block_size, Shared_writelock *shared_stop_lock)
Configure the applier pipeline according to the given configuration.
Definition: applier.cc:93
Pipeline_event
Definition: pipeline_interfaces.h:122
thread_id
static my_thread_id thread_id
Definition: my_thr_init.cc:62
Applier_module::applier_aborted
bool applier_aborted
Definition: applier.h:870
LEAVING_MEMBERS_PACKET_TYPE
#define LEAVING_MEMBERS_PACKET_TYPE
Definition: applier.h:51
Applier_module::set_applier_thread_context
void set_applier_thread_context()
Set the thread context for the applier thread.
Definition: applier.cc:191
Applier_module::apply_leaving_members_action_packet
int apply_leaving_members_action_packet(Leaving_members_action_packet *packet)
Apply a leaving members action packet received by the applier.
Definition: applier.cc:399
Applier_module_interface::wait_for_applier_complete_suspension
virtual int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true)=0
Transaction_prepared_action_packet::m_gcs_member_id
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:177
enum_packet_action
enum_packet_action
Definition: applier.h:61
Synchronized_queue::size
size_t size() override
Checks the queue size.
Definition: plugin_utils.h:235
Synchronized_queue< Packet * >
Single_primary_action_packet::QUEUE_APPLIED
@ QUEUE_APPLIED
Definition: applier.h:112
nullptr
Dialog Client Authentication nullptr
Definition: dialog.cc:353
PSI_stage_info_v1::m_key
PSI_stage_key m_key
The registered stage key.
Definition: psi_stage_bits.h:75
Applier_module::wait_for_applier_complete_suspension
int wait_for_applier_complete_suspension(bool *abort_flag, bool wait_for_execution=true) override
Waits for the applier to suspend and apply all the transactions previous to the suspend request.
Definition: applier.cc:759
certification_handler.h
Applier_module::suspend_applier_module
void suspend_applier_module()
Suspends the applier module, being transactions still queued in the incoming queue.
Definition: applier.h:809
Transaction_prepared_action_packet::m_gno
const rpl_gno m_gno
Definition: applier.h:176
Applier_module_interface::add_view_change_packet
virtual void add_view_change_packet(View_change_packet *packet)=0
Single_primary_action_packet::~Single_primary_action_packet
~Single_primary_action_packet() override
Definition: applier.h:122
Leaving_members_action_packet
Definition: applier.h:216
Action_packet::Action_packet
Action_packet(enum_packet_action action)
Create a new action packet.
Definition: applier.h:78
Applier_module::wait_for_current_events_execution
bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true) override
Waits for the execution of all current events by part of the current SQL applier.
Definition: applier.cc:877
Applier_module::queue_and_wait_on_queue_checkpoint
bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition) override
Definition: applier.cc:966
Applier_module::suspend_cond
mysql_cond_t suspend_cond
Definition: applier.h:880
stage_monitor_handler.h
SINGLE_PRIMARY_PACKET_TYPE
#define SINGLE_PRIMARY_PACKET_TYPE
Definition: applier.h:48
Applier_module::get_pipeline_stats_member_collector
Pipeline_stats_member_collector * get_pipeline_stats_member_collector() override
Definition: applier.h:693
Applier_module::purge_applier_queue_and_restart_applier_module
int purge_applier_queue_and_restart_applier_module() override
Purges the relay logs and restarts the applier thread.
Definition: applier.cc:122
mysql_mutex_t
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
pipeline_handlers.h
Applier_module_interface::get_applier_status
virtual Member_applier_state get_applier_status()=0
Applier_module::applier_pthd
my_thread_handle applier_pthd
Definition: applier.h:856
Applier_module::apply_sync_before_execution_action_packet
int apply_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet)
Apply a synchronization before execution action packet received by the applier.
Definition: applier.cc:393
ACTION_NUMBER
@ ACTION_NUMBER
Definition: applier.h:65
Leaving_members_action_packet::m_leaving_members
const std::vector< Gcs_member_identifier > m_leaving_members
Definition: applier.h:230
Leaving_members_action_packet::Leaving_members_action_packet
Leaving_members_action_packet(const std::vector< Gcs_member_identifier > &leaving_members)
Create a new leaving members packet.
Definition: applier.h:223
Applier_module_interface::get_retrieved_gtid_set
virtual bool get_retrieved_gtid_set(std::string &retrieved_set)=0
rpl_sidno
int rpl_sidno
Type of SIDNO (source ID number, first component of GTID)
Definition: rpl_gtid.h:93
Applier_module::add_suspension_packet
void add_suspension_packet() override
Queues a packet that will eventually make the applier module suspend.
Definition: applier.h:464
Queue_checkpoint_packet
Definition: applier.h:131
Queue_checkpoint_packet::checkpoint_condition
std::shared_ptr< Continuation > checkpoint_condition
If we discard a packet.
Definition: applier.h:145
APPLIER_STATE_ON
@ APPLIER_STATE_ON
Definition: applier.h:234
mysql_cond_broadcast
int(* mysql_cond_broadcast)(mysql_cond_t *that, const char *src_file, unsigned int src_line)
Definition: mysql_cond_service.h:51
true
#define true
Definition: config_static.h:44
Applier_module::stage_handler
Plugin_stage_monitor_handler stage_handler
Definition: applier.h:906
Applier_module::queue_certification_enabling_packet
virtual void queue_certification_enabling_packet()
Queues a single a packet that will enable certification on this member.
Definition: applier.cc:961
Event_handler
Definition: pipeline_interfaces.h:530
Applier_module::terminate_applier_thread
int terminate_applier_thread()
Terminates the applier thread.
Definition: applier.cc:664
Certification_handler
Definition: certification_handler.h:31
Sync_before_execution_action_packet::m_thread_id
const my_thread_id m_thread_id
Definition: applier.h:206
Flow_control_module::flow_control_step
void flow_control_step(Pipeline_stats_member_collector *)
Evaluate the information received in the last flow control period and adjust the system parameters ac...
Definition: pipeline_stats.cc:736
Applier_module::is_applier_thread_aborted
bool is_applier_thread_aborted()
Is the applier marked for shutdown?
Definition: applier.h:316
Applier_module::inject_event_into_pipeline
int inject_event_into_pipeline(Pipeline_event *pevent, Continuation *cont)
Injects an event into the pipeline and waits for its handling.
Definition: applier.cc:237
Plugin_stage_monitor_handler
Definition: stage_monitor_handler.h:29
Applier_module::is_applier_thread_waiting
bool is_applier_thread_waiting()
Checks if the applier, and its workers when parallel applier is enabled, has already consumed all rel...
Definition: applier.cc:810
Event_handler::handle_action
virtual int handle_action(Pipeline_action *action)=0
Handling of an action as defined in the handler implementation.
View_change_packet::~View_change_packet
~View_change_packet() override
Definition: applier.h:100
Plugin_stage_monitor_handler::set_stage
int set_stage(PSI_stage_key key, const char *file, int line, ulonglong estimated_work, ulonglong work_completed)
Set that a new stage is now in progress.
Definition: stage_monitor_handler.cc:78
Applier_module::shared_stop_write_lock
Shared_writelock * shared_stop_write_lock
Definition: applier.h:890
Applier_module::applier_thd_state
thread_state applier_thd_state
Definition: applier.h:868
Applier_module::set_stop_wait_timeout
void set_stop_wait_timeout(ulong timeout)
Sets the applier shutdown timeout.
Definition: applier.h:440
Transaction_prepared_action_packet::m_sid_specified
const bool m_sid_specified
Definition: applier.h:175
Applier_module::incoming
Synchronized_queue< Packet * > * incoming
Definition: applier.h:893
Applier_channel_state_observer
Definition: applier_channel_state_observer.h:29
Applier_module_interface::run_flow_control_step
virtual void run_flow_control_step()=0
Applier_module::pipeline
Event_handler * pipeline
Definition: applier.h:896
Applier_module::apply_view_change_packet
int apply_view_change_packet(View_change_packet *view_change_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a View Change packet received by the applier.
Definition: applier.cc:270
Applier_module::setup_pipeline_handlers
int setup_pipeline_handlers()
Configure the applier pipeline handlers.
Definition: applier.cc:165
Applier_module::suspended
bool suspended
Definition: applier.h:882
plugin_utils.h
Sync_before_execution_action_packet::Sync_before_execution_action_packet
Sync_before_execution_action_packet(my_thread_id thread_id, const Gcs_member_identifier &gcs_member_id)
Create a new synchronization point request.
Definition: applier.h:198
my_inttypes.h
Applier_module_interface
Definition: applier.h:239
Data_packet
Definition: pipeline_interfaces.h:70
Applier_module_interface::awake_applier_module
virtual void awake_applier_module()=0
Applier_module::add_sync_before_execution_action_packet
void add_sync_before_execution_action_packet(Sync_before_execution_action_packet *packet) override
Queues a synchronization before execution action packet into the applier.
Definition: applier.h:523
Applier_module::run_flow_control_step
void run_flow_control_step() override
Definition: applier.h:702
mysql_cond_wait
#define mysql_cond_wait(C, M)
Definition: mysql_cond.h:47
SYNC_BEFORE_EXECUTION_PACKET_TYPE
#define SYNC_BEFORE_EXECUTION_PACKET_TYPE
Definition: applier.h:49
Gcs_member_identifier
Definition: gcs_member_identifier.h:39
Applier_module::applier_killed_status
bool applier_killed_status
Definition: applier.h:874
Applier_module::awake_applier_module
void awake_applier_module() override
Awakes the applier module.
Definition: applier.h:548
Applier_module_interface::get_message_queue_size
virtual size_t get_message_queue_size()=0
Sync_before_execution_action_packet::m_gcs_member_id
const Gcs_member_identifier m_gcs_member_id
Definition: applier.h:207
Applier_module::applier_channel_observer
Applier_channel_state_observer * applier_channel_observer
Definition: applier.h:902
Applier_module::add_leaving_members_action_packet
void add_leaving_members_action_packet(Leaving_members_action_packet *packet) override
Queues a leaving members action packet into the applier.
Definition: applier.h:535
Applier_module
Definition: applier.h:281
Applier_module::add_view_change_packet
void add_view_change_packet(View_change_packet *packet) override
Queues a view change packet into the applier.
Definition: applier.h:488
Queue_checkpoint_packet::signal_checkpoint_reached
void signal_checkpoint_reached()
Definition: applier.h:141
Applier_module_interface::handle
virtual int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members)=0
rpl_gno
long long int rpl_gno
Type of GNO, the second (numeric) component of GTID.
Definition: rpl_gtid.h:99
Applier_module::stop_wait_timeout
ulong stop_wait_timeout
Definition: applier.h:899
Applier_module::suspend_lock
mysql_mutex_t suspend_lock
Definition: applier.h:878
Applier_module_interface::add_leaving_members_action_packet
virtual void add_leaving_members_action_packet(Leaving_members_action_packet *packet)=0
Queue_checkpoint_packet::Queue_checkpoint_packet
Queue_checkpoint_packet(std::shared_ptr< Continuation > checkpoint_condition_arg)
Create a new Queue_checkpoint_packet packet.
Definition: applier.h:136
THD::killed
std::atomic< killed_state > killed
Definition: sql_class.h:2400
Applier_module_interface::purge_applier_queue_and_restart_applier_module
virtual int purge_applier_queue_and_restart_applier_module()=0
View_change_packet::view_id
std::string view_id
Definition: applier.h:102
Applier_module::gtid_assignment_block_size
ulonglong gtid_assignment_block_size
Definition: applier.h:862
Applier_module::add_transaction_prepared_action_packet
void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet) override
Queues a transaction prepared action packet into the applier.
Definition: applier.h:511
enum_applier_state
enum_applier_state
Definition: applier.h:233
Applier_module::~Applier_module
~Applier_module() override
Definition: applier.cc:75
Applier_module::waiting_for_applier_suspension
bool waiting_for_applier_suspension
Definition: applier.h:884
Sync_before_execution_action_packet::~Sync_before_execution_action_packet
~Sync_before_execution_action_packet() override
Definition: applier.h:204
View_change_packet::group_executed_set
std::vector< std::string > group_executed_set
Definition: applier.h:103
uchar
unsigned char uchar
Definition: my_inttypes.h:51
pipeline_stats.h
Applier_module::handle
int handle(const uchar *data, ulong len, enum_group_replication_consistency_level consistency_level, std::list< Gcs_member_identifier > *online_members) override
Queues the packet coming from the reader for future application.
Definition: applier.h:391
TRANSACTION_PREPARED_PACKET_TYPE
#define TRANSACTION_PREPARED_PACKET_TYPE
Definition: applier.h:50
mysql_mutex_unlock
#define mysql_mutex_unlock(M)
Definition: mysql_mutex.h:56
Member_applier_state
enum enum_applier_state Member_applier_state
View_change_packet::View_change_packet
View_change_packet(std::string &view_id_arg)
Create a new data packet with associated data.
Definition: applier.h:97
Applier_module::add_termination_packet
void add_termination_packet()
Queues a packet that will make the applier module terminate it's handling process.
Definition: applier.h:475
my_thread_handle
Definition: my_thread_bits.h:51
Applier_module_interface::add_single_primary_action_packet
virtual void add_single_primary_action_packet(Single_primary_action_packet *packet)=0
Applier_module::group_replication_sidno
rpl_sidno group_replication_sidno
Definition: applier.h:861
thread_state
Definition: plugin_utils.h:42
Applier_module_interface::interrupt_applier_suspension_wait
virtual void interrupt_applier_suspension_wait()=0
Applier_module::interrupt_applier_suspension_wait
void interrupt_applier_suspension_wait() override
Interrupts the current applier waiting process either for it's suspension or it's wait for the consum...
Definition: applier.cc:804
Leaving_members_action_packet::~Leaving_members_action_packet
~Leaving_members_action_packet() override
Definition: applier.h:228
Pipeline_action
Definition: pipeline_interfaces.h:500
Applier_module::pipeline_stats_member_collector
Pipeline_stats_member_collector pipeline_stats_member_collector
Definition: applier.h:904
binary_log::Uuid::copy_from
void copy_from(const unsigned char *data)
Copies the given 16-byte data to this UUID.
Definition: uuid.h:64
Applier_module_interface::get_flow_control_module
virtual Flow_control_module * get_flow_control_module()=0
Action_packet::~Action_packet
~Action_packet() override
Definition: applier.h:81
Applier_module::apply_action_packet
bool apply_action_packet(Action_packet *action_packet)
Apply an action packet received by the applier.
Definition: applier.cc:248
my_thread_id
uint32 my_thread_id
Definition: my_thread_local.h:33
Single_primary_action_packet::enum_action
enum_action
Definition: applier.h:112
Gtid_set
Represents a set of GTIDs.
Definition: rpl_gtid.h:1433
Applier_module::get_retrieved_gtid_set
bool get_retrieved_gtid_set(std::string &retrieved_set) override
Returns the retrieved gtid set for the applier channel.
Definition: applier.cc:848
Single_primary_action_packet
Definition: applier.h:110
Applier_module_interface::add_suspension_packet
virtual void add_suspension_packet()=0
Single_primary_action_packet::Single_primary_action_packet
Single_primary_action_packet(enum enum_action action_arg)
Create a new single primary action packet with associated data.
Definition: applier.h:119
Transaction_prepared_action_packet
Definition: applier.h:152
Handler_pipeline_type
Handler_pipeline_type
Definition: pipeline_factory.h:44
View_change_packet
Definition: applier.h:90
applier_module_channel_name
char applier_module_channel_name[]
Definition: applier.cc:45
Sync_before_execution_action_packet
Definition: applier.h:190
ACTION_PACKET_TYPE
#define ACTION_PACKET_TYPE
Definition: applier.h:46
applier_channel_state_observer.h
VIEW_CHANGE_PACKET_TYPE
#define VIEW_CHANGE_PACKET_TYPE
Definition: applier.h:47
Packet
Definition: pipeline_interfaces.h:45
Applier_module_interface::add_transaction_prepared_action_packet
virtual void add_transaction_prepared_action_packet(Transaction_prepared_action_packet *packet)=0
TERMINATION_PACKET
@ TERMINATION_PACKET
Definition: applier.h:62
Pipeline_stats_member_collector
Definition: pipeline_stats.h:268
CHECKPOINT_PACKET
@ CHECKPOINT_PACKET
Definition: applier.h:64
gcs_operations.h
Applier_module::add_single_primary_action_packet
void add_single_primary_action_packet(Single_primary_action_packet *packet) override
Queues a single primary action packet into the applier.
Definition: applier.h:499
Applier_module::run_lock
mysql_mutex_t run_lock
Definition: applier.h:865
Applier_module::intersect_group_executed_sets
int intersect_group_executed_sets(std::vector< std::string > &gtid_sets, Gtid_set *output_set)
This method calculates the intersection of the given sets passed as a list of strings.
Definition: applier.cc:907
plugin_group_replication.h
Applier_module::inform_of_applier_stop
void inform_of_applier_stop(char *channel_name, bool aborted)
This method informs the applier module that an applying thread stopped.
Definition: applier.cc:742
Synchronized_queue::push
bool push(const T &value) override
Inserts an element in the queue.
Definition: plugin_utils.h:194
consistency_manager.h
APPLIER_ERROR
@ APPLIER_ERROR
Definition: applier.h:236
Applier_module::get_message_queue_size
size_t get_message_queue_size() override
Returns the applier module's queue size.
Definition: applier.h:682
Applier_module::flow_control_module
Flow_control_module flow_control_module
Definition: applier.h:905
Applier_module::run_cond
mysql_cond_t run_cond
Definition: applier.h:866
ulonglong
unsigned long long int ulonglong
Definition: my_inttypes.h:55
APPLIER_STATE_OFF
@ APPLIER_STATE_OFF
Definition: applier.h:235
Pipeline_member_stats
Definition: pipeline_stats.h:412
gcs_member_identifier.h
Applier_module::is_running
bool is_running()
Is the applier running?
Definition: applier.h:327
mysql_cond_t
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
Applier_module_interface::get_certification_handler
virtual Certification_handler * get_certification_handler()=0
info_GR_STAGE_module_suspending
PSI_stage_info info_GR_STAGE_module_suspending
Definition: plugin_psi.h:207
binary_log::Uuid
Definition: uuid.h:60
enum_group_replication_consistency_level
enum_group_replication_consistency_level
Definition: plugin_group_replication.h:34
Transaction_prepared_action_packet::m_sid
rpl_sid m_sid
Definition: applier.h:182
Applier_module_interface::wait_for_applier_event_execution
virtual int wait_for_applier_event_execution(std::string &retrieved_set, double timeout, bool update_THD_status=true)=0
Continuation
Definition: pipeline_interfaces.h:405
Applier_module_interface::~Applier_module_interface
virtual ~Applier_module_interface()
Definition: applier.h:241
Flow_control_module
Definition: pipeline_stats.h:587
pipeline_factory.h
Applier_module::Applier_module
Applier_module()
Definition: applier.cc:54
Applier_module::handle_pipeline_action
int handle_pipeline_action(Pipeline_action *action) override
Gives the pipeline an action for execution.
Definition: applier.h:408
Applier_module::applier_thread_handle
int applier_thread_handle()
Runs the applier thread process, reading events and processing them.
Definition: applier.cc:405
Single_primary_action_packet::action
enum enum_action action
Definition: applier.h:124
info_GR_STAGE_module_executing
PSI_stage_info info_GR_STAGE_module_executing
Definition: plugin_psi.h:206
Applier_module::get_local_pipeline_stats
Pipeline_member_stats * get_local_pipeline_stats()
Return the local applier stats.
Definition: applier.cc:972
Applier_module::get_applier_status
Member_applier_state get_applier_status() override
Definition: applier.h:684
Applier_module_interface::handle_pipeline_action
virtual int handle_pipeline_action(Pipeline_action *action)=0
Applier_module::clean_applier_thread_context
void clean_applier_thread_context()
Cleans the thread context for the applier thread This includes such tasks as removing the thread from...
Definition: applier.cc:231
Action_packet
Definition: applier.h:72
Applier_module::get_flow_control_module
Flow_control_module * get_flow_control_module() override
Definition: applier.h:698
Transaction_prepared_action_packet::~Transaction_prepared_action_packet
~Transaction_prepared_action_packet() override
Definition: applier.h:173
Transaction_prepared_action_packet::get_sid
const rpl_sid * get_sid()
Definition: applier.h:179
Applier_module::reset_applier_logs
bool reset_applier_logs
Definition: applier.h:860
Applier_module::apply_data_packet
int apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, Continuation *cont)
Apply a Data packet received by the applier.
Definition: applier.cc:328
Applier_module_interface::get_pipeline_stats_member_collector
virtual Pipeline_stats_member_collector * get_pipeline_stats_member_collector()=0
thread_state::is_running
bool is_running() const
Definition: plugin_utils.h:79
Applier_module_interface::queue_and_wait_on_queue_checkpoint
virtual bool queue_and_wait_on_queue_checkpoint(std::shared_ptr< Continuation > checkpoint_condition)=0
Applier_module_interface::wait_for_current_events_execution
virtual bool wait_for_current_events_execution(std::shared_ptr< Continuation > checkpoint_condition, bool *abort_flag, bool update_THD_status=true)=0
group_replication_priv.h
Applier_module::apply_single_primary_action_packet
int apply_single_primary_action_packet(Single_primary_action_packet *packet)
Apply an single primary action packet received by the applier.
Definition: applier.cc:368
Applier_module::terminate_applier_pipeline
int terminate_applier_pipeline()
Terminates the pipeline, shutting down the handlers and deleting them.
Definition: applier.cc:649
false
#define false
Definition: config_static.h:43
Applier_module::applier_thd
THD * applier_thd
Definition: applier.h:857