MySQL 9.1.0
Source Code Documentation
row0pread.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2018, 2024, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is designed to work with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have either included with
15the program or referenced in the documentation.
16
17This program is distributed in the hope that it will be useful, but WITHOUT
18ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20for more details.
21
22You should have received a copy of the GNU General Public License along with
23this program; if not, write to the Free Software Foundation, Inc.,
2451 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26*****************************************************************************/
27
28/** @file include/row0pread.h
29Parallel read interface.
30
31Created 2018-01-27 by Sunny Bains. */
32
33#ifndef row0par_read_h
34#define row0par_read_h
35
36#include <functional>
37#include <vector>
38
39#include "os0thread-create.h"
40#include "row0sel.h"
41
42// Forward declarations
43struct trx_t;
44struct mtr_t;
45class PCursor;
46struct btr_pcur_t;
47struct buf_block_t;
48struct dict_table_t;
49
50#include "btr0cur.h"
51#include "db0err.h"
52#include "fil0fil.h"
53#include "os0event.h"
54#include "page0size.h"
55#include "rem0types.h"
56#include "ut0mpmcbq.h"
57
58/** The core idea is to find the left and right paths down the B+Tree.These
59paths correspond to the scan start and scan end search. Follow the links
60at the appropriate btree level from the left to right and split the scan
61on each of these sub-tree root nodes.
62
63If the user has set the maximum number of threads to use at say 4 threads
64and there are 5 sub-trees at the selected level then we will split the 5th
65sub-tree dynamically when it is ready for scan.
66
67We want to allow multiple parallel range scans on different indexes at the
68same time. To achieve this split out the scan context (Scan_ctx) from the
69execution context (Ctx). The Scan_ctx has the index and transaction
70information and the Ctx keeps track of the cursor for a specific thread
71during the scan.
72
73To start a scan we need to instantiate a Parallel_reader. A parallel reader
74can contain several Scan_ctx instances and a Scan_ctx can contain several
75Ctx instances. Its' the Ctx instances that are eventually executed.
76
77This design allows for a single Parallel_reader to scan multiple indexes
78at once. Each index range scan has to be added via its add_scan() method.
79This functionality is required to handle parallel partition scans because
80partitions are separate indexes. This can be used to scan completely
81different indexes and tables by one instance of a Parallel_reader.
82
83To solve the imbalance problem we dynamically split the sub-trees as and
84when required. e.g., If you have 5 sub-trees to scan and 4 threads then
85it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
86the first thread that finishes scanning the first set of 4 partitions will
87then dynamically split the 5th sub-tree and add the newly created sub-trees
88to the execution context (Ctx) run queue in the Parallel_reader. As the
89other threads complete their sub-tree scans they will pick up more execution
90contexts (Ctx) from the Parallel_reader run queue and start scanning the
91sub-partitions as normal.
92
93Note: The Ctx instances are in a virtual list. Each Ctx instance has a
94range to scan. The start point of this range instance is the end point
95of the Ctx instance scanning values less than its start point. A Ctx
96will scan from [Start, End) rows. We use std::shared_ptr to manage the
97reference counting, this allows us to dispose of the Ctx instances
98without worrying about dangling pointers.
99
100NOTE: Secondary index scans are not supported currently. */
102 public:
103 /** Maximum value for innodb-parallel-read-threads. */
104 constexpr static size_t MAX_THREADS{256};
105
106 /** Maximum value for reserved parallel read threads for data load so that
107 at least this many threads are always available for data load. */
108 constexpr static size_t MAX_RESERVED_THREADS{16};
109
110 /** Maximum value for at most number of parallel read threads that can be
111 spawned. */
113
114 using Links = std::vector<page_no_t, ut::allocator<page_no_t>>;
115
116 // Forward declaration.
117 class Ctx;
118 class Scan_ctx;
119 struct Thread_ctx;
120
121 /** Scan state. */
122 enum class State : uint8_t {
123 /** Unknown state. */
124 UNKNOWN,
125
126 /** Start/Finish thread state. */
127 THREAD,
128
129 /** Start/Finish Ctx state. */
130 CTX,
131
132 /** Start/Finish page read. */
133 PAGE
134 };
135
136 /** Callback to initialise callers state. */
137 using Start = std::function<dberr_t(Thread_ctx *thread_ctx)>;
138
139 /** Callback to finalise callers state. */
140 using Finish = std::function<dberr_t(Thread_ctx *thread_ctx)>;
141
142 /** Callback to process the rows. */
143 using F = std::function<dberr_t(const Ctx *)>;
144
145 /** Specifies the range from where to start the scan and where to end it. */
146 struct Scan_range {
147 /** Default constructor. */
149
150 /** Copy constructor.
151 @param[in] scan_range Instance to copy from. */
152 Scan_range(const Scan_range &scan_range) = default;
153
154 /** Constructor.
155 @param[in] start Start key
156 @param[in] end End key. */
158 : m_start(start), m_end(end) {}
159
160 /** Start of the scan, can be nullptr for -infinity. */
162
163 /** End of the scan, can be null for +infinity. */
164 const dtuple_t *m_end{};
165
166 /** Convert the instance to a string representation. */
167 [[nodiscard]] std::string to_string() const;
168 };
169
170 /** Scan (Scan_ctx) configuration. */
171 struct Config {
172 /** Constructor.
173 @param[in] scan_range Range to scan.
174 @param[in] index Cluster index to scan.
175 @param[in] read_level Btree level from which records need to be read.
176 @param[in] partition_id Partition id if the index to be scanned.
177 belongs to a partitioned table. */
178 Config(const Scan_range &scan_range, dict_index_t *index,
179 size_t read_level = 0,
180 size_t partition_id = std::numeric_limits<size_t>::max())
181 : m_scan_range(scan_range),
182 m_index(index),
185 m_read_level(read_level),
186 m_partition_id(partition_id) {}
187
188 /** Copy constructor.
189 @param[in] config Instance to copy from. */
190 Config(const Config &config)
191
192 = default;
193
194 /** Range to scan. */
196
197 /** (Cluster) Index in table to scan. */
199
200 /** Row format of table. */
201 const bool m_is_compact{};
202
203 /** Tablespace page size. */
205
206 /** Btree level from which records need to be read. */
207 size_t m_read_level{0};
208
209 /** Partition id if the index to be scanned belongs to a partitioned table,
210 else std::numeric_limits<size_t>::max(). */
211 size_t m_partition_id{std::numeric_limits<size_t>::max()};
212 };
213
214 /** Thread related context information. */
215 struct Thread_ctx {
216 /** Constructor.
217 @param[in] id Thread ID */
218 explicit Thread_ctx(size_t id) noexcept : m_thread_id(id) {}
219
220 /** Destructor. */
221 ~Thread_ctx() noexcept {
222 ut_a(m_callback_ctx == nullptr);
223
224 if (m_blob_heap != nullptr) {
226 }
227 }
228
229 /** Set thread related callback information.
230 @param[in] ctx callback context */
231 template <typename T>
232 void set_callback_ctx(T *ctx) noexcept {
233 ut_ad(m_callback_ctx == nullptr || ctx == nullptr);
234 m_callback_ctx = ctx;
235 }
236
237 /** Get the thread related callback information/
238 @return return context. */
239 template <typename T>
240 T *get_callback_ctx() noexcept {
241 return static_cast<T *>(m_callback_ctx);
242 }
243
244 /** Create BLOB heap. */
245 void create_blob_heap() noexcept {
246 ut_a(m_blob_heap == nullptr);
248 }
249
250 /** @return the worker thread state. */
251 State get_state() const noexcept { return m_state; }
252
253 /** Save current position, commit any active mtr. */
254 void savepoint() noexcept;
255
256 /** Restore saved position and resume scan.
257 @return DB_SUCCESS or error code. */
258 [[nodiscard]] dberr_t restore_from_savepoint() noexcept;
259
260 /** Thread ID. */
261 size_t m_thread_id{std::numeric_limits<size_t>::max()};
262
263 /** Callback information related to the thread.
264 @note Needs to be created and destroyed by the callback itself. */
266
267 /** BLOB heap per thread. */
269
270 /** Worker thread state. */
272
273 /** Current persistent cursor. */
275
276 Thread_ctx(Thread_ctx &&) = delete;
277 Thread_ctx(const Thread_ctx &) = delete;
279 Thread_ctx &operator=(const Thread_ctx &) = delete;
280 };
281
282 /** Constructor.
283 @param[in] max_threads Maximum number of threads to use. */
284 explicit Parallel_reader(size_t max_threads);
285
286 /** Destructor. */
288
289 /** Check how many threads are available for parallel reads.
290 @param[in] n_required Number of threads required.
291 @param[in] use_reserved true if reserved threads needs to be considered
292 while checking for availability of threads
293 @return number of threads available. */
294 [[nodiscard]] static size_t available_threads(size_t n_required,
295 bool use_reserved);
296
297 /** Release the parallel read threads. */
298 static void release_threads(size_t n_threads) {
299 const auto SEQ_CST = std::memory_order_seq_cst;
300 auto active = s_active_threads.fetch_sub(n_threads, SEQ_CST);
301 ut_a(active >= n_threads);
302 }
303
304 /** Add scan context.
305 @param[in,out] trx Covering transaction.
306 @param[in] config Scan condfiguration.
307 @param[in] f Callback function.
308 (default is 0 which is leaf level)
309 @return error. */
310 [[nodiscard]] dberr_t add_scan(trx_t *trx, const Config &config, F &&f);
311
312 /** Wait for the join of threads spawned by the parallel reader. */
313 void join() {
314 for (auto &t : m_parallel_read_threads) {
315 t.wait();
316 }
317 }
318
319 /** Get the error stored in the global error state.
320 @return global error state. */
321 [[nodiscard]] dberr_t get_error_state() const { return m_err; }
322
323 /** @return true if the tree is empty, else false. */
324 [[nodiscard]] bool is_tree_empty() const {
325 return m_ctx_id.load(std::memory_order_relaxed) == 0;
326 }
327
328 /** Set the callback that must be called before any processing.
329 @param[in] f Call before first row is processed.*/
330 void set_start_callback(Start &&f) { m_start_callback = std::move(f); }
331
332 /** Set the callback that must be called after all processing.
333 @param[in] f Call after last row is processed.*/
334 void set_finish_callback(Finish &&f) { m_finish_callback = std::move(f); }
335
336 /** Spawn the threads to do the parallel read for the specified range.
337 Don't wait for the spawned to threads to complete.
338 @param[in] n_threads number of threads that *need* to be spawned
339 @return DB_SUCCESS or error code. */
340 [[nodiscard]] dberr_t spawn(size_t n_threads) noexcept;
341
342 /** Start the threads to do the parallel read for the specified range.
343 @param[in] n_threads Number of threads to use for the scan.
344 @return DB_SUCCESS or error code. */
345 [[nodiscard]] dberr_t run(size_t n_threads);
346
347 /** @return the configured max threads size. */
348 [[nodiscard]] size_t max_threads() const { return m_max_threads; }
349
350 /** @return true if in error state. */
351 [[nodiscard]] bool is_error_set() const {
352 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
353 }
354
355 /** Set the error state.
356 @param[in] err Error state to set to. */
358 m_err.store(err, std::memory_order_relaxed);
359 }
360
361 /** Set the number of threads to be spawned.
362 @param[in] n_threads number of threads to be spawned. */
363 void set_n_threads(size_t n_threads) {
364 ut_ad(n_threads <= m_max_threads);
365 m_n_threads = n_threads;
366 }
367
368 // Disable copying.
373
374 private:
375 /** Release unused threads back to the pool.
376 @param[in] unused_threads Number of threads to "release". */
377 void release_unused_threads(size_t unused_threads) {
378 ut_a(m_max_threads >= unused_threads);
379 release_threads(unused_threads);
380 }
381
382 /** Add an execution context to the run queue.
383 @param[in] ctx Execution context to add to the queue. */
384 void enqueue(std::shared_ptr<Ctx> ctx);
385
386 /** Fetch the next job execute.
387 @return job to execute or nullptr. */
388 [[nodiscard]] std::shared_ptr<Ctx> dequeue();
389
390 /** @return true if job queue is empty. */
391 [[nodiscard]] bool is_queue_empty() const;
392
393 /** Poll for requests and execute.
394 @param[in] thread_ctx thread related context information */
395 void worker(Thread_ctx *thread_ctx);
396
397 /** Create the threads and do a parallel read across the partitions. */
398 void parallel_read();
399
400 /** @return true if tasks are still executing. */
401 [[nodiscard]] bool is_active() const {
402 return m_n_completed.load(std::memory_order_relaxed) <
403 m_ctx_id.load(std::memory_order_relaxed);
404 }
405
406 private:
407 // clang-format off
408 using Ctxs =
409 std::list<std::shared_ptr<Ctx>,
411
412 using Scan_ctxs =
413 std::list<std::shared_ptr<Scan_ctx>,
415
416 // clang-format on
417
418 /** Maximum number of worker threads to use. */
420
421 /** Number of worker threads that will be spawned. */
422 size_t m_n_threads{0};
423
424 /** Mutex protecting m_ctxs. */
425 mutable ib_mutex_t m_mutex;
426
427 /** Contexts that must be executed. */
429
430 /** Scan contexts. */
432
433 /** For signalling worker threads about events. */
435
436 /** Value returned by previous call of os_event_reset() on m_event. */
437 uint64_t m_sig_count;
438
439 /** Counter for allocating scan context IDs. */
441
442 /** Context ID. Monotonically increasing ID. */
443 std::atomic_size_t m_ctx_id{};
444
445 /** Total tasks executed so far. */
446 std::atomic_size_t m_n_completed{};
447
448 /** Callback at start (before processing any rows). */
450
451 /** Callback at end (adter processing all rows). */
453
454 /** Error during parallel read. */
455 std::atomic<dberr_t> m_err{DB_SUCCESS};
456
457 /** List of threads used for paralle_read purpose. */
458 std::vector<IB_thread, ut::allocator<IB_thread>> m_parallel_read_threads;
459
460 /** Number of threads currently doing parallel reads. */
461 static std::atomic_size_t s_active_threads;
462
463 /** If the caller wants to wait for the parallel_read to finish it's run */
464 bool m_sync;
465
466 /** Context information related to each parallel reader thread. */
467 std::vector<Thread_ctx *, ut::allocator<Thread_ctx *>> m_thread_ctxs;
468};
469
470/** Parallel reader context. */
472 public:
473 /** Constructor.
474 @param[in] reader Parallel reader that owns this context.
475 @param[in] id ID of this scan context.
476 @param[in] trx Transaction covering the scan.
477 @param[in] config Range scan config.
478 @param[in] f Callback function. */
479 Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx,
480 const Parallel_reader::Config &config, F &&f);
481
482 /** Destructor. */
483 ~Scan_ctx() = default;
484
485 private:
486 /** Boundary of the range to scan. */
487 struct Iter {
488 /** Destructor. */
489 ~Iter();
490
491 /** Heap used to allocate m_rec, m_tuple and m_pcur. */
493
494 /** m_rec column offsets. */
495 const ulint *m_offsets{};
496
497 /** Start scanning from this key. Raw data of the row. */
498 const rec_t *m_rec{};
499
500 /** Tuple representation inside m_rec, for two Iter instances in a range
501 m_tuple will be [first->m_tuple, second->m_tuple). */
503
504 /** Persistent cursor.*/
506 };
507
508 /** mtr_t savepoint. */
509 using Savepoint = std::pair<ulint, buf_block_t *>;
510
511 /** For releasing the S latches after processing the blocks. */
512 using Savepoints = std::vector<Savepoint, ut::allocator<Savepoint>>;
513
514 /** The first cursor should read up to the second cursor [f, s). */
515 using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
516
517 using Ranges = std::vector<Range, ut::allocator<Range>>;
518
519 /** @return the scan context ID. */
520 [[nodiscard]] size_t id() const { return m_id; }
521
522 /** Set the error state.
523 @param[in] err Error state to set to. */
525 m_err.store(err, std::memory_order_relaxed);
526 }
527
528 /** @return true if in error state. */
529 [[nodiscard]] bool is_error_set() const {
530 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
531 }
532
533 /** Fetch a block from the buffer pool and acquire an S latch on it.
534 @param[in] page_id Page ID.
535 @param[in,out] mtr Mini-transaction covering the fetch.
536 @param[in] line Line from where called.
537 @return the block fetched from the buffer pool. */
538 [[nodiscard]] buf_block_t *block_get_s_latched(const page_id_t &page_id,
539 mtr_t *mtr, size_t line) const;
540
541 /** Partition the B+Tree for parallel read.
542 @param[in] scan_range Range for partitioning.
543 @param[in,out] ranges Ranges to scan.
544 @param[in] split_level Sub-range required level (0 == root).
545 @return the partition scan ranges. */
546 dberr_t partition(const Scan_range &scan_range, Ranges &ranges,
547 size_t split_level);
548
549 /** Find the page number of the node that contains the search key. If the
550 key is null then we assume -infinity.
551 @param[in] block Page to look in.
552 @param[in] key Key of the first record in the range.
553 @return the left child page number. */
554 [[nodiscard]] page_no_t search(const buf_block_t *block,
555 const dtuple_t *key) const;
556
557 /** Traverse from given sub-tree page number to start of the scan range
558 from the given page number.
559 @param[in] page_no Page number of sub-tree.
560 @param[in,out] mtr Mini-transaction.
561 @param[in] key Key of the first record in the range.
562 @param[in,out] savepoints Blocks S latched and accessed.
563 @return the leaf node page cursor. */
564 [[nodiscard]] page_cur_t start_range(page_no_t page_no, mtr_t *mtr,
565 const dtuple_t *key,
566 Savepoints &savepoints) const;
567
568 /** Create and add the range to the scan ranges.
569 @param[in,out] ranges Ranges to scan.
570 @param[in,out] leaf_page_cursor Leaf page cursor on which to create the
571 persistent cursor.
572 @param[in,out] mtr Mini-transaction */
573 void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor,
574 mtr_t *mtr) const;
575
576 /** Find the subtrees to scan in a block.
577 @param[in] scan_range Partition based on this scan range.
578 @param[in] page_no Page to partition at if at required level.
579 @param[in] depth Sub-range current level.
580 @param[in] split_level Sub-range starting level (0 == root).
581 @param[in,out] ranges Ranges to scan.
582 @param[in,out] mtr Mini-transaction */
583 dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no,
584 size_t depth, const size_t split_level, Ranges &ranges,
585 mtr_t *mtr);
586
587 /** Build a dtuple_t from rec_t.
588 @param[in] rec Build the dtuple from this record.
589 @param[in,out] iter Build in this iterator. */
590 void copy_row(const rec_t *rec, Iter *iter) const;
591
592 /** Create the persistent cursor that will be used to traverse the
593 partition and position on the the start row.
594 @param[in] page_cursor Current page cursor
595 @param[in] mtr Mini-transaction covering the read.
596 @return Start iterator. */
597 [[nodiscard]] std::shared_ptr<Iter> create_persistent_cursor(
598 const page_cur_t &page_cursor, mtr_t *mtr) const;
599
600 /** Build an old version of the row if required.
601 @param[in,out] rec Current row read from the index. This can
602 be modified by this method if an older version
603 needs to be built.
604 @param[in,out] offsets Same as above but pertains to the rec offsets
605 @param[in,out] heap Heap to use if a previous version needs to be
606 built from the undo log.
607 @param[in,out] mtr Mini-transaction covering the read.
608 @return true if row is visible to the transaction. */
609 [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets,
610 mem_heap_t *&heap, mtr_t *mtr);
611
612 /** Create an execution context for a range and add it to
613 the Parallel_reader's run queue.
614 @param[in] range Range for which to create the context.
615 @param[in] split true if the sub-tree should be split further.
616 @return DB_SUCCESS or error code. */
617 [[nodiscard]] dberr_t create_context(const Range &range, bool split);
618
619 /** Create the execution contexts based on the ranges.
620 @param[in] ranges Ranges for which to create the contexts.
621 @return DB_SUCCESS or error code. */
622 [[nodiscard]] dberr_t create_contexts(const Ranges &ranges);
623
624 /** @return the maximum number of threads configured. */
625 [[nodiscard]] size_t max_threads() const { return m_reader->max_threads(); }
626
627 /** Release unused threads back to the pool.
628 @param[in] unused_threads Number of threads to "release". */
629 void release_threads(size_t unused_threads) {
630 m_reader->release_threads(unused_threads);
631 }
632
633 /** S lock the index. */
634 void index_s_lock();
635
636 /** S unlock the index. */
637 void index_s_unlock();
638
639 /** @return true if at least one thread owns the S latch on the index. */
640 bool index_s_own() const {
641 return m_s_locks.load(std::memory_order_acquire) > 0;
642 }
643
644 private:
646
647 /** Context ID. */
648 size_t m_id{std::numeric_limits<size_t>::max()};
649
650 /** Parallel scan configuration. */
652
653 /** Covering transaction. */
654 const trx_t *m_trx{};
655
656 /** Callback function. */
658
659 /** Depth of the Btree. */
660 size_t m_depth{};
661
662 /** The parallel reader. */
664
665 /** Error during parallel read. */
666 mutable std::atomic<dberr_t> m_err{DB_SUCCESS};
667
668 /** Number of threads that have S locked the index. */
669 std::atomic_size_t m_s_locks{};
670
671 friend class Parallel_reader;
672
673 Scan_ctx(Scan_ctx &&) = delete;
674 Scan_ctx(const Scan_ctx &) = delete;
676 Scan_ctx &operator=(const Scan_ctx &) = delete;
677};
678
679/** Parallel reader execution context. */
681 public:
682 /** Constructor.
683 @param[in] id Thread ID.
684 @param[in] scan_ctx Scan context.
685 @param[in] range Range that the thread has to read. */
686 Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
687 : m_id(id), m_range(range), m_scan_ctx(scan_ctx) {}
688
689 /** Destructor. */
690 ~Ctx() = default;
691
692 public:
693 /** @return the context ID. */
694 [[nodiscard]] size_t id() const { return m_id; }
695
696 /** The scan ID of the scan context this belongs to. */
697 [[nodiscard]] size_t scan_id() const { return m_scan_ctx->id(); }
698
699 /** @return the covering transaction. */
700 [[nodiscard]] const trx_t *trx() const { return m_scan_ctx->m_trx; }
701
702 /** @return the index being scanned. */
703 [[nodiscard]] const dict_index_t *index() const {
705 }
706
707 /** @return ID of the thread processing this context */
708 [[nodiscard]] size_t thread_id() const { return m_thread_ctx->m_thread_id; }
709
710 /** @return the thread context of the reader thread. */
711 [[nodiscard]] Thread_ctx *thread_ctx() const { return m_thread_ctx; }
712
713 /** @return the partition id of the index.
714 @note this is std::numeric_limits<size_t>::max() if the index does not
715 belong to a partition. */
716 [[nodiscard]] size_t partition_id() const {
718 }
719
720 /** Build an old version of the row if required.
721 @param[in,out] rec Current row read from the index. This can
722 be modified by this method if an older version
723 needs to be built.
724 @param[in,out] offsets Same as above but pertains to the rec offsets
725 @param[in,out] heap Heap to use if a previous version needs to be
726 built from the undo log.
727 @param[in,out] mtr Mini-transaction covering the read.
728 @return true if row is visible to the transaction. */
729 bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap,
730 mtr_t *mtr) {
731 return m_scan_ctx->check_visibility(rec, offsets, heap, mtr);
732 }
733
734 private:
735 /** Traverse the pages by key order.
736 @return DB_SUCCESS or error code. */
737 [[nodiscard]] dberr_t traverse();
738
739 /** Traverse the records in a node.
740 @param[in] pcursor persistent b-tree cursor
741 @param[in] mtr mtr
742 @return error */
743 dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr);
744
745 /** Move to the next node in the specified level.
746 @param[in] pcursor persistent b-tree cursor
747 @return success */
748 bool move_to_next_node(PCursor *pcursor);
749
750 /** Split the context into sub-ranges and add them to the execution queue.
751 @return DB_SUCCESS or error code. */
752 [[nodiscard]] dberr_t split();
753
754 /** @return true if in error state. */
755 [[nodiscard]] bool is_error_set() const {
757 }
758
759 private:
760 /** Context ID. */
761 size_t m_id{std::numeric_limits<size_t>::max()};
762
763 /** If true then split the context at the block level. */
764 bool m_split{};
765
766 /** Range to read in this context. */
768
769 /** Scanner context. */
771
772 public:
773 /** Context information related to executing thread ID. */
775
776 /** Current block. */
778
779 /** Current row. */
780 const rec_t *m_rec{};
781
782 /** Number of pages traversed by the context. */
783 size_t m_n_pages{};
784
785 /** True if m_rec is the first record in the page. */
786 bool m_first_rec{true};
787
789
790 /** Start of a new range to scan. */
791 bool m_start{};
792
793 friend class Parallel_reader;
794};
795
796#endif /* !row0par_read_h */
uint32_t page_no_t
Page number.
Definition: api0api.h:46
The index tree cursor.
Persistent cursor wrapper around btr_pcur_t.
Definition: row0pread.cc:208
Parallel reader execution context.
Definition: row0pread.h:680
bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.h:729
Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
Constructor.
Definition: row0pread.h:686
size_t id() const
Definition: row0pread.h:694
size_t m_n_pages
Number of pages traversed by the context.
Definition: row0pread.h:783
~Ctx()=default
Destructor.
bool is_error_set() const
Definition: row0pread.h:755
dberr_t split()
Split the context into sub-ranges and add them to the execution queue.
Definition: row0pread.cc:148
Scan_ctx::Range m_range
Range to read in this context.
Definition: row0pread.h:767
Scan_ctx * m_scan_ctx
Scanner context.
Definition: row0pread.h:770
bool m_start
Start of a new range to scan.
Definition: row0pread.h:791
const trx_t * trx() const
Definition: row0pread.h:700
bool m_split
If true then split the context at the block level.
Definition: row0pread.h:764
dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr)
Traverse the records in a node.
Definition: row0pread.cc:649
size_t thread_id() const
Definition: row0pread.h:708
bool m_first_rec
True if m_rec is the first record in the page.
Definition: row0pread.h:786
Thread_ctx * thread_ctx() const
Definition: row0pread.h:711
size_t scan_id() const
The scan ID of the scan context this belongs to.
Definition: row0pread.h:697
const buf_block_t * m_block
Current block.
Definition: row0pread.h:777
ulint * m_offsets
Definition: row0pread.h:788
bool move_to_next_node(PCursor *pcursor)
Move to the next node in the specified level.
Definition: row0pread.cc:598
size_t m_id
Context ID.
Definition: row0pread.h:761
const dict_index_t * index() const
Definition: row0pread.h:703
size_t partition_id() const
Definition: row0pread.h:716
Thread_ctx * m_thread_ctx
Context information related to executing thread ID.
Definition: row0pread.h:774
const rec_t * m_rec
Current row.
Definition: row0pread.h:780
dberr_t traverse()
Traverse the pages by key order.
Definition: row0pread.cc:614
Parallel reader context.
Definition: row0pread.h:471
std::atomic_size_t m_s_locks
Number of threads that have S locked the index.
Definition: row0pread.h:669
dberr_t partition(const Scan_range &scan_range, Ranges &ranges, size_t split_level)
Partition the B+Tree for parallel read.
Definition: row0pread.cc:1184
page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const
Traverse from given sub-tree page number to start of the scan range from the given page number.
Definition: row0pread.cc:972
std::pair< ulint, buf_block_t * > Savepoint
mtr_t savepoint.
Definition: row0pread.h:509
size_t id() const
Definition: row0pread.h:520
Scan_ctx & operator=(Scan_ctx &&)=delete
void index_s_lock()
S lock the index.
Definition: row0pread.cc:132
std::shared_ptr< Iter > create_persistent_cursor(const page_cur_t &page_cursor, mtr_t *mtr) const
Create the persistent cursor that will be used to traverse the partition and position on the the star...
Definition: row0pread.cc:551
size_t m_id
Context ID.
Definition: row0pread.h:648
F m_f
Callback function.
Definition: row0pread.h:657
bool index_s_own() const
Definition: row0pread.h:640
dberr_t create_context(const Range &range, bool split)
Create an execution context for a range and add it to the Parallel_reader's run queue.
Definition: row0pread.cc:1219
void index_s_unlock()
S unlock the index.
Definition: row0pread.cc:140
dberr_t create_contexts(const Ranges &ranges)
Create the execution contexts based on the ranges.
Definition: row0pread.cc:1244
void release_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:629
page_no_t search(const buf_block_t *block, const dtuple_t *key) const
Find the page number of the node that contains the search key.
Definition: row0pread.cc:934
size_t max_threads() const
Definition: row0pread.h:625
Config m_config
Parallel scan configuration.
Definition: row0pread.h:651
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:524
void copy_row(const rec_t *rec, Iter *iter) const
Build a dtuple_t from rec_t.
Definition: row0pread.cc:521
~Scan_ctx()=default
Destructor.
std::vector< Savepoint, ut::allocator< Savepoint > > Savepoints
For releasing the S latches after processing the blocks.
Definition: row0pread.h:512
size_t m_depth
Depth of the Btree.
Definition: row0pread.h:660
std::pair< std::shared_ptr< Iter >, std::shared_ptr< Iter > > Range
The first cursor should read up to the second cursor [f, s).
Definition: row0pread.h:515
void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const
Create and add the range to the scan ranges.
Definition: row0pread.cc:1012
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:666
Scan_ctx & operator=(const Scan_ctx &)=delete
std::vector< Range, ut::allocator< Range > > Ranges
Definition: row0pread.h:517
bool is_error_set() const
Definition: row0pread.h:529
const trx_t * m_trx
Covering transaction.
Definition: row0pread.h:654
buf_block_t * block_get_s_latched(const page_id_t &page_id, mtr_t *mtr, size_t line) const
Fetch a block from the buffer pool and acquire an S latch on it.
Definition: row0pread.cc:319
Scan_ctx(Scan_ctx &&)=delete
Scan_ctx(const Scan_ctx &)=delete
Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f)
Constructor.
Definition: row0pread.cc:201
Parallel_reader * m_reader
The parallel reader.
Definition: row0pread.h:663
dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t split_level, Ranges &ranges, mtr_t *mtr)
Find the subtrees to scan in a block.
Definition: row0pread.cc:1028
bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.cc:466
The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan ...
Definition: row0pread.h:101
size_t max_threads() const
Definition: row0pread.h:348
std::function< dberr_t(Thread_ctx *thread_ctx)> Finish
Callback to finalise callers state.
Definition: row0pread.h:140
void parallel_read()
Create the threads and do a parallel read across the partitions.
Definition: row0pread.cc:1276
std::atomic_size_t m_n_completed
Total tasks executed so far.
Definition: row0pread.h:446
std::vector< IB_thread, ut::allocator< IB_thread > > m_parallel_read_threads
List of threads used for paralle_read purpose.
Definition: row0pread.h:458
constexpr static size_t MAX_THREADS
Maximum value for innodb-parallel-read-threads.
Definition: row0pread.h:104
Parallel_reader & operator=(Parallel_reader &&)=delete
bool m_sync
If the caller wants to wait for the parallel_read to finish it's run.
Definition: row0pread.h:464
bool is_queue_empty() const
Definition: row0pread.cc:808
dberr_t run(size_t n_threads)
Start the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1353
std::vector< Thread_ctx *, ut::allocator< Thread_ctx * > > m_thread_ctxs
Context information related to each parallel reader thread.
Definition: row0pread.h:467
std::vector< page_no_t, ut::allocator< page_no_t > > Links
Definition: row0pread.h:114
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:455
bool is_active() const
Definition: row0pread.h:401
size_t m_n_threads
Number of worker threads that will be spawned.
Definition: row0pread.h:422
uint64_t m_sig_count
Value returned by previous call of os_event_reset() on m_event.
Definition: row0pread.h:437
dberr_t get_error_state() const
Get the error stored in the global error state.
Definition: row0pread.h:321
Parallel_reader(const Parallel_reader &)=delete
size_t m_max_threads
Maximum number of worker threads to use.
Definition: row0pread.h:419
std::shared_ptr< Ctx > dequeue()
Fetch the next job execute.
Definition: row0pread.cc:792
void release_unused_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:377
Start m_start_callback
Callback at start (before processing any rows).
Definition: row0pread.h:449
os_event_t m_event
For signalling worker threads about events.
Definition: row0pread.h:434
Parallel_reader & operator=(const Parallel_reader &)=delete
void set_start_callback(Start &&f)
Set the callback that must be called before any processing.
Definition: row0pread.h:330
bool is_error_set() const
Definition: row0pread.h:351
Scan_ctxs m_scan_ctxs
Scan contexts.
Definition: row0pread.h:431
Parallel_reader(size_t max_threads)
Constructor.
Definition: row0pread.cc:188
constexpr static size_t MAX_RESERVED_THREADS
Maximum value for reserved parallel read threads for data load so that at least this many threads are...
Definition: row0pread.h:108
dberr_t spawn(size_t n_threads) noexcept
Spawn the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1338
ib_mutex_t m_mutex
Mutex protecting m_ctxs.
Definition: row0pread.h:425
void set_finish_callback(Finish &&f)
Set the callback that must be called after all processing.
Definition: row0pread.h:334
void worker(Thread_ctx *thread_ctx)
Poll for requests and execute.
Definition: row0pread.cc:815
void join()
Wait for the join of threads spawned by the parallel reader.
Definition: row0pread.h:313
~Parallel_reader()
Destructor.
Definition: row0pread.cc:90
std::list< std::shared_ptr< Scan_ctx >, ut::allocator< std::shared_ptr< Scan_ctx > > > Scan_ctxs
Definition: row0pread.h:414
Finish m_finish_callback
Callback at end (adter processing all rows).
Definition: row0pread.h:452
size_t m_scan_ctx_id
Counter for allocating scan context IDs.
Definition: row0pread.h:440
static std::atomic_size_t s_active_threads
Number of threads currently doing parallel reads.
Definition: row0pread.h:461
static void release_threads(size_t n_threads)
Release the parallel read threads.
Definition: row0pread.h:298
constexpr static size_t MAX_TOTAL_THREADS
Maximum value for at most number of parallel read threads that can be spawned.
Definition: row0pread.h:112
void enqueue(std::shared_ptr< Ctx > ctx)
Add an execution context to the run queue.
Definition: row0pread.cc:786
void set_n_threads(size_t n_threads)
Set the number of threads to be spawned.
Definition: row0pread.h:363
static size_t available_threads(size_t n_required, bool use_reserved)
Check how many threads are available for parallel reads.
Definition: row0pread.cc:103
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:357
std::list< std::shared_ptr< Ctx >, ut::allocator< std::shared_ptr< Ctx > > > Ctxs
Definition: row0pread.h:410
State
Scan state.
Definition: row0pread.h:122
@ PAGE
Start/Finish page read.
@ CTX
Start/Finish Ctx state.
@ THREAD
Start/Finish thread state.
@ UNKNOWN
Unknown state.
dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
Add scan context.
Definition: row0pread.cc:1393
bool is_tree_empty() const
Definition: row0pread.h:324
std::atomic_size_t m_ctx_id
Context ID.
Definition: row0pread.h:443
std::function< dberr_t(Thread_ctx *thread_ctx)> Start
Callback to initialise callers state.
Definition: row0pread.h:137
Parallel_reader(const Parallel_reader &&)=delete
std::function< dberr_t(const Ctx *)> F
Callback to process the rows.
Definition: row0pread.h:143
Ctxs m_ctxs
Contexts that must be executed.
Definition: row0pread.h:428
Page identifier.
Definition: buf0types.h:207
Page size descriptor.
Definition: page0size.h:50
Allocator that allows std::* containers to manage their memory through ut::malloc* and ut::free libra...
Definition: ut0new.h:2183
Global error codes for the database.
dberr_t
Definition: db0err.h:39
@ DB_SUCCESS
Definition: db0err.h:43
uint32_t dict_tf_to_fsp_flags(uint32_t table_flags)
Convert a 32 bit integer table flags to the 32 bit FSP Flags.
Definition: dict0dict.cc:4999
static bool dict_table_is_comp(const dict_table_t *table)
Check whether the table uses the compact page format.
The low-level file system.
static int flags[50]
Definition: hp_test1.cc:40
static void start(mysql_harness::PluginFuncEnv *env)
Definition: http_auth_backend_plugin.cc:180
static void mem_heap_free(mem_heap_t *heap)
Frees the space occupied by a memory heap.
static mem_heap_t * mem_heap_create(ulint size, ut::Location loc, ulint type=MEM_HEAP_DYNAMIC)
Creates a memory heap.
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
static Value err()
Create a Value object that represents an error condition.
Definition: json_binary.cc:908
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
The interface to the operating system condition variables.
The interface to the threading wrapper.
A class describing a page size.
Record manager global types.
byte rec_t
Definition: rem0types.h:41
required string key
Definition: replication_asynchronous_connection_failover.proto:60
Select.
Scan (Scan_ctx) configuration.
Definition: row0pread.h:171
size_t m_partition_id
Partition id if the index to be scanned belongs to a partitioned table, else std::numeric_limits<size...
Definition: row0pread.h:211
Config(const Scan_range &scan_range, dict_index_t *index, size_t read_level=0, size_t partition_id=std::numeric_limits< size_t >::max())
Constructor.
Definition: row0pread.h:178
const bool m_is_compact
Row format of table.
Definition: row0pread.h:201
size_t m_read_level
Btree level from which records need to be read.
Definition: row0pread.h:207
Config(const Config &config)=default
Copy constructor.
dict_index_t * m_index
(Cluster) Index in table to scan.
Definition: row0pread.h:198
const Scan_range m_scan_range
Range to scan.
Definition: row0pread.h:195
const page_size_t m_page_size
Tablespace page size.
Definition: row0pread.h:204
Boundary of the range to scan.
Definition: row0pread.h:487
const dtuple_t * m_tuple
Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple,...
Definition: row0pread.h:502
btr_pcur_t * m_pcur
Persistent cursor.
Definition: row0pread.h:505
const rec_t * m_rec
Start scanning from this key.
Definition: row0pread.h:498
mem_heap_t * m_heap
Heap used to allocate m_rec, m_tuple and m_pcur.
Definition: row0pread.h:492
~Iter()
Destructor.
Definition: row0pread.cc:75
const ulint * m_offsets
m_rec column offsets.
Definition: row0pread.h:495
Specifies the range from where to start the scan and where to end it.
Definition: row0pread.h:146
Scan_range()
Default constructor.
Definition: row0pread.h:148
Scan_range(const Scan_range &scan_range)=default
Copy constructor.
const dtuple_t * m_end
End of the scan, can be null for +infinity.
Definition: row0pread.h:164
const dtuple_t * m_start
Start of the scan, can be nullptr for -infinity.
Definition: row0pread.h:161
std::string to_string() const
Convert the instance to a string representation.
Definition: row0pread.cc:57
Scan_range(const dtuple_t *start, const dtuple_t *end)
Constructor.
Definition: row0pread.h:157
Thread related context information.
Definition: row0pread.h:215
size_t m_thread_id
Thread ID.
Definition: row0pread.h:261
Thread_ctx & operator=(Thread_ctx &&)=delete
void create_blob_heap() noexcept
Create BLOB heap.
Definition: row0pread.h:245
Thread_ctx(const Thread_ctx &)=delete
mem_heap_t * m_blob_heap
BLOB heap per thread.
Definition: row0pread.h:268
void * m_callback_ctx
Callback information related to the thread.
Definition: row0pread.h:265
Thread_ctx(size_t id) noexcept
Constructor.
Definition: row0pread.h:218
void savepoint() noexcept
Save current position, commit any active mtr.
Definition: row0pread.cc:420
PCursor * m_pcursor
Current persistent cursor.
Definition: row0pread.h:274
Thread_ctx(Thread_ctx &&)=delete
State get_state() const noexcept
Definition: row0pread.h:251
Thread_ctx & operator=(const Thread_ctx &)=delete
State m_state
Worker thread state.
Definition: row0pread.h:271
void set_callback_ctx(T *ctx) noexcept
Set thread related callback information.
Definition: row0pread.h:232
dberr_t restore_from_savepoint() noexcept
Restore saved position and resume scan.
Definition: row0pread.cc:414
T * get_callback_ctx() noexcept
Get the thread related callback information/.
Definition: row0pread.h:240
~Thread_ctx() noexcept
Destructor.
Definition: row0pread.h:221
Definition: btr0pcur.h:99
The buffer control block structure.
Definition: buf0buf.h:1747
Data structure for an index.
Definition: dict0mem.h:1041
Data structure for a database table.
Definition: dict0mem.h:1904
Structure for an SQL data tuple of fields (logical record)
Definition: data0data.h:696
The info structure stored at the beginning of a heap block.
Definition: mem0mem.h:302
Mini-transaction handle and buffer.
Definition: mtr0mtr.h:177
InnoDB condition variable.
Definition: os0event.cc:63
Index page cursor.
Definition: page0cur.h:311
Definition: gen_lex_token.cc:149
Definition: trx0trx.h:675
#define UNIV_PAGE_SIZE
The universal page size of the database.
Definition: univ.i:294
unsigned long int ulint
Definition: univ.i:406
#define UT_LOCATION_HERE
Definition: ut0core.h:73
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:105
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93