MySQL  8.0.27
Source Code Documentation
Go to the documentation of this file.
1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  GNU General Public License, version 2.0, for more details.
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23 #ifndef RPL_RLI_PDB_H
24 #define RPL_RLI_PDB_H
26 #include <stdarg.h>
27 #include <sys/types.h>
28 #include <time.h>
29 #include <atomic>
30 #include <tuple>
33 #include "my_bitmap.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_io.h"
38 #include "my_loglevel.h"
39 #include "my_psi_config.h"
45 #include "prealloced_array.h" // Prealloced_array
46 #include "sql/log_event.h" // Format_description_log_event
47 #include "sql/rpl_gtid.h"
48 #include "sql/rpl_mta_submode.h" // enum_mts_parallel_type
49 #include "sql/rpl_replica.h" // MTS_WORKER_UNDEF
50 #include "sql/rpl_rli.h" // Relay_log_info
51 #include "sql/sql_class.h"
52 #include "sql/system_variables.h"
54 class Rpl_info_handler;
55 class Slave_worker;
56 struct TABLE;
58 #ifndef NDEBUG
59 extern ulong w_rr;
60 #endif
61 /**
62  Legends running throughout the module:
64  C - Coordinator
65  CP - checkpoint
66  W - Worker
68  B-event event that Begins a group (a transaction)
69  T-event event that Terminates a group (a transaction)
70 */
72 /* Assigned Partition Hash (APH) entry */
75  const char *db;
77  /*
78  The number of transaction pending on this database.
79  This should only be modified under the lock slave_worker_hash_lock.
80  */
81  long usage;
82  /*
83  The list of temp tables belonging to @ db database is
84  attached to an assigned @c worker to become its thd->temporary_tables.
85  The list is updated with every ddl incl CREATE, DROP.
86  It is removed from the entry and merged to the coordinator's
87  thd->temporary_tables in case of events: slave stops, APH oversize.
88  */
91  /* todo: relax concurrency to mimic record-level locking.
92  That is to augmenting the entry with mutex/cond pair
93  pthread_mutex_t
94  pthread_cond_t
95  timestamp updated_at; */
96 };
100 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
101  db_worker_hash_entry **ptr_entry,
102  bool need_temp_tables, Slave_worker *w);
104  Slave_worker_array *workers,
105  Log_event *ev);
107 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
110  Slave_job_group() = default;
112  /*
113  We need a custom copy constructor and assign operator because std::atomic<T>
114  is not copy-constructible.
115  */
121  worker_id(other.worker_id),
122  worker(other.worker),
123  total_seqno(other.total_seqno),
130  done(other.done.load()),
131  shifted(other.shifted),
132  ts(other.ts),
133 #ifndef NDEBUG
134  notified(other.notified),
135 #endif
138  new_fd_event(other.new_fd_event) {
139  }
146  worker_id = other.worker_id;
147  worker = other.worker;
148  total_seqno = other.total_seqno;
156  shifted = other.shifted;
157  ts = other.ts;
158 #ifndef NDEBUG
159  notified = other.notified;
160 #endif
163  new_fd_event = other.new_fd_event;
164  return *this;
165  }
167  char *group_master_log_name; // (actually redundant)
168  /*
169  T-event lop_pos filled by Worker for CheckPoint (CP)
170  */
173  /*
174  When relay-log name changes allocates and fill in a new name of relay-log,
175  otherwise it fills in NULL.
176  Coordinator keeps track of each Worker has been notified on the updating
177  to make sure the routine runs once per change.
179  W checks the value at commit and memoriezes a not-NULL.
180  Freeing unless NULL is left to Coordinator at CP.
181  */
182  char *group_relay_log_name; // The value is last seen relay-log
184  ulong worker_id;
188  my_off_t master_log_pos; // B-event log_pos
189  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
191  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
193  my_off_t
194  checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
196  std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
197  ulong shifted; // shift the last CP bitmap at receiving a new CP
198  time_t ts; // Group's timestampt to update Seconds_behind_master
199 #ifndef NDEBUG
200  bool notified{false}; // to debug group_master_log_name change notification
201 #endif
202  /* Clock-based scheduler requirement: */
203  longlong last_committed; // commit parent timestamp
204  longlong sequence_number; // transaction's logical timestamp
205  /*
206  After Coordinator has seen a new FD event, it sets this member to
207  point to the new event, once per worker. Coordinator does so
208  when it schedules a first group following the FD event to a worker.
209  It checks Slave_worker::fd_change_notified flag to decide whether
210  to do this or not.
211  When the worker executes the group, it replaces its currently
212  active FD by the new FD once it takes on the group first event. It
213  checks this member and resets it after the FD replacement is done.
215  The member is kind of lock-free. It's updated by Coordinator and
216  read by Worker without holding any mutex. That's still safe thanks
217  to Slave_worker::jobs_lock that works as synchronizer, Worker
218  can't read any stale info.
219  The member is updated by Coordinator when it decides which Worker
220  an event following a new FD is to be scheduled.
221  After Coordinator has chosen a Worker, it queues the event to it
222  with necessarily taking Slave_worker::jobs_lock. The Worker grabs
223  the mutex lock later at pulling the event from the queue and
224  releases the lock before to read from this member.
226  This sequence of actions shows the write operation always precedes
227  the read one, and ensures no stale FD info is passed to the
228  Worker.
229  */
231  /*
232  Coordinator fills the struct with defaults and options at starting of
233  a group distribution.
234  */
238  group_master_log_name = nullptr; // todo: remove
239  group_relay_log_name = nullptr;
241  total_seqno = seqno;
242  checkpoint_log_name = nullptr;
243  checkpoint_log_pos = 0;
244  checkpoint_relay_log_name = nullptr;
246  checkpoint_seqno = (uint)-1;
247  done = 0;
248  ts = 0;
249 #ifndef NDEBUG
250  notified = false;
251 #endif
254  new_fd_event = nullptr;
255  }
256 };
258 /**
259  The class defines a type of queue with a predefined max size that is
260  implemented using the circular memory buffer.
261  That is items of the queue are accessed as indexed elements of
262  the array buffer in a way that when the index value reaches
263  a max value it wraps around to point to the first buffer element.
264 */
265 template <typename Element_type>
267  public:
269  ulong size; // the Size of the queue in terms of element
270  ulong avail; // first Available index to append at (next to tail)
271  ulong entry; // the head index or the entry point to the queue.
272  volatile ulong len; // actual length
277  size(max),
278  avail(0),
279  entry(max),
280  len(0),
281  inited_queue(false) {
282  if (!m_Q.reserve(size)) inited_queue = true;
283  m_Q.resize(size);
284  }
288  /**
289  Content of the being dequeued item is copied to the arg-pointer
290  location.
292  @param [out] item A pointer to the being dequeued item.
293  @return the queue's array index that the de-queued item
294  located at, or
295  an error encoded in beyond the index legacy range.
296  */
297  ulong de_queue(Element_type *item);
298  /**
299  Similar to de_queue but extracting happens from the tail side.
301  @param [out] item A pointer to the being dequeued item.
302  @return the queue's array index that the de-queued item
303  located at, or an error.
304  */
305  ulong de_tail(Element_type *item);
307  /**
308  return the index where the arg item locates
309  or an error encoded as a value in beyond of the legacy range
310  [0, size) (value `size' is excluded).
311  */
312  ulong en_queue(Element_type *item);
313  /**
314  return the value of @c data member of the head of the queue.
315  */
316  Element_type *head_queue() {
317  if (empty()) return nullptr;
318  return &m_Q[entry];
319  }
321  bool gt(ulong i, ulong k); // comparision of ordering of two entities
322  /* index is within the valid range */
323  bool in(ulong k) {
324  return !empty() && (entry > avail ? (k >= entry || k < avail)
325  : (k >= entry && k < avail));
326  }
327  bool empty() { return entry == size; }
328  bool full() { return avail == size; }
329 };
331 /**
332  Group Assigned Queue whose first element identifies first gap
333  in committed sequence. The head of the queue is therefore next to
334  the low-water-mark.
335 */
336 class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
337  public:
338  bool inited;
340  /* master's Rot-ev exec */
341  void update_current_binlog(const char *post_rotate);
343  /*
344  The last checkpoint time Low-Water-Mark
345  */
348  /* last time processed indexes for each worker */
351  /* the being assigned group index in GAQ */
354  Slave_committed_queue(ulong max, uint n);
357  if (inited) {
359  free_dynamic_items(); // free possibly left allocated strings in GAQ list
360  }
361  }
363 #ifndef NDEBUG
364  bool count_done(Relay_log_info *rli);
365 #endif
367  /* Checkpoint routine refreshes the queue */
369  /* Method is for slave shutdown time cleanup */
370  void free_dynamic_items();
371  /*
372  returns a pointer to Slave_job_group struct instance as indexed by arg
373  in the circular buffer dyn-array
374  */
376  assert(ind < size);
377  return &m_Q[ind];
378  }
380  /**
381  Assignes @c assigned_group_index to an index of enqueued item
382  and returns it.
383  */
384  ulong en_queue(Slave_job_group *item) {
385  return assigned_group_index =
387  }
389  /**
390  Dequeue from head.
392  @param [out] item A pointer to the being dequeued item.
393  @return The queue's array index that the de-queued item located at,
394  or an error encoded in beyond the index legacy range.
395  */
396  ulong de_queue(Slave_job_group *item) {
398  }
400  /**
401  Similar to de_queue() but removing an item from the tail side.
403  @param [out] item A pointer to the being dequeued item.
404  @return the queue's array index that the de-queued item
405  located at, or an error.
406  */
407  ulong de_tail(Slave_job_group *item) {
409  }
411  ulong find_lwm(Slave_job_group **, ulong);
412 };
414 /**
415  @return the index where the arg item has been located
416  or an error.
417 */
418 template <typename Element_type>
420  ulong ret;
421  if (avail == size) {
422  assert(avail == m_Q.size());
423  return (ulong)-1;
424  }
426  // store
428  ret = avail;
429  m_Q[avail] = *item;
431  // pre-boundary cond
432  if (entry == size) entry = avail;
434  avail = (avail + 1) % size;
435  len++;
437  // post-boundary cond
438  if (avail == entry) avail = size;
440  assert(avail == entry || len == (avail >= entry) ? (avail - entry)
441  : (size + avail - entry));
442  assert(avail != entry);
444  return ret;
445 }
447 /**
448  Dequeue from head.
450  @param [out] item A pointer to the being dequeued item.
451  @return the queue's array index that the de-queued item
452  located at, or an error as an int outside the legacy
453  [0, size) (value `size' is excluded) range.
454 */
455 template <typename Element_type>
457  ulong ret;
458  if (entry == size) {
459  assert(len == 0);
460  return (ulong)-1;
461  }
463  ret = entry;
464  *item = m_Q[entry];
465  len--;
467  // pre boundary cond
468  if (avail == size) avail = entry;
469  entry = (entry + 1) % size;
471  // post boundary cond
472  if (avail == entry) entry = size;
474  assert(entry == size ||
475  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
476  assert(avail != entry);
478  return ret;
479 }
481 template <typename Element_type>
483  if (entry == size) {
484  assert(len == 0);
485  return (ulong)-1;
486  }
488  avail = (entry + len - 1) % size;
489  *item = m_Q[avail];
490  len--;
492  // post boundary cond
493  if (avail == entry) entry = size;
495  assert(entry == size ||
496  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
497  assert(avail != entry);
499  return avail;
500 }
502 class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
503  public:
505  /*
506  Coordinator marks with true, Worker signals back at queue back to
507  available
508  */
509  bool overfill;
511 };
513 class Slave_worker : public Relay_log_info {
514  public:
517  PSI_mutex_key *param_key_info_run_lock,
518  PSI_mutex_key *param_key_info_data_lock,
519  PSI_mutex_key *param_key_info_sleep_lock,
520  PSI_mutex_key *param_key_info_thd_lock,
521  PSI_mutex_key *param_key_info_data_cond,
522  PSI_mutex_key *param_key_info_start_cond,
523  PSI_mutex_key *param_key_info_stop_cond,
524  PSI_mutex_key *param_key_info_sleep_cond,
525 #endif
526  uint param_id, const char *param_channel);
528  ~Slave_worker() override;
530  Slave_jobs_queue jobs; // assignment queue containing events to execute
531  mysql_mutex_t jobs_lock; // mutex for the jobs queue
532  mysql_cond_t jobs_cond; // condition variable for the jobs queue
533  Relay_log_info *c_rli; // pointer to Coordinator's rli
536  curr_group_exec_parts; // Current Group Executed Partitions
538 #ifndef NDEBUG
539  bool curr_group_seen_sequence_number; // is set to true about starts_group()
540 #endif
541  ulong id; // numberic identifier of the Worker
543  /*
544  Worker runtime statictics
545  */
546  // the index in GAQ of the last processed group by this Worker
547  volatile ulong last_group_done_index;
548  ulonglong
549  last_groups_assigned_index; // index of previous group assigned to worker
550  ulong wq_empty_waits; // how many times got idle
551  ulong events_done; // how many events (statements) processed
552  ulong groups_done; // how many groups (transactions) processed
553  volatile int curr_jobs; // number of active assignments
554  // number of partitions allocated to the worker at point in time
556  // symmetric to rli->mts_end_group_sets_max_dbs
559  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
560  volatile bool checkpoint_notified; // Coord sets and resets, W can read
561  volatile bool
562  master_log_change_notified; // Coord sets and resets, W can read
563  /*
564  The variable serves to Coordinator as a memo to itself
565  to notify a Worker about the fact that a new FD has been read.
566  Normally, the value is true, to mean the Worker is notified.
567  When Coordinator reads a new FD it changes the value to false.
568  When Coordinator schedules to a Worker the first event following the new FD,
569  it propagates the new FD to the Worker through
570  Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
571  to the regular true, to denote things done. Worker will adapt to the new FD
572  once it takes on a first event of the marked group.
573  */
575  ulong bitmap_shifted; // shift the last bitmap at receiving new CP
576  // WQ current excess above the overrun level
578  /*
579  number of events starting from which Worker queue is regarded as
580  close to full. The number of the excessive events yields a weight factor
581  to compute Coordinator's nap.
582  */
584  /*
585  reverse to overrun: the number of events below which Worker is
586  considered underruning
587  */
589  /*
590  Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
591  When WQ length is dropped below overrun the counter is reset.
592  */
593  ulong excess_cnt;
594  /*
595  Coordinates of the last CheckPoint (CP) this Worker has
596  acknowledged; part of is persisent data
597  */
602  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
603  MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
604  ulong
605  worker_checkpoint_seqno; // the most significant ON bit in group_executed
606  /* Initial value of FD-for-execution version until it's gets known. */
610  RUNNING = 1,
611  ERROR_LEAVING = 2, // is set by Worker
612  STOP = 3, // is set by Coordinator upon reciving STOP
614  4 // is set by worker upon completing job when STOP SLAVE is issued
615  };
617  /*
618  This function is used to make a copy of the worker object before we
619  destroy it on STOP SLAVE. This new object is then used to report the
620  worker status until next START SLAVE following which the new worker objetcs
621  will be used.
622  */
624  THD *worker_thd, const Error &last_error,
625  Gtid_monitoring_info *monitoring_info_arg);
627  /*
628  The running status is guarded by jobs_lock mutex that a writer
629  Coordinator or Worker itself needs to hold when write a new value.
630  */
632  /*
633  exit_incremented indicates whether worker has contributed to max updated
634  index. By default it is set to false. When the worker contibutes for the
635  first time this variable is set to true.
636  */
639  int init_worker(Relay_log_info *, ulong);
640  int rli_init_info(bool);
641  int flush_info(bool force = false);
642  static size_t get_number_worker_fields();
643  /**
644  Sets bits for columns that are allowed to be `NULL`.
646  @param nullable_fields the bitmap to hold the nullable fields.
647  */
648  static void set_nullable_fields(MY_BITMAP *nullable_fields);
649  void slave_worker_ends_group(Log_event *, int);
650  const char *get_master_log_name();
652  ulonglong set_master_log_pos(ulong val) { return master_log_pos = val; }
653  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
654  /**
655  The method is a wrapper to provide uniform interface with STS and is
656  to be called from Relay_log_info and Slave_worker pre_commit() methods.
657  */
658  bool commit_positions() override {
659  assert(current_event);
661  return commit_positions(
663  is_transactional());
664  }
665  /**
666  See the comments for STS version of this method.
667  */
668  void post_commit(bool on_rollback) override {
669  if (on_rollback) {
670  if (is_transactional())
673  } else if (!is_transactional())
676  true);
677  }
678  /*
679  When commit fails clear bitmap for executed worker group. Revert back the
680  positions to the old positions that existed before commit using the
681  checkpoint.
683  @param Slave_job_group a pointer to Slave_job_group struct instance which
684  holds group master log pos, group relay log pos and checkpoint positions.
685  */
686  void rollback_positions(Slave_job_group *ptr_g);
687  bool reset_recovery_info();
688  /**
689  The method runs at Worker initalization, at runtime when
690  Coordinator supplied a new FD event for execution context, and at
691  the Worker pool shutdown.
692  Similarly to the Coordinator's
693  Relay_log_info::set_rli_description_event() the possibly existing
694  old FD is destoyed, carefully; each worker decrements
695  Format_description_log_event::atomic_usage_counter and when it is made
696  zero the destructor runs.
697  Unlike to Coordinator's role, the usage counter of the new FD is *not*
698  incremented, see @c Log_event::get_slave_worker() where and why it's done
699  there.
701  Notice, the method is run as well by Coordinator per each Worker at MTS
702  shutdown time.
704  Todo: consider to merge logics of the method with that of
705  Relay_log_info class.
707  @param fdle pointer to a new Format_description_log_event
709  @return 1 if an error was encountered, 0 otherwise.
710  */
714  if (fdle) {
715  /*
716  When the master rotates its binary log, set gtid_next to
717  NOT_YET_DETERMINED. This tells the slave thread that:
719  - If a Gtid_log_event is read subsequently, gtid_next will be set to the
720  given GTID (this is done in gtid_pre_statement_checks()).
722  - If a statement is executed before any Gtid_log_event, then gtid_next
723  is set to anonymous (this is done in Gtid_log_event::do_apply_event().
725  It is imporant to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
726  a transaction. If that would happen when GTID_MODE=ON, the next
727  statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
728  which is disallowed when GTID_MODE=ON. So then there would be no way to
729  end the transaction; any attempt to do so would result in this error.
731  There are three possible states when reaching this execution flow point
732  (see further below for a more detailed explanation on each):
734  - **No active transaction, and not in a group**: set `gtid_next` to
737  - **No active transaction, and in a group**: do nothing regarding
738  `gtid_next`.
740  - **An active transaction exists**: impossible to set `gtid_next` and no
741  reason to process the `Format_description` event so, trigger an error.
743  For the sake of correctness, let's defined the meaning of having a
744  transaction "active" or "in a group".
746  A transaction is "active" if either BEGIN was executed or autocommit=0
747  and a DML statement was executed (@see
748  THD::in_active_multi_stmt_transaction).
750  A transaction is "in a group" if it is applied by the replication
751  applier, and the relay log position is between Gtid_log_event and the
752  committing event (@see Relay_log_info::is_in_group).
754  The three different states explained further:
756  **No active transaction, and not in a group**: It is normal to have
757  gtid_next=automatic/undefined and have a Format_description_log_event in
758  this condition. We are outside transaction context and should set
759  gtid_next to not_yet_determined.
761  **No active transaction, and in a group**: Having
762  gtid_next=automatic/undefined in a group is impossible if master is 5.7
763  or later, because the group always starts with a Gtid_log_event or an
764  Anonymous_gtid_log_event, which will set gtid_next to anonymous or
765  gtid. But it is possible to have gtid_next=undefined when replicating
766  from a 5.6 master with gtid_mode=off, because it does not generate any
767  such event. And then, it is possible to have no active transaction in a
768  group if the master has logged a DDL as a User_var_log_event followed by
769  a Query_log_event. The User_var_log_event will start a group, but not
770  start an active transaction or change gtid_next. In this case, it is
771  possible that a Format_description_log_event occurs, if the group
772  (transaction) is broken on two relay logs, so that User_var_log_event
773  appears at the end of one relay log and Query_log_event at the beginning
774  of the next one. In such cases, we should not set gtid_next.
776  **An active transaction exists**: It is possible to have
777  gtid_next=automatic/undefined in an active transaction, only if
778  gtid_next=automatic, which is only possible in a client connection using
779  gtid_next=automatic. In this scenario, there is no reason to execute a
780  Format_description_log_event. So we generate an error.
781  */
784  bool in_active_multi_stmt =
787  if (!is_in_group() && !in_active_multi_stmt) {
788  DBUG_PRINT("info",
789  ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
791  } else if (in_active_multi_stmt) {
793  "gtid_next");
794  return 1;
795  }
796  }
799  }
800  if (rli_description_event) {
804  /* The being deleted by Worker FD can't be the latest one */
807  delete rli_description_event;
808  }
809  }
810  rli_description_event = fdle;
812  return 0;
813  }
815  inline void reset_gaq_index() { gaq_index = c_rli->gaq->size; }
816  inline void set_gaq_index(ulong val) {
817  if (gaq_index == c_rli->gaq->size) gaq_index = val;
818  }
822  /**
823  Make the necessary changes to both the `Slave_worker` and current
824  `Log_event` objects, before retrying to apply the transaction.
826  Since the event is going to be re-read from the relay-log file, there
827  may be actions needed to be taken to reset the state both of `this`
828  instance, as well as of the current `Log_event` being processed.
830  @param event The `Log_event` object currently being processed.
831  */
834  /**
835  Checks if the transaction can be retried, and if not, reports an error.
837  @param[in] thd The THD object of current thread.
839  @returns std::tuple<bool, bool, uint> where each element has
840  following meaning:
842  first element of tuple is function return value and determines:
843  false if the transaction should be retried
844  true if the transaction should not be retried
846  second element of tuple determines:
847  the function will set the value to true, in case the retry
848  should be "silent". Silent means that the caller should not
849  report it in performance_schema tables, write to the error log,
850  or sleep. Currently, silent is used by NDB only.
852  third element of tuple determines:
853  If the caller should report any other error than that stored in
854  thd->get_stmt_da()->mysql_errno(), then this function will store
855  that error in this third element of the tuple.
857  */
858  std::tuple<bool, bool, uint> check_and_report_end_of_retries(THD *thd);
860  /**
861  It is called after an error happens. It checks if that is an temporary
862  error and if the transaction should be retried. Then it will retry the
863  transaction if it is allowed. Retry policy and logic is similar to
864  single-threaded slave.
866  @param[in] start_relay_number The extension number of the relay log which
867  includes the first event of the transaction.
868  @param[in] start_relay_pos The offset of the transaction's first event.
870  @param[in] end_relay_number The extension number of the relay log which
871  includes the last event it should retry.
872  @param[in] end_relay_pos The offset of the last event it should retry.
874  @retval false if transaction succeeds (possibly after a number of retries)
875  @retval true if transaction fails
876  */
877  bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
878  uint end_relay_number, my_off_t end_relay_pos);
880  bool set_info_search_keys(Rpl_info_handler *to) override;
882  /**
883  Get coordinator's RLI. Especially used get the rli from
884  a slave thread, like this: thd->rli_slave->get_c_rli();
885  thd could be a SQL thread or a worker thread.
886  */
887  Relay_log_info *get_c_rli() override { return c_rli; }
889  /**
890  return an extension "for channel channel_name"
891  for error messages per channel
892  */
893  const char *get_for_channel_str(bool upper_case = false) const override;
897  return ptr_g->sequence_number;
898  }
900  /**
901  Return true if replica-preserve-commit-order is enabled and an
902  earlier transaction is waiting for a row-level lock held by this
903  transaction.
904  */
907  /**
908  Called when replica-preserve-commit-order is enabled, by the worker
909  processing an earlier transaction that waits on a row-level lock
910  held by this worker's transaction.
911  */
914  /**
915  @return either the master server version as extracted from the last
916  installed Format_description_log_event, or when it was not
917  installed then the slave own server version.
918  */
920  return !get_rli_description_event()
923  }
925  protected:
926  void do_report(loglevel level, int err_code, const char *msg,
927  va_list v_args) const override
928  MY_ATTRIBUTE((format(printf, 4, 0)));
930  private:
931  ulong gaq_index; // GAQ index of the current assignment
932  ulonglong
933  master_log_pos; // event's cached log_pos for possibile error report
934  void end_info();
935  bool read_info(Rpl_info_handler *from) override;
936  bool write_info(Rpl_info_handler *to) override;
937  std::atomic<bool> m_commit_order_deadlock;
941  bool worker_sleep(ulong seconds);
942  bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
943  uint end_relay_number, my_off_t end_relay_pos);
944  void assign_partition_db(Log_event *ev);
948  public:
949  /**
950  Returns an array with the expected column numbers of the primary key
951  fields of the table repository.
952  */
953  static const uint *get_table_pk_field_indexes();
954  /**
955  Returns the index of the Channel_name field of the table repository.
956  */
957  static uint get_channel_field_index();
958 };
960 void *head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
961 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
963  Slave_job_item *job_item);
967 // Auxiliary function
971  Relay_log_info *rli);
975  return static_cast<Slave_worker *>(thd->rli_slave);
976 }
980 #endif
Contains the classes representing events occurring in the replication stream.
Class representing an error.
Definition: error.h:47
For binlog version 4.
Definition: log_event.h:1613
std::atomic< int32 > atomic_usage_counter
Definition: log_event.h:1631
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1291
This is the abstract base class for binary log events.
Definition: log_event.h:659
ulong mts_group_idx
Index in rli->gaq array to indicate a group that this event is purging.
Definition: log_event.h:814
bool reserve(size_t n)
Reserves space for array elements.
Definition: prealloced_array.h:285
void resize(size_t n, const Element_type &val=Element_type())
Resizes the container so that it contains n elements.
Definition: prealloced_array.h:582
Definition: rpl_rli.h:200
bool is_in_group() const
A group is defined as the entire range of events that constitute a transaction or auto-committed stat...
Definition: rpl_rli.h:1491
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1719
Log_event * current_event
Reference to being applied event.
Definition: rpl_rli.h:1989
Slave_committed_queue * gaq
Definition: rpl_rli.h:1162
ulong adapt_to_master_version_updown(ulong master_version, ulong current_version)
The method compares two supplied versions and carries out down- or up- grade customization of executi...
Format_description_log_event * rli_description_event
Definition: rpl_rli.h:1795
Definition: rpl_info_handler.h:57
bool is_transactional() const
Definition: rpl_info.h:110
THD * info_thd
Definition: rpl_info.h:77
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:336
void free_dynamic_items()
Method should be executed at slave system stop to cleanup dynamically allocated items that remained a...
bool inited
Definition: rpl_rli_pdb.h:338
ulong de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:396
bool count_done(Relay_log_info *rli)
Slave_committed_queue(ulong max, uint n)
Slave_job_group * get_job_group(ulong ind)
Definition: rpl_rli_pdb.h:375
ulong en_queue(Slave_job_group *item)
Assignes assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:384
ulong assigned_group_index
Definition: rpl_rli_pdb.h:352
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:349
ulong move_queue_head(Slave_worker_array *ws)
The queue is processed from the head item by item to purge items representing committed groups.
Definition: rpl_rli_pdb.h:356
void update_current_binlog(const char *post_rotate)
ulong de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:407
Slave_job_group lwm
Definition: rpl_rli_pdb.h:346
ulong find_lwm(Slave_job_group **, ulong)
Finds low-water mark of committed jobs in GAQ.
Definition: rpl_rli_pdb.h:502
Definition: rpl_rli_pdb.h:504
bool overfill
Definition: rpl_rli_pdb.h:509
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:510
Error const & last_error() const
Definition: rpl_reporting.h:136
Definition: rpl_rli_pdb.h:513
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:651
ulong underrun_level
Definition: rpl_rli_pdb.h:588
char checkpoint_relay_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:598
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:536
ulong excess_cnt
Definition: rpl_rli_pdb.h:593
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:539
bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
Read events from relay logs and apply them.
void reset_gaq_index()
Definition: rpl_rli_pdb.h:815
ulong overrun_level
Definition: rpl_rli_pdb.h:583
bool commit_positions() override
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:658
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:575
longlong sequence_number()
Definition: rpl_rli_pdb.h:895
void reset_commit_order_deadlock()
int flush_info(bool force=false)
int set_rli_description_event(Format_description_log_event *fdle) override
The method runs at Worker initalization, at runtime when Coordinator supplied a new FD event for exec...
Definition: rpl_rli_pdb.h:711
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:601
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:530
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:559
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:603
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:605
bool worker_sleep(ulong seconds)
Sleep for a given amount of seconds or until killed.
void copy_values_for_PFS(ulong worker_id, en_running_state running_status, THD *worker_thd, const Error &last_error, Gtid_monitoring_info *monitoring_info_arg)
std::tuple< bool, bool, uint > check_and_report_end_of_retries(THD *thd)
Checks if the transaction can be retried, and if not, reports an error.
ulong gaq_index
Definition: rpl_rli_pdb.h:931
void post_commit(bool on_rollback) override
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:668
void do_report(loglevel level, int err_code, const char *msg, va_list v_args) const override
const char * get_for_channel_str(bool upper_case=false) const override
return an extension "for channel channel_name" for error messages per channel
void report_commit_order_deadlock()
Called when replica-preserve-commit-order is enabled, by the worker processing an earlier transaction...
Slave_worker(const Slave_worker &info)
Slave_worker(Relay_log_info *rli, PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond, uint param_id, const char *param_channel)
bool write_info(Rpl_info_handler *to) override
int init_worker(Relay_log_info *, ulong)
Method is executed by Coordinator at Worker startup time to initialize members parly with values supp...
static uint get_channel_field_index()
Returns the index of the Channel_name field of the table repository.
bool read_info(Rpl_info_handler *from) override
void assign_partition_db(Log_event *ev)
const char * get_master_log_name()
static void set_nullable_fields(MY_BITMAP *nullable_fields)
Sets bits for columns that are allowed to be NULL.
bool set_info_search_keys(Rpl_info_handler *to) override
To search in the slave repositories, each slave info object (mi, rli or worker) should use a primary ...
ulong server_version
Definition: rpl_rli_pdb.h:607
volatile int curr_jobs
Definition: rpl_rli_pdb.h:553
ulong id
Definition: rpl_rli_pdb.h:541
Relay_log_info * get_c_rli() override
Get coordinator's RLI.
Definition: rpl_rli_pdb.h:887
void prepare_for_retry(Log_event &event)
Make the necessary changes to both the Slave_worker and current Log_event objects,...
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:933
int slave_worker_exec_event(Log_event *ev)
MTS worker main routine.
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:919
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:557
ulong events_done
Definition: rpl_rli_pdb.h:551
bool exit_incremented
Definition: rpl_rli_pdb.h:637
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:547
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:533
bool found_commit_order_deadlock()
Return true if replica-preserve-commit-order is enabled and an earlier transaction is waiting for a r...
bool reset_recovery_info()
Clean up a part of Worker info table that is regarded in in gaps collecting at recovery.
Slave_worker & operator=(const Slave_worker &info)
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:531
Definition: rpl_rli_pdb.h:608
Definition: rpl_rli_pdb.h:609
Definition: rpl_rli_pdb.h:613
Definition: rpl_rli_pdb.h:610
Definition: rpl_rli_pdb.h:612
Definition: rpl_rli_pdb.h:611
void rollback_positions(Slave_job_group *ptr_g)
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:577
static const uint * get_table_pk_field_indexes()
Returns an array with the expected column numbers of the primary key fields of the table repository.
void slave_worker_ends_group(Log_event *, int)
Deallocation routine to cancel out few effects of map_db_to_worker().
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:652
std::atomic< bool > m_commit_order_deadlock
Definition: rpl_rli_pdb.h:937
bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
It is called after an error happens.
long usage_partition
Definition: rpl_rli_pdb.h:555
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:816
char checkpoint_master_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:600
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:549
void end_info()
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:631
ulong wq_empty_waits
Definition: rpl_rli_pdb.h:550
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:599
bool fd_change_notified
Definition: rpl_rli_pdb.h:574
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:560
ulong groups_done
Definition: rpl_rli_pdb.h:552
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:532
static size_t get_number_worker_fields()
~Slave_worker() override
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:602
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:562
int rli_init_info(bool)
A part of Slave worker iitializer that provides a minimum context for MTS recovery.
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_class.h:821
Relay_log_info * rli_slave
Definition: sql_class.h:949
bool in_active_multi_stmt_transaction() const
true if the session is in a multi-statement transaction mode (
Definition: sql_class.h:2955
struct System_variables variables
Definition: sql_class.h:1006
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance.
Definition: control_events.cpp:180
The class defines a type of queue with a predefined max size that is implemented using the circular m...
Definition: rpl_rli_pdb.h:266
ulong entry
Definition: rpl_rli_pdb.h:271
ulong en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value in beyond of the legacy ra...
Definition: rpl_rli_pdb.h:419
bool full()
Definition: rpl_rli_pdb.h:328
ulong avail
Definition: rpl_rli_pdb.h:270
circular_buffer_queue(ulong max)
Definition: rpl_rli_pdb.h:275
bool gt(ulong i, ulong k)
two index comparision to determine which of the two is ordered first.
ulong size
Definition: rpl_rli_pdb.h:269
Definition: rpl_rli_pdb.h:285
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:316
ulong de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:456
bool in(ulong k)
Definition: rpl_rli_pdb.h:323
volatile ulong len
Definition: rpl_rli_pdb.h:272
bool inited_queue
Definition: rpl_rli_pdb.h:273
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:268
ulong de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:482
bool empty()
Definition: rpl_rli_pdb.h:327
struct _entry entry
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: rpl_replica.h:88
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:138
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:51
static constexpr unsigned PSI_INSTRUMENT_ME
Definition: psi_bits.h:42
Binary log event definitions.
Header for compiler-dependent features.
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:168
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:55
ulonglong my_off_t
Definition: my_inttypes.h:71
long long int longlong
Definition: my_inttypes.h:54
#define MYF(v)
Definition: my_inttypes.h:96
Common #defines and includes for file and socket I/O.
#define FN_REFLEN
Definition: my_io.h:82
Definition of the global "loglevel" enumeration.
Definition: my_loglevel.h:40
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
Definition: my_psi_config.h:38
Instrumentation helpers for conditions.
ABI for instrumented mutexes.
Log info(cout, "NOTE")
struct MasterPos master_pos
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Performance schema instrumentation interface.
Instrumentation helpers for mutexes.
required string event
Definition: replication_group_member_actions.proto:31
GTID_NEXT is set to this state after a transaction with GTID_NEXT=='UUID:NUMBER' is committed.
Definition: rpl_gtid.h:3717
Specifies that the GTID has not been generated yet; it will be generated on commit.
Definition: rpl_gtid.h:3665
Definition: rpl_mta_submode.h:46
Slave_worker * get_thd_worker(THD *thd)
Definition: rpl_rli_pdb.h:974
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
bool init_hash_workers(Relay_log_info *rli)
void destroy_hash_workers(Relay_log_info *)
ulong w_rr
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry's table list head.
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return a job item through a struct which point is supplied via argument.
void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return the value of data member of the head of the queue.
void set_not_yet_determined()
Definition: rpl_gtid.h:3786
enum_gtid_type type
The type of this GTID.
Definition: rpl_gtid.h:3767
Definition: my_bitmap.h:41
Definition: rpl_rli_pdb.h:109
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:188
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:191
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:230
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:116
bool notified
Definition: rpl_rli_pdb.h:200
time_t ts
Definition: rpl_rli_pdb.h:198
longlong last_committed
Definition: rpl_rli_pdb.h:203
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:196
Slave_worker * worker
Definition: rpl_rli_pdb.h:185
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:194
ulong worker_id
Definition: rpl_rli_pdb.h:184
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:190
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:235
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:171
char * group_master_log_name
Definition: rpl_rli_pdb.h:167
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:192
ulong shifted
Definition: rpl_rli_pdb.h:197
longlong sequence_number
Definition: rpl_rli_pdb.h:204
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:195
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:141
char * group_relay_log_name
Definition: rpl_rli_pdb.h:182
ulonglong total_seqno
Definition: rpl_rli_pdb.h:186
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:183
Gtid_specification gtid_next
Definition: system_variables.h:323
Definition: table.h:1394
Definition: completion_hash.h:34
Legends running throughout the module:
Definition: rpl_rli_pdb.h:73
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:89
uint db_len
Definition: rpl_rli_pdb.h:74
long usage
Definition: rpl_rli_pdb.h:81
const char * db
Definition: rpl_rli_pdb.h:75
Slave_worker * worker
Definition: rpl_rli_pdb.h:76
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
Definition: rpl_rli.h:79
double seconds()
Include file for Sun RPC to compile out of the box.
unsigned int uint
int n