MySQL 9.1.0
Source Code Documentation
rpl_rli_pdb.h
Go to the documentation of this file.
1/* Copyright (c) 2011, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef RPL_RLI_PDB_H
25#define RPL_RLI_PDB_H
26
27#include <stdarg.h>
28#include <sys/types.h>
29#include <time.h>
30#include <atomic>
31#include <tuple>
32
33#include "my_bitmap.h"
34#include "my_compiler.h"
35#include "my_dbug.h"
36#include "my_inttypes.h"
37#include "my_io.h"
38#include "my_psi_config.h"
44#include "mysql/my_loglevel.h"
46#include "prealloced_array.h" // Prealloced_array
50#include "sql/log_event.h" // Format_description_log_event
51#include "sql/rpl_gtid.h"
52#include "sql/rpl_mta_submode.h" // enum_mts_parallel_type
53#include "sql/rpl_replica.h" // MTS_WORKER_UNDEF
54#include "sql/rpl_rli.h" // Relay_log_info
55#include "sql/sql_class.h"
57
59class Slave_worker;
60struct TABLE;
61
62#ifndef NDEBUG
63extern ulong w_rr;
64#endif
65/**
66 Legends running throughout the module:
67
68 C - Coordinator
69 CP - checkpoint
70 W - Worker
71
72 B-event event that Begins a group (a transaction)
73 T-event event that Terminates a group (a transaction)
74*/
75
76/* Assigned Partition Hash (APH) entry */
78 uint db_len;
79 const char *db;
81 /*
82 The number of transaction pending on this database.
83 This should only be modified under the lock slave_worker_hash_lock.
84 */
85 long usage;
86 /*
87 The list of temp tables belonging to @ db database is
88 attached to an assigned @c worker to become its thd->temporary_tables.
89 The list is updated with every ddl incl CREATE, DROP.
90 It is removed from the entry and merged to the coordinator's
91 thd->temporary_tables in case of events: slave stops, APH oversize.
92 */
94
95 /* todo: relax concurrency to mimic record-level locking.
96 That is to augmenting the entry with mutex/cond pair
97 pthread_mutex_t
98 pthread_cond_t
99 timestamp updated_at; */
100};
101
104Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
105 db_worker_hash_entry **ptr_entry,
106 bool need_temp_tables, Slave_worker *w);
108 Slave_worker_array *workers,
109 Log_event *ev);
110
111#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
112
114 Slave_job_group() = default;
115
116 /*
117 We need a custom copy constructor and assign operator because std::atomic<T>
118 is not copy-constructible.
119 */
125 worker_id(other.worker_id),
126 worker(other.worker),
134 done(other.done.load()),
135 shifted(other.shifted),
136 ts(other.ts),
137#ifndef NDEBUG
138 notified(other.notified),
139#endif
143 }
144
150 worker_id = other.worker_id;
151 worker = other.worker;
152 total_seqno = other.total_seqno;
159 done.store(other.done.load());
160 shifted = other.shifted;
161 ts = other.ts;
162#ifndef NDEBUG
163 notified = other.notified;
164#endif
168 return *this;
169 }
170
171 char *group_master_log_name; // (actually redundant)
172 /*
173 T-event lop_pos filled by Worker for CheckPoint (CP)
174 */
176
177 /*
178 When relay-log name changes allocates and fill in a new name of relay-log,
179 otherwise it fills in NULL.
180 Coordinator keeps track of each Worker has been notified on the updating
181 to make sure the routine runs once per change.
182
183 W checks the value at commit and memorizes a not-NULL.
184 Freeing unless NULL is left to Coordinator at CP.
185 */
186 char *group_relay_log_name; // The value is last seen relay-log
191
192 my_off_t master_log_pos; // B-event log_pos
193 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
195 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
198 checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
200 std::atomic<int32> done; // Flag raised by W, read and reset by Coordinator
201 ulong shifted; // shift the last CP bitmap at receiving a new CP
202 time_t ts; // Group's timestamp to update Seconds_behind_source
203#ifndef NDEBUG
204 bool notified{false}; // to debug group_master_log_name change notification
205#endif
206 /* Clock-based scheduler requirement: */
207 longlong last_committed; // commit parent timestamp
208 longlong sequence_number; // transaction's logical timestamp
209 /*
210 After Coordinator has seen a new FD event, it sets this member to
211 point to the new event, once per worker. Coordinator does so
212 when it schedules a first group following the FD event to a worker.
213 It checks Slave_worker::fd_change_notified flag to decide whether
214 to do this or not.
215 When the worker executes the group, it replaces its currently
216 active FD by the new FD once it takes on the group first event. It
217 checks this member and resets it after the FD replacement is done.
218
219 The member is kind of lock-free. It's updated by Coordinator and
220 read by Worker without holding any mutex. That's still safe thanks
221 to Slave_worker::jobs_lock that works as synchronizer, Worker
222 can't read any stale info.
223 The member is updated by Coordinator when it decides which Worker
224 an event following a new FD is to be scheduled.
225 After Coordinator has chosen a Worker, it queues the event to it
226 with necessarily taking Slave_worker::jobs_lock. The Worker grabs
227 the mutex lock later at pulling the event from the queue and
228 releases the lock before to read from this member.
229
230 This sequence of actions shows the write operation always precedes
231 the read one, and ensures no stale FD info is passed to the
232 Worker.
233 */
235 /*
236 Coordinator fills the struct with defaults and options at starting of
237 a group distribution.
238 */
242 group_master_log_name = nullptr; // todo: remove
243 group_relay_log_name = nullptr;
245 total_seqno = seqno;
246 checkpoint_log_name = nullptr;
250 checkpoint_seqno = (uint)-1;
251 done = 0;
252 ts = 0;
253#ifndef NDEBUG
254 notified = false;
255#endif
258 new_fd_event = nullptr;
259 }
260};
261
262/**
263 The class defines a type of queue with a predefined max capacity that is
264 implemented using the circular memory buffer.
265 That is items of the queue are accessed as indexed elements of
266 the array buffer in a way that when the index value reaches
267 a max value it wraps around to point to the first buffer element.
268*/
269template <typename Element_type>
271 public:
273 /**
274 The capacity and maximum length of the queue in terms of element.
275 */
276 size_t capacity;
277 /**
278 Its value modulo `capacity` is index of the element where the next element
279 will be enqueued. It's entry+length. It may be bigger than capacity, but
280 will be smaller than 2*capacity.
281 */
282 size_t avail;
283 /**
284 The head index of the queue. It is an index of next element that will be
285 dequeued. It is less than capacity, so it is an actual index (in contrast
286 to `avail`), don't need to be calculated modulo `capacity`.
287 */
288 size_t entry;
289 /**
290 Actual length. It can be read while not protected by any mutex.
291 */
292 std::atomic<size_t> len;
294
297 capacity(max),
298 avail(0),
299 entry(0),
300 len(0),
301 inited_queue(false) {
302 if (!m_Q.reserve(capacity)) inited_queue = true;
303 m_Q.resize(capacity);
304 }
307
308 /**
309 Content of the being dequeued item is copied to the arg-pointer
310 location.
311
312 @param [out] item A pointer to the being dequeued item.
313 @return true if an element was returned, false if the queue was empty.
314 */
315 bool de_queue(Element_type *item);
316 /**
317 Similar to de_queue but extracting happens from the tail side.
318
319 @param [out] item A pointer to the being dequeued item.
320 @return true if an element was returned, false if the queue was empty.
321 */
322 bool de_tail(Element_type *item);
323
324 /**
325 return the index where the arg item locates
326 or an error encoded as a value `circular_buffer_queue::error_result`.
327 */
328 size_t en_queue(Element_type *item);
329 /**
330 return the value of @c data member of the head of the queue.
331 */
332 Element_type *head_queue() {
333 if (empty()) return nullptr;
334 return &m_Q[entry];
335 }
336
337 /* index is within the valid range */
338 bool in(size_t i) {
339 return (avail >= capacity) ? (entry <= i || i < avail - capacity)
340 : (entry <= i && i < avail);
341 }
342 size_t get_length() const { return len.load(std::memory_order_relaxed); }
343 bool empty() const { return get_length() == 0; }
344 bool full() const { return get_length() == capacity; }
345
346 static constexpr size_t error_result = std::numeric_limits<size_t>::max();
347};
348
349/**
350 Group Assigned Queue whose first element identifies first gap
351 in committed sequence. The head of the queue is therefore next to
352 the low-water-mark.
353*/
354class Slave_committed_queue : public circular_buffer_queue<Slave_job_group> {
355 public:
356 bool inited;
357
358 /* master's Rot-ev exec */
359 void update_current_binlog(const char *post_rotate);
360
361 /*
362 The last checkpoint time Low-Water-Mark
363 */
365
366 /* last time processed indexes for each worker */
368
369 /* the being assigned group index in GAQ */
371
372 Slave_committed_queue(size_t max, uint n);
373
375 if (inited) {
377 free_dynamic_items(); // free possibly left allocated strings in GAQ list
378 }
379 }
380
381#ifndef NDEBUG
382 bool count_done(Relay_log_info *rli);
383#endif
384
385 /* Checkpoint routine refreshes the queue */
387 /* Method is for slave shutdown time cleanup */
388 void free_dynamic_items();
389 /*
390 returns a pointer to Slave_job_group struct instance as indexed by arg
391 in the circular buffer dyn-array
392 */
394 assert(ind < capacity);
395 return &m_Q[ind];
396 }
397
398 /**
399 Assigns @c assigned_group_index to an index of enqueued item
400 and returns it.
401 */
402 size_t en_queue(Slave_job_group *item) {
403 return assigned_group_index =
405 }
406
407 /**
408 Dequeue from head.
409
410 @param [out] item A pointer to the being dequeued item.
411 @return true if an element was returned, false if the queue was empty.
412 */
415 }
416
417 /**
418 Similar to de_queue() but removing an item from the tail side.
419
420 @param [out] item A pointer to the being dequeued item.
421 @return true if an element was returned, false if the queue was empty.
422 */
425 }
426
427 size_t find_lwm(Slave_job_group **, size_t);
428};
429
430/**
431 @return the index where the arg item has been located
432 or an error encoded as a value
433 `circular_buffer_queue::error_result`.
434*/
435template <typename Element_type>
437 if (full()) {
438 return error_result;
439 }
440
441 const auto ret = (avail++) % capacity;
442 m_Q[ret] = *item;
443 len++;
444 assert(len == avail - entry);
445 assert(entry < avail);
446
447 return ret;
448}
449
450/**
451 Dequeue from head.
452
453 @param [out] item A pointer to the being dequeued item.
454 @return true if an element was returned, false if the queue was empty.
455*/
456template <typename Element_type>
458 if (empty()) {
459 return false;
460 }
461 *item = m_Q[entry++];
462 len--;
463 assert(len == avail - entry);
464 assert(entry <= avail);
465
466 // The start of the queue just have returned to the first index. Normalize
467 // indexes so they are small again.
468 if (entry == capacity) {
469 entry = 0;
470 avail -= capacity;
471 assert(avail < capacity);
472 assert(avail == len);
473 }
474 return true;
475}
476
477template <typename Element_type>
479 if (empty()) {
480 return false;
481 }
482
483 assert(avail > entry);
484 *item = m_Q[(--avail) % capacity];
485 len--;
486 assert(len == avail - entry);
487 return true;
488}
489
490class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item> {
491 public:
493 /*
494 Coordinator marks with true, Worker signals back at queue back to
495 available
496 */
499};
500
502 public:
505 PSI_mutex_key *param_key_info_run_lock,
506 PSI_mutex_key *param_key_info_data_lock,
507 PSI_mutex_key *param_key_info_sleep_lock,
508 PSI_mutex_key *param_key_info_thd_lock,
509 PSI_mutex_key *param_key_info_data_cond,
510 PSI_mutex_key *param_key_info_start_cond,
511 PSI_mutex_key *param_key_info_stop_cond,
512 PSI_mutex_key *param_key_info_sleep_cond,
513#endif
514 uint param_id, const char *param_channel);
515
516 ~Slave_worker() override;
517
518 Slave_jobs_queue jobs; // assignment queue containing events to execute
519 mysql_mutex_t jobs_lock; // mutex for the jobs queue
520 mysql_cond_t jobs_cond; // condition variable for the jobs queue
521 Relay_log_info *c_rli; // pointer to Coordinator's rli
522
524 curr_group_exec_parts; // Current Group Executed Partitions
525
526#ifndef NDEBUG
527 bool curr_group_seen_sequence_number; // is set to true about starts_group()
528#endif
529 ulong id; // numeric identifier of the Worker
530
531 /*
532 Worker runtime statistics
533 */
534 /// Number of transaction handled - incremented at slave_worker_ends_group
536
537 // the index in GAQ of the last processed group by this Worker
538 volatile ulong last_group_done_index;
540 last_groups_assigned_index; // index of previous group assigned to worker
541 std::atomic<int> curr_jobs; // number of active assignments
542 // number of partitions allocated to the worker at point in time
544 // symmetric to rli->mts_end_group_sets_max_dbs
546
547 volatile bool relay_log_change_notified; // Coord sets and resets, W can read
548 volatile bool checkpoint_notified; // Coord sets and resets, W can read
549 volatile bool
550 master_log_change_notified; // Coord sets and resets, W can read
551 /*
552 The variable serves to Coordinator as a memo to itself
553 to notify a Worker about the fact that a new FD has been read.
554 Normally, the value is true, to mean the Worker is notified.
555 When Coordinator reads a new FD it changes the value to false.
556 When Coordinator schedules to a Worker the first event following the new FD,
557 it propagates the new FD to the Worker through
558 Slave_job_group::new_fd_event. Afterwards Coordinator returns the value back
559 to the regular true, to denote things done. Worker will adapt to the new FD
560 once it takes on a first event of the marked group.
561 */
563 ulong bitmap_shifted; // shift the last bitmap at receiving new CP
564
565 private:
566 /// @brief worker statistics
568 /// @brief Placehold for stats when metric collection is disabled
570
571 /// @brief Is worker metric collection enabled
573
574 public:
575 /// @brief Sets the metric collection as on or off
576 /// This should be done at the worker start
577 /// @param status if metrics are enabled or not
580 }
581
582 /// @brief gets a reference to the worker statistics.
583 /// @return a reference to the worker statistics.
585
586 /// @brief Copies data and sets the metric collection flag
587 /// @param other the instance to be copied
592 }
593
594 /// The number of events applied in an ongoing transaction, used to collect
595 /// statistics when the transaction ends.
597
598 /// True if this transaction occurred after the _metrics breakpoint_ in the
599 /// relay log.
600 ///
601 /// @see Applier_metrics_interface::is_received_initialized.
603
604 /// Update per-event worker metrics.
605 ///
606 /// This includes:
607 ///
608 /// - APPLYING_TRANSACTION_APPLIED_SIZE_BYTES
609 ///
610 /// - The the number of events in the transaction. This is an internal
611 /// counter, not directly user visible, but used to increment
612 /// EVENTS_COMMITTED_COUNT at commit time. This is set to 1 for the GTID
613 /// event, and incremented for all other events. It does not need to be reset
614 /// at rollback, since the value is only used at commits, and the next GTID
615 /// event will reset the value before the next commit.
618 event.common_header->data_written);
620 event.get_type_code()))
622 else
624 }
625
626 // WQ current excess above the overrun level
628 /*
629 number of events starting from which Worker queue is regarded as
630 close to full. The number of the excessive events yields a weight factor
631 to compute Coordinator's nap.
632 */
634 /*
635 reverse to overrun: the number of events below which Worker is
636 considered under-running
637 */
639 /*
640 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
641 When WQ length is dropped below overrun the counter is reset.
642 */
644 /*
645 Coordinates of the last CheckPoint (CP) this Worker has
646 acknowledged; part of is persistent data
647 */
652 MY_BITMAP group_executed; // bitmap describes groups executed after last CP
653 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
654 ulong
655 worker_checkpoint_seqno; // the most significant ON bit in group_executed
656 /* Initial value of FD-for-execution version until it's gets known. */
661 ERROR_LEAVING = 2, // is set by Worker
662 STOP = 3, // is set by Coordinator upon receiving STOP
664 4 // is set by worker upon completing job when STOP REPLICA is issued
665 };
666
667 /*
668 This function is used to make a copy of the worker object before we
669 destroy it on STOP REPLICA. This new object is then used to report the
670 worker status until next START REPLICA following which the new worker
671 objects will be used.
672 */
674 THD *worker_thd, const Error &last_error,
675 Gtid_monitoring_info *monitoring_info_arg);
676
677 /*
678 The running status is guarded by jobs_lock mutex that a writer
679 Coordinator or Worker itself needs to hold when write a new value.
680 */
682 /*
683 exit_incremented indicates whether worker has contributed to max updated
684 index. By default it is set to false. When the worker contributes for the
685 first time this variable is set to true.
686 */
688
689 int init_worker(Relay_log_info *, ulong);
690 int rli_init_info(bool);
691 int flush_info(bool force = false);
692 static size_t get_number_worker_fields();
693 /**
694 Sets bits for columns that are allowed to be `NULL`.
695
696 @param nullable_fields the bitmap to hold the nullable fields.
697 */
698 static void set_nullable_fields(MY_BITMAP *nullable_fields);
700 const char *get_master_log_name();
702 ulonglong set_master_log_pos(ulong val) { return master_log_pos = val; }
703 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
704 /**
705 The method is a wrapper to provide uniform interface with STS and is
706 to be called from Relay_log_info and Slave_worker pre_commit() methods.
707 */
708 bool commit_positions() override {
709 assert(current_event);
710
711 return commit_positions(
714 }
715 /**
716 See the comments for STS version of this method.
717 */
718 void post_commit(bool on_rollback) override {
719 if (on_rollback) {
720 if (is_transactional())
723 } else if (!is_transactional())
726 true);
727 }
728 /*
729 When commit fails clear bitmap for executed worker group. Revert back the
730 positions to the old positions that existed before commit using the
731 checkpoint.
732
733 @param Slave_job_group a pointer to Slave_job_group struct instance which
734 holds group master log pos, group relay log pos and checkpoint positions.
735 */
737 bool reset_recovery_info();
738 /**
739 The method runs at Worker initialization, at runtime when
740 Coordinator supplied a new FD event for execution context, and at
741 the Worker pool shutdown.
742 Similarly to the Coordinator's
743 Relay_log_info::set_rli_description_event() the possibly existing
744 old FD is destroyed, carefully; each worker decrements
745 Format_description_log_event::atomic_usage_counter and when it is made
746 zero the destructor runs.
747 Unlike to Coordinator's role, the usage counter of the new FD is *not*
748 incremented, see @c Log_event::get_slave_worker() where and why it's done
749 there.
750
751 Notice, the method is run as well by Coordinator per each Worker at MTS
752 shutdown time.
753
754 Todo: consider to merge logics of the method with that of
755 Relay_log_info class.
756
757 @param fdle pointer to a new Format_description_log_event
758
759 @return 1 if an error was encountered, 0 otherwise.
760 */
763
764 if (fdle) {
765 /*
766 When the master rotates its binary log, set gtid_next to
767 NOT_YET_DETERMINED. This tells the slave thread that:
768
769 - If a Gtid_log_event is read subsequently, gtid_next will be set to the
770 given GTID (this is done in gtid_pre_statement_checks()).
771
772 - If a statement is executed before any Gtid_log_event, then gtid_next
773 is set to anonymous (this is done in Gtid_log_event::do_apply_event().
774
775 It is important to not set GTID_NEXT=NOT_YET_DETERMINED in the middle of
776 a transaction. If that would happen when GTID_MODE=ON, the next
777 statement would fail because it implicitly sets GTID_NEXT=ANONYMOUS,
778 which is disallowed when GTID_MODE=ON. So then there would be no way to
779 end the transaction; any attempt to do so would result in this error.
780
781 There are three possible states when reaching this execution flow point
782 (see further below for a more detailed explanation on each):
783
784 - **No active transaction, and not in a group**: set `gtid_next` to
785 `NOT_YET_DETERMINED`.
786
787 - **No active transaction, and in a group**: do nothing regarding
788 `gtid_next`.
789
790 - **An active transaction exists**: impossible to set `gtid_next` and no
791 reason to process the `Format_description` event so, trigger an error.
792
793 For the sake of correctness, let's defined the meaning of having a
794 transaction "active" or "in a group".
795
796 A transaction is "active" if either BEGIN was executed or autocommit=0
797 and a DML statement was executed (@see
798 THD::in_active_multi_stmt_transaction).
799
800 A transaction is "in a group" if it is applied by the replication
801 applier, and the relay log position is between Gtid_log_event and the
802 committing event (@see Relay_log_info::is_in_group).
803
804 The three different states explained further:
805
806 **No active transaction, and not in a group**: It is normal to have
807 gtid_next=automatic/undefined and have a Format_description_log_event in
808 this condition. We are outside transaction context and should set
809 gtid_next to not_yet_determined.
810
811 **No active transaction, and in a group**: Having
812 gtid_next=automatic/undefined in a group is impossible if master is 5.7
813 or later, because the group always starts with a Gtid_log_event or an
814 Anonymous_gtid_log_event, which will set gtid_next to anonymous or
815 gtid. But it is possible to have gtid_next=undefined when replicating
816 from a 5.6 master with gtid_mode=off, because it does not generate any
817 such event. And then, it is possible to have no active transaction in a
818 group if the master has logged a DDL as a User_var_log_event followed by
819 a Query_log_event. The User_var_log_event will start a group, but not
820 start an active transaction or change gtid_next. In this case, it is
821 possible that a Format_description_log_event occurs, if the group
822 (transaction) is broken on two relay logs, so that User_var_log_event
823 appears at the end of one relay log and Query_log_event at the beginning
824 of the next one. In such cases, we should not set gtid_next.
825
826 **An active transaction exists**: It is possible to have
827 gtid_next=automatic/undefined in an active transaction, only if
828 gtid_next=automatic, which is only possible in a client connection using
829 gtid_next=automatic. In this scenario, there is no reason to execute a
830 Format_description_log_event. So we generate an error.
831 */
832 if (info_thd->variables.gtid_next.is_automatic() ||
833 info_thd->variables.gtid_next.is_undefined()) {
834 bool in_active_multi_stmt =
836
837 if (!is_in_group() && !in_active_multi_stmt) {
838 DBUG_PRINT("info",
839 ("Setting gtid_next.type to NOT_YET_DETERMINED_GTID"));
840 info_thd->variables.gtid_next.set_not_yet_determined();
841 } else if (in_active_multi_stmt) {
842 my_error(ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION, MYF(0),
843 "gtid_next");
844 return 1;
845 }
846 }
849 }
852
854 /* The being deleted by Worker FD can't be the latest one */
856
858 }
859 }
861
862 return 0;
863 }
864
866 inline void set_gaq_index(ulong val) {
867 if (gaq_index == c_rli->gaq->capacity) gaq_index = val;
868 }
869
871
872 /**
873 Make the necessary changes to both the `Slave_worker` and current
874 `Log_event` objects, before retrying to apply the transaction.
875
876 Since the event is going to be re-read from the relay-log file, there
877 may be actions needed to be taken to reset the state both of `this`
878 instance, as well as of the current `Log_event` being processed.
879
880 @param event The `Log_event` object currently being processed.
881 */
883
884 /**
885 Checks if the transaction can be retried, and if not, reports an error.
886
887 @param[in] thd The THD object of current thread.
888
889 @returns std::tuple<bool, bool, uint> where each element has
890 following meaning:
891
892 first element of tuple is function return value and determines:
893 false if the transaction should be retried
894 true if the transaction should not be retried
895
896 second element of tuple determines:
897 the function will set the value to true, in case the retry
898 should be "silent". Silent means that the caller should not
899 report it in performance_schema tables, write to the error log,
900 or sleep. Currently, silent is used by NDB only.
901
902 third element of tuple determines:
903 If the caller should report any other error than that stored in
904 thd->get_stmt_da()->mysql_errno(), then this function will store
905 that error in this third element of the tuple.
906
907 */
908 std::tuple<bool, bool, uint> check_and_report_end_of_retries(THD *thd);
909
910 /**
911 It is called after an error happens. It checks if that is an temporary
912 error and if the transaction should be retried. Then it will retry the
913 transaction if it is allowed. Retry policy and logic is similar to
914 single-threaded slave.
915
916 @param[in] start_relay_pos The offset of the transaction's first event.
917 @param[in] start_event_relay_log_name The name of the relay log which
918 includes the first event of the transaction.
919
920 @param[in] end_relay_pos The offset of the last event it should retry.
921 @param[in] end_event_relay_log_name The name of the relay log which
922 includes the last event it should retry.
923
924 @retval false if transaction succeeds (possibly after a number of retries)
925 @retval true if transaction fails
926 */
927 bool retry_transaction(my_off_t start_relay_pos,
928 const char *start_event_relay_log_name,
929 my_off_t end_relay_pos,
930 const char *end_event_relay_log_name);
931
932 bool set_info_search_keys(Rpl_info_handler *to) override;
933
934 /**
935 Get coordinator's RLI. Especially used get the rli from
936 a slave thread, like this: thd->rli_slave->get_c_rli();
937 thd could be a SQL thread or a worker thread.
938 */
939 Relay_log_info *get_c_rli() override { return c_rli; }
940
941 /**
942 return an extension "for channel channel_name"
943 for error messages per channel
944 */
945 const char *get_for_channel_str(bool upper_case = false) const override;
946
949 return ptr_g->sequence_number;
950 }
951
952 /**
953 Return true if replica-preserve-commit-order is enabled and an
954 earlier transaction is waiting for a row-level lock held by this
955 transaction.
956 */
958
959 /**
960 Called when replica-preserve-commit-order is enabled, by the worker
961 processing an earlier transaction that waits on a row-level lock
962 held by this worker's transaction.
963 */
965
966 /**
967 @return either the master server version as extracted from the last
968 installed Format_description_log_event, or when it was not
969 installed then the slave own server version.
970 */
975 }
976
977 protected:
978 void do_report(loglevel level, int err_code, const char *msg,
979 va_list v_args) const override
980 MY_ATTRIBUTE((format(printf, 4, 0)));
981
982 void do_report(loglevel level, int err_code,
983 const Gtid_specification *gtid_next, const char *msg,
984 va_list v_args) const override
985 MY_ATTRIBUTE((format(printf, 5, 0)));
986
987 private:
988 ulong gaq_index; // GAQ index of the current assignment
989 ulonglong master_log_pos; // event's cached log_pos for possible error report
990 void end_info();
991 bool read_info(Rpl_info_handler *from) override;
992 bool write_info(Rpl_info_handler *to) override;
993 std::atomic<bool> m_commit_order_deadlock;
994
995 /// This flag indicates whether positions were already modified during the
996 /// event processing, if yes, positions are not updated in the
997 /// slave_worker_ends_group function
999
1002 bool worker_sleep(ulong seconds);
1003 bool read_and_apply_events(my_off_t start_relay_pos,
1004 const char *start_event_relay_log_name,
1005 my_off_t end_relay_pos,
1006 const char *end_event_relay_log_name);
1008
1009 public:
1010 /**
1011 Set the flag the signals a deadlock to false
1012 */
1014
1015 /**
1016 Returns an array with the expected column numbers of the primary key
1017 fields of the table repository.
1018 */
1019 static const uint *get_table_pk_field_indexes();
1020 /**
1021 Returns the index of the Channel_name field of the table repository.
1022 */
1023 static uint get_channel_field_index();
1024};
1025
1028 Slave_job_item *job_item);
1029
1032// Auxiliary function
1034
1036 Relay_log_info *rli);
1037
1038inline Slave_worker *get_thd_worker(const THD *thd) {
1039 return static_cast<Slave_worker *>(thd->rli_slave);
1040}
1041
1043
1044#endif
Contains the classes representing events occurring in the replication stream.
Class representing an error.
Definition: error.h:48
For binlog version 4.
Definition: log_event.h:1536
std::atomic< int32 > atomic_usage_counter
Definition: log_event.h:1554
Stores information to monitor a transaction during the different replication stages.
Definition: rpl_gtid.h:1413
This is the abstract base class for binary log events.
Definition: log_event.h:538
ulong mts_group_idx
Index in rli->gaq array to indicate a group that this event is purging.
Definition: log_event.h:693
Definition: rpl_rli.h:206
Format_description_log_event * get_rli_description_event() const
Return the current Format_description_log_event.
Definition: rpl_rli.h:1731
bool is_in_group() const
A group is defined as the entire range of events that constitute a transaction or auto-committed stat...
Definition: rpl_rli.h:1520
Log_event * current_event
Reference to being applied event.
Definition: rpl_rli.h:2009
Slave_committed_queue * gaq
Definition: rpl_rli.h:1208
ulong adapt_to_master_version_updown(ulong master_version, ulong current_version)
The method compares two supplied versions and carries out down- or up- grade customization of executi...
Definition: rpl_rli.cc:2612
Format_description_log_event * rli_description_event
Definition: rpl_rli.h:1807
Definition: rpl_info_handler.h:58
bool is_transactional() const
Definition: rpl_info.h:107
THD * info_thd
Definition: rpl_info.h:78
Group Assigned Queue whose first element identifies first gap in committed sequence.
Definition: rpl_rli_pdb.h:354
void free_dynamic_items()
Method should be executed at slave system stop to cleanup dynamically allocated items that remained a...
Definition: rpl_rli_pdb.cc:1519
bool de_queue(Slave_job_group *item)
Dequeue from head.
Definition: rpl_rli_pdb.h:413
Slave_committed_queue(size_t max, uint n)
Definition: rpl_rli_pdb.cc:1318
bool inited
Definition: rpl_rli_pdb.h:356
bool count_done(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:1336
ulong assigned_group_index
Definition: rpl_rli_pdb.h:370
bool de_tail(Slave_job_group *item)
Similar to de_queue() but removing an item from the tail side.
Definition: rpl_rli_pdb.h:423
size_t move_queue_head(Slave_worker_array *ws)
The queue is processed from the head item by item to purge items representing committed groups.
Definition: rpl_rli_pdb.cc:1380
Prealloced_array< ulonglong, 1 > last_done
Definition: rpl_rli_pdb.h:367
Slave_job_group * get_job_group(size_t ind)
Definition: rpl_rli_pdb.h:393
~Slave_committed_queue()
Definition: rpl_rli_pdb.h:374
void update_current_binlog(const char *post_rotate)
size_t find_lwm(Slave_job_group **, size_t)
Finds low-water mark of committed jobs in GAQ.
Definition: rpl_rli_pdb.cc:1482
size_t en_queue(Slave_job_group *item)
Assigns assigned_group_index to an index of enqueued item and returns it.
Definition: rpl_rli_pdb.h:402
Slave_job_group lwm
Definition: rpl_rli_pdb.h:364
Definition: rpl_rli_pdb.h:490
Slave_jobs_queue()
Definition: rpl_rli_pdb.h:492
bool overfill
Definition: rpl_rli_pdb.h:497
ulonglong waited_overfill
Definition: rpl_rli_pdb.h:498
Error const & last_error() const
Definition: rpl_reporting.h:143
Definition: rpl_rli_pdb.h:501
ulonglong get_master_log_pos()
Definition: rpl_rli_pdb.h:701
ulong underrun_level
Definition: rpl_rli_pdb.h:638
char checkpoint_relay_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:648
Prealloced_array< db_worker_hash_entry *, SLAVE_INIT_DBS_IN_GROUP > curr_group_exec_parts
Definition: rpl_rli_pdb.h:524
ulong excess_cnt
Definition: rpl_rli_pdb.h:643
bool m_is_worker_metric_collection_enabled
Is worker metric collection enabled.
Definition: rpl_rli_pdb.h:572
bool read_and_apply_events(my_off_t start_relay_pos, const char *start_event_relay_log_name, my_off_t end_relay_pos, const char *end_event_relay_log_name)
Read events from relay logs and apply them.
Definition: rpl_rli_pdb.cc:1963
bool curr_group_seen_sequence_number
Definition: rpl_rli_pdb.h:527
void reset_gaq_index()
Definition: rpl_rli_pdb.h:865
ulong overrun_level
Definition: rpl_rli_pdb.h:633
bool commit_positions() override
The method is a wrapper to provide uniform interface with STS and is to be called from Relay_log_info...
Definition: rpl_rli_pdb.h:708
ulong bitmap_shifted
Definition: rpl_rli_pdb.h:563
longlong sequence_number()
Definition: rpl_rli_pdb.h:947
void reset_commit_order_deadlock()
Set the flag the signals a deadlock to false.
Definition: rpl_rli_pdb.cc:1787
int flush_info(bool force=false)
Definition: rpl_rli_pdb.cc:479
int set_rli_description_event(Format_description_log_event *fdle) override
The method runs at Worker initialization, at runtime when Coordinator supplied a new FD event for exe...
Definition: rpl_rli_pdb.h:761
ulonglong checkpoint_master_log_pos
Definition: rpl_rli_pdb.h:651
Slave_jobs_queue jobs
Definition: rpl_rli_pdb.h:518
volatile bool relay_log_change_notified
Definition: rpl_rli_pdb.h:547
MY_BITMAP group_shifted
Definition: rpl_rli_pdb.h:653
ulong worker_checkpoint_seqno
Definition: rpl_rli_pdb.h:655
bool worker_sleep(ulong seconds)
Sleep for a given amount of seconds or until killed.
Definition: rpl_rli_pdb.cc:1765
void copy_values_for_PFS(ulong worker_id, en_running_state running_status, THD *worker_thd, const Error &last_error, Gtid_monitoring_info *monitoring_info_arg)
Definition: rpl_rli_pdb.cc:562
std::tuple< bool, bool, uint > check_and_report_end_of_retries(THD *thd)
Checks if the transaction can be retried, and if not, reports an error.
Definition: rpl_rli_pdb.cc:1812
ulong gaq_index
Definition: rpl_rli_pdb.h:988
void post_commit(bool on_rollback) override
See the comments for STS version of this method.
Definition: rpl_rli_pdb.h:718
void do_report(loglevel level, int err_code, const char *msg, va_list v_args) const override
Definition: rpl_rli_pdb.cc:1537
const char * get_for_channel_str(bool upper_case=false) const override
return an extension "for channel channel_name" for error messages per channel
Definition: rpl_rli_pdb.cc:2722
void report_commit_order_deadlock()
Called when replica-preserve-commit-order is enabled, by the worker processing an earlier transaction...
Definition: rpl_rli_pdb.cc:1795
Slave_worker(const Slave_worker &info)
std::atomic< int > curr_jobs
Definition: rpl_rli_pdb.h:541
Slave_worker(Relay_log_info *rli, PSI_mutex_key *param_key_info_run_lock, PSI_mutex_key *param_key_info_data_lock, PSI_mutex_key *param_key_info_sleep_lock, PSI_mutex_key *param_key_info_thd_lock, PSI_mutex_key *param_key_info_data_cond, PSI_mutex_key *param_key_info_start_cond, PSI_mutex_key *param_key_info_stop_cond, PSI_mutex_key *param_key_info_sleep_cond, uint param_id, const char *param_channel)
Definition: rpl_rli_pdb.cc:246
void set_worker_metric_collection_status(bool status)
Sets the metric collection as on or off This should be done at the worker start.
Definition: rpl_rli_pdb.h:578
bool write_info(Rpl_info_handler *to) override
Definition: rpl_rli_pdb.cc:584
int init_worker(Relay_log_info *, ulong)
Method is executed by Coordinator at Worker startup time to initialize members parly with values supp...
Definition: rpl_rli_pdb.cc:318
static uint get_channel_field_index()
Returns the index of the Channel_name field of the table repository.
Definition: rpl_rli_pdb.cc:2730
bool read_info(Rpl_info_handler *from) override
Definition: rpl_rli_pdb.cc:510
void assign_partition_db(Log_event *ev)
Definition: rpl_rli_pdb.cc:2084
const char * get_master_log_name()
Definition: rpl_rli_pdb.cc:634
static void set_nullable_fields(MY_BITMAP *nullable_fields)
Sets bits for columns that are allowed to be NULL.
Definition: rpl_rli_pdb.cc:628
cs::apply::instruments::Dummy_worker_metrics m_disabled_worker_metrics
Placehold for stats when metric collection is disabled.
Definition: rpl_rli_pdb.h:569
bool set_info_search_keys(Rpl_info_handler *to) override
To search in the slave repositories, each slave info object (mi, rli or worker) should use a primary ...
Definition: rpl_rli_pdb.cc:573
ulong server_version
Definition: rpl_rli_pdb.h:657
ulong id
Definition: rpl_rli_pdb.h:529
int64_t m_events_applied_in_transaction
The number of events applied in an ongoing transaction, used to collect statistics when the transacti...
Definition: rpl_rli_pdb.h:596
void prepare_for_retry(Log_event &event)
Make the necessary changes to both the Slave_worker and current Log_event objects,...
Definition: rpl_rli_pdb.cc:1801
Slave_worker & operator=(const Slave_worker &info)
ulonglong master_log_pos
Definition: rpl_rli_pdb.h:989
int slave_worker_exec_event(Log_event *ev)
MTS worker main routine.
Definition: rpl_rli_pdb.cc:1674
ulong get_master_server_version()
Definition: rpl_rli_pdb.h:971
bool retry_transaction(my_off_t start_relay_pos, const char *start_event_relay_log_name, my_off_t end_relay_pos, const char *end_event_relay_log_name)
It is called after an error happens.
Definition: rpl_rli_pdb.cc:1869
bool end_group_sets_max_dbs
Definition: rpl_rli_pdb.h:545
bool exit_incremented
Definition: rpl_rli_pdb.h:687
volatile ulong last_group_done_index
Definition: rpl_rli_pdb.h:538
Relay_log_info * c_rli
Definition: rpl_rli_pdb.h:521
bool found_commit_order_deadlock()
Return true if replica-preserve-commit-order is enabled and an earlier transaction is waiting for a r...
Definition: rpl_rli_pdb.cc:1791
Relay_log_info * get_c_rli() override
Get coordinator's RLI.
Definition: rpl_rli_pdb.h:939
bool reset_recovery_info()
Clean up a part of Worker info table that is regarded in in gaps collecting at recovery.
Definition: rpl_rli_pdb.cc:615
mysql_mutex_t jobs_lock
Definition: rpl_rli_pdb.h:519
en_running_state
Definition: rpl_rli_pdb.h:658
@ NOT_RUNNING
Definition: rpl_rli_pdb.h:659
@ STOP_ACCEPTED
Definition: rpl_rli_pdb.h:663
@ RUNNING
Definition: rpl_rli_pdb.h:660
@ STOP
Definition: rpl_rli_pdb.h:662
@ ERROR_LEAVING
Definition: rpl_rli_pdb.h:661
void rollback_positions(Slave_job_group *ptr_g)
Definition: rpl_rli_pdb.cc:722
long wq_overrun_cnt
Definition: rpl_rli_pdb.h:627
static const uint * get_table_pk_field_indexes()
Returns an array with the expected column numbers of the primary key fields of the table repository.
Definition: rpl_rli_pdb.cc:2726
void slave_worker_ends_group(Log_event *, int)
Deallocation routine to cancel out few effects of map_db_to_worker().
Definition: rpl_rli_pdb.cc:1146
ulonglong set_master_log_pos(ulong val)
Definition: rpl_rli_pdb.h:702
bool m_flag_positions_committed
This flag indicates whether positions were already modified during the event processing,...
Definition: rpl_rli_pdb.h:998
std::atomic< bool > m_commit_order_deadlock
Definition: rpl_rli_pdb.h:993
long usage_partition
Definition: rpl_rli_pdb.h:543
void set_gaq_index(ulong val)
Definition: rpl_rli_pdb.h:866
void increment_worker_metrics_for_event(const Log_event &event)
Update per-event worker metrics.
Definition: rpl_rli_pdb.h:616
char checkpoint_master_log_name[FN_REFLEN]
Definition: rpl_rli_pdb.h:650
ulonglong last_groups_assigned_index
Definition: rpl_rli_pdb.h:540
void end_info()
Definition: rpl_rli_pdb.cc:465
cs::apply::instruments::Worker_metrics & get_worker_metrics()
gets a reference to the worker statistics.
Definition: rpl_rli_pdb.cc:2732
en_running_state volatile running_status
Definition: rpl_rli_pdb.h:681
void copy_worker_metrics(Slave_worker *other)
Copies data and sets the metric collection flag.
Definition: rpl_rli_pdb.h:588
ulonglong checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:649
bool fd_change_notified
Definition: rpl_rli_pdb.h:562
volatile bool checkpoint_notified
Definition: rpl_rli_pdb.h:548
bool m_is_after_metrics_breakpoint
True if this transaction occurred after the metrics breakpoint in the relay log.
Definition: rpl_rli_pdb.h:602
mysql_cond_t jobs_cond
Definition: rpl_rli_pdb.h:520
cs::apply::instruments::Mta_worker_metrics m_worker_metrics
worker statistics
Definition: rpl_rli_pdb.h:567
static size_t get_number_worker_fields()
Definition: rpl_rli_pdb.cc:624
~Slave_worker() override
Definition: rpl_rli_pdb.cc:288
MY_BITMAP group_executed
Definition: rpl_rli_pdb.h:652
volatile bool master_log_change_notified
Definition: rpl_rli_pdb.h:550
int rli_init_info(bool)
A part of Slave worker initializer that provides a minimum context for MTS recovery.
Definition: rpl_rli_pdb.cc:418
ulong transactions_handled
Number of transaction handled - incremented at slave_worker_ends_group.
Definition: rpl_rli_pdb.h:535
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
Relay_log_info * rli_slave
Definition: sql_class.h:1077
System_variables variables
Definition: sql_lexer_thd.h:64
bool in_active_multi_stmt_transaction() const
true if the session is in a multi-statement transaction mode (
Definition: sql_class.h:3253
The class defines a type of queue with a predefined max capacity that is implemented using the circul...
Definition: rpl_rli_pdb.h:270
bool in(size_t i)
Definition: rpl_rli_pdb.h:338
bool empty() const
Definition: rpl_rli_pdb.h:343
static constexpr size_t error_result
Definition: rpl_rli_pdb.h:346
std::atomic< size_t > len
Actual length.
Definition: rpl_rli_pdb.h:292
size_t en_queue(Element_type *item)
return the index where the arg item locates or an error encoded as a value circular_buffer_queue::err...
Definition: rpl_rli_pdb.h:436
size_t capacity
The capacity and maximum length of the queue in terms of element.
Definition: rpl_rli_pdb.h:276
size_t get_length() const
Definition: rpl_rli_pdb.h:342
circular_buffer_queue()
Definition: rpl_rli_pdb.h:305
Element_type * head_queue()
return the value of data member of the head of the queue.
Definition: rpl_rli_pdb.h:332
size_t avail
Its value modulo capacity is index of the element where the next element will be enqueued.
Definition: rpl_rli_pdb.h:282
circular_buffer_queue(size_t max)
Definition: rpl_rli_pdb.h:295
bool de_queue(Element_type *item)
Content of the being dequeued item is copied to the arg-pointer location.
Definition: rpl_rli_pdb.h:457
size_t entry
The head index of the queue.
Definition: rpl_rli_pdb.h:288
bool full() const
Definition: rpl_rli_pdb.h:344
bool inited_queue
Definition: rpl_rli_pdb.h:293
Prealloced_array< Element_type, 1 > m_Q
Definition: rpl_rli_pdb.h:272
bool de_tail(Element_type *item)
Similar to de_queue but extracting happens from the tail side.
Definition: rpl_rli_pdb.h:478
~circular_buffer_queue()=default
Class that intends to be a dummy end point for worker metrics.
Definition: dummy_worker_metrics.h:33
This class contains metrics transaction execution in replica MTA workers.
Definition: mta_worker_metrics.h:33
void copy_stats_from(const Mta_worker_metrics &other)
Copies stats from the given object into this one.
Definition: mta_worker_metrics.cc:82
Abstract class for classes that contain metrics related to transaction execution in applier workers.
Definition: worker_metrics.h:33
virtual void inc_transaction_ongoing_progress_size(int64_t amount)=0
increment the executed size of the ongoing transaction.
unsigned long get_product_version() const
This method is used to find out the version of server that originated the current FD instance.
Definition: control_events.cpp:167
const int64_t SEQ_UNINIT
Uninitialized timestamp value (for either last committed or sequence number).
Definition: binlog_event.h:150
void my_error(int nr, myf MyFlags,...)
Fill in and print a previously registered error message.
Definition: my_error.cc:216
#define MTS_WORKER_UNDEF
Definition: rpl_replica.h:91
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:52
static constexpr unsigned PSI_INSTRUMENT_ME
Definition: psi_bits.h:43
Binary log event definitions.
Header for compiler-dependent features.
#define DBUG_PRINT(keyword, arglist)
Definition: my_dbug.h:181
#define DBUG_TRACE
Definition: my_dbug.h:146
Some integer typedefs for easier portability.
unsigned long long int ulonglong
Definition: my_inttypes.h:56
ulonglong my_off_t
Definition: my_inttypes.h:72
long long int longlong
Definition: my_inttypes.h:55
#define MYF(v)
Definition: my_inttypes.h:97
Common #defines and includes for file and socket I/O.
#define FN_REFLEN
Definition: my_io.h:83
Definition of the global "loglevel" enumeration.
loglevel
Definition: my_loglevel.h:41
void my_free(void *ptr)
Frees the memory pointed by the ptr.
Definition: my_memory.cc:81
Defines various enable/disable and HAVE_ macros related to the performance schema instrumentation sys...
#define HAVE_PSI_INTERFACE
Definition: my_psi_config.h:39
Instrumentation helpers for conditions.
ABI for instrumented mutexes.
struct MasterPos master_pos
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
bool empty(const Histogram &histogram)
Return true if 'histogram' was built on an empty table.
Definition: histogram.h:693
Performance schema instrumentation interface.
Instrumentation helpers for mutexes.
required uint32 status
Definition: replication_asynchronous_connection_failover.proto:61
required string event
Definition: replication_group_member_actions.proto:32
enum_mts_parallel_type
Definition: rpl_mta_submode.h:47
TABLE * mts_move_temp_tables_to_thd(THD *, TABLE *)
Relocation of the list of temporary tables to thd->temporary_tables.
Definition: rpl_rli_pdb.cc:820
Slave_worker * get_least_occupied_worker(Relay_log_info *rli, Slave_worker_array *workers, Log_event *ev)
Get the least occupied worker.
Definition: rpl_rli_pdb.cc:1130
bool set_max_updated_index_on_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:172
bool init_hash_workers(Relay_log_info *rli)
Definition: rpl_rli_pdb.cc:753
void destroy_hash_workers(Relay_log_info *)
Definition: rpl_rli_pdb.cc:764
Slave_worker * get_thd_worker(const THD *thd)
Definition: rpl_rli_pdb.h:1038
ulong w_rr
Definition: rpl_rli_pdb.cc:81
int slave_worker_exec_job_group(Slave_worker *w, Relay_log_info *rli)
apply one job group.
Definition: rpl_rli_pdb.cc:2453
TABLE * mts_move_temp_table_to_entry(TABLE *, THD *, db_worker_hash_entry *)
Relocating temporary table reference into entry's table list head.
Definition: rpl_rli_pdb.cc:786
bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item)
This function is called by both coordinator and workers.
Definition: rpl_rli_pdb.cc:108
Slave_worker * map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w)
The function produces a reference to the struct of a Worker that has been or will be engaged to proce...
Definition: rpl_rli_pdb.cc:915
bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli)
Coordinator enqueues a job item into a Worker private queue.
Definition: rpl_rli_pdb.cc:2109
This struct represents a specification of a GTID for a statement to be executed: either "AUTOMATIC",...
Definition: rpl_gtid.h:3999
Definition: my_bitmap.h:43
Definition: rpl_rli_pdb.h:113
Slave_job_group()=default
my_off_t master_log_pos
Definition: rpl_rli_pdb.h:192
my_off_t checkpoint_log_pos
Definition: rpl_rli_pdb.h:195
Format_description_log_event * new_fd_event
Definition: rpl_rli_pdb.h:234
Slave_job_group(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:120
bool notified
Definition: rpl_rli_pdb.h:204
time_t ts
Definition: rpl_rli_pdb.h:202
longlong last_committed
Definition: rpl_rli_pdb.h:207
std::atomic< int32 > done
Definition: rpl_rli_pdb.h:200
Slave_job_group & operator=(const Slave_job_group &other)
Definition: rpl_rli_pdb.h:145
Slave_worker * worker
Definition: rpl_rli_pdb.h:189
my_off_t checkpoint_relay_log_pos
Definition: rpl_rli_pdb.h:198
ulong worker_id
Definition: rpl_rli_pdb.h:188
uint checkpoint_seqno
Definition: rpl_rli_pdb.h:194
void reset(my_off_t master_pos, ulonglong seqno)
Definition: rpl_rli_pdb.h:239
my_off_t group_master_log_pos
Definition: rpl_rli_pdb.h:175
char * group_master_log_name
Definition: rpl_rli_pdb.h:171
char * checkpoint_log_name
Definition: rpl_rli_pdb.h:196
ulong shifted
Definition: rpl_rli_pdb.h:201
longlong sequence_number
Definition: rpl_rli_pdb.h:208
char * checkpoint_relay_log_name
Definition: rpl_rli_pdb.h:199
char * group_relay_log_name
Definition: rpl_rli_pdb.h:186
ulonglong total_seqno
Definition: rpl_rli_pdb.h:190
my_off_t group_relay_log_pos
Definition: rpl_rli_pdb.h:187
Definition: table.h:1421
Definition: completion_hash.h:35
Legends running throughout the module:
Definition: rpl_rli_pdb.h:77
TABLE *volatile temporary_tables
Definition: rpl_rli_pdb.h:93
uint db_len
Definition: rpl_rli_pdb.h:78
long usage
Definition: rpl_rli_pdb.h:85
const char * db
Definition: rpl_rli_pdb.h:79
Slave_worker * worker
Definition: rpl_rli_pdb.h:80
static bool is_any_gtid_event(const Log_event_type &type)
Helps to identify any GTID event - returns true for GTID_LOG_EVENT, GTID_TAGGED_LOG_EVENT and ANONYMO...
Definition: binlog_event.h:391
An instrumented cond structure.
Definition: mysql_cond_bits.h:50
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:50
Definition: rpl_rli.h:84
double seconds()
Definition: task.cc:310
Include file for Sun RPC to compile out of the box.
int n
Definition: xcom_base.cc:509