MySQL 8.0.32
Source Code Documentation
rpl_rli_pdb.h
Go to the documentation of this file.
1/* Copyright (c) 2011, 2022, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef RPL_RLI_PDB_H
24#define RPL_RLI_PDB_H
25
26#include <stdarg.h>
27#include <sys/types.h>
28#include <time.h>
29#include <atomic>
30#include <tuple>
31
33#include "my_bitmap.h"
34#include "my_compiler.h"
35#include "my_dbug.h"
36#include "my_inttypes.h"
37#include "my_io.h"
38#include "my_loglevel.h"
39#include "my_psi_config.h"
45#include "prealloced_array.h" // Prealloced_array
46#include "sql/log_event.h" // Format_description_log_event
47#include "sql/rpl_gtid.h"
48#include "sql/rpl_mta_submode.h" // enum_mts_parallel_type
49#include "sql/rpl_replica.h" // MTS_WORKER_UNDEF
50#include "sql/rpl_rli.h" // Relay_log_info
51#include "sql/sql_class.h"
53
55class Slave_worker;
56struct TABLE;
57
58#ifndef NDEBUG
59extern ulong w_rr;
60#endif
61/**
62 Legends running throughout the module:
63
64 C - Coordinator
65 CP - checkpoint
66 W - Worker
67
68 B-event event that Begins a group (a transaction)
69 T-event event that Terminates a group (a transaction)
70*/
71
72/* Assigned Partition Hash (APH) entry */
75 const char *db;
77 /*
78 The number of transaction pending on this database.
79 This should only be modified under the lock slave_worker_hash_lock.
80 */
81 long usage;
82 /*
83 The list of temp tables belonging to @ db database is
84 attached to an assigned @c worker to become its thd->temporary_tables.
85 The list is updated with every ddl incl CREATE, DROP.
86 It is removed from the entry and merged to the coordinator's
87 thd->temporary_tables in case of events: slave stops, APH oversize.
88 */
90
91 /* todo: relax concurrency to mimic record-level locking.
92 That is to augmenting the entry with mutex/cond pair
93 pthread_mutex_t
94 pthread_cond_t
95 timestamp updated_at; */
96};
97
100Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
101 db_worker_hash_entry **ptr_entry,
102 bool need_temp_tables, Slave_worker *w);
104 Slave_worker_array *workers,
105 Log_event *ev);
106
107#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
108
110 Slave_job_group() = default;
111
112 /*
113 We need a custom copy constructor and assign operator because std::atomic<T>
114 is not copy-constructible.
115 */
121 worker_id(other.worker_id),
122 worker(other.worker),
130 done(other.done.load()),
131 shifted(other.shifted),
132 ts(other.ts),
133#ifndef NDEBUG
134 notified(other.notified),
135#endif
139 }
140
146 worker_id = other.worker_id;
147 worker = other.worker;
148 total_seqno = other.total_seqno;
155 done.store(other.done.load());
156 shifted = other.shifted;
157 ts = other.ts;
158#ifndef NDEBUG
159 notified = other.notified;
160#endif
164 return *this;
165 }
166
167 char *group_master_log_name; // (actually redundant)
168 /*
169 T-event lop_pos filled by Worker for CheckPoint (CP)
170 */
172
173 /*
174 When relay-log name changes allocates and fill in a new name of relay-log,
175 otherwise it fills in NULL.
176 Coordinator keeps track of each Worker has been notified on the updating
177 to make sure the routine runs once per change.
178
179 W checks the value at commit and memorizes a not-NULL.
180 Freeing unless NULL is left to Coordinator at CP.
181 */
182 char *group_relay_log_name; // The value is last seen relay-log
187
188 my_off_t master_log_pos; // B-event log_pos
189 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
191 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
194 checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
196 std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
197 ulong shifted; // shift the last CP bitmap at receiving a new CP
198 time_t ts; // Group's timestamp to update Seconds_behind_master
199#ifndef NDEBUG
200 bool notified{false}; // to debug group_master_log_name change notification
201#endif
202 /* Clock-based scheduler requirement: */
203 longlong last_committed; // commit parent timestamp
204 longlong sequence_number; // transaction's logical timestamp
205 /*
206 After Coordinator has seen a new FD event, it sets this member to
207 point to the new event, once per worker. Coordinator does so
208 when it schedules a first group following the FD event to a worker.
209 It checks Slave_worker::fd_change_notified flag to decide whether
210 to do this or not.
211 When the worker executes the group, it replaces its currently
212 active FD by the new FD once it takes on the group first event. It
213 checks this member and resets it after the FD replacement is done.
214
215 The member is kind of lock-free. It's updated by Coordinator and
216 read by Worker without holding any mutex. That's still safe thanks
217 to Slave_worker::jobs_lock that works as synchronizer, Worker
218 can't read any stale info.
219 The member is updated by Coordinator when it decides which Worker
220 an event following a new FD is to be scheduled.
221 After Coordinator has chosen a Worker, it queues the event to it
222 with necessarily taking Slave_worker::jobs_lock. The Worker grabs
223 the mutex lock later at pulling the event from the queue and
224 releases the lock before to read from this member.
225
226 This sequence of actions shows the write operation always precedes
227 the read one, and ensures no stale FD info is passed to the
228 Worker.
229 */
231 /*
232 Coordinator fills the struct with defaults and options at starting of
233 a group distribution.
234 */
238 group_master_log_name = nullptr; // todo: remove
239 group_relay_log_name = nullptr;
241 total_seqno = seqno;
242 checkpoint_log_name = nullptr;
247 done = 0;
248 ts = 0;
249#ifndef NDEBUG
250 notified = false;
251#endif
254 new_fd_event = nullptr;
255 }
256};
257
258/**
259 The class defines a type of queue with a predefined max capacity that is
260 implemented using the circular memory buffer.
261 That is items of the queue are accessed as indexed elements of
262 the array buffer in a way that when the index value reaches
263 a max value it wraps around to point to the first buffer element.
264*/
265template <typename Element_type>
267 public:
269 /**
270 The capacity and maximum length of the queue in terms of element.
271 */
272 size_t capacity;
273 /**
274 Its value modulo `capacity` is index of the element where the next element
275 will be enqueued. It's entry+length. It may be bigger than capacity, but
276 will be smaller than 2*capacity.
277 */
278 size_t avail;
279 /**
280 The head index of the queue. It is an index of next element that will be
281 dequeued. It is less than capacity, so it is an actual index (in contrast
282 to `avail`), don't need to be calculated modulo `capacity`.
283 */
284 size_t entry;
285 /**
286 Actual length. It can be read while not protected by any mutex.
287 */
288 std::atomic<size_t> len;
290
293 capacity(max),
294 avail(0),
295 entry(0),
296 len(0),
297 inited_queue(false) {
298 if (!m_Q.reserve(capacity)) inited_queue = true;
299 m_Q.resize(capacity);
300 }
303
304 /**
305 Content of the being dequeued item is copied to the arg-pointer
306 location.
307
308 @param [out] item A pointer to the being dequeued item.
309 @return true if an element was returned, false if the queue was empty.
310 */
311 bool de_queue(Element_type *item);
312 /**
313 Similar to de_queue but extracting happens from the tail side.
314
315 @param [out] item A pointer to the being dequeued item.
316 @return true if an element was returned, false if the queue was empty.
317 */
318 bool de_tail(Element_type *item);
319
320 /**
321 return the index where the arg item locates
322 or an error encoded as a value `circular_buffer_queue::error_result`.
323 */
324 size_t en_queue(Element_type *item);
325 /**
326 return the value of @c data member of the head of the queue.
327 */
328 Element_type *head_queue() {
329 if (empty()) return nullptr;
330 return &m_Q[entry];
331 }
332
333 /* index is within the valid range */
334 bool in(size_t i) {
335 return (avail >= capacity) ? (entry <= i || i < avail - capacity)
336 : (entry <= i && i < avail);
337 }
338 size_t get_length() const { return len.load(std::memory_order_relaxed); }
339 bool empty() const { return get_length() == 0; }
340 bool full() const { return get_length() == capacity; }
341
342 static constexpr size_t error_result = std::numeric_limits<size_t>::max();
343};
344
345/**
346 Group Assigned Queue whose first element identifies first gap
347 in committed sequence. The head of the queue is therefore next to
348 the low-water-mark.
349*/
350class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
351 public:
352 bool inited;
353
354 /* master's Rot-ev exec */
355 void update_current_binlog(const char *post_rotate);
356
357 /*
358 The last checkpoint time Low-Water-Mark
359 */
361
362 /* last time processed indexes for each worker */
364
365 /* the being assigned group index in GAQ */
367
368 Slave_committed_queue(size_t max, uint n);
369
371 if (inited) {
373 free_dynamic_items(); // free possibly left allocated strings in GAQ list
374 }
375 }
376
377#ifndef NDEBUG
378 bool count_done(Relay_log_info *rli);
379#endif
380
381 /* Checkpoint routine refreshes the queue */
383 /* Method is for slave shutdown time cleanup */
384 void free_dynamic_items();
385 /*
386 returns a pointer to Slave_job_group struct instance as indexed by arg
387 in the circular buffer dyn-array
388 */
390 assert(ind < capacity);
391 return &m_Q[ind];
392 }
393
394 /**
395 Assigns @c assigned_group_index to an index of enqueued item
396 and returns it.
397 */
398 size_t en_queue(Slave_job_group *item) {
399 return assigned_group_index =
401 }
402
403 /**
404 Dequeue from head.
405
406 @param [out] item A pointer to the being dequeued item.
407 @return true if an element was returned, false if the queue was empty.
408 */
411 }
412
413 /**
414 Similar to de_queue() but removing an item from the tail side.
415
416 @param [out] item A pointer to the being dequeued item.
417 @return true if an element was returned, false if the queue was empty.
418 */
421 }
422
423 size_t find_lwm(Slave_job_group **, size_t);
424};
425
426/**
427 @return the index where the arg item has been located
428 or an error encoded as a value
429 `circular_buffer_queue::error_result`.
430*/
431template <typename Element_type>
433 if (full()) {
434 return error_result;
435 }
436
437 const auto ret = (avail++) % capacity;
438 m_Q[ret] = *item;
439 len++;
440 assert(len == avail - entry);
441 assert(entry < avail);
442
443 return ret;
444}
445
446/**
447 Dequeue from head.
448
449 @param [out] item A pointer to the being dequeued item.
450 @return true if an element was returned, false if the queue was empty.
451*/
452template <typename Element_type>
454 if (empty()) {
455 return false;
456 }
457 *item = m_Q[entry++];
458 len--;
459 assert(len == avail - entry);
460 assert(entry <= avail);
461
462 // The start of the queue just have returned to the first index. Normalize
463 // indexes so they are small again.
464 if (entry == capacity) {
465 entry = 0;
466 avail -= capacity;
467 assert(avail < capacity);
468 assert(avail == len);
469 }
470 return true;
471}
472
473template <typename Element_type>
475 if (empty()) {
476 return false;
477 }
478
479 assert(avail > entry);
480 *item = m_Q[(--avail) % capacity];
481 len--;
482 assert(len == avail - entry);
483 return true;
484}
485
486class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
487 public:
489 /*
490 Coordinator marks with true, Worker signals back at queue back to
491 available
492 */
495};
496
498 public:
501 PSI_mutex_key *param_key_info_run_lock,
502 PSI_mutex_key *param_key_info_data_lock,
503 PSI_mutex_key *param_key_info_sleep_lock,
504 PSI_mutex_key *param_key_info_thd_lock,
505 PSI_mutex_key *param_key_info_data_cond,
506 PSI_mutex_key *param_key_info_start_cond,
507 PSI_mutex_key *param_key_info_stop_cond,
508 PSI_mutex_key *param_key_info_sleep_cond,
509#endif
510 uint param_id, const char *param_channel);
511
512 ~Slave_worker() override;
513
514 Slave_jobs_queue jobs; // assignment queue containing events to execute
515 mysql_mutex_t jobs_lock; // mutex for the jobs queue
516 mysql_cond_t jobs_cond; // condition variable for the jobs queue
517 Relay_log_info *c_rli; // pointer to Coordinator's rli
518
520 curr_group_exec_parts; // Current Group Executed Partitions
521
522#ifndef NDEBUG
523 bool curr_group_seen_sequence_number; // is set to true about starts_group()
524#endif
525 ulong id; // numeric identifier of the Worker
526
527 /*
528 Worker runtime statistics
529 */
530 // the index in GAQ of the last processed group by this Worker
531 volatile ulong last_group_done_index;
533 last_groups_assigned_index; // index of previous group assigned to worker
534 ulong wq_empty_waits; // how many times got idle
535 ulong events_done; // how many events (statements) processed
536 ulong groups_done; // how many groups (transactions) processed
537 volatile int curr_jobs; // number of active assignments
538 // number of partitions allocated to the worker at point in time
540 // symmetric to rli->mts_end_group_sets_max_dbs
542
543 volatile bool relay_log_change_notified; // Coord sets and resets, W can read
544 volatile bool checkpoint_notified; // Coord sets and resets, W can read
545 volatile bool
546 master_log_change_notified; // Coord sets and resets, W can read
547 /*
548 The variable serves to Coordinator as a memo to itself
549 to notify a Worker about the fact that a new FD has been read.
550 Normally, the value is true, to mean the Worker is notified.
551 When Coordinator reads a new FD it changes the value to false.
552 When Coordinator schedules to a Worker the first event following the new FD,
553 it propagates the new FD to the Worker through
554 Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
555 to the regular true, to denote things done. Worker will adapt to the new FD
556 once it takes on a first event of the marked group.
557 */
559 ulong bitmap_shifted; // shift the last bitmap at receiving new CP
560 // WQ current excess above the overrun level
562 /*
563 number of events starting from which Worker queue is regarded as
564 close to full. The number of the excessive events yields a weight factor
565 to compute Coordinator's nap.
566 */
568 /*
569 reverse to overrun: the number of events below which Worker is
570 considered under-running
571 */
573 /*
574 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
575 When WQ length is dropped below overrun the counter is reset.
576 */
578 /*
579 Coordinates of the last CheckPoint (CP) this Worker has
580 acknowledged; part of is persistent data
581 */
586 MY_BITMAP group_executed; // bitmap describes groups executed after last CP
587 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
588 ulong
589 worker_checkpoint_seqno; // the most significant ON bit in group_executed
590 /* Initial value of FD-for-execution version until it's gets known. */
595 ERROR_LEAVING = 2, // is set by Worker
596 STOP = 3, // is set by Coordinator upon receiving STOP
598 4 // is set by worker upon completing job when STOP SLAVE is issued
599 };
600
601 /*
602 This function is used to make a copy of the worker object before we
603 destroy it on STOP SLAVE. This new object is then used to report the
604 worker status until next START SLAVE following which the new worker objects
605 will be used.
606 */
608 THD *worker_thd, const Error &last_error,
609 Gtid_monitoring_info *monitoring_info_arg);
610
611 /*
612 The running status is guarded by jobs_lock mutex that a writer
613 Coordinator or Worker itself needs to hold when write a new value.
614 */
616 /*
617 exit_incremented indicates whether worker has contributed to max updated
618 index. By default it is set to false. When the worker contributes for the
619 first time this variable is set to true.
620 */
622
623 int init_worker(Relay_log_info *, ulong);
624 int rli_init_info(bool);
625 int flush_info(bool force = false);
626 static size_t get_number_worker_fields();
627 /**
628 Sets bits for columns that are allowed to be `NULL`.
629
630 @param nullable_fields the bitmap to hold the nullable fields.
631 */
632 static void set_nullable_fields(MY_BITMAP *nullable_fields);
634 const char *get_master_log_name();
636 ulonglong set_master_log_pos(ulong val) { return master_log_pos = val; }
637 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
638 /**
639 The method is a wrapper to provide uniform interface with STS and is
640 to be called from Relay_log_info and Slave_worker pre_commit() methods.
641 */
642 bool commit_positions() override {
643 assert(current_event);
644
645 return commit_positions(
648 }
649 /**
650 See the comments for STS version of this method.
651 */
652 void post_commit(bool on_rollback) override {
653 if (on_rollback) {
654 if (is_transactional())
657 } else if (!is_transactional())
660 true);
661 }
662 /*
663 When commit fails clear bitmap for executed worker group. Revert back the
664 positions to the old positions that existed before commit using the
665 checkpoint.
666
667 @param Slave_job_group a pointer to Slave_job_group struct instance which
668 holds group master log pos, group relay log pos and checkpoint positions.
669 */
671 bool reset_recovery_info();
672 /**
673 The method runs at Worker initialization, at runtime when
674 Coordinator supplied a new FD event for execution context, and at
675 the Worker pool shutdown.
676 Similarly to the Coordinator's
677 Relay_log_info::set_rli_description_event() the possibly existing
678 old FD is destroyed, carefully; each worker decrements
679 Format_description_log_event::atomic_usage_counter and when it is made
680 zero the destructor runs.
681 Unlike to Coordinator's role, the usage counter of the new FD is *not*
682 incremented, see @c Log_event::get_slave_worker() where and why it's done
683 there.
684
685 Notice, the method is run as well by Coordinator per each Worker at MTS
686 shutdown time.
687
688 Todo: consider to merge logics of the method with that of
689 Relay_log_info class.
690
691 @param fdle pointer to a new Format_description_log_event
692
693 @return 1 if an error was encountered, 0 otherwise.
694 */
697
698 if (fdle) {
699 /*
700 When the master rotates its binary log, set gtid_next to
701 NOT_YET_DETERMINED. This tells the slave thread that:
702
703 - If a Gtid_log_event is read subsequently, gtid_next will be set to the
704 given GTID (this is done in gtid_pre_statement_checks()).
705
706 - If a statement is executed before any Gtid_log_event, then gtid_next
707 is set to anonymous (this is done in Gtid_log_event::do_apply_event().
708
709 It is important to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
710 a transaction. If that would happen when GTID_MODE=ON, the next
711 statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
712 which is disallowed when GTID_MODE=ON. So then there would be no way to
713 end the transaction; any attempt to do so would result in this error.
714
715 There are three possible states when reaching this execution flow point
716 (see further below for a more detailed explanation on each):
717
718 - **No active transaction, and not in a group**: set `gtid_next` to
719 `NOT_YET_DETERMINED`.
720
721 - **No active transaction, and in a group**: do nothing regarding
722 `gtid_next`.
723
724 - **An active transaction exists**: impossible to set `gtid_next` and no
725 reason to process the `Format_description` event so, trigger an error.
726
727 For the sake of correctness, let's defined the meaning of having a
728 transaction "active" or "in a group".
729
730 A transaction is "active" if either BEGIN was executed or autocommit=0
731 and a DML statement was executed (@see
732 THD::in_active_multi_stmt_transaction).
733
734 A transaction is "in a group" if it is applied by the replication
735 applier, and the relay log position is between Gtid_log_event and the
736 committing event (@see Relay_log_info::is_in_group).
737
738 The three different states explained further:
739
740 **No active transaction, and not in a group**: It is normal to have
741 gtid_next=automatic/undefined and have a Format_description_log_event in
742 this condition. We are outside transaction context and should set
743 gtid_next to not_yet_determined.
744
745 **No active transaction, and in a group**: Having
746 gtid_next=automatic/undefined in a group is impossible if master is 5.7
747 or later, because the group always starts with a Gtid_log_event or an
748 Anonymous_gtid_log_event, which will set gtid_next to anonymous or
749 gtid. But it is possible to have gtid_next=undefined when replicating
750 from a 5.6 master with gtid_mode=off, because it does not generate any
751 such event. And then, it is possible to have no active transaction in a
752 group if the master has logged a DDL as a User_var_log_event followed by
753 a Query_log_event. The User_var_log_event will start a group, but not
754 start an active transaction or change gtid_next. In this case, it is
755 possible that a Format_description_log_event occurs, if the group
756 (transaction) is broken on two relay logs, so that User_var_log_event
757 appears at the end of one relay log and Query_log_event at the beginning
758 of the next one. In such cases, we should not set gtid_next.
759
760 **An active transaction exists**: It is possible to have
761 gtid_next=automatic/undefined in an active transaction, only if
762 gtid_next=automatic, which is only possible in a client connection using
763 gtid_next=automatic. In this scenario, there is no reason to execute a
764 Format_description_log_event. So we generate an error.
765 */
766 if (info_thd->variables.gtid_next.type == AUTOMATIC_GTID ||
767 info_thd->variables.gtid_next.type == UNDEFINED_GTID) {
768 bool in_active_multi_stmt =
770
771 if (!is_in_group() && !in_active_multi_stmt) {
772 DBUG_PRINT("info",
773 ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
774 info_thd->variables.gtid_next.set_not_yet_determined();
775 } else if (in_active_multi_stmt) {
776 my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
777 "gtid_next");
778 return 1;
779 }
780 }
783 }
786
788 /* The being deleted by Worker FD can't be the latest one */
790
792 }
793 }
795
796 return 0;
797 }
798
800 inline void set_gaq_index(ulong val) {
801 if (gaq_index == c_rli->gaq->capacity) gaq_index = val;
802 }
803
805
806 /**
807 Make the necessary changes to both the `Slave_worker` and current
808 `Log_event` objects, before retrying to apply the transaction.
809
810 Since the event is going to be re-read from the relay-log file, there
811 may be actions needed to be taken to reset the state both of `this`
812 instance, as well as of the current `Log_event` being processed.
813
814 @param event The `Log_event` object currently being processed.
815 */
817
818 /**
819 Checks if the transaction can be retried, and if not, reports an error.
820
821 @param[in] thd The THD object of current thread.
822
823 @returns std::tuple<bool, bool, uint> where each element has
824 following meaning:
825
826 first element of tuple is function return value and determines:
827 false if the transaction should be retried
828 true if the transaction should not be retried
829
830 second element of tuple determines:
831 the function will set the value to true, in case the retry
832 should be "silent". Silent means that the caller should not
833 report it in performance_schema tables, write to the error log,
834 or sleep. Currently, silent is used by NDB only.
835
836 third element of tuple determines:
837 If the caller should report any other error than that stored in
838 thd->get_stmt_da()->mysql_errno(), then this function will store
839 that error in this third element of the tuple.
840
841 */
842 std::tuple<bool, bool, uint> check_and_report_end_of_retries(THD *thd);
843
844 /**
845 It is called after an error happens. It checks if that is an temporary
846 error and if the transaction should be retried. Then it will retry the
847 transaction if it is allowed. Retry policy and logic is similar to
848 single-threaded slave.
849
850 @param[in] start_relay_number The extension number of the relay log which
851 includes the first event of the transaction.
852 @param[in] start_relay_pos The offset of the transaction's first event.
853
854 @param[in] end_relay_number The extension number of the relay log which
855 includes the last event it should retry.
856 @param[in] end_relay_pos The offset of the last event it should retry.
857
858 @retval false if transaction succeeds (possibly after a number of retries)
859 @retval true if transaction fails
860 */
861 bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
862 uint end_relay_number, my_off_t end_relay_pos);
863
864 bool set_info_search_keys(Rpl_info_handler *to) override;
865
866 /**
867 Get coordinator's RLI. Especially used get the rli from
868 a slave thread, like this: thd->rli_slave->get_c_rli();
869 thd could be a SQL thread or a worker thread.
870 */
871 Relay_log_info *get_c_rli() override { return c_rli; }
872
873 /**
874 return an extension "for channel channel_name"
875 for error messages per channel
876 */
877 const char *get_for_channel_str(bool upper_case = false) const override;
878
881 return ptr_g->sequence_number;
882 }
883
884 /**
885 Return true if replica-preserve-commit-order is enabled and an
886 earlier transaction is waiting for a row-level lock held by this
887 transaction.
888 */
890
891 /**
892 Called when replica-preserve-commit-order is enabled, by the worker
893 processing an earlier transaction that waits on a row-level lock
894 held by this worker's transaction.
895 */
897
898 /**
899 @return either the master server version as extracted from the last
900 installed Format_description_log_event, or when it was not
901 installed then the slave own server version.
902 */
907 }
908
909 protected:
910 void do_report(loglevel level, int err_code, const char *msg,
911 va_list v_args) const override
912 MY_ATTRIBUTE((format(printf, 4, 0)));
913
914 private:
915 ulong gaq_index; // GAQ index of the current assignment
916 ulonglong master_log_pos; // event's cached log_pos for possible error report
917 void end_info();
918 bool read_info(Rpl_info_handler *from) override;
919 bool write_info(Rpl_info_handler *to) override;
920 std::atomic<bool> m_commit_order_deadlock;
921
922 /// This flag indicates whether positions were already modified during the
923 /// event processing, if yes, positions are not updated in the
924 /// slave_worker_ends_group function
926
929 bool worker_sleep(ulong seconds);
930 bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
931 uint end_relay_number, my_off_t end_relay_pos);
933
935
936 public:
937 /**
938 Returns an array with the expected column numbers of the primary key
939 fields of the table repository.
940 */
941 static const uint *get_table_pk_field_indexes();
942 /**
943 Returns the index of the Channel_name field of the table repository.
944 */
946};
947
950 Slave_job_item *job_item);
951
954// Auxiliary function
956
958 Relay_log_info *rli);
959
961 return static_cast<Slave_worker *>(thd->rli_slave);
962}
963
965
966#endif
Contains the classes representing events occurring in the replication stream.
Class representing an error.
Definition: error.h:47
For binlog version 4.
Definition: log_event.h:1501
std::atomic< int32 > atomic_usage_counter
Definition: log_event.h:1519
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1310
This is the abstract base class for binary log events.
Definition: log_event.h:547
ulong mts_group_idx
Index in rli->gaq array to indicate a group that this event is purging.
Definition: log_event.h:702
Definition: rpl_rli.h:201
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1725
bool is_in_group() const
A group is defined as the entire range of events that constitute a transaction or auto-committed stat...
Definition: rpl_rli.h:1497
Log_event * current_event
Reference to being applied event.
Definition: rpl_rli.h:1995
Slave_committed_queue * gaq
Definition: rpl_rli.h:1168
ulong adapt_to_master_version_updown(ulong master_version, ulong current_version)
The method compares two supplied versions and carries out down- or up- grade customization of executi...
Definition: rpl_rli.cc:2585
Format_description_log_event * rli_description_event
Definition: rpl_rli.h:1801
Definition: rpl_info_handler.h:57
bool is_transactional() const
Definition: rpl_info.h:110
THD * info_thd
Definition: rpl_info.h:77
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:350
void free_dynamic_items()
Method should be executed at slave system stop to cleanup dynamically allocated items that remained a...
Definition: rpl_rli_pdb.cc:1506
bool de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:409
Slave_committed_queue(size_t max, uint n)
Definition: rpl_rli_pdb.cc:1305
bool inited
Definition: rpl_rli_pdb.h:352
bool count_done(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:1323
ulong assigned_group_index
Definition: rpl_rli_pdb.h:366
bool de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:419
size_t move_queue_head(Slave_worker_array *ws)
The queue is processed from the head item by item to purge items representing committed groups.
Definition: rpl_rli_pdb.cc:1367
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:363
Slave_job_group * get_job_group(size_t ind)
Definition: rpl_rli_pdb.h:389
~Slave_committed_queue()
Definition: rpl_rli_pdb.h:370
void update_current_binlog(const char *post_rotate)
size_t find_lwm(Slave_job_group **, size_t)
Finds low-water mark of committed jobs in GAQ.
Definition: rpl_rli_pdb.cc:1469
size_t en_queue(Slave_job_group *item)
Assigns assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:398
Slave_job_group lwm
Definition: rpl_rli_pdb.h:360
Definition: rpl_rli_pdb.h:486
Slave_jobs_queue()
Definition: rpl_rli_pdb.h:488
bool overfill
Definition: rpl_rli_pdb.h:493
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:494
Error const & last_error() const
Definition: rpl_reporting.h:136
Definition: rpl_rli_pdb.h:497
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:635
ulong underrun_level
Definition: rpl_rli_pdb.h:572
char checkpoint_relay_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:582
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:520
ulong excess_cnt
Definition: rpl_rli_pdb.h:577
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:523
bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
Read events from relay logs and apply them.
Definition: rpl_rli_pdb.cc:1942
void reset_gaq_index()
Definition: rpl_rli_pdb.h:799
ulong overrun_level
Definition: rpl_rli_pdb.h:567
bool commit_positions() override
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:642
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:559
longlong sequence_number()
Definition: rpl_rli_pdb.h:879
void reset_commit_order_deadlock()
Definition: rpl_rli_pdb.cc:1767
int flush_info(bool force=false)
Definition: rpl_rli_pdb.cc:470
int set_rli_description_event(Format_description_log_event *fdle) override
The method runs at Worker initialization, at runtime when Coordinator supplied a new FD event for exe...
Definition: rpl_rli_pdb.h:695
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:585
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:514
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:543
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:587
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:589
bool worker_sleep(ulong seconds)
Sleep for a given amount of seconds or until killed.
Definition: rpl_rli_pdb.cc:1745
void copy_values_for_PFS(ulong worker_id, en_running_state running_status, THD *worker_thd, const Error &last_error, Gtid_monitoring_info *monitoring_info_arg)
Definition: rpl_rli_pdb.cc:553
std::tuple< bool, bool, uint > check_and_report_end_of_retries(THD *thd)
Checks if the transaction can be retried, and if not, reports an error.
Definition: rpl_rli_pdb.cc:1791
ulong gaq_index
Definition: rpl_rli_pdb.h:915
void post_commit(bool on_rollback) override
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:652
void do_report(loglevel level, int err_code, const char *msg, va_list v_args) const override
Definition: rpl_rli_pdb.cc:1524
const char * get_for_channel_str(bool upper_case=false) const override
return an extension "for channel channel_name" for error messages per channel
Definition: rpl_rli_pdb.cc:2562
void report_commit_order_deadlock()
Called when replica-preserve-commit-order is enabled, by the worker processing an earlier transaction...
Definition: rpl_rli_pdb.cc:1775
Slave_worker(const Slave_worker &info)
Slave_worker(Relay_log_info *rli, PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond, uint param_id, const char *param_channel)
Definition: rpl_rli_pdb.cc:242
bool write_info(Rpl_info_handler *to) override
Definition: rpl_rli_pdb.cc:575
int init_worker(Relay_log_info *, ulong)
Method is executed by Coordinator at Worker startup time to initialize members parly with values supp...
Definition: rpl_rli_pdb.cc:314
static uint get_channel_field_index()
Returns the index of the Channel_name field of the table repository.
Definition: rpl_rli_pdb.cc:2570
bool read_info(Rpl_info_handler *from) override
Definition: rpl_rli_pdb.cc:501
void assign_partition_db(Log_event *ev)
Definition: rpl_rli_pdb.cc:2055
const char * get_master_log_name()
Definition: rpl_rli_pdb.cc:625
static void set_nullable_fields(MY_BITMAP *nullable_fields)
Sets bits for columns that are allowed to be NULL.
Definition: rpl_rli_pdb.cc:619
bool set_info_search_keys(Rpl_info_handler *to) override
To search in the slave repositories, each slave info object (mi, rli or worker) should use a primary ...
Definition: rpl_rli_pdb.cc:564
ulong server_version
Definition: rpl_rli_pdb.h:591
volatile int curr_jobs
Definition: rpl_rli_pdb.h:537
ulong id
Definition: rpl_rli_pdb.h:525
void prepare_for_retry(Log_event &event)
Make the necessary changes to both the Slave_worker and current Log_event objects,...
Definition: rpl_rli_pdb.cc:1781
Slave_worker & operator=(const Slave_worker &info)
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:916
int slave_worker_exec_event(Log_event *ev)
MTS worker main routine.
Definition: rpl_rli_pdb.cc:1654
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:903
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:541
ulong events_done
Definition: rpl_rli_pdb.h:535
bool exit_incremented
Definition: rpl_rli_pdb.h:621
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:531
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:517
bool found_commit_order_deadlock()
Return true if replica-preserve-commit-order is enabled and an earlier transaction is waiting for a r...
Definition: rpl_rli_pdb.cc:1771
Relay_log_info * get_c_rli() override
Get coordinator's RLI.
Definition: rpl_rli_pdb.h:871
bool reset_recovery_info()
Clean up a part of Worker info table that is regarded in in gaps collecting at recovery.
Definition: rpl_rli_pdb.cc:606
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:515
en_running_state
Definition: rpl_rli_pdb.h:592
@ NOT_RUNNING
Definition: rpl_rli_pdb.h:593
@ STOP_ACCEPTED
Definition: rpl_rli_pdb.h:597
@ RUNNING
Definition: rpl_rli_pdb.h:594
@ STOP
Definition: rpl_rli_pdb.h:596
@ ERROR_LEAVING
Definition: rpl_rli_pdb.h:595
void rollback_positions(Slave_job_group *ptr_g)
Definition: rpl_rli_pdb.cc:709
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:561
static const uint * get_table_pk_field_indexes()
Returns an array with the expected column numbers of the primary key fields of the table repository.
Definition: rpl_rli_pdb.cc:2566
void slave_worker_ends_group(Log_event *, int)
Deallocation routine to cancel out few effects of map_db_to_worker().
Definition: rpl_rli_pdb.cc:1133
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:636
bool m_flag_positions_committed
This flag indicates whether positions were already modified during the event processing,...
Definition: rpl_rli_pdb.h:925
std::atomic< bool > m_commit_order_deadlock
Definition: rpl_rli_pdb.h:920
bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
It is called after an error happens.
Definition: rpl_rli_pdb.cc:1848
long usage_partition
Definition: rpl_rli_pdb.h:539
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:800
char checkpoint_master_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:584
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:533
void end_info()
Definition: rpl_rli_pdb.cc:456
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:615
ulong wq_empty_waits
Definition: rpl_rli_pdb.h:534
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:583
bool fd_change_notified
Definition: rpl_rli_pdb.h:558
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:544
ulong groups_done
Definition: rpl_rli_pdb.h:536
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:516
static size_t get_number_worker_fields()
Definition: rpl_rli_pdb.cc:615
~Slave_worker() override
Definition: rpl_rli_pdb.cc:284
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:586
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:546
int rli_init_info(bool)
A part of Slave worker initializer that provides a minimum context for MTS recovery.
Definition: rpl_rli_pdb.cc:409
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:33
Relay_log_info * rli_slave
Definition: sql_class.h:1065
System_variables variables
Definition: sql_lexer_thd.h:61
bool in_active_multi_stmt_transaction() const
true if the session is in a multi-statement transaction mode (
Definition: sql_class.h:3167
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance.
Definition: control_events.cpp:157
The class defines a type of queue with a predefined max capacity that is implemented using the circul...
Definition: rpl_rli_pdb.h:266
bool in(size_t i)
Definition: rpl_rli_pdb.h:334
bool empty() const
Definition: rpl_rli_pdb.h:339
static constexpr size_t error_result
Definition: rpl_rli_pdb.h:342
std::atomic< size_t > len
Actual length.
Definition: rpl_rli_pdb.h:288
size_t en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value circular_buffer_queue::err...
Definition: rpl_rli_pdb.h:432
size_t capacity
The capacity and maximum length of the queue in terms of element.
Definition: rpl_rli_pdb.h:272
size_t get_length() const
Definition: rpl_rli_pdb.h:338
circular_buffer_queue()
Definition: rpl_rli_pdb.h:301
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:328
size_t avail
Its value modulo capacity is index of the element where the next element will be enqueued.
Definition: rpl_rli_pdb.h:278
circular_buffer_queue(size_t max)
Definition: rpl_rli_pdb.h:291
bool de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:453
size_t entry
The head index of the queue.
Definition: rpl_rli_pdb.h:284
bool full() const
Definition: rpl_rli_pdb.h:340
bool inited_queue
Definition: rpl_rli_pdb.h:289
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:268
bool de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:474
~circular_buffer_queue()=default
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: my_error.cc:215
#define MTS_WORKER_UNDEF
Definition: rpl_replica.h:89
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:146
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:51
static constexpr unsigned PSI_INSTRUMENT_ME
Definition: psi_bits.h:42
Binary log event definitions.
Header for compiler-dependent features.
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:180
#define DBUG_TRACE
Definition: my_dbug.h:145
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:55
ulonglong my_off_t
Definition: my_inttypes.h:71
long long int longlong
Definition: my_inttypes.h:54
#define MYF(v)
Definition: my_inttypes.h:96
Common #defines and includes for file and socket I/O.
#define FN_REFLEN
Definition: my_io.h:82
Definition of the global "loglevel" enumeration.
loglevel
Definition: my_loglevel.h:40
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Definition: my_memory.cc:80
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
#define HAVE_PSI_INTERFACE
Definition: my_psi_config.h:38
Instrumentation helpers for conditions.
ABI for instrumented mutexes.
Log info(cout, "NOTE")
struct MasterPos master_pos
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:307
Performance schema instrumentation interface.
Instrumentation helpers for mutexes.
required string event
Definition: replication_group_member_actions.proto:31
@ UNDEFINED_GTID
GTID_NEXT is set to this state after a transaction with GTID_NEXT=='UUID:NUMBER' is committed.
Definition: rpl_gtid.h:3742
@ AUTOMATIC_GTID
Specifies that the GTID has not been generated yet; it will be generated on commit.
Definition: rpl_gtid.h:3690
enum_mts_parallel_type
Definition: rpl_mta_submode.h:46
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
Definition: rpl_rli_pdb.cc:807
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
Definition: rpl_rli_pdb.cc:1117
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:168
bool init_hash_workers(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:740
void destroy_hash_workers(Relay_log_info *)
Definition: rpl_rli_pdb.cc:751
ulong w_rr
Definition: rpl_rli_pdb.cc:77
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
Definition: rpl_rli_pdb.cc:2395
Slave_worker * get_thd_worker(THD *thd)
Definition: rpl_rli_pdb.h:960
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry's table list head.
Definition: rpl_rli_pdb.cc:773
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:104
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
Definition: rpl_rli_pdb.cc:902
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
Definition: rpl_rli_pdb.cc:2080
Definition: my_bitmap.h:42
Definition: rpl_rli_pdb.h:109
Slave_job_group()=default
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:188
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:191
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:230
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:116
bool notified
Definition: rpl_rli_pdb.h:200
time_t ts
Definition: rpl_rli_pdb.h:198
longlong last_committed
Definition: rpl_rli_pdb.h:203
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:196
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:141
Slave_worker * worker
Definition: rpl_rli_pdb.h:185
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:194
ulong worker_id
Definition: rpl_rli_pdb.h:184
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:190
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:235
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:171
char * group_master_log_name
Definition: rpl_rli_pdb.h:167
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:192
ulong shifted
Definition: rpl_rli_pdb.h:197
longlong sequence_number
Definition: rpl_rli_pdb.h:204
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:195
char * group_relay_log_name
Definition: rpl_rli_pdb.h:182
ulonglong total_seqno
Definition: rpl_rli_pdb.h:186
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:183
Definition: table.h:1395
Definition: completion_hash.h:34
Legends running throughout the module:
Definition: rpl_rli_pdb.h:73
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:89
uint db_len
Definition: rpl_rli_pdb.h:74
long usage
Definition: rpl_rli_pdb.h:81
const char * db
Definition: rpl_rli_pdb.h:75
Slave_worker * worker
Definition: rpl_rli_pdb.h:76
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
Definition: rpl_rli.h:80
double seconds()
Definition: task.cc:309
Include file for Sun RPC to compile out of the box.
unsigned int uint
Definition: uca-dump.cc:29
int n
Definition: xcom_base.cc:508