MySQL  8.0.20
Source Code Documentation
rpl_rli_pdb.h
Go to the documentation of this file.
1 /* Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef RPL_RLI_PDB_H
24 #define RPL_RLI_PDB_H
25 
26 #include <stdarg.h>
27 #include <sys/types.h>
28 #include <time.h>
29 #include <atomic>
30 #include <tuple>
31 
33 #include "my_bitmap.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_io.h"
38 #include "my_loglevel.h"
39 #include "my_psi_config.h"
43 #include "mysql/psi/psi_base.h"
45 #include "prealloced_array.h" // Prealloced_array
46 #include "sql/log_event.h" // Format_description_log_event
47 #include "sql/rpl_gtid.h"
48 #include "sql/rpl_mts_submode.h" // enum_mts_parallel_type
49 #include "sql/rpl_rli.h" // Relay_log_info
50 #include "sql/rpl_slave.h" // MTS_WORKER_UNDEF
51 #include "sql/sql_class.h"
52 #include "sql/system_variables.h"
53 
54 class Rpl_info_handler;
55 class Slave_worker;
56 struct TABLE;
57 
58 #ifndef DBUG_OFF
59 extern ulong w_rr;
60 #endif
61 /**
62  Legends running throughout the module:
63 
64  C - Coordinator
65  CP - checkpoint
66  W - Worker
67 
68  B-event event that Begins a group (a transaction)
69  T-event event that Terminates a group (a transaction)
70 */
71 
72 /* Assigned Partition Hash (APH) entry */
75  const char *db;
77  /*
78  The number of transaction pending on this database.
79  This should only be modified under the lock slave_worker_hash_lock.
80  */
81  long usage;
82  /*
83  The list of temp tables belonging to @ db database is
84  attached to an assigned @c worker to become its thd->temporary_tables.
85  The list is updated with every ddl incl CREATE, DROP.
86  It is removed from the entry and merged to the coordinator's
87  thd->temporary_tables in case of events: slave stops, APH oversize.
88  */
90 
91  /* todo: relax concurrency to mimic record-level locking.
92  That is to augmenting the entry with mutex/cond pair
93  pthread_mutex_t
94  pthread_cond_t
95  timestamp updated_at; */
96 };
97 
100 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
101  db_worker_hash_entry **ptr_entry,
102  bool need_temp_tables, Slave_worker *w);
104  Slave_worker_array *workers,
105  Log_event *ev);
106 
107 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
108 
111 
112  /*
113  We need a custom copy constructor and assign operator because std::atomic<T>
114  is not copy-constructible.
115  */
117  : group_master_log_name(other.group_master_log_name),
118  group_master_log_pos(other.group_master_log_pos),
119  group_relay_log_name(other.group_relay_log_name),
120  group_relay_log_pos(other.group_relay_log_pos),
121  worker_id(other.worker_id),
122  worker(other.worker),
123  total_seqno(other.total_seqno),
124  master_log_pos(other.master_log_pos),
125  checkpoint_seqno(other.checkpoint_seqno),
126  checkpoint_log_pos(other.checkpoint_log_pos),
127  checkpoint_log_name(other.checkpoint_log_name),
128  checkpoint_relay_log_pos(other.checkpoint_relay_log_pos),
129  checkpoint_relay_log_name(other.checkpoint_relay_log_name),
130  done(other.done.load()),
131  shifted(other.shifted),
132  ts(other.ts),
133 #ifndef DBUG_OFF
134  notified(other.notified),
135 #endif
136  last_committed(other.last_committed),
137  sequence_number(other.sequence_number),
138  new_fd_event(other.new_fd_event) {
139  }
140 
142  group_master_log_name = other.group_master_log_name;
143  group_master_log_pos = other.group_master_log_pos;
144  group_relay_log_name = other.group_relay_log_name;
145  group_relay_log_pos = other.group_relay_log_pos;
146  worker_id = other.worker_id;
147  worker = other.worker;
148  total_seqno = other.total_seqno;
149  master_log_pos = other.master_log_pos;
150  checkpoint_seqno = other.checkpoint_seqno;
151  checkpoint_log_pos = other.checkpoint_log_pos;
152  checkpoint_log_name = other.checkpoint_log_name;
153  checkpoint_relay_log_pos = other.checkpoint_relay_log_pos;
154  checkpoint_relay_log_name = other.checkpoint_relay_log_name;
155  done.store(other.done.load());
156  shifted = other.shifted;
157  ts = other.ts;
158 #ifndef DBUG_OFF
159  notified = other.notified;
160 #endif
161  last_committed = other.last_committed;
162  sequence_number = other.sequence_number;
163  new_fd_event = other.new_fd_event;
164  return *this;
165  }
166 
167  char *group_master_log_name; // (actually redundant)
168  /*
169  T-event lop_pos filled by Worker for CheckPoint (CP)
170  */
172 
173  /*
174  When relay-log name changes allocates and fill in a new name of relay-log,
175  otherwise it fills in NULL.
176  Coordinator keeps track of each Worker has been notified on the updating
177  to make sure the routine runs once per change.
178 
179  W checks the value at commit and memoriezes a not-NULL.
180  Freeing unless NULL is left to Coordinator at CP.
181  */
182  char *group_relay_log_name; // The value is last seen relay-log
184  ulong worker_id;
187 
188  my_off_t master_log_pos; // B-event log_pos
189  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
191  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
193  my_off_t
194  checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
196  std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
197  ulong shifted; // shift the last CP bitmap at receiving a new CP
198  time_t ts; // Group's timestampt to update Seconds_behind_master
199 #ifndef DBUG_OFF
200  bool notified{false}; // to debug group_master_log_name change notification
201 #endif
202  /* Clock-based scheduler requirement: */
203  longlong last_committed; // commit parent timestamp
204  longlong sequence_number; // transaction's logical timestamp
205  /*
206  After Coordinator has seen a new FD event, it sets this member to
207  point to the new event, once per worker. Coordinator does so
208  when it schedules a first group following the FD event to a worker.
209  It checks Slave_worker::fd_change_notified flag to decide whether
210  to do this or not.
211  When the worker executes the group, it replaces its currently
212  active FD by the new FD once it takes on the group first event. It
213  checks this member and resets it after the FD replacement is done.
214 
215  The member is kind of lock-free. It's updated by Coordinator and
216  read by Worker without holding any mutex. That's still safe thanks
217  to Slave_worker::jobs_lock that works as synchronizer, Worker
218  can't read any stale info.
219  The member is updated by Coordinator when it decides which Worker
220  an event following a new FD is to be scheduled.
221  After Coordinator has chosen a Worker, it queues the event to it
222  with necessarily taking Slave_worker::jobs_lock. The Worker grabs
223  the mutex lock later at pulling the event from the queue and
224  releases the lock before to read from this member.
225 
226  This sequence of actions shows the write operation always precedes
227  the read one, and ensures no stale FD info is passed to the
228  Worker.
229  */
231  /*
232  Coordinator fills the struct with defaults and options at starting of
233  a group distribution.
234  */
236  master_log_pos = master_pos;
237  group_master_log_pos = group_relay_log_pos = 0;
238  group_master_log_name = nullptr; // todo: remove
239  group_relay_log_name = nullptr;
240  worker_id = MTS_WORKER_UNDEF;
241  total_seqno = seqno;
242  checkpoint_log_name = nullptr;
243  checkpoint_log_pos = 0;
244  checkpoint_relay_log_name = nullptr;
245  checkpoint_relay_log_pos = 0;
246  checkpoint_seqno = (uint)-1;
247  done = 0;
248  ts = 0;
249 #ifndef DBUG_OFF
250  notified = false;
251 #endif
252  last_committed = SEQ_UNINIT;
253  sequence_number = SEQ_UNINIT;
254  new_fd_event = nullptr;
255  }
256 };
257 
258 /**
259  The class defines a type of queue with a predefined max size that is
260  implemented using the circular memory buffer.
261  That is items of the queue are accessed as indexed elements of
262  the array buffer in a way that when the index value reaches
263  a max value it wraps around to point to the first buffer element.
264 */
265 template <typename Element_type>
267  public:
269  ulong size; // the Size of the queue in terms of element
270  ulong avail; // first Available index to append at (next to tail)
271  ulong entry; // the head index or the entry point to the queue.
272  volatile ulong len; // actual length
274 
276  : m_Q(PSI_INSTRUMENT_ME),
277  size(max),
278  avail(0),
279  entry(max),
280  len(0),
281  inited_queue(false) {
282  if (!m_Q.reserve(size)) inited_queue = true;
283  m_Q.resize(size);
284  }
285  circular_buffer_queue() : m_Q(PSI_INSTRUMENT_ME), inited_queue(false) {}
287 
288  /**
289  Content of the being dequeued item is copied to the arg-pointer
290  location.
291 
292  @param [out] item A pointer to the being dequeued item.
293  @return the queue's array index that the de-queued item
294  located at, or
295  an error encoded in beyond the index legacy range.
296  */
297  ulong de_queue(Element_type *item);
298  /**
299  Similar to de_queue but extracting happens from the tail side.
300 
301  @param [out] item A pointer to the being dequeued item.
302  @return the queue's array index that the de-queued item
303  located at, or an error.
304  */
305  ulong de_tail(Element_type *item);
306 
307  /**
308  return the index where the arg item locates
309  or an error encoded as a value in beyond of the legacy range
310  [0, size) (value `size' is excluded).
311  */
312  ulong en_queue(Element_type *item);
313  /**
314  return the value of @c data member of the head of the queue.
315  */
316  Element_type *head_queue() {
317  if (empty()) return nullptr;
318  return &m_Q[entry];
319  }
320 
321  bool gt(ulong i, ulong k); // comparision of ordering of two entities
322  /* index is within the valid range */
323  bool in(ulong k) {
324  return !empty() && (entry > avail ? (k >= entry || k < avail)
325  : (k >= entry && k < avail));
326  }
327  bool empty() { return entry == size; }
328  bool full() { return avail == size; }
329 };
330 
331 /**
332  Group Assigned Queue whose first element identifies first gap
333  in committed sequence. The head of the queue is therefore next to
334  the low-water-mark.
335 */
336 class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
337  public:
338  bool inited;
339 
340  /* master's Rot-ev exec */
341  void update_current_binlog(const char *post_rotate);
342 
343  /*
344  The last checkpoint time Low-Water-Mark
345  */
347 
348  /* last time processed indexes for each worker */
350 
351  /* the being assigned group index in GAQ */
353 
354  Slave_committed_queue(ulong max, uint n);
355 
357  if (inited) {
359  free_dynamic_items(); // free possibly left allocated strings in GAQ list
360  }
361  }
362 
363 #ifndef DBUG_OFF
364  bool count_done(Relay_log_info *rli);
365 #endif
366 
367  /* Checkpoint routine refreshes the queue */
368  ulong move_queue_head(Slave_worker_array *ws);
369  /* Method is for slave shutdown time cleanup */
370  void free_dynamic_items();
371  /*
372  returns a pointer to Slave_job_group struct instance as indexed by arg
373  in the circular buffer dyn-array
374  */
376  DBUG_ASSERT(ind < size);
377  return &m_Q[ind];
378  }
379 
380  /**
381  Assignes @c assigned_group_index to an index of enqueued item
382  and returns it.
383  */
384  ulong en_queue(Slave_job_group *item) {
385  return assigned_group_index =
387  }
388 
389  /**
390  Dequeue from head.
391 
392  @param [out] item A pointer to the being dequeued item.
393  @return The queue's array index that the de-queued item located at,
394  or an error encoded in beyond the index legacy range.
395  */
396  ulong de_queue(Slave_job_group *item) {
398  }
399 
400  /**
401  Similar to de_queue() but removing an item from the tail side.
402 
403  @param [out] item A pointer to the being dequeued item.
404  @return the queue's array index that the de-queued item
405  located at, or an error.
406  */
407  ulong de_tail(Slave_job_group *item) {
409  }
410 
411  ulong find_lwm(Slave_job_group **, ulong);
412 };
413 
414 /**
415  @return the index where the arg item has been located
416  or an error.
417 */
418 template <typename Element_type>
420  ulong ret;
421  if (avail == size) {
422  DBUG_ASSERT(avail == m_Q.size());
423  return (ulong)-1;
424  }
425 
426  // store
427 
428  ret = avail;
429  m_Q[avail] = *item;
430 
431  // pre-boundary cond
432  if (entry == size) entry = avail;
433 
434  avail = (avail + 1) % size;
435  len++;
436 
437  // post-boundary cond
438  if (avail == entry) avail = size;
439 
440  DBUG_ASSERT(avail == entry || len == (avail >= entry)
441  ? (avail - entry)
442  : (size + avail - entry));
443  DBUG_ASSERT(avail != entry);
444 
445  return ret;
446 }
447 
448 /**
449  Dequeue from head.
450 
451  @param [out] item A pointer to the being dequeued item.
452  @return the queue's array index that the de-queued item
453  located at, or an error as an int outside the legacy
454  [0, size) (value `size' is excluded) range.
455 */
456 template <typename Element_type>
458  ulong ret;
459  if (entry == size) {
460  DBUG_ASSERT(len == 0);
461  return (ulong)-1;
462  }
463 
464  ret = entry;
465  *item = m_Q[entry];
466  len--;
467 
468  // pre boundary cond
469  if (avail == size) avail = entry;
470  entry = (entry + 1) % size;
471 
472  // post boundary cond
473  if (avail == entry) entry = size;
474 
475  DBUG_ASSERT(
476  entry == size ||
477  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
478  DBUG_ASSERT(avail != entry);
479 
480  return ret;
481 }
482 
483 template <typename Element_type>
485  if (entry == size) {
486  DBUG_ASSERT(len == 0);
487  return (ulong)-1;
488  }
489 
490  avail = (entry + len - 1) % size;
491  *item = m_Q[avail];
492  len--;
493 
494  // post boundary cond
495  if (avail == entry) entry = size;
496 
497  DBUG_ASSERT(
498  entry == size ||
499  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
500  DBUG_ASSERT(avail != entry);
501 
502  return avail;
503 }
504 
505 class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
506  public:
508  /*
509  Coordinator marks with true, Worker signals back at queue back to
510  available
511  */
512  bool overfill;
514 };
515 
516 class Slave_worker : public Relay_log_info {
517  public:
519 #ifdef HAVE_PSI_INTERFACE
520  PSI_mutex_key *param_key_info_run_lock,
521  PSI_mutex_key *param_key_info_data_lock,
522  PSI_mutex_key *param_key_info_sleep_lock,
523  PSI_mutex_key *param_key_info_thd_lock,
524  PSI_mutex_key *param_key_info_data_cond,
525  PSI_mutex_key *param_key_info_start_cond,
526  PSI_mutex_key *param_key_info_stop_cond,
527  PSI_mutex_key *param_key_info_sleep_cond,
528 #endif
529  uint param_id, const char *param_channel);
530 
531  virtual ~Slave_worker();
532 
533  Slave_jobs_queue jobs; // assignment queue containing events to execute
534  mysql_mutex_t jobs_lock; // mutex for the jobs queue
535  mysql_cond_t jobs_cond; // condition variable for the jobs queue
536  Relay_log_info *c_rli; // pointer to Coordinator's rli
537 
539  curr_group_exec_parts; // Current Group Executed Partitions
540 
541 #ifndef DBUG_OFF
542  bool curr_group_seen_sequence_number; // is set to true about starts_group()
543 #endif
544  ulong id; // numberic identifier of the Worker
545 
546  /*
547  Worker runtime statictics
548  */
549  // the index in GAQ of the last processed group by this Worker
550  volatile ulong last_group_done_index;
551  ulonglong
552  last_groups_assigned_index; // index of previous group assigned to worker
553  ulong wq_empty_waits; // how many times got idle
554  ulong events_done; // how many events (statements) processed
555  ulong groups_done; // how many groups (transactions) processed
556  volatile int curr_jobs; // number of active assignments
557  // number of partitions allocated to the worker at point in time
559  // symmetric to rli->mts_end_group_sets_max_dbs
561 
562  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
563  volatile bool checkpoint_notified; // Coord sets and resets, W can read
564  volatile bool
565  master_log_change_notified; // Coord sets and resets, W can read
566  /*
567  The variable serves to Coordinator as a memo to itself
568  to notify a Worker about the fact that a new FD has been read.
569  Normally, the value is true, to mean the Worker is notified.
570  When Coordinator reads a new FD it changes the value to false.
571  When Coordinator schedules to a Worker the first event following the new FD,
572  it propagates the new FD to the Worker through
573  Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
574  to the regular true, to denote things done. Worker will adapt to the new FD
575  once it takes on a first event of the marked group.
576  */
578  ulong bitmap_shifted; // shift the last bitmap at receiving new CP
579  // WQ current excess above the overrun level
581  /*
582  number of events starting from which Worker queue is regarded as
583  close to full. The number of the excessive events yields a weight factor
584  to compute Coordinator's nap.
585  */
587  /*
588  reverse to overrun: the number of events below which Worker is
589  considered underruning
590  */
592  /*
593  Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
594  When WQ length is dropped below overrun the counter is reset.
595  */
596  ulong excess_cnt;
597  /*
598  Coordinates of the last CheckPoint (CP) this Worker has
599  acknowledged; part of is persisent data
600  */
601  char checkpoint_relay_log_name[FN_REFLEN];
603  char checkpoint_master_log_name[FN_REFLEN];
605  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
606  MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
607  ulong
608  worker_checkpoint_seqno; // the most significant ON bit in group_executed
609  /* Initial value of FD-for-execution version until it's gets known. */
612  NOT_RUNNING = 0,
613  RUNNING = 1,
614  ERROR_LEAVING = 2, // is set by Worker
615  STOP = 3, // is set by Coordinator upon reciving STOP
616  STOP_ACCEPTED =
617  4 // is set by worker upon completing job when STOP SLAVE is issued
618  };
619 
620  /*
621  This function is used to make a copy of the worker object before we
622  destroy it on STOP SLAVE. This new object is then used to report the
623  worker status until next START SLAVE following which the new worker objetcs
624  will be used.
625  */
626  void copy_values_for_PFS(ulong worker_id, en_running_state running_status,
627  THD *worker_thd, const Error &last_error,
628  Gtid_monitoring_info *monitoring_info_arg);
629 
630  /*
631  The running status is guarded by jobs_lock mutex that a writer
632  Coordinator or Worker itself needs to hold when write a new value.
633  */
635  /*
636  exit_incremented indicates whether worker has contributed to max updated
637  index. By default it is set to false. When the worker contibutes for the
638  first time this variable is set to true.
639  */
641 
642  int init_worker(Relay_log_info *, ulong);
643  int rli_init_info(bool);
644  int flush_info(bool force = false);
645  static size_t get_number_worker_fields();
646  /**
647  Sets bits for columns that are allowed to be `NULL`.
648 
649  @param nullable_fields the bitmap to hold the nullable fields.
650  */
651  static void set_nullable_fields(MY_BITMAP *nullable_fields);
652  void slave_worker_ends_group(Log_event *, int);
653  const char *get_master_log_name();
654  ulonglong get_master_log_pos() { return master_log_pos; }
655  ulonglong set_master_log_pos(ulong val) { return master_log_pos = val; }
656  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
657  /**
658  The method is a wrapper to provide uniform interface with STS and is
659  to be called from Relay_log_info and Slave_worker pre_commit() methods.
660  */
662  DBUG_ASSERT(current_event);
663 
664  return commit_positions(
665  current_event, c_rli->gaq->get_job_group(current_event->mts_group_idx),
666  is_transactional());
667  }
668  /**
669  See the comments for STS version of this method.
670  */
671  void post_commit(bool on_rollback) {
672  if (on_rollback) {
673  if (is_transactional())
674  rollback_positions(
675  c_rli->gaq->get_job_group(current_event->mts_group_idx));
676  } else if (!is_transactional())
677  commit_positions(current_event,
678  c_rli->gaq->get_job_group(current_event->mts_group_idx),
679  true);
680  }
681  /*
682  When commit fails clear bitmap for executed worker group. Revert back the
683  positions to the old positions that existed before commit using the
684  checkpoint.
685 
686  @param Slave_job_group a pointer to Slave_job_group struct instance which
687  holds group master log pos, group relay log pos and checkpoint positions.
688  */
689  void rollback_positions(Slave_job_group *ptr_g);
690  bool reset_recovery_info();
691  /**
692  The method runs at Worker initalization, at runtime when
693  Coordinator supplied a new FD event for execution context, and at
694  the Worker pool shutdown.
695  Similarly to the Coordinator's
696  Relay_log_info::set_rli_description_event() the possibly existing
697  old FD is destoyed, carefully; each worker decrements
698  Format_description_log_event::atomic_usage_counter and when it is made
699  zero the destructor runs.
700  Unlike to Coordinator's role, the usage counter of the new FD is *not*
701  incremented, see @c Log_event::get_slave_worker() where and why it's done
702  there.
703 
704  Notice, the method is run as well by Coordinator per each Worker at MTS
705  shutdown time.
706 
707  Todo: consider to merge logics of the method with that of
708  Relay_log_info class.
709 
710  @param fdle pointer to a new Format_description_log_event
711 
712  @return 1 if an error was encountered, 0 otherwise.
713  */
715  DBUG_TRACE;
716 
717  if (fdle) {
718  /*
719  When the master rotates its binary log, set gtid_next to
720  NOT_YET_DETERMINED. This tells the slave thread that:
721 
722  - If a Gtid_log_event is read subsequently, gtid_next will be set to the
723  given GTID (this is done in gtid_pre_statement_checks()).
724 
725  - If a statement is executed before any Gtid_log_event, then gtid_next
726  is set to anonymous (this is done in Gtid_log_event::do_apply_event().
727 
728  It is imporant to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
729  a transaction. If that would happen when GTID_MODE=ON, the next
730  statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
731  which is disallowed when GTID_MODE=ON. So then there would be no way to
732  end the transaction; any attempt to do so would result in this error.
733 
734  There are three possible states when reaching this execution flow point
735  (see further below for a more detailed explanation on each):
736 
737  - **No active transaction, and not in a group**: set `gtid_next` to
738  `NOT_YET_DETERMINED`.
739 
740  - **No active transaction, and in a group**: do nothing regarding
741  `gtid_next`.
742 
743  - **An active transaction exists**: impossible to set `gtid_next` and no
744  reason to process the `Format_description` event so, trigger an error.
745 
746  For the sake of correctness, let's defined the meaning of having a
747  transaction "active" or "in a group".
748 
749  A transaction is "active" if either BEGIN was executed or autocommit=0
750  and a DML statement was executed (@see
751  THD::in_active_multi_stmt_transaction).
752 
753  A transaction is "in a group" if it is applied by the replication
754  applier, and the relay log position is between Gtid_log_event and the
755  committing event (@see Relay_log_info::is_in_group).
756 
757  The three different states explained further:
758 
759  **No active transaction, and not in a group**: It is normal to have
760  gtid_next=automatic/undefined and have a Format_description_log_event in
761  this condition. We are outside transaction context and should set
762  gtid_next to not_yet_determined.
763 
764  **No active transaction, and in a group**: Having
765  gtid_next=automatic/undefined in a group is impossible if master is 5.7
766  or later, because the group always starts with a Gtid_log_event or an
767  Anonymous_gtid_log_event, which will set gtid_next to anonymous or
768  gtid. But it is possible to have gtid_next=undefined when replicating
769  from a 5.6 master with gtid_mode=off, because it does not generate any
770  such event. And then, it is possible to have no active transaction in a
771  group if the master has logged a DDL as a User_var_log_event followed by
772  a Query_log_event. The User_var_log_event will start a group, but not
773  start an active transaction or change gtid_next. In this case, it is
774  possible that a Format_description_log_event occurs, if the group
775  (transaction) is broken on two relay logs, so that User_var_log_event
776  appears at the end of one relay log and Query_log_event at the beginning
777  of the next one. In such cases, we should not set gtid_next.
778 
779  **An active transaction exists**: It is possible to have
780  gtid_next=automatic/undefined in an active transaction, only if
781  gtid_next=automatic, which is only possible in a client connection using
782  gtid_next=automatic. In this scenario, there is no reason to execute a
783  Format_description_log_event. So we generate an error.
784  */
785  if (info_thd->variables.gtid_next.type == AUTOMATIC_GTID ||
786  info_thd->variables.gtid_next.type == UNDEFINED_GTID) {
787  bool in_active_multi_stmt =
788  info_thd->in_active_multi_stmt_transaction();
789 
790  if (!is_in_group() && !in_active_multi_stmt) {
791  DBUG_PRINT("info",
792  ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
793  info_thd->variables.gtid_next.set_not_yet_determined();
794  } else if (in_active_multi_stmt) {
795  my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
796  "gtid_next");
797  return 1;
798  }
799  }
800  adapt_to_master_version_updown(fdle->get_product_version(),
801  get_master_server_version());
802  }
803  if (rli_description_event) {
804  DBUG_ASSERT(rli_description_event->atomic_usage_counter > 0);
805 
806  if (--rli_description_event->atomic_usage_counter == 0) {
807  /* The being deleted by Worker FD can't be the latest one */
808  DBUG_ASSERT(rli_description_event !=
809  c_rli->get_rli_description_event());
810 
811  delete rli_description_event;
812  }
813  }
814  rli_description_event = fdle;
815 
816  return 0;
817  }
818 
819  inline void reset_gaq_index() { gaq_index = c_rli->gaq->size; }
820  inline void set_gaq_index(ulong val) {
821  if (gaq_index == c_rli->gaq->size) gaq_index = val;
822  }
823 
824  int slave_worker_exec_event(Log_event *ev);
825 
826  /**
827  Checks if the transaction can be retried, and if not, reports an error.
828 
829  @param[in] thd The THD object of current thread.
830 
831  @returns std::tuple<bool, bool, uint> where each element has
832  following meaning:
833 
834  first element of tuple is function return value and determines:
835  false if the transaction should be retried
836  true if the transaction should not be retried
837 
838  second element of tuple determines:
839  the function will set the value to true, in case the retry
840  should be "silent". Silent means that the caller should not
841  report it in performance_schema tables, write to the error log,
842  or sleep. Currently, silent is used by NDB only.
843 
844  third element of tuple determines:
845  If the caller should report any other error than that stored in
846  thd->get_stmt_da()->mysql_errno(), then this function will store
847  that error in this third element of the tuple.
848 
849  */
850  std::tuple<bool, bool, uint> check_and_report_end_of_retries(THD *thd);
851 
852  /**
853  It is called after an error happens. It checks if that is an temporary
854  error and if the transaction should be retried. Then it will retry the
855  transaction if it is allowed. Retry policy and logic is similar to
856  single-threaded slave.
857 
858  @param[in] start_relay_number The extension number of the relay log which
859  includes the first event of the transaction.
860  @param[in] start_relay_pos The offset of the transaction's first event.
861 
862  @param[in] end_relay_number The extension number of the relay log which
863  includes the last event it should retry.
864  @param[in] end_relay_pos The offset of the last event it should retry.
865 
866  @retval false if transaction succeeds (possibly after a number of retries)
867  @retval true if transaction fails
868  */
869  bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
870  uint end_relay_number, my_off_t end_relay_pos);
871 
872  bool set_info_search_keys(Rpl_info_handler *to);
873 
874  /**
875  Get coordinator's RLI. Especially used get the rli from
876  a slave thread, like this: thd->rli_slave->get_c_rli();
877  thd could be a SQL thread or a worker thread.
878  */
879  virtual Relay_log_info *get_c_rli() { return c_rli; }
880 
881  /**
882  return an extension "for channel channel_name"
883  for error messages per channel
884  */
885  const char *get_for_channel_str(bool upper_case = false) const;
886 
888  Slave_job_group *ptr_g = c_rli->gaq->get_job_group(gaq_index);
889  return ptr_g->sequence_number;
890  }
891 
892  /**
893  Return true if slave-preserve-commit-order is enabled and an
894  earlier transaction is waiting for a row-level lock held by this
895  transaction.
896  */
897  bool found_commit_order_deadlock();
898 
899  /**
900  Called when slave-preserve-commit-order is enabled, by the worker
901  processing an earlier transaction that waits on a row-level lock
902  held by this worker's transaction.
903  */
904  void report_commit_order_deadlock();
905 
906  /**
907  @return either the master server version as extracted from the last
908  installed Format_description_log_event, or when it was not
909  installed then the slave own server version.
910  */
912  return !get_rli_description_event()
914  : get_rli_description_event()->get_product_version();
915  }
916 
917  protected:
918  virtual void do_report(loglevel level, int err_code, const char *msg,
919  va_list v_args) const
920  MY_ATTRIBUTE((format(printf, 4, 0)));
921 
922  private:
923  ulong gaq_index; // GAQ index of the current assignment
924  ulonglong
925  master_log_pos; // event's cached log_pos for possibile error report
926  void end_info();
927  bool read_info(Rpl_info_handler *from);
928  bool write_info(Rpl_info_handler *to);
929  std::atomic<bool> m_commit_order_deadlock;
930 
931  Slave_worker &operator=(const Slave_worker &info);
933  bool worker_sleep(ulong seconds);
934  bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
935  uint end_relay_number, my_off_t end_relay_pos);
936  void assign_partition_db(Log_event *ev);
937 
938  void reset_commit_order_deadlock();
939 
940  public:
941  /**
942  Returns an array with the expected column numbers of the primary key
943  fields of the table repository.
944  */
945  static const uint *get_table_pk_field_indexes();
946  /**
947  Returns the index of the Channel_name field of the table repository.
948  */
949  static uint get_channel_field_index();
950 };
951 
952 void *head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
955  Slave_job_item *job_item);
956 
959 // Auxiliary function
961 
963  Relay_log_info *rli);
965 
967  return static_cast<Slave_worker *>(thd->rli_slave);
968 }
969 
971 
972 #endif
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1641
unsigned long long int ulonglong
Definition: my_inttypes.h:55
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:195
bool init_hash_workers(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:726
ulong wq_empty_waits
Definition: rpl_rli_pdb.h:553
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:602
void end_info(Master_info *mi)
Definition: rpl_slave.cc:1294
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:580
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:563
void resize(size_t n, const Element_type &val=Element_type())
Resizes the container so that it contains n elements.
Definition: prealloced_array.h:565
virtual Relay_log_info * get_c_rli()
Get coordinator&#39;s RLI.
Definition: rpl_rli_pdb.h:879
time_t ts
Definition: rpl_rli_pdb.h:198
enum_mts_parallel_type
Definition: rpl_mts_submode.h:46
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:534
ulong assigned_group_index
Definition: rpl_rli_pdb.h:352
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:306
~circular_buffer_queue()
Definition: rpl_rli_pdb.h:286
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:171
struct _entry entry
const char * db
Definition: rpl_rli_pdb.h:75
ulong excess_cnt
Definition: rpl_rli_pdb.h:596
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
Slave_worker * worker
Definition: rpl_rli_pdb.h:185
Definition: rpl_info_handler.h:57
Include file for Sun RPC to compile out of the box.
Some integer typedefs for easier portability.
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: my_error.cc:215
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:820
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry&#39;s table list head.
Definition: rpl_rli_pdb.cc:759
Slave_job_group()
Definition: rpl_rli_pdb.h:110
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:192
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:167
void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.cc:2096
Contains the classes representing events occurring in the replication stream.
struct MasterPos master_pos
void post_commit(bool on_rollback)
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:671
long usage_partition
Definition: rpl_rli_pdb.h:558
ulong de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:457
ulong size
Definition: rpl_rli_pdb.h:269
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
Definition: rpl_rli_pdb.cc:1103
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:550
ulong avail
Definition: rpl_rli_pdb.h:270
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:89
For binlog version 4.
Definition: log_event.h:1613
ulong de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:484
GTID_NEXT is set to this state after a transaction with GTID_NEXT==&#39;UUID:NUMBER&#39; is committed...
Definition: rpl_gtid.h:3594
volatile int curr_jobs
Definition: rpl_rli_pdb.h:556
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:235
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
Definition: rpl_rli_pdb.cc:888
static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
Definition: rpl_rli_pdb.cc:2068
ulonglong total_seqno
Definition: rpl_rli_pdb.h:186
bool inited_queue
Definition: rpl_rli_pdb.h:273
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Definition: my_memory.cc:81
ulong en_queue(Slave_job_group *item)
Assignes assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:384
bool commit_positions()
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:661
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:605
long usage
Definition: rpl_rli_pdb.h:81
ulong entry
Definition: rpl_rli_pdb.h:271
Instrumentation helpers for conditions.
#define PSI_INSTRUMENT_ME
Definition: psi_base.h:44
Definition: table.h:1306
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:49
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:336
Performance schema instrumentation interface.
void destroy_hash_workers(Relay_log_info *)
Definition: rpl_rli_pdb.cc:737
ulong groups_done
Definition: rpl_rli_pdb.h:555
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:654
circular_buffer_queue()
Definition: rpl_rli_pdb.h:285
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:181
longlong sequence_number()
Definition: rpl_rli_pdb.h:887
Slave_worker * worker
Definition: rpl_rli_pdb.h:76
#define DBUG_ASSERT(A)
Definition: my_dbug.h:199
longlong sequence_number
Definition: rpl_rli_pdb.h:204
Legends running throughout the module:
Definition: rpl_rli_pdb.h:73
ulong server_version
Definition: rpl_rli_pdb.h:610
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance...
Definition: control_events.cpp:180
Definition: rpl_rli.h:165
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:634
#define MYF(v)
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:116
Slave_job_group * get_job_group(ulong ind)
Definition: rpl_rli_pdb.h:375
char * group_relay_log_name
Definition: rpl_rli_pdb.h:182
Definition: my_bitmap.h:41
Slave_job_group lwm
Definition: rpl_rli_pdb.h:346
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:513
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:268
bool full()
Definition: rpl_rli_pdb.h:328
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:194
This is the abstract base class for binary log events.
Definition: log_event.h:659
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:604
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:560
Header for compiler-dependent features.
ulong shifted
Definition: rpl_rli_pdb.h:197
bool empty()
Definition: rpl_rli_pdb.h:327
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:533
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
Definition: rpl_rli_pdb.cc:2458
unsigned int uint
Definition: uca-dump.cc:29
Error information structure.
Definition: rpl_reporting.h:98
#define FN_REFLEN
Definition: my_io.h:82
long long int longlong
Definition: my_inttypes.h:54
volatile ulong len
Definition: rpl_rli_pdb.h:272
Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return a job item through a struct which point is supplied via argument.
Definition: rpl_rli_pdb.cc:2112
bool exit_incremented
Definition: rpl_rli_pdb.h:640
bool fd_change_notified
Definition: rpl_rli_pdb.h:577
longlong last_committed
Definition: rpl_rli_pdb.h:203
char msg[1024]
Definition: test_sql_9_sessions.cc:281
#define MTS_WORKER_UNDEF
Definition: rpl_slave.h:81
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:535
Slave_jobs_queue()
Definition: rpl_rli_pdb.h:507
ulong gaq_index
Definition: rpl_rli_pdb.h:923
int set_rli_description_event(Format_description_log_event *fdle)
The method runs at Worker initalization, at runtime when Coordinator supplied a new FD event for exec...
Definition: rpl_rli_pdb.h:714
Instrumentation helpers for mutexes.
ulong worker_id
Definition: rpl_rli_pdb.h:184
DBUG_TRACE
Definition: do_ctype.cc:46
ulonglong my_off_t
Definition: my_inttypes.h:71
ulong en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value in beyond of the legacy ra...
Definition: rpl_rli_pdb.h:419
bool inited
Definition: rpl_rli_pdb.h:338
#define HAVE_PSI_INTERFACE
Definition: my_psi_config.h:38
int n
Definition: xcom_base.c:425
Definition of the global "loglevel" enumeration.
Binary log event definitions.
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:137
double seconds()
Definition: task.c:298
ulong overrun_level
Definition: rpl_rli_pdb.h:586
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:606
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:188
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:141
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1169
static char * server_version
Definition: mysql.cc:107
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:542
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:196
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:562
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:316
Specifies that the GTID has not been generated yet; it will be generated on commit.
Definition: rpl_gtid.h:3542
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
bool in(ulong k)
Definition: rpl_rli_pdb.h:323
std::atomic< bool > m_commit_order_deadlock
Definition: rpl_rli_pdb.h:929
ulong id
Definition: rpl_rli_pdb.h:544
~Slave_committed_queue()
Definition: rpl_rli_pdb.h:356
Definition: rpl_rli_pdb.h:516
ulong underrun_level
Definition: rpl_rli_pdb.h:591
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:608
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:552
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:925
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:655
Slave_committed_queue * gaq
Definition: rpl_rli.h:1101
ABI for instrumented mutexes.
Common #defines and includes for file and socket I/O.
Definition: rpl_rli_pdb.h:109
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:103
loglevel
Definition: my_loglevel.h:32
char * group_master_log_name
Definition: rpl_rli_pdb.h:167
bool overfill
Definition: rpl_rli_pdb.h:512
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:191
Definition: rpl_rli.h:88
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:565
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:183
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:230
Log info(cout, "NOTE")
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:911
bool notified
Definition: rpl_rli_pdb.h:200
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:190
The class defines a type of queue with a predefined max size that is implemented using the circular m...
Definition: rpl_rli_pdb.h:266
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:539
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:536
bool reserve(size_t n)
Reserves space for array elements.
Definition: prealloced_array.h:271
ulong events_done
Definition: rpl_rli_pdb.h:554
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
Definition: rpl_rli_pdb.cc:793
Definition: completion_hash.h:34
#define false
Definition: config_static.h:43
ulong de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:396
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:349
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
Definition: rpl_rli_pdb.cc:2146
circular_buffer_queue(ulong max)
Definition: rpl_rli_pdb.h:275
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:578
ulong de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:407
Slave_worker * get_thd_worker(THD *thd)
Definition: rpl_rli_pdb.h:966
Definition: rpl_rli_pdb.h:505
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_class.h:765
en_running_state
Definition: rpl_rli_pdb.h:611
uint db_len
Definition: rpl_rli_pdb.h:74
Relay_log_info * rli_slave
Definition: sql_class.h:894
void reset_gaq_index()
Definition: rpl_rli_pdb.h:819
ulong w_rr
Definition: rpl_rli_pdb.cc:76