MySQL 8.3.0
Source Code Documentation
Go to the documentation of this file.
1/* Copyright (c) 2011, 2023, Oracle and/or its affiliates.
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 GNU General Public License, version 2.0, for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23#ifndef RPL_RLI_PDB_H
24#define RPL_RLI_PDB_H
26#include <stdarg.h>
27#include <sys/types.h>
28#include <time.h>
29#include <atomic>
30#include <tuple>
32#include "my_bitmap.h"
33#include "my_compiler.h"
34#include "my_dbug.h"
35#include "my_inttypes.h"
36#include "my_io.h"
37#include "my_psi_config.h"
43#include "mysql/my_loglevel.h"
45#include "prealloced_array.h" // Prealloced_array
46#include "sql/log_event.h" // Format_description_log_event
47#include "sql/rpl_gtid.h"
48#include "sql/rpl_mta_submode.h" // enum_mts_parallel_type
49#include "sql/rpl_replica.h" // MTS_WORKER_UNDEF
50#include "sql/rpl_rli.h" // Relay_log_info
51#include "sql/sql_class.h"
55class Slave_worker;
56struct TABLE;
58#ifndef NDEBUG
59extern ulong w_rr;
62 Legends running throughout the module:
64 C - Coordinator
65 CP - checkpoint
66 W - Worker
68 B-event event that Begins a group (a transaction)
69 T-event event that Terminates a group (a transaction)
72/* Assigned Partition Hash (APH) entry */
74 uint db_len;
75 const char *db;
77 /*
78 The number of transaction pending on this database.
79 This should only be modified under the lock slave_worker_hash_lock.
80 */
81 long usage;
82 /*
83 The list of temp tables belonging to @ db database is
84 attached to an assigned @c worker to become its thd->temporary_tables.
85 The list is updated with every ddl incl CREATE, DROP.
86 It is removed from the entry and merged to the coordinator's
87 thd->temporary_tables in case of events: slave stops, APH oversize.
88 */
91 /* todo: relax concurrency to mimic record-level locking.
92 That is to augmenting the entry with mutex/cond pair
93 pthread_mutex_t
94 pthread_cond_t
95 timestamp updated_at; */
100Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
101 db_worker_hash_entry **ptr_entry,
102 bool need_temp_tables, Slave_worker *w);
104 Slave_worker_array *workers,
105 Log_event *ev);
107#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
110 Slave_job_group() = default;
112 /*
113 We need a custom copy constructor and assign operator because std::atomic<T>
114 is not copy-constructible.
115 */
121 worker_id(other.worker_id),
122 worker(other.worker),
130 done(other.done.load()),
131 shifted(other.shifted),
132 ts(other.ts),
133#ifndef NDEBUG
134 notified(other.notified),
139 }
146 worker_id = other.worker_id;
147 worker = other.worker;
148 total_seqno = other.total_seqno;
156 shifted = other.shifted;
157 ts = other.ts;
158#ifndef NDEBUG
159 notified = other.notified;
164 return *this;
165 }
167 char *group_master_log_name; // (actually redundant)
168 /*
169 T-event lop_pos filled by Worker for CheckPoint (CP)
170 */
173 /*
174 When relay-log name changes allocates and fill in a new name of relay-log,
175 otherwise it fills in NULL.
176 Coordinator keeps track of each Worker has been notified on the updating
177 to make sure the routine runs once per change.
179 W checks the value at commit and memorizes a not-NULL.
180 Freeing unless NULL is left to Coordinator at CP.
181 */
182 char *group_relay_log_name; // The value is last seen relay-log
188 my_off_t master_log_pos; // B-event log_pos
189 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
191 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
194 checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
196 std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
197 ulong shifted; // shift the last CP bitmap at receiving a new CP
198 time_t ts; // Group's timestamp to update Seconds_behind_master
199#ifndef NDEBUG
200 bool notified{false}; // to debug group_master_log_name change notification
202 /* Clock-based scheduler requirement: */
203 longlong last_committed; // commit parent timestamp
204 longlong sequence_number; // transaction's logical timestamp
205 /*
206 After Coordinator has seen a new FD event, it sets this member to
207 point to the new event, once per worker. Coordinator does so
208 when it schedules a first group following the FD event to a worker.
209 It checks Slave_worker::fd_change_notified flag to decide whether
210 to do this or not.
211 When the worker executes the group, it replaces its currently
212 active FD by the new FD once it takes on the group first event. It
213 checks this member and resets it after the FD replacement is done.
215 The member is kind of lock-free. It's updated by Coordinator and
216 read by Worker without holding any mutex. That's still safe thanks
217 to Slave_worker::jobs_lock that works as synchronizer, Worker
218 can't read any stale info.
219 The member is updated by Coordinator when it decides which Worker
220 an event following a new FD is to be scheduled.
221 After Coordinator has chosen a Worker, it queues the event to it
222 with necessarily taking Slave_worker::jobs_lock. The Worker grabs
223 the mutex lock later at pulling the event from the queue and
224 releases the lock before to read from this member.
226 This sequence of actions shows the write operation always precedes
227 the read one, and ensures no stale FD info is passed to the
228 Worker.
229 */
231 /*
232 Coordinator fills the struct with defaults and options at starting of
233 a group distribution.
234 */
238 group_master_log_name = nullptr; // todo: remove
239 group_relay_log_name = nullptr;
241 total_seqno = seqno;
242 checkpoint_log_name = nullptr;
246 checkpoint_seqno = (uint)-1;
247 done = 0;
248 ts = 0;
249#ifndef NDEBUG
250 notified = false;
254 new_fd_event = nullptr;
255 }
259 The class defines a type of queue with a predefined max capacity that is
260 implemented using the circular memory buffer.
261 That is items of the queue are accessed as indexed elements of
262 the array buffer in a way that when the index value reaches
263 a max value it wraps around to point to the first buffer element.
265template <typename Element_type>
267 public:
269 /**
270 The capacity and maximum length of the queue in terms of element.
271 */
272 size_t capacity;
273 /**
274 Its value modulo `capacity` is index of the element where the next element
275 will be enqueued. It's entry+length. It may be bigger than capacity, but
276 will be smaller than 2*capacity.
277 */
278 size_t avail;
279 /**
280 The head index of the queue. It is an index of next element that will be
281 dequeued. It is less than capacity, so it is an actual index (in contrast
282 to `avail`), don't need to be calculated modulo `capacity`.
283 */
284 size_t entry;
285 /**
286 Actual length. It can be read while not protected by any mutex.
287 */
288 std::atomic<size_t> len;
293 capacity(max),
294 avail(0),
295 entry(0),
296 len(0),
297 inited_queue(false) {
298 if (!m_Q.reserve(capacity)) inited_queue = true;
299 m_Q.resize(capacity);
300 }
304 /**
305 Content of the being dequeued item is copied to the arg-pointer
306 location.
308 @param [out] item A pointer to the being dequeued item.
309 @return true if an element was returned, false if the queue was empty.
310 */
311 bool de_queue(Element_type *item);
312 /**
313 Similar to de_queue but extracting happens from the tail side.
315 @param [out] item A pointer to the being dequeued item.
316 @return true if an element was returned, false if the queue was empty.
317 */
318 bool de_tail(Element_type *item);
320 /**
321 return the index where the arg item locates
322 or an error encoded as a value `circular_buffer_queue::error_result`.
323 */
324 size_t en_queue(Element_type *item);
325 /**
326 return the value of @c data member of the head of the queue.
327 */
328 Element_type *head_queue() {
329 if (empty()) return nullptr;
330 return &m_Q[entry];
331 }
333 /* index is within the valid range */
334 bool in(size_t i) {
335 return (avail >= capacity) ? (entry <= i || i < avail - capacity)
336 : (entry <= i && i < avail);
337 }
338 size_t get_length() const { return len.load(std::memory_order_relaxed); }
339 bool empty() const { return get_length() == 0; }
340 bool full() const { return get_length() == capacity; }
342 static constexpr size_t error_result = std::numeric_limits<size_t>::max();
346 Group Assigned Queue whose first element identifies first gap
347 in committed sequence. The head of the queue is therefore next to
348 the low-water-mark.
350class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
351 public:
352 bool inited;
354 /* master's Rot-ev exec */
355 void update_current_binlog(const char *post_rotate);
357 /*
358 The last checkpoint time Low-Water-Mark
359 */
362 /* last time processed indexes for each worker */
365 /* the being assigned group index in GAQ */
368 Slave_committed_queue(size_t max, uint n);
371 if (inited) {
373 free_dynamic_items(); // free possibly left allocated strings in GAQ list
374 }
375 }
377#ifndef NDEBUG
378 bool count_done(Relay_log_info *rli);
381 /* Checkpoint routine refreshes the queue */
383 /* Method is for slave shutdown time cleanup */
384 void free_dynamic_items();
385 /*
386 returns a pointer to Slave_job_group struct instance as indexed by arg
387 in the circular buffer dyn-array
388 */
390 assert(ind < capacity);
391 return &m_Q[ind];
392 }
394 /**
395 Assigns @c assigned_group_index to an index of enqueued item
396 and returns it.
397 */
398 size_t en_queue(Slave_job_group *item) {
399 return assigned_group_index =
401 }
403 /**
404 Dequeue from head.
406 @param [out] item A pointer to the being dequeued item.
407 @return true if an element was returned, false if the queue was empty.
408 */
411 }
413 /**
414 Similar to de_queue() but removing an item from the tail side.
416 @param [out] item A pointer to the being dequeued item.
417 @return true if an element was returned, false if the queue was empty.
418 */
421 }
423 size_t find_lwm(Slave_job_group **, size_t);
427 @return the index where the arg item has been located
428 or an error encoded as a value
429 `circular_buffer_queue::error_result`.
431template <typename Element_type>
433 if (full()) {
434 return error_result;
435 }
437 const auto ret = (avail++) % capacity;
438 m_Q[ret] = *item;
439 len++;
440 assert(len == avail - entry);
441 assert(entry < avail);
443 return ret;
447 Dequeue from head.
449 @param [out] item A pointer to the being dequeued item.
450 @return true if an element was returned, false if the queue was empty.
452template <typename Element_type>
454 if (empty()) {
455 return false;
456 }
457 *item = m_Q[entry++];
458 len--;
459 assert(len == avail - entry);
460 assert(entry <= avail);
462 // The start of the queue just have returned to the first index. Normalize
463 // indexes so they are small again.
464 if (entry == capacity) {
465 entry = 0;
466 avail -= capacity;
467 assert(avail < capacity);
468 assert(avail == len);
469 }
470 return true;
473template <typename Element_type>
475 if (empty()) {
476 return false;
477 }
479 assert(avail > entry);
480 *item = m_Q[(--avail) % capacity];
481 len--;
482 assert(len == avail - entry);
483 return true;
486class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
487 public:
489 /*
490 Coordinator marks with true, Worker signals back at queue back to
491 available
492 */
498 public:
501 PSI_mutex_key *param_key_info_run_lock,
502 PSI_mutex_key *param_key_info_data_lock,
503 PSI_mutex_key *param_key_info_sleep_lock,
504 PSI_mutex_key *param_key_info_thd_lock,
505 PSI_mutex_key *param_key_info_data_cond,
506 PSI_mutex_key *param_key_info_start_cond,
507 PSI_mutex_key *param_key_info_stop_cond,
508 PSI_mutex_key *param_key_info_sleep_cond,
510 uint param_id, const char *param_channel);
512 ~Slave_worker() override;
514 Slave_jobs_queue jobs; // assignment queue containing events to execute
515 mysql_mutex_t jobs_lock; // mutex for the jobs queue
516 mysql_cond_t jobs_cond; // condition variable for the jobs queue
517 Relay_log_info *c_rli; // pointer to Coordinator's rli
520 curr_group_exec_parts; // Current Group Executed Partitions
522#ifndef NDEBUG
523 bool curr_group_seen_sequence_number; // is set to true about starts_group()
525 ulong id; // numeric identifier of the Worker
527 /*
528 Worker runtime statistics
529 */
530 // the index in GAQ of the last processed group by this Worker
531 volatile ulong last_group_done_index;
533 last_groups_assigned_index; // index of previous group assigned to worker
534 ulong wq_empty_waits; // how many times got idle
535 ulong events_done; // how many events (statements) processed
536 ulong groups_done; // how many groups (transactions) processed
537 std::atomic<int> curr_jobs; // number of active assignments
538 // number of partitions allocated to the worker at point in time
540 // symmetric to rli->mts_end_group_sets_max_dbs
543 volatile bool relay_log_change_notified; // Coord sets and resets, W can read
544 volatile bool checkpoint_notified; // Coord sets and resets, W can read
545 volatile bool
546 master_log_change_notified; // Coord sets and resets, W can read
547 /*
548 The variable serves to Coordinator as a memo to itself
549 to notify a Worker about the fact that a new FD has been read.
550 Normally, the value is true, to mean the Worker is notified.
551 When Coordinator reads a new FD it changes the value to false.
552 When Coordinator schedules to a Worker the first event following the new FD,
553 it propagates the new FD to the Worker through
554 Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
555 to the regular true, to denote things done. Worker will adapt to the new FD
556 once it takes on a first event of the marked group.
557 */
559 ulong bitmap_shifted; // shift the last bitmap at receiving new CP
560 // WQ current excess above the overrun level
562 /*
563 number of events starting from which Worker queue is regarded as
564 close to full. The number of the excessive events yields a weight factor
565 to compute Coordinator's nap.
566 */
568 /*
569 reverse to overrun: the number of events below which Worker is
570 considered under-running
571 */
573 /*
574 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
575 When WQ length is dropped below overrun the counter is reset.
576 */
578 /*
579 Coordinates of the last CheckPoint (CP) this Worker has
580 acknowledged; part of is persistent data
581 */
586 MY_BITMAP group_executed; // bitmap describes groups executed after last CP
587 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
588 ulong
589 worker_checkpoint_seqno; // the most significant ON bit in group_executed
590 /* Initial value of FD-for-execution version until it's gets known. */
595 ERROR_LEAVING = 2, // is set by Worker
596 STOP = 3, // is set by Coordinator upon receiving STOP
598 4 // is set by worker upon completing job when STOP SLAVE is issued
599 };
601 /*
602 This function is used to make a copy of the worker object before we
603 destroy it on STOP SLAVE. This new object is then used to report the
604 worker status until next START SLAVE following which the new worker objects
605 will be used.
606 */
608 THD *worker_thd, const Error &last_error,
609 Gtid_monitoring_info *monitoring_info_arg);
611 /*
612 The running status is guarded by jobs_lock mutex that a writer
613 Coordinator or Worker itself needs to hold when write a new value.
614 */
616 /*
617 exit_incremented indicates whether worker has contributed to max updated
618 index. By default it is set to false. When the worker contributes for the
619 first time this variable is set to true.
620 */
623 int init_worker(Relay_log_info *, ulong);
624 int rli_init_info(bool);
625 int flush_info(bool force = false);
626 static size_t get_number_worker_fields();
627 /**
628 Sets bits for columns that are allowed to be `NULL`.
630 @param nullable_fields the bitmap to hold the nullable fields.
631 */
632 static void set_nullable_fields(MY_BITMAP *nullable_fields);
634 const char *get_master_log_name();
636 ulonglong set_master_log_pos(ulong val) { return master_log_pos = val; }
637 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
638 /**
639 The method is a wrapper to provide uniform interface with STS and is
640 to be called from Relay_log_info and Slave_worker pre_commit() methods.
641 */
642 bool commit_positions() override {
643 assert(current_event);
645 return commit_positions(
648 }
649 /**
650 See the comments for STS version of this method.
651 */
652 void post_commit(bool on_rollback) override {
653 if (on_rollback) {
654 if (is_transactional())
657 } else if (!is_transactional())
660 true);
661 }
662 /*
663 When commit fails clear bitmap for executed worker group. Revert back the
664 positions to the old positions that existed before commit using the
665 checkpoint.
667 @param Slave_job_group a pointer to Slave_job_group struct instance which
668 holds group master log pos, group relay log pos and checkpoint positions.
669 */
671 bool reset_recovery_info();
672 /**
673 The method runs at Worker initialization, at runtime when
674 Coordinator supplied a new FD event for execution context, and at
675 the Worker pool shutdown.
676 Similarly to the Coordinator's
677 Relay_log_info::set_rli_description_event() the possibly existing
678 old FD is destroyed, carefully; each worker decrements
679 Format_description_log_event::atomic_usage_counter and when it is made
680 zero the destructor runs.
681 Unlike to Coordinator's role, the usage counter of the new FD is *not*
682 incremented, see @c Log_event::get_slave_worker() where and why it's done
683 there.
685 Notice, the method is run as well by Coordinator per each Worker at MTS
686 shutdown time.
688 Todo: consider to merge logics of the method with that of
689 Relay_log_info class.
691 @param fdle pointer to a new Format_description_log_event
693 @return 1 if an error was encountered, 0 otherwise.
694 */
698 if (fdle) {
699 /*
700 When the master rotates its binary log, set gtid_next to
701 NOT_YET_DETERMINED. This tells the slave thread that:
703 - If a Gtid_log_event is read subsequently, gtid_next will be set to the
704 given GTID (this is done in gtid_pre_statement_checks()).
706 - If a statement is executed before any Gtid_log_event, then gtid_next
707 is set to anonymous (this is done in Gtid_log_event::do_apply_event().
709 It is important to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
710 a transaction. If that would happen when GTID_MODE=ON, the next
711 statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
712 which is disallowed when GTID_MODE=ON. So then there would be no way to
713 end the transaction; any attempt to do so would result in this error.
715 There are three possible states when reaching this execution flow point
716 (see further below for a more detailed explanation on each):
718 - **No active transaction, and not in a group**: set `gtid_next` to
721 - **No active transaction, and in a group**: do nothing regarding
722 `gtid_next`.
724 - **An active transaction exists**: impossible to set `gtid_next` and no
725 reason to process the `Format_description` event so, trigger an error.
727 For the sake of correctness, let's defined the meaning of having a
728 transaction "active" or "in a group".
730 A transaction is "active" if either BEGIN was executed or autocommit=0
731 and a DML statement was executed (@see
732 THD::in_active_multi_stmt_transaction).
734 A transaction is "in a group" if it is applied by the replication
735 applier, and the relay log position is between Gtid_log_event and the
736 committing event (@see Relay_log_info::is_in_group).
738 The three different states explained further:
740 **No active transaction, and not in a group**: It is normal to have
741 gtid_next=automatic/undefined and have a Format_description_log_event in
742 this condition. We are outside transaction context and should set
743 gtid_next to not_yet_determined.
745 **No active transaction, and in a group**: Having
746 gtid_next=automatic/undefined in a group is impossible if master is 5.7
747 or later, because the group always starts with a Gtid_log_event or an
748 Anonymous_gtid_log_event, which will set gtid_next to anonymous or
749 gtid. But it is possible to have gtid_next=undefined when replicating
750 from a 5.6 master with gtid_mode=off, because it does not generate any
751 such event. And then, it is possible to have no active transaction in a
752 group if the master has logged a DDL as a User_var_log_event followed by
753 a Query_log_event. The User_var_log_event will start a group, but not
754 start an active transaction or change gtid_next. In this case, it is
755 possible that a Format_description_log_event occurs, if the group
756 (transaction) is broken on two relay logs, so that User_var_log_event
757 appears at the end of one relay log and Query_log_event at the beginning
758 of the next one. In such cases, we should not set gtid_next.
760 **An active transaction exists**: It is possible to have
761 gtid_next=automatic/undefined in an active transaction, only if
762 gtid_next=automatic, which is only possible in a client connection using
763 gtid_next=automatic. In this scenario, there is no reason to execute a
764 Format_description_log_event. So we generate an error.
765 */
766 if (info_thd->variables.gtid_next.is_automatic() ||
767 info_thd->variables.gtid_next.is_undefined()) {
768 bool in_active_multi_stmt =
771 if (!is_in_group() && !in_active_multi_stmt) {
772 DBUG_PRINT("info",
773 ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
774 info_thd->variables.gtid_next.set_not_yet_determined();
775 } else if (in_active_multi_stmt) {
777 "gtid_next");
778 return 1;
779 }
780 }
783 }
788 /* The being deleted by Worker FD can't be the latest one */
792 }
793 }
796 return 0;
797 }
800 inline void set_gaq_index(ulong val) {
801 if (gaq_index == c_rli->gaq->capacity) gaq_index = val;
802 }
806 /**
807 Make the necessary changes to both the `Slave_worker` and current
808 `Log_event` objects, before retrying to apply the transaction.
810 Since the event is going to be re-read from the relay-log file, there
811 may be actions needed to be taken to reset the state both of `this`
812 instance, as well as of the current `Log_event` being processed.
814 @param event The `Log_event` object currently being processed.
815 */
818 /**
819 Checks if the transaction can be retried, and if not, reports an error.
821 @param[in] thd The THD object of current thread.
823 @returns std::tuple<bool, bool, uint> where each element has
824 following meaning:
826 first element of tuple is function return value and determines:
827 false if the transaction should be retried
828 true if the transaction should not be retried
830 second element of tuple determines:
831 the function will set the value to true, in case the retry
832 should be "silent". Silent means that the caller should not
833 report it in performance_schema tables, write to the error log,
834 or sleep. Currently, silent is used by NDB only.
836 third element of tuple determines:
837 If the caller should report any other error than that stored in
838 thd->get_stmt_da()->mysql_errno(), then this function will store
839 that error in this third element of the tuple.
841 */
842 std::tuple<bool, bool, uint> check_and_report_end_of_retries(THD *thd);
844 /**
845 It is called after an error happens. It checks if that is an temporary
846 error and if the transaction should be retried. Then it will retry the
847 transaction if it is allowed. Retry policy and logic is similar to
848 single-threaded slave.
850 @param[in] start_relay_number The extension number of the relay log which
851 includes the first event of the transaction.
852 @param[in] start_relay_pos The offset of the transaction's first event.
854 @param[in] end_relay_number The extension number of the relay log which
855 includes the last event it should retry.
856 @param[in] end_relay_pos The offset of the last event it should retry.
858 @retval false if transaction succeeds (possibly after a number of retries)
859 @retval true if transaction fails
860 */
861 bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
862 uint end_relay_number, my_off_t end_relay_pos);
864 bool set_info_search_keys(Rpl_info_handler *to) override;
866 /**
867 Get coordinator's RLI. Especially used get the rli from
868 a slave thread, like this: thd->rli_slave->get_c_rli();
869 thd could be a SQL thread or a worker thread.
870 */
871 Relay_log_info *get_c_rli() override { return c_rli; }
873 /**
874 return an extension "for channel channel_name"
875 for error messages per channel
876 */
877 const char *get_for_channel_str(bool upper_case = false) const override;
881 return ptr_g->sequence_number;
882 }
884 /**
885 Return true if replica-preserve-commit-order is enabled and an
886 earlier transaction is waiting for a row-level lock held by this
887 transaction.
888 */
891 /**
892 Called when replica-preserve-commit-order is enabled, by the worker
893 processing an earlier transaction that waits on a row-level lock
894 held by this worker's transaction.
895 */
898 /**
899 @return either the master server version as extracted from the last
900 installed Format_description_log_event, or when it was not
901 installed then the slave own server version.
902 */
907 }
909 protected:
910 void do_report(loglevel level, int err_code, const char *msg,
911 va_list v_args) const override
912 MY_ATTRIBUTE((format(printf, 4, 0)));
914 void do_report(loglevel level, int err_code,
915 const Gtid_specification *gtid_next, const char *msg,
916 va_list v_args) const override
917 MY_ATTRIBUTE((format(printf, 5, 0)));
919 private:
920 ulong gaq_index; // GAQ index of the current assignment
921 ulonglong master_log_pos; // event's cached log_pos for possible error report
922 void end_info();
923 bool read_info(Rpl_info_handler *from) override;
924 bool write_info(Rpl_info_handler *to) override;
925 std::atomic<bool> m_commit_order_deadlock;
927 /// This flag indicates whether positions were already modified during the
928 /// event processing, if yes, positions are not updated in the
929 /// slave_worker_ends_group function
934 bool worker_sleep(ulong seconds);
935 bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
936 uint end_relay_number, my_off_t end_relay_pos);
941 public:
942 /**
943 Returns an array with the expected column numbers of the primary key
944 fields of the table repository.
945 */
946 static const uint *get_table_pk_field_indexes();
947 /**
948 Returns the index of the Channel_name field of the table repository.
949 */
950 static uint get_channel_field_index();
955 Slave_job_item *job_item);
959// Auxiliary function
963 Relay_log_info *rli);
965inline Slave_worker *get_thd_worker(const THD *thd) {
966 return static_cast<Slave_worker *>(thd->rli_slave);
Contains the classes representing events occurring in the replication stream.
Class representing an error.
Definition: error.h:47
For binlog version 4.
Definition: log_event.h:1524
std::atomic< int32 > atomic_usage_counter
Definition: log_event.h:1542
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1411
This is the abstract base class for binary log events.
Definition: log_event.h:537
ulong mts_group_idx
Index in rli->gaq array to indicate a group that this event is purging.
Definition: log_event.h:692
Definition: rpl_rli.h:202
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1760
bool is_in_group() const
A group is defined as the entire range of events that constitute a transaction or auto-committed stat...
Definition: rpl_rli.h:1532
Log_event * current_event
Reference to being applied event.
Definition: rpl_rli.h:2038
Slave_committed_queue * gaq
Definition: rpl_rli.h:1199
ulong adapt_to_master_version_updown(ulong master_version, ulong current_version)
The method compares two supplied versions and carries out down- or up- grade customization of executi...
Format_description_log_event * rli_description_event
Definition: rpl_rli.h:1836
Definition: rpl_info_handler.h:57
bool is_transactional() const
Definition: rpl_info.h:106
THD * info_thd
Definition: rpl_info.h:77
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:350
void free_dynamic_items()
Method should be executed at slave system stop to cleanup dynamically allocated items that remained a...
bool de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:409
Slave_committed_queue(size_t max, uint n)
bool inited
Definition: rpl_rli_pdb.h:352
bool count_done(Relay_log_info *rli)
ulong assigned_group_index
Definition: rpl_rli_pdb.h:366
bool de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:419
size_t move_queue_head(Slave_worker_array *ws)
The queue is processed from the head item by item to purge items representing committed groups.
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:363
Slave_job_group * get_job_group(size_t ind)
Definition: rpl_rli_pdb.h:389
Definition: rpl_rli_pdb.h:370
void update_current_binlog(const char *post_rotate)
size_t find_lwm(Slave_job_group **, size_t)
Finds low-water mark of committed jobs in GAQ.
size_t en_queue(Slave_job_group *item)
Assigns assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:398
Slave_job_group lwm
Definition: rpl_rli_pdb.h:360
Definition: rpl_rli_pdb.h:486
Definition: rpl_rli_pdb.h:488
bool overfill
Definition: rpl_rli_pdb.h:493
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:494
Error const & last_error() const
Definition: rpl_reporting.h:142
Definition: rpl_rli_pdb.h:497
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:635
ulong underrun_level
Definition: rpl_rli_pdb.h:572
char checkpoint_relay_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:582
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:520
ulong excess_cnt
Definition: rpl_rli_pdb.h:577
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:523
bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
Read events from relay logs and apply them.
void reset_gaq_index()
Definition: rpl_rli_pdb.h:799
ulong overrun_level
Definition: rpl_rli_pdb.h:567
bool commit_positions() override
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:642
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:559
longlong sequence_number()
Definition: rpl_rli_pdb.h:879
void reset_commit_order_deadlock()
int flush_info(bool force=false)
int set_rli_description_event(Format_description_log_event *fdle) override
The method runs at Worker initialization, at runtime when Coordinator supplied a new FD event for exe...
Definition: rpl_rli_pdb.h:695
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:585
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:514
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:543
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:587
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:589
bool worker_sleep(ulong seconds)
Sleep for a given amount of seconds or until killed.
void copy_values_for_PFS(ulong worker_id, en_running_state running_status, THD *worker_thd, const Error &last_error, Gtid_monitoring_info *monitoring_info_arg)
std::tuple< bool, bool, uint > check_and_report_end_of_retries(THD *thd)
Checks if the transaction can be retried, and if not, reports an error.
ulong gaq_index
Definition: rpl_rli_pdb.h:920
void post_commit(bool on_rollback) override
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:652
void do_report(loglevel level, int err_code, const char *msg, va_list v_args) const override
const char * get_for_channel_str(bool upper_case=false) const override
return an extension "for channel channel_name" for error messages per channel
void report_commit_order_deadlock()
Called when replica-preserve-commit-order is enabled, by the worker processing an earlier transaction...
Slave_worker(const Slave_worker &info)
std::atomic< int > curr_jobs
Definition: rpl_rli_pdb.h:537
Slave_worker(Relay_log_info *rli, PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond, uint param_id, const char *param_channel)
bool write_info(Rpl_info_handler *to) override
int init_worker(Relay_log_info *, ulong)
Method is executed by Coordinator at Worker startup time to initialize members parly with values supp...
static uint get_channel_field_index()
Returns the index of the Channel_name field of the table repository.
bool read_info(Rpl_info_handler *from) override
void assign_partition_db(Log_event *ev)
const char * get_master_log_name()
static void set_nullable_fields(MY_BITMAP *nullable_fields)
Sets bits for columns that are allowed to be NULL.
bool set_info_search_keys(Rpl_info_handler *to) override
To search in the slave repositories, each slave info object (mi, rli or worker) should use a primary ...
ulong server_version
Definition: rpl_rli_pdb.h:591
ulong id
Definition: rpl_rli_pdb.h:525
void prepare_for_retry(Log_event &event)
Make the necessary changes to both the Slave_worker and current Log_event objects,...
Slave_worker & operator=(const Slave_worker &info)
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:921
int slave_worker_exec_event(Log_event *ev)
MTS worker main routine.
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:903
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:541
ulong events_done
Definition: rpl_rli_pdb.h:535
bool exit_incremented
Definition: rpl_rli_pdb.h:621
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:531
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:517
bool found_commit_order_deadlock()
Return true if replica-preserve-commit-order is enabled and an earlier transaction is waiting for a r...
Relay_log_info * get_c_rli() override
Get coordinator's RLI.
Definition: rpl_rli_pdb.h:871
bool reset_recovery_info()
Clean up a part of Worker info table that is regarded in in gaps collecting at recovery.
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:515
Definition: rpl_rli_pdb.h:592
Definition: rpl_rli_pdb.h:593
Definition: rpl_rli_pdb.h:597
Definition: rpl_rli_pdb.h:594
Definition: rpl_rli_pdb.h:596
Definition: rpl_rli_pdb.h:595
void rollback_positions(Slave_job_group *ptr_g)
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:561
static const uint * get_table_pk_field_indexes()
Returns an array with the expected column numbers of the primary key fields of the table repository.
void slave_worker_ends_group(Log_event *, int)
Deallocation routine to cancel out few effects of map_db_to_worker().
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:636
bool m_flag_positions_committed
This flag indicates whether positions were already modified during the event processing,...
Definition: rpl_rli_pdb.h:930
std::atomic< bool > m_commit_order_deadlock
Definition: rpl_rli_pdb.h:925
bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos, uint end_relay_number, my_off_t end_relay_pos)
It is called after an error happens.
long usage_partition
Definition: rpl_rli_pdb.h:539
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:800
char checkpoint_master_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:584
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:533
void end_info()
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:615
ulong wq_empty_waits
Definition: rpl_rli_pdb.h:534
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:583
bool fd_change_notified
Definition: rpl_rli_pdb.h:558
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:544
ulong groups_done
Definition: rpl_rli_pdb.h:536
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:516
static size_t get_number_worker_fields()
~Slave_worker() override
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:586
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:546
int rli_init_info(bool)
A part of Slave worker initializer that provides a minimum context for MTS recovery.
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:35
Relay_log_info * rli_slave
Definition: sql_class.h:1052
System_variables variables
Definition: sql_lexer_thd.h:63
bool in_active_multi_stmt_transaction() const
true if the session is in a multi-statement transaction mode (
Definition: sql_class.h:3175
The class defines a type of queue with a predefined max capacity that is implemented using the circul...
Definition: rpl_rli_pdb.h:266
bool in(size_t i)
Definition: rpl_rli_pdb.h:334
bool empty() const
Definition: rpl_rli_pdb.h:339
static constexpr size_t error_result
Definition: rpl_rli_pdb.h:342
std::atomic< size_t > len
Actual length.
Definition: rpl_rli_pdb.h:288
size_t en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value circular_buffer_queue::err...
Definition: rpl_rli_pdb.h:432
size_t capacity
The capacity and maximum length of the queue in terms of element.
Definition: rpl_rli_pdb.h:272
size_t get_length() const
Definition: rpl_rli_pdb.h:338
Definition: rpl_rli_pdb.h:301
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:328
size_t avail
Its value modulo capacity is index of the element where the next element will be enqueued.
Definition: rpl_rli_pdb.h:278
circular_buffer_queue(size_t max)
Definition: rpl_rli_pdb.h:291
bool de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:453
size_t entry
The head index of the queue.
Definition: rpl_rli_pdb.h:284
bool full() const
Definition: rpl_rli_pdb.h:340
bool inited_queue
Definition: rpl_rli_pdb.h:289
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:268
bool de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:474
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance.
Definition: control_events.cpp:165
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:157
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: rpl_replica.h:95
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:51
static constexpr unsigned PSI_INSTRUMENT_ME
Definition: psi_bits.h:42
Binary log event definitions.
Header for compiler-dependent features.
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:180
#define DBUG_TRACE
Definition: my_dbug.h:145
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:55
ulonglong my_off_t
Definition: my_inttypes.h:71
long long int longlong
Definition: my_inttypes.h:54
#define MYF(v)
Definition: my_inttypes.h:96
Common #defines and includes for file and socket I/O.
#define FN_REFLEN
Definition: my_io.h:82
Definition of the global "loglevel" enumeration.
Definition: my_loglevel.h:40
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
Definition: my_psi_config.h:38
Instrumentation helpers for conditions.
ABI for instrumented mutexes.
struct MasterPos master_pos
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
bool empty(const Histogram &histogram)
Return true if 'histogram' was built on an empty table.
Definition: histogram.h:672
Performance schema instrumentation interface.
Instrumentation helpers for mutexes.
required string event
Definition: replication_group_member_actions.proto:31
Definition: rpl_mta_submode.h:46
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
bool init_hash_workers(Relay_log_info *rli)
void destroy_hash_workers(Relay_log_info *)
Slave_worker * get_thd_worker(const THD *thd)
Definition: rpl_rli_pdb.h:965
ulong w_rr
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry's table list head.
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
This struct represents a specification of a GTID for a statement to be executed: either "AUTOMATIC",...
Definition: rpl_gtid.h:3971
Definition: my_bitmap.h:42
Definition: rpl_rli_pdb.h:109
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:188
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:191
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:230
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:116
bool notified
Definition: rpl_rli_pdb.h:200
time_t ts
Definition: rpl_rli_pdb.h:198
longlong last_committed
Definition: rpl_rli_pdb.h:203
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:196
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:141
Slave_worker * worker
Definition: rpl_rli_pdb.h:185
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:194
ulong worker_id
Definition: rpl_rli_pdb.h:184
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:190
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:235
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:171
char * group_master_log_name
Definition: rpl_rli_pdb.h:167
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:192
ulong shifted
Definition: rpl_rli_pdb.h:197
longlong sequence_number
Definition: rpl_rli_pdb.h:204
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:195
char * group_relay_log_name
Definition: rpl_rli_pdb.h:182
ulonglong total_seqno
Definition: rpl_rli_pdb.h:186
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:183
Definition: table.h:1403
Definition: completion_hash.h:34
Legends running throughout the module:
Definition: rpl_rli_pdb.h:73
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:89
uint db_len
Definition: rpl_rli_pdb.h:74
long usage
Definition: rpl_rli_pdb.h:81
const char * db
Definition: rpl_rli_pdb.h:75
Slave_worker * worker
Definition: rpl_rli_pdb.h:76
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
Definition: rpl_rli.h:81
double seconds()
Include file for Sun RPC to compile out of the box.
int n