MySQL 8.0.31
Source Code Documentation
row0pread.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2018, 2022, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is also distributed with certain software (including but not
10limited to OpenSSL) that is licensed under separate terms, as designated in a
11particular file or component or in included license documentation. The authors
12of MySQL hereby grant you an additional permission to link the program and
13your derivative works with the separately licensed software that they have
14included with MySQL.
15
16This program is distributed in the hope that it will be useful, but WITHOUT
17ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19for more details.
20
21You should have received a copy of the GNU General Public License along with
22this program; if not, write to the Free Software Foundation, Inc.,
2351 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25*****************************************************************************/
26
27/** @file include/row0pread.h
28Parallel read interface.
29
30Created 2018-01-27 by Sunny Bains. */
31
32#ifndef row0par_read_h
33#define row0par_read_h
34
35#include <functional>
36#include <vector>
37
38#include "os0thread-create.h"
39#include "row0sel.h"
40
41// Forward declarations
42struct trx_t;
43struct mtr_t;
44class PCursor;
45struct btr_pcur_t;
46struct buf_block_t;
47struct dict_table_t;
48
49#include "btr0cur.h"
50#include "db0err.h"
51#include "fil0fil.h"
52#include "os0event.h"
53#include "page0size.h"
54#include "rem0types.h"
55#include "ut0mpmcbq.h"
56
57/** The core idea is to find the left and right paths down the B+Tree.These
58paths correspond to the scan start and scan end search. Follow the links
59at the appropriate btree level from the left to right and split the scan
60on each of these sub-tree root nodes.
61
62If the user has set the maximum number of threads to use at say 4 threads
63and there are 5 sub-trees at the selected level then we will split the 5th
64sub-tree dynamically when it is ready for scan.
65
66We want to allow multiple parallel range scans on different indexes at the
67same time. To achieve this split out the scan context (Scan_ctx) from the
68execution context (Ctx). The Scan_ctx has the index and transaction
69information and the Ctx keeps track of the cursor for a specific thread
70during the scan.
71
72To start a scan we need to instantiate a Parallel_reader. A parallel reader
73can contain several Scan_ctx instances and a Scan_ctx can contain several
74Ctx instances. Its' the Ctx instances that are eventually executed.
75
76This design allows for a single Parallel_reader to scan multiple indexes
77at once. Each index range scan has to be added via its add_scan() method.
78This functionality is required to handle parallel partition scans because
79partitions are separate indexes. This can be used to scan completely
80different indexes and tables by one instance of a Parallel_reader.
81
82To solve the imbalance problem we dynamically split the sub-trees as and
83when required. e.g., If you have 5 sub-trees to scan and 4 threads then
84it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
85the first thread that finishes scanning the first set of 4 partitions will
86then dynamically split the 5th sub-tree and add the newly created sub-trees
87to the execution context (Ctx) run queue in the Parallel_reader. As the
88other threads complete their sub-tree scans they will pick up more execution
89contexts (Ctx) from the Parallel_reader run queue and start scanning the
90sub-partitions as normal.
91
92Note: The Ctx instances are in a virtual list. Each Ctx instance has a
93range to scan. The start point of this range instance is the end point
94of the Ctx instance scanning values less than its start point. A Ctx
95will scan from [Start, End) rows. We use std::shared_ptr to manage the
96reference counting, this allows us to dispose of the Ctx instances
97without worrying about dangling pointers.
98
99NOTE: Secondary index scans are not supported currently. */
101 public:
102 /** Maximum value for innodb-parallel-read-threads. */
103 constexpr static size_t MAX_THREADS{256};
104
105 /** Maximum value for reserved parallel read threads for data load so that
106 at least this many threads are always available for data load. */
107 constexpr static size_t MAX_RESERVED_THREADS{16};
108
109 /** Maximum value for at most number of parallel read threads that can be
110 spawned. */
112
113 using Links = std::vector<page_no_t, ut::allocator<page_no_t>>;
114
115 // Forward declaration.
116 class Ctx;
117 class Scan_ctx;
118 struct Thread_ctx;
119
120 /** Scan state. */
121 enum class State : uint8_t {
122 /** Unknown state. */
123 UNKNOWN,
124
125 /** Start/Finish thread state. */
126 THREAD,
127
128 /** Start/Finish Ctx state. */
129 CTX,
130
131 /** Start/Finish page read. */
132 PAGE
133 };
134
135 /** Callback to initialise callers state. */
136 using Start = std::function<dberr_t(Thread_ctx *thread_ctx)>;
137
138 /** Callback to finalise callers state. */
139 using Finish = std::function<dberr_t(Thread_ctx *thread_ctx)>;
140
141 /** Callback to process the rows. */
142 using F = std::function<dberr_t(const Ctx *)>;
143
144 /** Specifies the range from where to start the scan and where to end it. */
145 struct Scan_range {
146 /** Default constructor. */
148
149 /** Copy constructor.
150 @param[in] scan_range Instance to copy from. */
151 Scan_range(const Scan_range &scan_range) = default;
152
153 /** Constructor.
154 @param[in] start Start key
155 @param[in] end End key. */
157 : m_start(start), m_end(end) {}
158
159 /** Start of the scan, can be nullptr for -infinity. */
161
162 /** End of the scan, can be null for +infinity. */
163 const dtuple_t *m_end{};
164
165 /** Convert the instance to a string representation. */
166 [[nodiscard]] std::string to_string() const;
167 };
168
169 /** Scan (Scan_ctx) configuration. */
170 struct Config {
171 /** Constructor.
172 @param[in] scan_range Range to scan.
173 @param[in] index Cluster index to scan.
174 @param[in] read_level Btree level from which records need to be read.
175 @param[in] partition_id Partition id if the index to be scanned.
176 belongs to a partitioned table. */
177 Config(const Scan_range &scan_range, dict_index_t *index,
178 size_t read_level = 0,
179 size_t partition_id = std::numeric_limits<size_t>::max())
180 : m_scan_range(scan_range),
181 m_index(index),
182 m_is_compact(dict_table_is_comp(index->table)),
183 m_page_size(dict_tf_to_fsp_flags(index->table->flags)),
184 m_read_level(read_level),
185 m_partition_id(partition_id) {}
186
187 /** Copy constructor.
188 @param[in] config Instance to copy from. */
189 Config(const Config &config)
190
191 = default;
192
193 /** Range to scan. */
195
196 /** (Cluster) Index in table to scan. */
198
199 /** Row format of table. */
200 const bool m_is_compact{};
201
202 /** Tablespace page size. */
204
205 /** Btree level from which records need to be read. */
206 size_t m_read_level{0};
207
208 /** Partition id if the index to be scanned belongs to a partitioned table,
209 else std::numeric_limits<size_t>::max(). */
210 size_t m_partition_id{std::numeric_limits<size_t>::max()};
211 };
212
213 /** Thread related context information. */
214 struct Thread_ctx {
215 /** Constructor.
216 @param[in] id Thread ID */
217 explicit Thread_ctx(size_t id) noexcept : m_thread_id(id) {}
218
219 /** Destructor. */
220 ~Thread_ctx() noexcept {
221 ut_a(m_callback_ctx == nullptr);
222
223 if (m_blob_heap != nullptr) {
225 }
226 }
227
228 /** Set thread related callback information.
229 @param[in] ctx callback context */
230 template <typename T>
231 void set_callback_ctx(T *ctx) noexcept {
232 ut_ad(m_callback_ctx == nullptr || ctx == nullptr);
233 m_callback_ctx = ctx;
234 }
235
236 /** Get the thread related callback information/
237 @return return context. */
238 template <typename T>
239 T *get_callback_ctx() noexcept {
240 return static_cast<T *>(m_callback_ctx);
241 }
242
243 /** Create BLOB heap. */
244 void create_blob_heap() noexcept {
245 ut_a(m_blob_heap == nullptr);
247 }
248
249 /** @return the worker thread state. */
250 State get_state() const noexcept { return m_state; }
251
252 /** Save current position, commit any active mtr. */
253 void savepoint() noexcept;
254
255 /** Restore saved position and resume scan.
256 @return DB_SUCCESS or error code. */
257 [[nodiscard]] dberr_t restore_from_savepoint() noexcept;
258
259 /** Thread ID. */
260 size_t m_thread_id{std::numeric_limits<size_t>::max()};
261
262 /** Callback information related to the thread.
263 @note Needs to be created and destroyed by the callback itself. */
265
266 /** BLOB heap per thread. */
268
269 /** Worker thread state. */
271
272 /** Current persistent cursor. */
274
275 Thread_ctx(Thread_ctx &&) = delete;
276 Thread_ctx(const Thread_ctx &) = delete;
278 Thread_ctx &operator=(const Thread_ctx &) = delete;
279 };
280
281 /** Constructor.
282 @param[in] max_threads Maximum number of threads to use. */
283 explicit Parallel_reader(size_t max_threads);
284
285 /** Destructor. */
287
288 /** Check how many threads are available for parallel reads.
289 @param[in] n_required Number of threads required.
290 @param[in] use_reserved true if reserved threads needs to be considered
291 while checking for availability of threads
292 @return number of threads available. */
293 [[nodiscard]] static size_t available_threads(size_t n_required,
294 bool use_reserved);
295
296 /** Release the parallel read threads. */
297 static void release_threads(size_t n_threads) {
298 const auto SEQ_CST = std::memory_order_seq_cst;
299 auto active = s_active_threads.fetch_sub(n_threads, SEQ_CST);
300 ut_a(active >= n_threads);
301 }
302
303 /** Add scan context.
304 @param[in,out] trx Covering transaction.
305 @param[in] config Scan condfiguration.
306 @param[in] f Callback function.
307 (default is 0 which is leaf level)
308 @return error. */
309 [[nodiscard]] dberr_t add_scan(trx_t *trx, const Config &config, F &&f);
310
311 /** Wait for the join of threads spawned by the parallel reader. */
312 void join() {
313 for (auto &t : m_parallel_read_threads) {
314 t.wait();
315 }
316 }
317
318 /** Get the error stored in the global error state.
319 @return global error state. */
320 [[nodiscard]] dberr_t get_error_state() const { return m_err; }
321
322 /** @return true if the tree is empty, else false. */
323 [[nodiscard]] bool is_tree_empty() const {
324 return m_ctx_id.load(std::memory_order_relaxed) == 0;
325 }
326
327 /** Set the callback that must be called before any processing.
328 @param[in] f Call before first row is processed.*/
329 void set_start_callback(Start &&f) { m_start_callback = std::move(f); }
330
331 /** Set the callback that must be called after all processing.
332 @param[in] f Call after last row is processed.*/
333 void set_finish_callback(Finish &&f) { m_finish_callback = std::move(f); }
334
335 /** Spawn the threads to do the parallel read for the specified range.
336 Don't wait for the spawned to threads to complete.
337 @param[in] n_threads number of threads that *need* to be spawned
338 @return DB_SUCCESS or error code. */
339 [[nodiscard]] dberr_t spawn(size_t n_threads) noexcept;
340
341 /** Start the threads to do the parallel read for the specified range.
342 @param[in] n_threads Number of threads to use for the scan.
343 @return DB_SUCCESS or error code. */
344 [[nodiscard]] dberr_t run(size_t n_threads);
345
346 /** @return the configured max threads size. */
347 [[nodiscard]] size_t max_threads() const { return m_max_threads; }
348
349 /** @return true if in error state. */
350 [[nodiscard]] bool is_error_set() const {
351 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
352 }
353
354 /** Set the error state.
355 @param[in] err Error state to set to. */
357 m_err.store(err, std::memory_order_relaxed);
358 }
359
360 /** Set the number of threads to be spawned.
361 @param[in] n_threads number of threads to be spawned. */
362 void set_n_threads(size_t n_threads) {
363 ut_ad(n_threads <= m_max_threads);
364 m_n_threads = n_threads;
365 }
366
367 // Disable copying.
372
373 private:
374 /** Release unused threads back to the pool.
375 @param[in] unused_threads Number of threads to "release". */
376 void release_unused_threads(size_t unused_threads) {
377 ut_a(m_max_threads >= unused_threads);
378 release_threads(unused_threads);
379 }
380
381 /** Add an execution context to the run queue.
382 @param[in] ctx Execution context to add to the queue. */
383 void enqueue(std::shared_ptr<Ctx> ctx);
384
385 /** Fetch the next job execute.
386 @return job to execute or nullptr. */
387 [[nodiscard]] std::shared_ptr<Ctx> dequeue();
388
389 /** @return true if job queue is empty. */
390 [[nodiscard]] bool is_queue_empty() const;
391
392 /** Poll for requests and execute.
393 @param[in] thread_ctx thread related context information */
394 void worker(Thread_ctx *thread_ctx);
395
396 /** Create the threads and do a parallel read across the partitions. */
397 void parallel_read();
398
399 /** @return true if tasks are still executing. */
400 [[nodiscard]] bool is_active() const {
401 return m_n_completed.load(std::memory_order_relaxed) <
402 m_ctx_id.load(std::memory_order_relaxed);
403 }
404
405 private:
406 // clang-format off
407 using Ctxs =
408 std::list<std::shared_ptr<Ctx>,
410
411 using Scan_ctxs =
412 std::list<std::shared_ptr<Scan_ctx>,
414
415 // clang-format on
416
417 /** Maximum number of worker threads to use. */
419
420 /** Number of worker threads that will be spawned. */
421 size_t m_n_threads{0};
422
423 /** Mutex protecting m_ctxs. */
424 mutable ib_mutex_t m_mutex;
425
426 /** Contexts that must be executed. */
428
429 /** Scan contexts. */
431
432 /** For signalling worker threads about events. */
434
435 /** Value returned by previous call of os_event_reset() on m_event. */
436 uint64_t m_sig_count;
437
438 /** Counter for allocating scan context IDs. */
440
441 /** Context ID. Monotonically increasing ID. */
442 std::atomic_size_t m_ctx_id{};
443
444 /** Total tasks executed so far. */
445 std::atomic_size_t m_n_completed{};
446
447 /** Callback at start (before processing any rows). */
449
450 /** Callback at end (adter processing all rows). */
452
453 /** Error during parallel read. */
454 std::atomic<dberr_t> m_err{DB_SUCCESS};
455
456 /** List of threads used for paralle_read purpose. */
457 std::vector<IB_thread, ut::allocator<IB_thread>> m_parallel_read_threads;
458
459 /** Number of threads currently doing parallel reads. */
460 static std::atomic_size_t s_active_threads;
461
462 /** If the caller wants to wait for the parallel_read to finish it's run */
463 bool m_sync;
464
465 /** Context information related to each parallel reader thread. */
466 std::vector<Thread_ctx *, ut::allocator<Thread_ctx *>> m_thread_ctxs;
467};
468
469/** Parallel reader context. */
471 public:
472 /** Constructor.
473 @param[in] reader Parallel reader that owns this context.
474 @param[in] id ID of this scan context.
475 @param[in] trx Transaction covering the scan.
476 @param[in] config Range scan config.
477 @param[in] f Callback function. */
478 Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx,
479 const Parallel_reader::Config &config, F &&f);
480
481 /** Destructor. */
482 ~Scan_ctx() = default;
483
484 private:
485 /** Boundary of the range to scan. */
486 struct Iter {
487 /** Destructor. */
488 ~Iter();
489
490 /** Heap used to allocate m_rec, m_tuple and m_pcur. */
492
493 /** m_rec column offsets. */
494 const ulint *m_offsets{};
495
496 /** Start scanning from this key. Raw data of the row. */
497 const rec_t *m_rec{};
498
499 /** Tuple representation inside m_rec, for two Iter instances in a range
500 m_tuple will be [first->m_tuple, second->m_tuple). */
502
503 /** Persistent cursor.*/
505 };
506
507 /** mtr_t savepoint. */
508 using Savepoint = std::pair<ulint, buf_block_t *>;
509
510 /** For releasing the S latches after processing the blocks. */
511 using Savepoints = std::vector<Savepoint, ut::allocator<Savepoint>>;
512
513 /** The first cursor should read up to the second cursor [f, s). */
514 using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
515
516 using Ranges = std::vector<Range, ut::allocator<Range>>;
517
518 /** @return the scan context ID. */
519 [[nodiscard]] size_t id() const { return m_id; }
520
521 /** Set the error state.
522 @param[in] err Error state to set to. */
524 m_err.store(err, std::memory_order_relaxed);
525 }
526
527 /** @return true if in error state. */
528 [[nodiscard]] bool is_error_set() const {
529 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
530 }
531
532 /** Fetch a block from the buffer pool and acquire an S latch on it.
533 @param[in] page_id Page ID.
534 @param[in,out] mtr Mini-transaction covering the fetch.
535 @param[in] line Line from where called.
536 @return the block fetched from the buffer pool. */
537 [[nodiscard]] buf_block_t *block_get_s_latched(const page_id_t &page_id,
538 mtr_t *mtr, size_t line) const;
539
540 /** Partition the B+Tree for parallel read.
541 @param[in] scan_range Range for partitioning.
542 @param[in,out] ranges Ranges to scan.
543 @param[in] split_level Sub-range required level (0 == root).
544 @return the partition scan ranges. */
545 dberr_t partition(const Scan_range &scan_range, Ranges &ranges,
546 size_t split_level);
547
548 /** Find the page number of the node that contains the search key. If the
549 key is null then we assume -infinity.
550 @param[in] block Page to look in.
551 @param[in] key Key of the first record in the range.
552 @return the left child page number. */
553 [[nodiscard]] page_no_t search(const buf_block_t *block,
554 const dtuple_t *key) const;
555
556 /** Traverse from given sub-tree page number to start of the scan range
557 from the given page number.
558 @param[in] page_no Page number of sub-tree.
559 @param[in,out] mtr Mini-transaction.
560 @param[in] key Key of the first record in the range.
561 @param[in,out] savepoints Blocks S latched and accessed.
562 @return the leaf node page cursor. */
563 [[nodiscard]] page_cur_t start_range(page_no_t page_no, mtr_t *mtr,
564 const dtuple_t *key,
565 Savepoints &savepoints) const;
566
567 /** Create and add the range to the scan ranges.
568 @param[in,out] ranges Ranges to scan.
569 @param[in,out] leaf_page_cursor Leaf page cursor on which to create the
570 persistent cursor.
571 @param[in,out] mtr Mini-transaction */
572 void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor,
573 mtr_t *mtr) const;
574
575 /** Find the subtrees to scan in a block.
576 @param[in] scan_range Partition based on this scan range.
577 @param[in] page_no Page to partition at if at required level.
578 @param[in] depth Sub-range current level.
579 @param[in] split_level Sub-range starting level (0 == root).
580 @param[in,out] ranges Ranges to scan.
581 @param[in,out] mtr Mini-transaction */
582 dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no,
583 size_t depth, const size_t split_level, Ranges &ranges,
584 mtr_t *mtr);
585
586 /** Build a dtuple_t from rec_t.
587 @param[in] rec Build the dtuple from this record.
588 @param[in,out] iter Build in this iterator. */
589 void copy_row(const rec_t *rec, Iter *iter) const;
590
591 /** Create the persistent cursor that will be used to traverse the
592 partition and position on the the start row.
593 @param[in] page_cursor Current page cursor
594 @param[in] mtr Mini-transaction covering the read.
595 @return Start iterator. */
596 [[nodiscard]] std::shared_ptr<Iter> create_persistent_cursor(
597 const page_cur_t &page_cursor, mtr_t *mtr) const;
598
599 /** Build an old version of the row if required.
600 @param[in,out] rec Current row read from the index. This can
601 be modified by this method if an older version
602 needs to be built.
603 @param[in,out] offsets Same as above but pertains to the rec offsets
604 @param[in,out] heap Heap to use if a previous version needs to be
605 built from the undo log.
606 @param[in,out] mtr Mini-transaction covering the read.
607 @return true if row is visible to the transaction. */
608 [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets,
609 mem_heap_t *&heap, mtr_t *mtr);
610
611 /** Create an execution context for a range and add it to
612 the Parallel_reader's run queue.
613 @param[in] range Range for which to create the context.
614 @param[in] split true if the sub-tree should be split further.
615 @return DB_SUCCESS or error code. */
616 [[nodiscard]] dberr_t create_context(const Range &range, bool split);
617
618 /** Create the execution contexts based on the ranges.
619 @param[in] ranges Ranges for which to create the contexts.
620 @return DB_SUCCESS or error code. */
621 [[nodiscard]] dberr_t create_contexts(const Ranges &ranges);
622
623 /** @return the maximum number of threads configured. */
624 [[nodiscard]] size_t max_threads() const { return m_reader->max_threads(); }
625
626 /** Release unused threads back to the pool.
627 @param[in] unused_threads Number of threads to "release". */
628 void release_threads(size_t unused_threads) {
629 m_reader->release_threads(unused_threads);
630 }
631
632 /** S lock the index. */
633 void index_s_lock();
634
635 /** S unlock the index. */
636 void index_s_unlock();
637
638 /** @return true if at least one thread owns the S latch on the index. */
639 bool index_s_own() const {
640 return m_s_locks.load(std::memory_order_acquire) > 0;
641 }
642
643 private:
645
646 /** Context ID. */
647 size_t m_id{std::numeric_limits<size_t>::max()};
648
649 /** Parallel scan configuration. */
651
652 /** Covering transaction. */
653 const trx_t *m_trx{};
654
655 /** Callback function. */
657
658 /** Depth of the Btree. */
659 size_t m_depth{};
660
661 /** The parallel reader. */
663
664 /** Error during parallel read. */
665 mutable std::atomic<dberr_t> m_err{DB_SUCCESS};
666
667 /** Number of threads that have S locked the index. */
668 std::atomic_size_t m_s_locks{};
669
670 friend class Parallel_reader;
671
672 Scan_ctx(Scan_ctx &&) = delete;
673 Scan_ctx(const Scan_ctx &) = delete;
675 Scan_ctx &operator=(const Scan_ctx &) = delete;
676};
677
678/** Parallel reader execution context. */
680 public:
681 /** Constructor.
682 @param[in] id Thread ID.
683 @param[in] scan_ctx Scan context.
684 @param[in] range Range that the thread has to read. */
685 Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
686 : m_id(id), m_range(range), m_scan_ctx(scan_ctx) {}
687
688 /** Destructor. */
689 ~Ctx() = default;
690
691 public:
692 /** @return the context ID. */
693 [[nodiscard]] size_t id() const { return m_id; }
694
695 /** The scan ID of the scan context this belongs to. */
696 [[nodiscard]] size_t scan_id() const { return m_scan_ctx->id(); }
697
698 /** @return the covering transaction. */
699 [[nodiscard]] const trx_t *trx() const { return m_scan_ctx->m_trx; }
700
701 /** @return the index being scanned. */
702 [[nodiscard]] const dict_index_t *index() const {
704 }
705
706 /** @return ID of the thread processing this context */
707 [[nodiscard]] size_t thread_id() const { return m_thread_ctx->m_thread_id; }
708
709 /** @return the thread context of the reader thread. */
710 [[nodiscard]] Thread_ctx *thread_ctx() const { return m_thread_ctx; }
711
712 /** @return the partition id of the index.
713 @note this is std::numeric_limits<size_t>::max() if the index does not
714 belong to a partition. */
715 [[nodiscard]] size_t partition_id() const {
717 }
718
719 /** Build an old version of the row if required.
720 @param[in,out] rec Current row read from the index. This can
721 be modified by this method if an older version
722 needs to be built.
723 @param[in,out] offsets Same as above but pertains to the rec offsets
724 @param[in,out] heap Heap to use if a previous version needs to be
725 built from the undo log.
726 @param[in,out] mtr Mini-transaction covering the read.
727 @return true if row is visible to the transaction. */
728 bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap,
729 mtr_t *mtr) {
730 return m_scan_ctx->check_visibility(rec, offsets, heap, mtr);
731 }
732
733 private:
734 /** Traverse the pages by key order.
735 @return DB_SUCCESS or error code. */
736 [[nodiscard]] dberr_t traverse();
737
738 /** Traverse the records in a node.
739 @param[in] pcursor persistent b-tree cursor
740 @param[in] mtr mtr
741 @return error */
742 dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr);
743
744 /** Move to the next node in the specified level.
745 @param[in] pcursor persistent b-tree cursor
746 @return success */
747 bool move_to_next_node(PCursor *pcursor);
748
749 /** Split the context into sub-ranges and add them to the execution queue.
750 @return DB_SUCCESS or error code. */
751 [[nodiscard]] dberr_t split();
752
753 /** @return true if in error state. */
754 [[nodiscard]] bool is_error_set() const {
756 }
757
758 private:
759 /** Context ID. */
760 size_t m_id{std::numeric_limits<size_t>::max()};
761
762 /** If true then split the context at the block level. */
763 bool m_split{};
764
765 /** Range to read in this context. */
767
768 /** Scanner context. */
770
771 public:
772 /** Context information related to executing thread ID. */
774
775 /** Current block. */
777
778 /** Current row. */
779 const rec_t *m_rec{};
780
781 /** Number of pages traversed by the context. */
782 size_t m_n_pages{};
783
784 /** True if m_rec is the first record in the page. */
785 bool m_first_rec{true};
786
788
789 /** Start of a new range to scan. */
790 bool m_start{};
791
792 friend class Parallel_reader;
793};
794
795#endif /* !row0par_read_h */
uint32_t page_no_t
Page number.
Definition: api0api.h:48
The index tree cursor.
Persistent cursor wrapper around btr_pcur_t.
Definition: row0pread.cc:207
Parallel reader execution context.
Definition: row0pread.h:679
bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.h:728
Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
Constructor.
Definition: row0pread.h:685
size_t id() const
Definition: row0pread.h:693
size_t m_n_pages
Number of pages traversed by the context.
Definition: row0pread.h:782
~Ctx()=default
Destructor.
bool is_error_set() const
Definition: row0pread.h:754
dberr_t split()
Split the context into sub-ranges and add them to the execution queue.
Definition: row0pread.cc:147
Scan_ctx::Range m_range
Range to read in this context.
Definition: row0pread.h:766
Scan_ctx * m_scan_ctx
Scanner context.
Definition: row0pread.h:769
bool m_start
Start of a new range to scan.
Definition: row0pread.h:790
const trx_t * trx() const
Definition: row0pread.h:699
bool m_split
If true then split the context at the block level.
Definition: row0pread.h:763
dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr)
Traverse the records in a node.
Definition: row0pread.cc:648
size_t thread_id() const
Definition: row0pread.h:707
bool m_first_rec
True if m_rec is the first record in the page.
Definition: row0pread.h:785
Thread_ctx * thread_ctx() const
Definition: row0pread.h:710
size_t scan_id() const
The scan ID of the scan context this belongs to.
Definition: row0pread.h:696
const buf_block_t * m_block
Current block.
Definition: row0pread.h:776
ulint * m_offsets
Definition: row0pread.h:787
bool move_to_next_node(PCursor *pcursor)
Move to the next node in the specified level.
Definition: row0pread.cc:597
size_t m_id
Context ID.
Definition: row0pread.h:760
const dict_index_t * index() const
Definition: row0pread.h:702
size_t partition_id() const
Definition: row0pread.h:715
Thread_ctx * m_thread_ctx
Context information related to executing thread ID.
Definition: row0pread.h:773
const rec_t * m_rec
Current row.
Definition: row0pread.h:779
dberr_t traverse()
Traverse the pages by key order.
Definition: row0pread.cc:613
Parallel reader context.
Definition: row0pread.h:470
std::atomic_size_t m_s_locks
Number of threads that have S locked the index.
Definition: row0pread.h:668
dberr_t partition(const Scan_range &scan_range, Ranges &ranges, size_t split_level)
Partition the B+Tree for parallel read.
Definition: row0pread.cc:1183
page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const
Traverse from given sub-tree page number to start of the scan range from the given page number.
Definition: row0pread.cc:971
std::pair< ulint, buf_block_t * > Savepoint
mtr_t savepoint.
Definition: row0pread.h:508
size_t id() const
Definition: row0pread.h:519
Scan_ctx & operator=(Scan_ctx &&)=delete
void index_s_lock()
S lock the index.
Definition: row0pread.cc:131
std::shared_ptr< Iter > create_persistent_cursor(const page_cur_t &page_cursor, mtr_t *mtr) const
Create the persistent cursor that will be used to traverse the partition and position on the the star...
Definition: row0pread.cc:550
size_t m_id
Context ID.
Definition: row0pread.h:647
F m_f
Callback function.
Definition: row0pread.h:656
bool index_s_own() const
Definition: row0pread.h:639
dberr_t create_context(const Range &range, bool split)
Create an execution context for a range and add it to the Parallel_reader's run queue.
Definition: row0pread.cc:1218
void index_s_unlock()
S unlock the index.
Definition: row0pread.cc:139
dberr_t create_contexts(const Ranges &ranges)
Create the execution contexts based on the ranges.
Definition: row0pread.cc:1243
void release_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:628
page_no_t search(const buf_block_t *block, const dtuple_t *key) const
Find the page number of the node that contains the search key.
Definition: row0pread.cc:933
size_t max_threads() const
Definition: row0pread.h:624
Config m_config
Parallel scan configuration.
Definition: row0pread.h:650
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:523
void copy_row(const rec_t *rec, Iter *iter) const
Build a dtuple_t from rec_t.
Definition: row0pread.cc:520
~Scan_ctx()=default
Destructor.
std::vector< Savepoint, ut::allocator< Savepoint > > Savepoints
For releasing the S latches after processing the blocks.
Definition: row0pread.h:511
size_t m_depth
Depth of the Btree.
Definition: row0pread.h:659
std::pair< std::shared_ptr< Iter >, std::shared_ptr< Iter > > Range
The first cursor should read up to the second cursor [f, s).
Definition: row0pread.h:514
void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const
Create and add the range to the scan ranges.
Definition: row0pread.cc:1011
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:665
Scan_ctx & operator=(const Scan_ctx &)=delete
std::vector< Range, ut::allocator< Range > > Ranges
Definition: row0pread.h:516
bool is_error_set() const
Definition: row0pread.h:528
const trx_t * m_trx
Covering transaction.
Definition: row0pread.h:653
buf_block_t * block_get_s_latched(const page_id_t &page_id, mtr_t *mtr, size_t line) const
Fetch a block from the buffer pool and acquire an S latch on it.
Definition: row0pread.cc:318
Scan_ctx(Scan_ctx &&)=delete
Scan_ctx(const Scan_ctx &)=delete
Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f)
Constructor.
Definition: row0pread.cc:200
Parallel_reader * m_reader
The parallel reader.
Definition: row0pread.h:662
dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t split_level, Ranges &ranges, mtr_t *mtr)
Find the subtrees to scan in a block.
Definition: row0pread.cc:1027
bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.cc:465
The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan ...
Definition: row0pread.h:100
size_t max_threads() const
Definition: row0pread.h:347
std::function< dberr_t(Thread_ctx *thread_ctx)> Finish
Callback to finalise callers state.
Definition: row0pread.h:139
void parallel_read()
Create the threads and do a parallel read across the partitions.
Definition: row0pread.cc:1275
std::atomic_size_t m_n_completed
Total tasks executed so far.
Definition: row0pread.h:445
std::vector< IB_thread, ut::allocator< IB_thread > > m_parallel_read_threads
List of threads used for paralle_read purpose.
Definition: row0pread.h:457
constexpr static size_t MAX_THREADS
Maximum value for innodb-parallel-read-threads.
Definition: row0pread.h:103
Parallel_reader & operator=(Parallel_reader &&)=delete
bool m_sync
If the caller wants to wait for the parallel_read to finish it's run.
Definition: row0pread.h:463
bool is_queue_empty() const
Definition: row0pread.cc:807
dberr_t run(size_t n_threads)
Start the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1352
std::vector< Thread_ctx *, ut::allocator< Thread_ctx * > > m_thread_ctxs
Context information related to each parallel reader thread.
Definition: row0pread.h:466
std::vector< page_no_t, ut::allocator< page_no_t > > Links
Definition: row0pread.h:113
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:454
bool is_active() const
Definition: row0pread.h:400
size_t m_n_threads
Number of worker threads that will be spawned.
Definition: row0pread.h:421
uint64_t m_sig_count
Value returned by previous call of os_event_reset() on m_event.
Definition: row0pread.h:436
dberr_t get_error_state() const
Get the error stored in the global error state.
Definition: row0pread.h:320
Parallel_reader(const Parallel_reader &)=delete
size_t m_max_threads
Maximum number of worker threads to use.
Definition: row0pread.h:418
std::shared_ptr< Ctx > dequeue()
Fetch the next job execute.
Definition: row0pread.cc:791
void release_unused_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:376
Start m_start_callback
Callback at start (before processing any rows).
Definition: row0pread.h:448
os_event_t m_event
For signalling worker threads about events.
Definition: row0pread.h:433
Parallel_reader & operator=(const Parallel_reader &)=delete
void set_start_callback(Start &&f)
Set the callback that must be called before any processing.
Definition: row0pread.h:329
bool is_error_set() const
Definition: row0pread.h:350
Scan_ctxs m_scan_ctxs
Scan contexts.
Definition: row0pread.h:430
Parallel_reader(size_t max_threads)
Constructor.
Definition: row0pread.cc:187
constexpr static size_t MAX_RESERVED_THREADS
Maximum value for reserved parallel read threads for data load so that at least this many threads are...
Definition: row0pread.h:107
dberr_t spawn(size_t n_threads) noexcept
Spawn the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1337
ib_mutex_t m_mutex
Mutex protecting m_ctxs.
Definition: row0pread.h:424
void set_finish_callback(Finish &&f)
Set the callback that must be called after all processing.
Definition: row0pread.h:333
void worker(Thread_ctx *thread_ctx)
Poll for requests and execute.
Definition: row0pread.cc:814
void join()
Wait for the join of threads spawned by the parallel reader.
Definition: row0pread.h:312
~Parallel_reader()
Destructor.
Definition: row0pread.cc:89
std::list< std::shared_ptr< Scan_ctx >, ut::allocator< std::shared_ptr< Scan_ctx > > > Scan_ctxs
Definition: row0pread.h:413
Finish m_finish_callback
Callback at end (adter processing all rows).
Definition: row0pread.h:451
size_t m_scan_ctx_id
Counter for allocating scan context IDs.
Definition: row0pread.h:439
static std::atomic_size_t s_active_threads
Number of threads currently doing parallel reads.
Definition: row0pread.h:460
static void release_threads(size_t n_threads)
Release the parallel read threads.
Definition: row0pread.h:297
constexpr static size_t MAX_TOTAL_THREADS
Maximum value for at most number of parallel read threads that can be spawned.
Definition: row0pread.h:111
void enqueue(std::shared_ptr< Ctx > ctx)
Add an execution context to the run queue.
Definition: row0pread.cc:785
void set_n_threads(size_t n_threads)
Set the number of threads to be spawned.
Definition: row0pread.h:362
static size_t available_threads(size_t n_required, bool use_reserved)
Check how many threads are available for parallel reads.
Definition: row0pread.cc:102
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:356
std::list< std::shared_ptr< Ctx >, ut::allocator< std::shared_ptr< Ctx > > > Ctxs
Definition: row0pread.h:409
State
Scan state.
Definition: row0pread.h:121
@ PAGE
Start/Finish page read.
@ CTX
Start/Finish Ctx state.
@ THREAD
Start/Finish thread state.
@ UNKNOWN
Unknown state.
dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
Add scan context.
Definition: row0pread.cc:1392
bool is_tree_empty() const
Definition: row0pread.h:323
std::atomic_size_t m_ctx_id
Context ID.
Definition: row0pread.h:442
std::function< dberr_t(Thread_ctx *thread_ctx)> Start
Callback to initialise callers state.
Definition: row0pread.h:136
Parallel_reader(const Parallel_reader &&)=delete
std::function< dberr_t(const Ctx *)> F
Callback to process the rows.
Definition: row0pread.h:142
Ctxs m_ctxs
Contexts that must be executed.
Definition: row0pread.h:427
Page identifier.
Definition: buf0types.h:206
Page size descriptor.
Definition: page0size.h:49
Allocator that allows std::* containers to manage their memory through ut::malloc* and ut::free libra...
Definition: ut0new.h:2179
Global error codes for the database.
dberr_t
Definition: db0err.h:38
@ DB_SUCCESS
Definition: db0err.h:42
uint32_t dict_tf_to_fsp_flags(uint32_t table_flags)
Convert a 32 bit integer table flags to the 32 bit FSP Flags.
Definition: dict0dict.cc:5021
static bool dict_table_is_comp(const dict_table_t *table)
Check whether the table uses the compact page format.
The low-level file system.
static int flags[50]
Definition: hp_test1.cc:39
static void start(mysql_harness::PluginFuncEnv *env)
Definition: http_auth_backend_plugin.cc:176
static void mem_heap_free(mem_heap_t *heap)
Frees the space occupied by a memory heap.
static mem_heap_t * mem_heap_create(ulint size, ut::Location loc, ulint type=MEM_HEAP_DYNAMIC)
Creates a memory heap.
static Value err()
Create a Value object that represents an error condition.
Definition: json_binary.cc:909
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
The interface to the operating system condition variables.
The interface to the threading wrapper.
A class describing a page size.
Record manager global types.
byte rec_t
Definition: rem0types.h:40
required string key
Definition: replication_asynchronous_connection_failover.proto:59
Select.
Scan (Scan_ctx) configuration.
Definition: row0pread.h:170
size_t m_partition_id
Partition id if the index to be scanned belongs to a partitioned table, else std::numeric_limits<size...
Definition: row0pread.h:210
Config(const Scan_range &scan_range, dict_index_t *index, size_t read_level=0, size_t partition_id=std::numeric_limits< size_t >::max())
Constructor.
Definition: row0pread.h:177
const bool m_is_compact
Row format of table.
Definition: row0pread.h:200
size_t m_read_level
Btree level from which records need to be read.
Definition: row0pread.h:206
Config(const Config &config)=default
Copy constructor.
dict_index_t * m_index
(Cluster) Index in table to scan.
Definition: row0pread.h:197
const Scan_range m_scan_range
Range to scan.
Definition: row0pread.h:194
const page_size_t m_page_size
Tablespace page size.
Definition: row0pread.h:203
Boundary of the range to scan.
Definition: row0pread.h:486
const dtuple_t * m_tuple
Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple,...
Definition: row0pread.h:501
btr_pcur_t * m_pcur
Persistent cursor.
Definition: row0pread.h:504
const rec_t * m_rec
Start scanning from this key.
Definition: row0pread.h:497
mem_heap_t * m_heap
Heap used to allocate m_rec, m_tuple and m_pcur.
Definition: row0pread.h:491
~Iter()
Destructor.
Definition: row0pread.cc:74
const ulint * m_offsets
m_rec column offsets.
Definition: row0pread.h:494
Specifies the range from where to start the scan and where to end it.
Definition: row0pread.h:145
Scan_range()
Default constructor.
Definition: row0pread.h:147
Scan_range(const Scan_range &scan_range)=default
Copy constructor.
const dtuple_t * m_end
End of the scan, can be null for +infinity.
Definition: row0pread.h:163
const dtuple_t * m_start
Start of the scan, can be nullptr for -infinity.
Definition: row0pread.h:160
std::string to_string() const
Convert the instance to a string representation.
Definition: row0pread.cc:56
Scan_range(const dtuple_t *start, const dtuple_t *end)
Constructor.
Definition: row0pread.h:156
Thread related context information.
Definition: row0pread.h:214
size_t m_thread_id
Thread ID.
Definition: row0pread.h:260
Thread_ctx & operator=(Thread_ctx &&)=delete
void create_blob_heap() noexcept
Create BLOB heap.
Definition: row0pread.h:244
Thread_ctx(const Thread_ctx &)=delete
mem_heap_t * m_blob_heap
BLOB heap per thread.
Definition: row0pread.h:267
void * m_callback_ctx
Callback information related to the thread.
Definition: row0pread.h:264
Thread_ctx(size_t id) noexcept
Constructor.
Definition: row0pread.h:217
void savepoint() noexcept
Save current position, commit any active mtr.
Definition: row0pread.cc:419
PCursor * m_pcursor
Current persistent cursor.
Definition: row0pread.h:273
Thread_ctx(Thread_ctx &&)=delete
State get_state() const noexcept
Definition: row0pread.h:250
Thread_ctx & operator=(const Thread_ctx &)=delete
State m_state
Worker thread state.
Definition: row0pread.h:270
void set_callback_ctx(T *ctx) noexcept
Set thread related callback information.
Definition: row0pread.h:231
dberr_t restore_from_savepoint() noexcept
Restore saved position and resume scan.
Definition: row0pread.cc:413
T * get_callback_ctx() noexcept
Get the thread related callback information/.
Definition: row0pread.h:239
~Thread_ctx() noexcept
Destructor.
Definition: row0pread.h:220
Definition: btr0pcur.h:98
The buffer control block structure.
Definition: buf0buf.h:1664
Data structure for an index.
Definition: dict0mem.h:1021
Data structure for a database table.
Definition: dict0mem.h:1884
Structure for an SQL data tuple of fields (logical record)
Definition: data0data.h:681
The info structure stored at the beginning of a heap block.
Definition: mem0mem.h:299
Mini-transaction handle and buffer.
Definition: mtr0mtr.h:181
InnoDB condition variable.
Definition: os0event.cc:62
Index page cursor.
Definition: page0cur.h:310
Definition: gen_lex_token.cc:151
Definition: trx0trx.h:680
#define UNIV_PAGE_SIZE
The universal page size of the database.
Definition: univ.i:295
unsigned long int ulint
Definition: univ.i:407
#define UT_LOCATION_HERE
Definition: ut0core.h:46
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:68
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:56