MySQL  8.0.18
Source Code Documentation
rpl_rli_pdb.h
Go to the documentation of this file.
1 /* Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef RPL_RLI_PDB_H
24 #define RPL_RLI_PDB_H
25 
26 #include <stdarg.h>
27 #include <sys/types.h>
28 #include <time.h>
29 #include <atomic>
30 
32 #include "my_bitmap.h"
33 #include "my_compiler.h"
34 #include "my_dbug.h"
35 #include "my_inttypes.h"
36 #include "my_io.h"
37 #include "my_loglevel.h"
38 #include "my_psi_config.h"
42 #include "mysql/psi/psi_base.h"
44 #include "prealloced_array.h" // Prealloced_array
45 #include "sql/log_event.h" // Format_description_log_event
46 #include "sql/rpl_gtid.h"
47 #include "sql/rpl_mts_submode.h" // enum_mts_parallel_type
48 #include "sql/rpl_rli.h" // Relay_log_info
49 #include "sql/rpl_slave.h" // MTS_WORKER_UNDEF
50 #include "sql/sql_class.h"
51 #include "sql/system_variables.h"
52 
53 class Rpl_info_handler;
54 class Slave_worker;
55 struct TABLE;
56 
57 #ifndef DBUG_OFF
58 extern ulong w_rr;
59 #endif
60 /**
61  Legends running throughout the module:
62 
63  C - Coordinator
64  CP - checkpoint
65  W - Worker
66 
67  B-event event that Begins a group (a transaction)
68  T-event event that Terminates a group (a transaction)
69 */
70 
71 /* Assigned Partition Hash (APH) entry */
74  const char *db;
76  /*
77  The number of transaction pending on this database.
78  This should only be modified under the lock slave_worker_hash_lock.
79  */
80  long usage;
81  /*
82  The list of temp tables belonging to @ db database is
83  attached to an assigned @c worker to become its thd->temporary_tables.
84  The list is updated with every ddl incl CREATE, DROP.
85  It is removed from the entry and merged to the coordinator's
86  thd->temporary_tables in case of events: slave stops, APH oversize.
87  */
89 
90  /* todo: relax concurrency to mimic record-level locking.
91  That is to augmenting the entry with mutex/cond pair
92  pthread_mutex_t
93  pthread_cond_t
94  timestamp updated_at; */
95 };
96 
99 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
100  db_worker_hash_entry **ptr_entry,
101  bool need_temp_tables, Slave_worker *w);
103  Slave_worker_array *workers,
104  Log_event *ev);
105 
106 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
107 
110 
111  /*
112  We need a custom copy constructor and assign operator because std::atomic<T>
113  is not copy-constructible.
114  */
120  worker_id(other.worker_id),
121  worker(other.worker),
122  total_seqno(other.total_seqno),
129  done(other.done.load()),
130  shifted(other.shifted),
131  ts(other.ts),
132 #ifndef DBUG_OFF
133  notified(other.notified),
134 #endif
137  new_fd_event(other.new_fd_event) {
138  }
139 
145  worker_id = other.worker_id;
146  worker = other.worker;
147  total_seqno = other.total_seqno;
154  done.store(other.done.load());
155  shifted = other.shifted;
156  ts = other.ts;
157 #ifndef DBUG_OFF
158  notified = other.notified;
159 #endif
162  new_fd_event = other.new_fd_event;
163  return *this;
164  }
165 
166  char *group_master_log_name; // (actually redundant)
167  /*
168  T-event lop_pos filled by Worker for CheckPoint (CP)
169  */
171 
172  /*
173  When relay-log name changes allocates and fill in a new name of relay-log,
174  otherwise it fills in NULL.
175  Coordinator keeps track of each Worker has been notified on the updating
176  to make sure the routine runs once per change.
177 
178  W checks the value at commit and memoriezes a not-NULL.
179  Freeing unless NULL is left to Coordinator at CP.
180  */
181  char *group_relay_log_name; // The value is last seen relay-log
186 
187  my_off_t master_log_pos; // B-event log_pos
188  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
190  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
192  my_off_t
193  checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
195  std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
196  ulong shifted; // shift the last CP bitmap at receiving a new CP
197  time_t ts; // Group's timestampt to update Seconds_behind_master
198 #ifndef DBUG_OFF
199  bool notified{false}; // to debug group_master_log_name change notification
200 #endif
201  /* Clock-based scheduler requirement: */
202  longlong last_committed; // commit parent timestamp
203  longlong sequence_number; // transaction's logical timestamp
204  /*
205  After Coordinator has seen a new FD event, it sets this member to
206  point to the new event, once per worker. Coordinator does so
207  when it schedules a first group following the FD event to a worker.
208  It checks Slave_worker::fd_change_notified flag to decide whether
209  to do this or not.
210  When the worker executes the group, it replaces its currently
211  active FD by the new FD once it takes on the group first event. It
212  checks this member and resets it after the FD replacement is done.
213 
214  The member is kind of lock-free. It's updated by Coordinator and
215  read by Worker without holding any mutex. That's still safe thanks
216  to Slave_worker::jobs_lock that works as synchronizer, Worker
217  can't read any stale info.
218  The member is updated by Coordinator when it decides which Worker
219  an event following a new FD is to be scheduled.
220  After Coordinator has chosen a Worker, it queues the event to it
221  with necessarily taking Slave_worker::jobs_lock. The Worker grabs
222  the mutex lock later at pulling the event from the queue and
223  releases the lock before to read from this member.
224 
225  This sequence of actions shows the write operation always precedes
226  the read one, and ensures no stale FD info is passed to the
227  Worker.
228  */
230  /*
231  Coordinator fills the struct with defaults and options at starting of
232  a group distribution.
233  */
237  group_master_log_name = nullptr; // todo: remove
238  group_relay_log_name = nullptr;
240  total_seqno = seqno;
241  checkpoint_log_name = nullptr;
242  checkpoint_log_pos = 0;
243  checkpoint_relay_log_name = nullptr;
245  checkpoint_seqno = (uint)-1;
246  done = 0;
247  ts = 0;
248 #ifndef DBUG_OFF
249  notified = false;
250 #endif
253  new_fd_event = nullptr;
254  }
255 };
256 
257 /**
258  The class defines a type of queue with a predefined max size that is
259  implemented using the circular memory buffer.
260  That is items of the queue are accessed as indexed elements of
261  the array buffer in a way that when the index value reaches
262  a max value it wraps around to point to the first buffer element.
263 */
264 template <typename Element_type>
266  public:
268  ulong size; // the Size of the queue in terms of element
269  ulong avail; // first Available index to append at (next to tail)
270  ulong entry; // the head index or the entry point to the queue.
271  volatile ulong len; // actual length
273 
276  size(max),
277  avail(0),
278  entry(max),
279  len(0),
281  if (!m_Q.reserve(size)) inited_queue = true;
282  m_Q.resize(size);
283  }
286 
287  /**
288  Content of the being dequeued item is copied to the arg-pointer
289  location.
290 
291  @param [out] item A pointer to the being dequeued item.
292  @return the queue's array index that the de-queued item
293  located at, or
294  an error encoded in beyond the index legacy range.
295  */
296  ulong de_queue(Element_type *item);
297  /**
298  Similar to de_queue but extracting happens from the tail side.
299 
300  @param [out] item A pointer to the being dequeued item.
301  @return the queue's array index that the de-queued item
302  located at, or an error.
303  */
304  ulong de_tail(Element_type *item);
305 
306  /**
307  return the index where the arg item locates
308  or an error encoded as a value in beyond of the legacy range
309  [0, size) (value `size' is excluded).
310  */
311  ulong en_queue(Element_type *item);
312  /**
313  return the value of @c data member of the head of the queue.
314  */
315  Element_type *head_queue() {
316  if (empty()) return nullptr;
317  return &m_Q[entry];
318  }
319 
320  bool gt(ulong i, ulong k); // comparision of ordering of two entities
321  /* index is within the valid range */
322  bool in(ulong k) {
323  return !empty() && (entry > avail ? (k >= entry || k < avail)
324  : (k >= entry && k < avail));
325  }
326  bool empty() { return entry == size; }
327  bool full() { return avail == size; }
328 };
329 
330 /**
331  Group Assigned Queue whose first element identifies first gap
332  in committed sequence. The head of the queue is therefore next to
333  the low-water-mark.
334 */
335 class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
336  public:
337  bool inited;
338 
339  /* master's Rot-ev exec */
340  void update_current_binlog(const char *post_rotate);
341 
342  /*
343  The last checkpoint time Low-Water-Mark
344  */
346 
347  /* last time processed indexes for each worker */
349 
350  /* the being assigned group index in GAQ */
352 
354 
356  if (inited) {
358  free_dynamic_items(); // free possibly left allocated strings in GAQ list
359  }
360  }
361 
362 #ifndef DBUG_OFF
363  bool count_done(Relay_log_info *rli);
364 #endif
365 
366  /* Checkpoint routine refreshes the queue */
368  /* Method is for slave shutdown time cleanup */
369  void free_dynamic_items();
370  /*
371  returns a pointer to Slave_job_group struct instance as indexed by arg
372  in the circular buffer dyn-array
373  */
375  DBUG_ASSERT(ind < size);
376  return &m_Q[ind];
377  }
378 
379  /**
380  Assignes @c assigned_group_index to an index of enqueued item
381  and returns it.
382  */
384  return assigned_group_index =
386  }
387 
388  /**
389  Dequeue from head.
390 
391  @param [out] item A pointer to the being dequeued item.
392  @return The queue's array index that the de-queued item located at,
393  or an error encoded in beyond the index legacy range.
394  */
397  }
398 
399  /**
400  Similar to de_queue() but removing an item from the tail side.
401 
402  @param [out] item A pointer to the being dequeued item.
403  @return the queue's array index that the de-queued item
404  located at, or an error.
405  */
408  }
409 
411 };
412 
413 /**
414  @return the index where the arg item has been located
415  or an error.
416 */
417 template <typename Element_type>
419  ulong ret;
420  if (avail == size) {
421  DBUG_ASSERT(avail == m_Q.size());
422  return (ulong)-1;
423  }
424 
425  // store
426 
427  ret = avail;
428  m_Q[avail] = *item;
429 
430  // pre-boundary cond
431  if (entry == size) entry = avail;
432 
433  avail = (avail + 1) % size;
434  len++;
435 
436  // post-boundary cond
437  if (avail == entry) avail = size;
438 
439  DBUG_ASSERT(avail == entry || len == (avail >= entry)
440  ? (avail - entry)
441  : (size + avail - entry));
442  DBUG_ASSERT(avail != entry);
443 
444  return ret;
445 }
446 
447 /**
448  Dequeue from head.
449 
450  @param [out] item A pointer to the being dequeued item.
451  @return the queue's array index that the de-queued item
452  located at, or an error as an int outside the legacy
453  [0, size) (value `size' is excluded) range.
454 */
455 template <typename Element_type>
457  ulong ret;
458  if (entry == size) {
459  DBUG_ASSERT(len == 0);
460  return (ulong)-1;
461  }
462 
463  ret = entry;
464  *item = m_Q[entry];
465  len--;
466 
467  // pre boundary cond
468  if (avail == size) avail = entry;
469  entry = (entry + 1) % size;
470 
471  // post boundary cond
472  if (avail == entry) entry = size;
473 
474  DBUG_ASSERT(
475  entry == size ||
476  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
477  DBUG_ASSERT(avail != entry);
478 
479  return ret;
480 }
481 
482 template <typename Element_type>
484  if (entry == size) {
485  DBUG_ASSERT(len == 0);
486  return (ulong)-1;
487  }
488 
489  avail = (entry + len - 1) % size;
490  *item = m_Q[avail];
491  len--;
492 
493  // post boundary cond
494  if (avail == entry) entry = size;
495 
496  DBUG_ASSERT(
497  entry == size ||
498  (len == (avail >= entry) ? (avail - entry) : (size + avail - entry)));
499  DBUG_ASSERT(avail != entry);
500 
501  return avail;
502 }
503 
504 class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
505  public:
507  /*
508  Coordinator marks with true, Worker signals back at queue back to
509  available
510  */
511  bool overfill;
513 };
514 
515 class Slave_worker : public Relay_log_info {
516  public:
518 #ifdef HAVE_PSI_INTERFACE
519  PSI_mutex_key *param_key_info_run_lock,
520  PSI_mutex_key *param_key_info_data_lock,
521  PSI_mutex_key *param_key_info_sleep_lock,
522  PSI_mutex_key *param_key_info_thd_lock,
523  PSI_mutex_key *param_key_info_data_cond,
524  PSI_mutex_key *param_key_info_start_cond,
525  PSI_mutex_key *param_key_info_stop_cond,
526  PSI_mutex_key *param_key_info_sleep_cond,
527 #endif
528  uint param_id, const char *param_channel);
529 
530  virtual ~Slave_worker();
531 
532  Slave_jobs_queue jobs; // assignment queue containing events to execute
533  mysql_mutex_t jobs_lock; // mutex for the jobs queue
534  mysql_cond_t jobs_cond; // condition variable for the jobs queue
535  Relay_log_info *c_rli; // pointer to Coordinator's rli
536 
538  curr_group_exec_parts; // Current Group Executed Partitions
539 
540 #ifndef DBUG_OFF
541  bool curr_group_seen_sequence_number; // is set to true about starts_group()
542 #endif
543  ulong id; // numberic identifier of the Worker
544 
545  /*
546  Worker runtime statictics
547  */
548  // the index in GAQ of the last processed group by this Worker
550  ulonglong
551  last_groups_assigned_index; // index of previous group assigned to worker
552  ulong wq_empty_waits; // how many times got idle
553  ulong events_done; // how many events (statements) processed
554  ulong groups_done; // how many groups (transactions) processed
555  volatile int curr_jobs; // number of active assignments
556  // number of partitions allocated to the worker at point in time
558  // symmetric to rli->mts_end_group_sets_max_dbs
560 
561  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
562  volatile bool checkpoint_notified; // Coord sets and resets, W can read
563  volatile bool
564  master_log_change_notified; // Coord sets and resets, W can read
565  /*
566  The variable serves to Coordinator as a memo to itself
567  to notify a Worker about the fact that a new FD has been read.
568  Normally, the value is true, to mean the Worker is notified.
569  When Coordinator reads a new FD it changes the value to false.
570  When Coordinator schedules to a Worker the first event following the new FD,
571  it propagates the new FD to the Worker through
572  Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
573  to the regular true, to denote things done. Worker will adapt to the new FD
574  once it takes on a first event of the marked group.
575  */
577  ulong bitmap_shifted; // shift the last bitmap at receiving new CP
578  // WQ current excess above the overrun level
580  /*
581  number of events starting from which Worker queue is regarded as
582  close to full. The number of the excessive events yields a weight factor
583  to compute Coordinator's nap.
584  */
586  /*
587  reverse to overrun: the number of events below which Worker is
588  considered underruning
589  */
591  /*
592  Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
593  When WQ length is dropped below overrun the counter is reset.
594  */
596  /*
597  Coordinates of the last CheckPoint (CP) this Worker has
598  acknowledged; part of is persisent data
599  */
604  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
605  MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
606  ulong
607  worker_checkpoint_seqno; // the most significant ON bit in group_executed
608  /* Initial value of FD-for-execution version until it's gets known. */
612  RUNNING = 1,
613  ERROR_LEAVING = 2, // is set by Worker
614  STOP = 3, // is set by Coordinator upon reciving STOP
616  4 // is set by worker upon completing job when STOP SLAVE is issued
617  };
618 
619  /*
620  This function is used to make a copy of the worker object before we
621  destroy it on STOP SLAVE. This new object is then used to report the
622  worker status until next START SLAVE following which the new worker objetcs
623  will be used.
624  */
626  THD *worker_thd, const Error &last_error,
627  Gtid_monitoring_info *monitoring_info_arg);
628 
629  /*
630  The running status is guarded by jobs_lock mutex that a writer
631  Coordinator or Worker itself needs to hold when write a new value.
632  */
634  /*
635  exit_incremented indicates whether worker has contributed to max updated
636  index. By default it is set to false. When the worker contibutes for the
637  first time this variable is set to true.
638  */
640 
642  int rli_init_info(bool);
643  int flush_info(bool force = false);
644  static size_t get_number_worker_fields();
645  /**
646  Sets bits for columns that are allowed to be `NULL`.
647 
648  @param nullable_fields the bitmap to hold the nullable fields.
649  */
650  static void set_nullable_fields(MY_BITMAP *nullable_fields);
651  void slave_worker_ends_group(Log_event *, int);
652  const char *get_master_log_name();
655  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
656  /**
657  The method is a wrapper to provide uniform interface with STS and is
658  to be called from Relay_log_info and Slave_worker pre_commit() methods.
659  */
662 
663  return commit_positions(
665  is_transactional());
666  }
667  /**
668  See the comments for STS version of this method.
669  */
670  void post_commit(bool on_rollback) {
671  if (on_rollback) {
672  if (is_transactional())
675  } else if (!is_transactional())
678  true);
679  }
680  /*
681  When commit fails clear bitmap for executed worker group. Revert back the
682  positions to the old positions that existed before commit using the
683  checkpoint.
684 
685  @param Slave_job_group a pointer to Slave_job_group struct instance which
686  holds group master log pos, group relay log pos and checkpoint positions.
687  */
688  void rollback_positions(Slave_job_group *ptr_g);
689  bool reset_recovery_info();
690  /**
691  The method runs at Worker initalization, at runtime when
692  Coordinator supplied a new FD event for execution context, and at
693  the Worker pool shutdown.
694  Similarly to the Coordinator's
695  Relay_log_info::set_rli_description_event() the possibly existing
696  old FD is destoyed, carefully; each worker decrements
697  Format_description_log_event::atomic_usage_counter and when it is made
698  zero the destructor runs.
699  Unlike to Coordinator's role, the usage counter of the new FD is *not*
700  incremented, see @c Log_event::get_slave_worker() where and why it's done
701  there.
702 
703  Notice, the method is run as well by Coordinator per each Worker at MTS
704  shutdown time.
705 
706  Todo: consider to merge logics of the method with that of
707  Relay_log_info class.
708 
709  @param fdle pointer to a new Format_description_log_event
710 
711  @return 1 if an error was encountered, 0 otherwise.
712  */
714  DBUG_TRACE;
715 
716  if (fdle) {
717  /*
718  When the master rotates its binary log, set gtid_next to
719  NOT_YET_DETERMINED. This tells the slave thread that:
720 
721  - If a Gtid_log_event is read subsequently, gtid_next will be set to the
722  given GTID (this is done in gtid_pre_statement_checks()).
723 
724  - If a statement is executed before any Gtid_log_event, then gtid_next
725  is set to anonymous (this is done in Gtid_log_event::do_apply_event().
726 
727  It is imporant to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
728  a transaction. If that would happen when GTID_MODE=ON, the next
729  statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
730  which is disallowed when GTID_MODE=ON. So then there would be no way to
731  end the transaction; any attempt to do so would result in this error.
732 
733  There are three possible states when reaching this execution flow point
734  (see further below for a more detailed explanation on each):
735 
736  - **No active transaction, and not in a group**: set `gtid_next` to
737  `NOT_YET_DETERMINED`.
738 
739  - **No active transaction, and in a group**: do nothing regarding
740  `gtid_next`.
741 
742  - **An active transaction exists**: impossible to set `gtid_next` and no
743  reason to process the `Format_description` event so, trigger an error.
744 
745  For the sake of correctness, let's defined the meaning of having a
746  transaction "active" or "in a group".
747 
748  A transaction is "active" if either BEGIN was executed or autocommit=0
749  and a DML statement was executed (@see
750  THD::in_active_multi_stmt_transaction).
751 
752  A transaction is "in a group" if it is applied by the replication
753  applier, and the relay log position is between Gtid_log_event and the
754  committing event (@see Relay_log_info::is_in_group).
755 
756  The three different states explained further:
757 
758  **No active transaction, and not in a group**: It is normal to have
759  gtid_next=automatic/undefined and have a Format_description_log_event in
760  this condition. We are outside transaction context and should set
761  gtid_next to not_yet_determined.
762 
763  **No active transaction, and in a group**: Having
764  gtid_next=automatic/undefined in a group is impossible if master is 5.7
765  or later, because the group always starts with a Gtid_log_event or an
766  Anonymous_gtid_log_event, which will set gtid_next to anonymous or
767  gtid. But it is possible to have gtid_next=undefined when replicating
768  from a 5.6 master with gtid_mode=off, because it does not generate any
769  such event. And then, it is possible to have no active transaction in a
770  group if the master has logged a DDL as a User_var_log_event followed by
771  a Query_log_event. The User_var_log_event will start a group, but not
772  start an active transaction or change gtid_next. In this case, it is
773  possible that a Format_description_log_event occurs, if the group
774  (transaction) is broken on two relay logs, so that User_var_log_event
775  appears at the end of one relay log and Query_log_event at the beginning
776  of the next one. In such cases, we should not set gtid_next.
777 
778  **An active transaction exists**: It is possible to have
779  gtid_next=automatic/undefined in an active transaction, only if
780  gtid_next=automatic, which is only possible in a client connection using
781  gtid_next=automatic. In this scenario, there is no reason to execute a
782  Format_description_log_event. So we generate an error.
783  */
786  bool in_active_multi_stmt =
788 
789  if (!is_in_group() && !in_active_multi_stmt) {
790  DBUG_PRINT("info",
791  ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
793  } else if (in_active_multi_stmt) {
794  my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
795  "gtid_next");
796  return 1;
797  }
798  }
801  }
802  if (rli_description_event) {
804 
806  /* The being deleted by Worker FD can't be the latest one */
809 
810  delete rli_description_event;
811  }
812  }
813  rli_description_event = fdle;
814 
815  return 0;
816  }
817 
818  inline void reset_gaq_index() { gaq_index = c_rli->gaq->size; }
819  inline void set_gaq_index(ulong val) {
820  if (gaq_index == c_rli->gaq->size) gaq_index = val;
821  }
822 
824  bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
825  uint end_relay_number, my_off_t end_relay_pos);
826 
828 
829  /**
830  Get coordinator's RLI. Especially used get the rli from
831  a slave thread, like this: thd->rli_slave->get_c_rli();
832  thd could be a SQL thread or a worker thread.
833  */
834  virtual Relay_log_info *get_c_rli() { return c_rli; }
835 
836  /**
837  return an extension "for channel channel_name"
838  for error messages per channel
839  */
840  const char *get_for_channel_str(bool upper_case = false) const;
841 
844  return ptr_g->sequence_number;
845  }
846 
849  /**
850  @return either the master server version as extracted from the last
851  installed Format_description_log_event, or when it was not
852  installed then the slave own server version.
853  */
855  return !get_rli_description_event()
858  }
859 
860  protected:
861  virtual void do_report(loglevel level, int err_code, const char *msg,
862  va_list v_args) const
863  MY_ATTRIBUTE((format(printf, 4, 0)));
864 
865  private:
866  ulong gaq_index; // GAQ index of the current assignment
867  ulonglong
868  master_log_pos; // event's cached log_pos for possibile error report
869  void end_info();
870  bool read_info(Rpl_info_handler *from);
871  bool write_info(Rpl_info_handler *to);
873 
876  bool worker_sleep(ulong seconds);
877  bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
878  uint end_relay_number, my_off_t end_relay_pos);
879  void assign_partition_db(Log_event *ev);
880 
882 
883  public:
884  /**
885  Returns an array with the expected column numbers of the primary key
886  fields of the table repository.
887  */
888  static const uint *get_table_pk_field_indexes();
889  /**
890  Returns the index of the Channel_name field of the table repository.
891  */
892  static uint get_channel_field_index();
893 };
894 
895 void *head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
896 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
898  Slave_job_item *job_item);
899 
902 // Auxiliary function
904 
906  Relay_log_info *rli);
908 
910  return static_cast<Slave_worker *>(thd->rli_slave);
911 }
912 
914 
915 #endif
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1536
char checkpoint_relay_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:600
unsigned long long int ulonglong
Definition: my_inttypes.h:55
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:194
bool init_hash_workers(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:719
char checkpoint_master_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:602
ulong wq_empty_waits
Definition: rpl_rli_pdb.h:552
void rollback_positions(Slave_job_group *ptr_g)
Definition: rpl_rli_pdb.cc:688
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:601
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:579
Slave_committed_queue(ulong max, uint n)
Definition: rpl_rli_pdb.cc:1288
bool reset_recovery_info()
Clean up a part of Worker info table that is regarded in in gaps collecting at recovery.
Definition: rpl_rli_pdb.cc:586
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:562
void resize(size_t n, const Element_type &val=Element_type())
Resizes the container so that it contains n elements.
Definition: prealloced_array.h:536
virtual Relay_log_info * get_c_rli()
Get coordinator&#39;s RLI.
Definition: rpl_rli_pdb.h:834
void end_info()
Definition: rpl_rli_pdb.cc:443
bool set_info_search_keys(Rpl_info_handler *to)
To search in the slave repositories, each slave info object (mi, rli or worker) should use a primary ...
Definition: rpl_rli_pdb.cc:544
Class representing an error.
Definition: error.h:47
time_t ts
Definition: rpl_rli_pdb.h:197
enum_mts_parallel_type
Definition: rpl_mts_submode.h:44
bool found_order_commit_deadlock()
Definition: rpl_rli_pdb.h:847
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:533
ulong assigned_group_index
Definition: rpl_rli_pdb.h:351
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:306
~circular_buffer_queue()
Definition: rpl_rli_pdb.h:285
static void set_nullable_fields(MY_BITMAP *nullable_fields)
Sets bits for columns that are allowed to be NULL.
Definition: rpl_rli_pdb.cc:599
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:170
bool is_transactional() const
Definition: rpl_info.h:110
void slave_worker_ends_group(Log_event *, int)
Deallocation routine to cancel out few effects of map_db_to_worker().
Definition: rpl_rli_pdb.cc:1097
struct _entry entry
const char * db
Definition: rpl_rli_pdb.h:74
ulong excess_cnt
Definition: rpl_rli_pdb.h:595
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
Slave_worker * worker
Definition: rpl_rli_pdb.h:184
Definition: rpl_info_handler.h:56
bool count_done(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:1306
Include file for Sun RPC to compile out of the box.
Some integer typedefs for easier portability.
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: my_error.cc:215
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:819
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry&#39;s table list head.
Definition: rpl_rli_pdb.cc:752
Slave_job_group()
Definition: rpl_rli_pdb.h:109
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:191
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:167
void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.cc:2047
Contains the classes representing events occurring in the replication stream.
struct MasterPos master_pos
void post_commit(bool on_rollback)
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:670
long usage_partition
Definition: rpl_rli_pdb.h:557
ulong de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:456
ulong size
Definition: rpl_rli_pdb.h:268
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
Definition: rpl_rli_pdb.cc:1081
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:549
ulong avail
Definition: rpl_rli_pdb.h:269
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:88
struct System_variables variables
Definition: sql_class.h:957
Definition: rpl_rli_pdb.h:612
For binlog version 4.
Definition: log_event.h:1458
bool read_info(Rpl_info_handler *from)
Definition: rpl_rli_pdb.cc:481
ulong de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:483
GTID_NEXT is set to this state after a transaction with GTID_NEXT==&#39;UUID:NUMBER&#39; is committed...
Definition: rpl_gtid.h:3583
volatile int curr_jobs
Definition: rpl_rli_pdb.h:555
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:234
void set_not_yet_determined()
Set the type to NOT_YET_DETERMINED_GTID.
Definition: rpl_gtid.h:3642
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
Definition: rpl_rli_pdb.cc:881
ulonglong total_seqno
Definition: rpl_rli_pdb.h:185
bool inited_queue
Definition: rpl_rli_pdb.h:272
static size_t get_number_worker_fields()
Definition: rpl_rli_pdb.cc:595
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Definition: my_memory.cc:81
ulong en_queue(Slave_job_group *item)
Assignes assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:383
bool commit_positions()
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:660
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:604
long usage
Definition: rpl_rli_pdb.h:80
ulong entry
Definition: rpl_rli_pdb.h:270
THD * info_thd
Definition: rpl_info.h:77
Instrumentation helpers for conditions.
bool m_order_commit_deadlock
Definition: rpl_rli_pdb.h:872
#define PSI_INSTRUMENT_ME
Definition: psi_base.h:44
Definition: table.h:1301
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:49
void assign_partition_db(Log_event *ev)
Definition: rpl_rli_pdb.cc:2004
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:335
Performance schema instrumentation interface.
void destroy_hash_workers(Relay_log_info *)
Definition: rpl_rli_pdb.cc:730
ulong groups_done
Definition: rpl_rli_pdb.h:554
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:653
circular_buffer_queue()
Definition: rpl_rli_pdb.h:284
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:179
longlong sequence_number()
Definition: rpl_rli_pdb.h:842
Slave_worker * worker
Definition: rpl_rli_pdb.h:75
A typesafe replacement for DYNAMIC_ARRAY.
Definition: prealloced_array.h:66
#define DBUG_ASSERT(A)
Definition: my_dbug.h:197
longlong sequence_number
Definition: rpl_rli_pdb.h:203
Legends running throughout the module:
Definition: rpl_rli_pdb.h:72
Definition: rpl_rli_pdb.h:614
ulong server_version
Definition: rpl_rli_pdb.h:609
std::atomic< int32 > atomic_usage_counter
Definition: log_event.h:1477
const char * get_master_log_name()
Definition: rpl_rli_pdb.cc:605
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance...
Definition: control_events.cpp:176
Definition: rpl_rli.h:164
void report_order_commit_deadlock()
Definition: rpl_rli_pdb.h:848
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:633
Gtid_specification gtid_next
Definition: system_variables.h:316
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:115
const char * get_for_channel_str(bool upper_case=false) const
return an extension "for channel channel_name" for error messages per channel
Definition: rpl_rli_pdb.cc:2573
Slave_job_group * get_job_group(ulong ind)
Definition: rpl_rli_pdb.h:374
char * group_relay_log_name
Definition: rpl_rli_pdb.h:181
Slave_worker & operator=(const Slave_worker &info)
Definition: my_bitmap.h:42
Slave_job_group lwm
Definition: rpl_rli_pdb.h:345
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:512
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:267
bool full()
Definition: rpl_rli_pdb.h:327
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:193
This is the abstract base class for binary log events.
Definition: log_event.h:517
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:603
Definition: rpl_rli_pdb.h:613
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:559
Header for compiler-dependent features.
ulong shifted
Definition: rpl_rli_pdb.h:196
bool empty()
Definition: rpl_rli_pdb.h:326
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:532
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
bool in_active_multi_stmt_transaction() const
true if the session is in a multi-statement transaction mode (
Definition: sql_class.h:2768
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
Definition: rpl_rli_pdb.cc:2409
unsigned int uint
Definition: uca-dump.cc:29
Definition: rpl_rli_pdb.h:611
#define FN_REFLEN
Definition: my_io.h:82
int rli_init_info(bool)
A part of Slave worker iitializer that provides a minimum context for MTS recovery.
Definition: rpl_rli_pdb.cc:396
long long int longlong
Definition: my_inttypes.h:54
#define MYF(v)
Definition: my_inttypes.h:114
volatile ulong len
Definition: rpl_rli_pdb.h:271
Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
return a job item through a struct which point is supplied via argument.
Definition: rpl_rli_pdb.cc:2063
bool exit_incremented
Definition: rpl_rli_pdb.h:639
bool fd_change_notified
Definition: rpl_rli_pdb.h:576
longlong last_committed
Definition: rpl_rli_pdb.h:202
char msg[1024]
Definition: test_sql_9_sessions.cc:281
bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
It is called after an error happens.
Definition: rpl_rli_pdb.cc:1774
#define MTS_WORKER_UNDEF
Definition: rpl_slave.h:81
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:534
int slave_worker_exec_event(Log_event *ev)
MTS worker main routine.
Definition: rpl_rli_pdb.cc:1644
Slave_jobs_queue()
Definition: rpl_rli_pdb.h:506
ulong gaq_index
Definition: rpl_rli_pdb.h:866
int set_rli_description_event(Format_description_log_event *fdle)
The method runs at Worker initalization, at runtime when Coordinator supplied a new FD event for exec...
Definition: rpl_rli_pdb.h:713
Instrumentation helpers for mutexes.
ulong worker_id
Definition: rpl_rli_pdb.h:183
bool is_in_group() const
A group is defined as the entire range of events that constitute a transaction or auto-committed stat...
Definition: rpl_rli.h:1350
void reset_order_commit_deadlock()
Definition: rpl_rli_pdb.h:881
Error const & last_error() const
Definition: rpl_reporting.h:136
DBUG_TRACE
Definition: do_ctype.cc:46
ulong en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value in beyond of the legacy ra...
Definition: rpl_rli_pdb.h:418
bool inited
Definition: rpl_rli_pdb.h:337
bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
Read events from relay logs and apply them.
Definition: rpl_rli_pdb.cc:1898
#define HAVE_PSI_INTERFACE
Definition: my_psi_config.h:38
int n
Definition: xcom_base.c:425
Definition of the global "loglevel" enumeration.
Binary log event definitions.
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:137
virtual ~Slave_worker()
Definition: rpl_rli_pdb.cc:278
double seconds()
Definition: task.c:298
ulong overrun_level
Definition: rpl_rli_pdb.h:585
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:605
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:187
unsigned long my_off_t
Definition: my_inttypes.h:77
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:140
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1161
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:541
bool worker_sleep(ulong seconds)
Sleep for a given amount of seconds or until killed.
Definition: rpl_rli_pdb.cc:1736
void update_current_binlog(const char *post_rotate)
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:195
ulong mts_group_idx
Index in rli->gaq array to indicate a group that this event is purging.
Definition: log_event.h:672
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:561
enum_gtid_type type
The type of this GTID.
Definition: rpl_gtid.h:3623
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:315
Specifies that the GTID has not been generated yet; it will be generated on commit.
Definition: rpl_gtid.h:3531
bool write_info(Rpl_info_handler *to)
Definition: rpl_rli_pdb.cc:555
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
bool in(ulong k)
Definition: rpl_rli_pdb.h:322
ulong id
Definition: rpl_rli_pdb.h:543
~Slave_committed_queue()
Definition: rpl_rli_pdb.h:355
Definition: rpl_rli_pdb.h:515
ulong underrun_level
Definition: rpl_rli_pdb.h:590
ulong find_lwm(Slave_job_group **, ulong)
Finds low-water mark of committed jobs in GAQ.
Definition: rpl_rli_pdb.cc:1454
Format_description_log_event * rli_description_event
Definition: rpl_rli.h:1612
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:607
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:551
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:868
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:654
Slave_committed_queue * gaq
Definition: rpl_rli.h:1016
virtual void do_report(loglevel level, int err_code, const char *msg, va_list v_args) const
Definition: rpl_rli_pdb.cc:1514
ABI for instrumented mutexes.
void free_dynamic_items()
Method should be executed at slave system stop to cleanup dynamically allocated items that remained a...
Definition: rpl_rli_pdb.cc:1493
static const uint * get_table_pk_field_indexes()
Returns an array with the expected column numbers of the primary key fields of the table repository...
Definition: rpl_rli_pdb.cc:2577
Common #defines and includes for file and socket I/O.
Definition: rpl_rli_pdb.h:108
Slave_worker(Relay_log_info *rli, PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond, uint param_id, const char *param_channel)
Definition: rpl_rli_pdb.cc:236
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:103
loglevel
Definition: my_loglevel.h:32
char * group_master_log_name
Definition: rpl_rli_pdb.h:166
bool overfill
Definition: rpl_rli_pdb.h:511
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:190
Definition: rpl_rli.h:87
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:564
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:182
Log_event * current_event
Reference to being applied event.
Definition: rpl_rli.h:1767
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:229
ulong move_queue_head(Slave_worker_array *ws)
The queue is processed from the head item by item to purge items representing committed groups...
Definition: rpl_rli_pdb.cc:1349
Log info(cout, "NOTE")
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:854
bool notified
Definition: rpl_rli_pdb.h:199
ulong adapt_to_master_version_updown(ulong master_version, ulong current_version)
The method compares two supplied versions and carries out down- or up- grade customization of executi...
Definition: rpl_rli.cc:2310
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:189
static uint get_channel_field_index()
Returns the index of the Channel_name field of the table repository.
Definition: rpl_rli_pdb.cc:2581
int init_worker(Relay_log_info *, ulong)
Method is executed by Coordinator at Worker startup time to initialize members parly with values supp...
Definition: rpl_rli_pdb.cc:308
The class defines a type of queue with a predefined max size that is implemented using the circular m...
Definition: rpl_rli_pdb.h:265
void copy_values_for_PFS(ulong worker_id, en_running_state running_status, THD *worker_thd, const Error &last_error, Gtid_monitoring_info *monitoring_info_arg)
Definition: rpl_rli_pdb.cc:533
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:538
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:535
bool reserve(size_t n)
Reserves space for array elements.
Definition: prealloced_array.h:242
unsigned long ulong
Definition: my_inttypes.h:48
ulong events_done
Definition: rpl_rli_pdb.h:553
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
Definition: rpl_rli_pdb.cc:786
Definition: completion_hash.h:34
#define false
Definition: config_static.h:43
ulong de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:395
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:348
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
Definition: rpl_rli_pdb.cc:2097
int flush_info(bool force=false)
Definition: rpl_rli_pdb.cc:457
circular_buffer_queue(ulong max)
Definition: rpl_rli_pdb.h:274
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:577
ulong de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:406
Slave_worker * get_thd_worker(THD *thd)
Definition: rpl_rli_pdb.h:909
bool gt(ulong i, ulong k)
two index comparision to determine which of the two is ordered first.
Definition: rpl_rli_pdb.cc:1273
Definition: rpl_rli_pdb.h:504
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_class.h:778
en_running_state
Definition: rpl_rli_pdb.h:610
uint db_len
Definition: rpl_rli_pdb.h:73
Relay_log_info * rli_slave
Definition: sql_class.h:900
void reset_gaq_index()
Definition: rpl_rli_pdb.h:818
ulong w_rr
Definition: rpl_rli_pdb.cc:76
Definition: rpl_rli_pdb.h:615