MySQL 9.2.0
Source Code Documentation
row0pread.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2018, 2024, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is designed to work with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have either included with
15the program or referenced in the documentation.
16
17This program is distributed in the hope that it will be useful, but WITHOUT
18ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20for more details.
21
22You should have received a copy of the GNU General Public License along with
23this program; if not, write to the Free Software Foundation, Inc.,
2451 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26*****************************************************************************/
27
28/** @file include/row0pread.h
29Parallel read interface.
30
31Created 2018-01-27 by Sunny Bains. */
32
33#ifndef row0par_read_h
34#define row0par_read_h
35
36#include <functional>
37#include <vector>
38
39#include "os0thread-create.h"
40#include "row0sel.h"
41
42// Forward declarations
43struct trx_t;
44struct mtr_t;
45class PCursor;
46struct btr_pcur_t;
47struct buf_block_t;
48struct dict_table_t;
49
50#include "btr0cur.h"
51#include "db0err.h"
52#include "fil0fil.h"
53#include "os0event.h"
54#include "page0size.h"
55#include "rem0types.h"
56#include "ut0mpmcbq.h"
57
58/** The core idea is to find the left and right paths down the B+Tree.These
59paths correspond to the scan start and scan end search. Follow the links
60at the appropriate btree level from the left to right and split the scan
61on each of these sub-tree root nodes.
62
63If the user has set the maximum number of threads to use at say 4 threads
64and there are 5 sub-trees at the selected level then we will split the 5th
65sub-tree dynamically when it is ready for scan.
66
67We want to allow multiple parallel range scans on different indexes at the
68same time. To achieve this split out the scan context (Scan_ctx) from the
69execution context (Ctx). The Scan_ctx has the index and transaction
70information and the Ctx keeps track of the cursor for a specific thread
71during the scan.
72
73To start a scan we need to instantiate a Parallel_reader. A parallel reader
74can contain several Scan_ctx instances and a Scan_ctx can contain several
75Ctx instances. Its' the Ctx instances that are eventually executed.
76
77This design allows for a single Parallel_reader to scan multiple indexes
78at once. Each index range scan has to be added via its add_scan() method.
79This functionality is required to handle parallel partition scans because
80partitions are separate indexes. This can be used to scan completely
81different indexes and tables by one instance of a Parallel_reader.
82
83To solve the imbalance problem we dynamically split the sub-trees as and
84when required. e.g., If you have 5 sub-trees to scan and 4 threads then
85it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
86the first thread that finishes scanning the first set of 4 partitions will
87then dynamically split the 5th sub-tree and add the newly created sub-trees
88to the execution context (Ctx) run queue in the Parallel_reader. As the
89other threads complete their sub-tree scans they will pick up more execution
90contexts (Ctx) from the Parallel_reader run queue and start scanning the
91sub-partitions as normal.
92
93Note: The Ctx instances are in a virtual list. Each Ctx instance has a
94range to scan. The start point of this range instance is the end point
95of the Ctx instance scanning values less than its start point. A Ctx
96will scan from [Start, End) rows. We use std::shared_ptr to manage the
97reference counting, this allows us to dispose of the Ctx instances
98without worrying about dangling pointers.
99
100NOTE: Secondary index scans are not supported currently. */
102 public:
103 /** Maximum value for innodb-parallel-read-threads. */
104 constexpr static size_t MAX_THREADS{256};
105
106 /** Maximum value for reserved parallel read threads for data load so that
107 at least this many threads are always available for data load. */
108 constexpr static size_t MAX_RESERVED_THREADS{16};
109
110 /** Maximum value for at most number of parallel read threads that can be
111 spawned. */
113
114 using Links = std::vector<page_no_t, ut::allocator<page_no_t>>;
115
116 // Forward declaration.
117 class Ctx;
118 class Scan_ctx;
119 struct Thread_ctx;
120
121 /** Scan state. */
122 enum class State : uint8_t {
123 /** Unknown state. */
124 UNKNOWN,
125
126 /** Start/Finish thread state. */
127 THREAD,
128
129 /** Start/Finish Ctx state. */
130 CTX,
131
132 /** Start/Finish page read. */
133 PAGE
134 };
135
136 /** Callback to initialise callers state. */
137 using Start = std::function<dberr_t(Thread_ctx *thread_ctx)>;
138
139 /** Callback to finalise callers state. */
140 using Finish = std::function<dberr_t(Thread_ctx *thread_ctx)>;
141
142 /** Callback to process the rows. */
143 using F = std::function<dberr_t(const Ctx *)>;
144
145 /** Specifies the range from where to start the scan and where to end it. */
146 struct Scan_range {
147 /** Default constructor. */
149
150 /** Copy constructor.
151 @param[in] scan_range Instance to copy from. */
152 Scan_range(const Scan_range &scan_range) = default;
153
154 /** Constructor.
155 @param[in] start Start key
156 @param[in] end End key. */
158 : m_start(start), m_end(end) {}
159
160 /** Start of the scan, can be nullptr for -infinity. */
162
163 /** End of the scan, can be null for +infinity. */
164 const dtuple_t *m_end{};
165
166 /** Convert the instance to a string representation. */
167 [[nodiscard]] std::string to_string() const;
168 };
169
170 /** Scan (Scan_ctx) configuration. */
171 struct Config {
172 /** Constructor.
173 @param[in] scan_range Range to scan.
174 @param[in] index Cluster index to scan.
175 @param[in] read_level Btree level from which records need to be read.
176 @param[in] partition_id Partition id if the index to be scanned.
177 belongs to a partitioned table. */
178 Config(const Scan_range &scan_range, dict_index_t *index,
179 size_t read_level = 0,
180 size_t partition_id = std::numeric_limits<size_t>::max())
181 : m_scan_range(scan_range),
182 m_index(index),
185 m_read_level(read_level),
186 m_partition_id(partition_id) {}
187
188 /** Copy constructor.
189 @param[in] config Instance to copy from. */
190 Config(const Config &config)
191
192 = default;
193
194 /** Range to scan. */
196
197 /** (Cluster) Index in table to scan. */
199
200 /** Row format of table. */
201 const bool m_is_compact{};
202
203 /** Tablespace page size. */
205
206 /** Btree level from which records need to be read. */
207 size_t m_read_level{0};
208
209 /** Partition id if the index to be scanned belongs to a partitioned table,
210 else std::numeric_limits<size_t>::max(). */
211 size_t m_partition_id{std::numeric_limits<size_t>::max()};
212 };
213
214 /** Thread related context information. */
215 struct Thread_ctx {
216 /** Constructor.
217 @param[in] id Thread ID */
218 explicit Thread_ctx(size_t id) noexcept : m_thread_id(id) {}
219
220 /** Destructor. */
221 ~Thread_ctx() noexcept {
222 ut_a(m_callback_ctx == nullptr);
223
224 if (m_blob_heap != nullptr) {
226 }
227 }
228
229 /** Set thread related callback information.
230 @param[in] ctx callback context */
231 template <typename T>
232 void set_callback_ctx(T *ctx) noexcept {
233 ut_ad(m_callback_ctx == nullptr || ctx == nullptr);
234 m_callback_ctx = ctx;
235 }
236
237 /** Get the thread related callback information/
238 @return return context. */
239 template <typename T>
240 T *get_callback_ctx() noexcept {
241 return static_cast<T *>(m_callback_ctx);
242 }
243
244 /** Create BLOB heap. */
245 void create_blob_heap() noexcept {
246 ut_a(m_blob_heap == nullptr);
248 }
249
250 /** @return the worker thread state. */
251 State get_state() const noexcept { return m_state; }
252
253 /** @see PCursor::save_current_user_record_as_last_processed */
255
256 /** @see PCursor::restore_to_last_processed_user_record */
258
259 /** @see PCursor::save_previous_user_record_as_last_processed */
261
262 /** @see PCursor::restore_to_first_unprocessed */
263 void restore_to_first_unprocessed() noexcept;
264
265 /** Thread ID. */
266 size_t m_thread_id{std::numeric_limits<size_t>::max()};
267
268 /** Callback information related to the thread.
269 @note Needs to be created and destroyed by the callback itself. */
271
272 /** BLOB heap per thread. */
274
275 /** Worker thread state. */
277
278 /** Current persistent cursor. */
280
281 Thread_ctx(Thread_ctx &&) = delete;
282 Thread_ctx(const Thread_ctx &) = delete;
284 Thread_ctx &operator=(const Thread_ctx &) = delete;
285 };
286
287 /** Constructor.
288 @param[in] max_threads Maximum number of threads to use. */
289 explicit Parallel_reader(size_t max_threads);
290
291 /** Destructor. */
293
294 /** Check how many threads are available for parallel reads.
295 @param[in] n_required Number of threads required.
296 @param[in] use_reserved true if reserved threads needs to be considered
297 while checking for availability of threads
298 @return number of threads available. */
299 [[nodiscard]] static size_t available_threads(size_t n_required,
300 bool use_reserved);
301
302 /** Release the parallel read threads. */
303 static void release_threads(size_t n_threads) {
304 const auto SEQ_CST = std::memory_order_seq_cst;
305 auto active = s_active_threads.fetch_sub(n_threads, SEQ_CST);
306 ut_a(active >= n_threads);
307 }
308
309 /** Add scan context.
310 @param[in,out] trx Covering transaction.
311 @param[in] config Scan condfiguration.
312 @param[in] f Callback function.
313 (default is 0 which is leaf level)
314 @return error. */
315 [[nodiscard]] dberr_t add_scan(trx_t *trx, const Config &config, F &&f);
316
317 /** Wait for the join of threads spawned by the parallel reader. */
318 void join() {
319 for (auto &t : m_parallel_read_threads) {
320 t.wait();
321 }
322 }
323
324 /** Get the error stored in the global error state.
325 @return global error state. */
326 [[nodiscard]] dberr_t get_error_state() const { return m_err; }
327
328 /** @return true if the tree is empty, else false. */
329 [[nodiscard]] bool is_tree_empty() const {
330 return m_ctx_id.load(std::memory_order_relaxed) == 0;
331 }
332
333 /** Set the callback that must be called before any processing.
334 @param[in] f Call before first row is processed.*/
335 void set_start_callback(Start &&f) { m_start_callback = std::move(f); }
336
337 /** Set the callback that must be called after all processing.
338 @param[in] f Call after last row is processed.*/
339 void set_finish_callback(Finish &&f) { m_finish_callback = std::move(f); }
340
341 /** Spawn the threads to do the parallel read for the specified range.
342 Don't wait for the spawned to threads to complete.
343 @param[in] n_threads number of threads that *need* to be spawned
344 @return DB_SUCCESS or error code. */
345 [[nodiscard]] dberr_t spawn(size_t n_threads) noexcept;
346
347 /** Start the threads to do the parallel read for the specified range.
348 @param[in] n_threads Number of threads to use for the scan.
349 @return DB_SUCCESS or error code. */
350 [[nodiscard]] dberr_t run(size_t n_threads);
351
352 /** @return the configured max threads size. */
353 [[nodiscard]] size_t max_threads() const { return m_max_threads; }
354
355 /** @return true if in error state. */
356 [[nodiscard]] bool is_error_set() const {
357 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
358 }
359
360 /** Set the error state.
361 @param[in] err Error state to set to. */
363 m_err.store(err, std::memory_order_relaxed);
364 }
365
366 /** Set the number of threads to be spawned.
367 @param[in] n_threads number of threads to be spawned. */
368 void set_n_threads(size_t n_threads) {
369 ut_ad(n_threads <= m_max_threads);
370 m_n_threads = n_threads;
371 }
372
373 // Disable copying.
378
379 private:
380 /** Release unused threads back to the pool.
381 @param[in] unused_threads Number of threads to "release". */
382 void release_unused_threads(size_t unused_threads) {
383 ut_a(m_max_threads >= unused_threads);
384 release_threads(unused_threads);
385 }
386
387 /** Add an execution context to the run queue.
388 @param[in] ctx Execution context to add to the queue. */
389 void enqueue(std::shared_ptr<Ctx> ctx);
390
391 /** Fetch the next job execute.
392 @return job to execute or nullptr. */
393 [[nodiscard]] std::shared_ptr<Ctx> dequeue();
394
395 /** @return true if job queue is empty. */
396 [[nodiscard]] bool is_queue_empty() const;
397
398 /** Poll for requests and execute.
399 @param[in] thread_ctx thread related context information */
400 void worker(Thread_ctx *thread_ctx);
401
402 /** Create the threads and do a parallel read across the partitions. */
403 void parallel_read();
404
405 /** @return true if tasks are still executing. */
406 [[nodiscard]] bool is_active() const {
407 return m_n_completed.load(std::memory_order_relaxed) <
408 m_ctx_id.load(std::memory_order_relaxed);
409 }
410
411 private:
412 // clang-format off
413 using Ctxs =
414 std::list<std::shared_ptr<Ctx>,
416
417 using Scan_ctxs =
418 std::list<std::shared_ptr<Scan_ctx>,
420
421 // clang-format on
422
423 /** Maximum number of worker threads to use. */
425
426 /** Number of worker threads that will be spawned. */
427 size_t m_n_threads{0};
428
429 /** Mutex protecting m_ctxs. */
430 mutable ib_mutex_t m_mutex;
431
432 /** Contexts that must be executed. */
434
435 /** Scan contexts. */
437
438 /** For signalling worker threads about events. */
440
441 /** Value returned by previous call of os_event_reset() on m_event. */
442 uint64_t m_sig_count;
443
444 /** Counter for allocating scan context IDs. */
446
447 /** Context ID. Monotonically increasing ID. */
448 std::atomic_size_t m_ctx_id{};
449
450 /** Total tasks executed so far. */
451 std::atomic_size_t m_n_completed{};
452
453 /** Callback at start (before processing any rows). */
455
456 /** Callback at end (adter processing all rows). */
458
459 /** Error during parallel read. */
460 std::atomic<dberr_t> m_err{DB_SUCCESS};
461
462 /** List of threads used for paralle_read purpose. */
463 std::vector<IB_thread, ut::allocator<IB_thread>> m_parallel_read_threads;
464
465 /** Number of threads currently doing parallel reads. */
466 static std::atomic_size_t s_active_threads;
467
468 /** If the caller wants to wait for the parallel_read to finish it's run */
469 bool m_sync;
470
471 /** Context information related to each parallel reader thread. */
472 std::vector<Thread_ctx *, ut::allocator<Thread_ctx *>> m_thread_ctxs;
473};
474
475/** Parallel reader context. */
477 public:
478 /** Constructor.
479 @param[in] reader Parallel reader that owns this context.
480 @param[in] id ID of this scan context.
481 @param[in] trx Transaction covering the scan.
482 @param[in] config Range scan config.
483 @param[in] f Callback function. */
484 Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx,
485 const Parallel_reader::Config &config, F &&f);
486
487 /** Destructor. */
488 ~Scan_ctx() = default;
489
490 private:
491 /** Boundary of the range to scan. */
492 struct Iter {
493 /** Destructor. */
494 ~Iter();
495
496 /** Heap used to allocate m_rec, m_tuple and m_pcur. */
498
499 /** m_rec column offsets. */
500 const ulint *m_offsets{};
501
502 /** Start scanning from this key. Raw data of the row. */
503 const rec_t *m_rec{};
504
505 /** Tuple representation inside m_rec, for two Iter instances in a range
506 m_tuple will be [first->m_tuple, second->m_tuple). */
508
509 /** Persistent cursor.*/
511 };
512
513 /** mtr_t savepoint. */
514 using Savepoint = std::pair<ulint, buf_block_t *>;
515
516 /** For releasing the S latches after processing the blocks. */
517 using Savepoints = std::vector<Savepoint, ut::allocator<Savepoint>>;
518
519 /** The first cursor should read up to the second cursor [f, s). */
520 using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
521
522 using Ranges = std::vector<Range, ut::allocator<Range>>;
523
524 /** @return the scan context ID. */
525 [[nodiscard]] size_t id() const { return m_id; }
526
527 /** Set the error state.
528 @param[in] err Error state to set to. */
530 m_err.store(err, std::memory_order_relaxed);
531 }
532
533 /** @return true if in error state. */
534 [[nodiscard]] bool is_error_set() const {
535 return m_err.load(std::memory_order_relaxed) != DB_SUCCESS;
536 }
537
538 /** Fetch a block from the buffer pool and acquire an S latch on it.
539 @param[in] page_id Page ID.
540 @param[in,out] mtr Mini-transaction covering the fetch.
541 @param[in] line Line from where called.
542 @return the block fetched from the buffer pool. */
543 [[nodiscard]] buf_block_t *block_get_s_latched(const page_id_t &page_id,
544 mtr_t *mtr, size_t line) const;
545
546 /** Partition the B+Tree for parallel read.
547 @param[in] scan_range Range for partitioning.
548 @param[in,out] ranges Ranges to scan.
549 @param[in] split_level Sub-range required level (0 == root).
550 @return the partition scan ranges. */
551 dberr_t partition(const Scan_range &scan_range, Ranges &ranges,
552 size_t split_level);
553
554 /** Find the page number of the node that contains the search key. If the
555 key is null then we assume -infinity.
556 @param[in] block Page to look in.
557 @param[in] key Key of the first record in the range.
558 @return the left child page number. */
559 [[nodiscard]] page_no_t search(const buf_block_t *block,
560 const dtuple_t *key) const;
561
562 /** Traverse from given sub-tree page number to start of the scan range
563 from the given page number.
564 @param[in] page_no Page number of sub-tree.
565 @param[in,out] mtr Mini-transaction.
566 @param[in] key Key of the first record in the range.
567 @param[in,out] savepoints Blocks S latched and accessed.
568 @return the leaf node page cursor. */
569 [[nodiscard]] page_cur_t start_range(page_no_t page_no, mtr_t *mtr,
570 const dtuple_t *key,
571 Savepoints &savepoints) const;
572
573 /** Create and add the range to the scan ranges.
574 @param[in,out] ranges Ranges to scan.
575 @param[in,out] leaf_page_cursor Leaf page cursor on which to create the
576 persistent cursor.
577 @param[in,out] mtr Mini-transaction */
578 void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor,
579 mtr_t *mtr) const;
580
581 /** Find the subtrees to scan in a block.
582 @param[in] scan_range Partition based on this scan range.
583 @param[in] page_no Page to partition at if at required level.
584 @param[in] depth Sub-range current level.
585 @param[in] split_level Sub-range starting level (0 == root).
586 @param[in,out] ranges Ranges to scan.
587 @param[in,out] mtr Mini-transaction */
588 dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no,
589 size_t depth, const size_t split_level, Ranges &ranges,
590 mtr_t *mtr);
591
592 /** Build a dtuple_t from rec_t.
593 @param[in] rec Build the dtuple from this record.
594 @param[in,out] iter Build in this iterator. */
595 void copy_row(const rec_t *rec, Iter *iter) const;
596
597 /** Create the persistent cursor that will be used to traverse the
598 partition and position on the the start row.
599 @param[in] page_cursor Current page cursor
600 @param[in] mtr Mini-transaction covering the read.
601 @return Start iterator. */
602 [[nodiscard]] std::shared_ptr<Iter> create_persistent_cursor(
603 const page_cur_t &page_cursor, mtr_t *mtr) const;
604
605 /** Build an old version of the row if required.
606 @param[in,out] rec Current row read from the index. This can
607 be modified by this method if an older version
608 needs to be built.
609 @param[in,out] offsets Same as above but pertains to the rec offsets
610 @param[in,out] heap Heap to use if a previous version needs to be
611 built from the undo log.
612 @param[in,out] mtr Mini-transaction covering the read.
613 @return true if row is visible to the transaction. */
614 [[nodiscard]] bool check_visibility(const rec_t *&rec, ulint *&offsets,
615 mem_heap_t *&heap, mtr_t *mtr);
616
617 /** Create an execution context for a range and add it to
618 the Parallel_reader's run queue.
619 @param[in] range Range for which to create the context.
620 @param[in] split true if the sub-tree should be split further.
621 @return DB_SUCCESS or error code. */
622 [[nodiscard]] dberr_t create_context(const Range &range, bool split);
623
624 /** Create the execution contexts based on the ranges.
625 @param[in] ranges Ranges for which to create the contexts.
626 @return DB_SUCCESS or error code. */
627 [[nodiscard]] dberr_t create_contexts(const Ranges &ranges);
628
629 /** @return the maximum number of threads configured. */
630 [[nodiscard]] size_t max_threads() const { return m_reader->max_threads(); }
631
632 /** Release unused threads back to the pool.
633 @param[in] unused_threads Number of threads to "release". */
634 void release_threads(size_t unused_threads) {
635 m_reader->release_threads(unused_threads);
636 }
637
638 /** S lock the index. */
639 void index_s_lock();
640
641 /** S unlock the index. */
642 void index_s_unlock();
643
644 /** @return true if at least one thread owns the S latch on the index. */
645 bool index_s_own() const {
646 return m_s_locks.load(std::memory_order_acquire) > 0;
647 }
648
649 private:
651
652 /** Context ID. */
653 size_t m_id{std::numeric_limits<size_t>::max()};
654
655 /** Parallel scan configuration. */
657
658 /** Covering transaction. */
659 const trx_t *m_trx{};
660
661 /** Callback function. */
663
664 /** Depth of the Btree. */
665 size_t m_depth{};
666
667 /** The parallel reader. */
669
670 /** Error during parallel read. */
671 mutable std::atomic<dberr_t> m_err{DB_SUCCESS};
672
673 /** Number of threads that have S locked the index. */
674 std::atomic_size_t m_s_locks{};
675
676 friend class Parallel_reader;
677
678 Scan_ctx(Scan_ctx &&) = delete;
679 Scan_ctx(const Scan_ctx &) = delete;
681 Scan_ctx &operator=(const Scan_ctx &) = delete;
682};
683
684/** Parallel reader execution context. */
686 public:
687 /** Constructor.
688 @param[in] id Thread ID.
689 @param[in] scan_ctx Scan context.
690 @param[in] range Range that the thread has to read. */
691 Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
692 : m_id(id), m_range(range), m_scan_ctx(scan_ctx) {}
693
694 /** Destructor. */
695 ~Ctx() = default;
696
697 public:
698 /** @return the context ID. */
699 [[nodiscard]] size_t id() const { return m_id; }
700
701 /** The scan ID of the scan context this belongs to. */
702 [[nodiscard]] size_t scan_id() const { return m_scan_ctx->id(); }
703
704 /** @return the covering transaction. */
705 [[nodiscard]] const trx_t *trx() const { return m_scan_ctx->m_trx; }
706
707 /** @return the index being scanned. */
708 [[nodiscard]] const dict_index_t *index() const {
710 }
711
712 /** @return ID of the thread processing this context */
713 [[nodiscard]] size_t thread_id() const { return m_thread_ctx->m_thread_id; }
714
715 /** @return the thread context of the reader thread. */
716 [[nodiscard]] Thread_ctx *thread_ctx() const { return m_thread_ctx; }
717
718 /** @return the partition id of the index.
719 @note this is std::numeric_limits<size_t>::max() if the index does not
720 belong to a partition. */
721 [[nodiscard]] size_t partition_id() const {
723 }
724
725 /** Build an old version of the row if required.
726 @param[in,out] rec Current row read from the index. This can
727 be modified by this method if an older version
728 needs to be built.
729 @param[in,out] offsets Same as above but pertains to the rec offsets
730 @param[in,out] heap Heap to use if a previous version needs to be
731 built from the undo log.
732 @param[in,out] mtr Mini-transaction covering the read.
733 @return true if row is visible to the transaction. */
734 bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap,
735 mtr_t *mtr) {
736 return m_scan_ctx->check_visibility(rec, offsets, heap, mtr);
737 }
738
739 private:
740 /** Traverse the pages by key order.
741 @return DB_SUCCESS or error code. */
742 [[nodiscard]] dberr_t traverse();
743
744 /** Traverse the records in a node.
745 @param[in] pcursor persistent b-tree cursor
746 @param[in] mtr mtr
747 @return error */
748 dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr);
749
750 /** Move to the next node in the specified level.
751 @param[in] pcursor persistent b-tree cursor
752 @return success */
753 bool move_to_next_node(PCursor *pcursor);
754
755 /** Split the context into sub-ranges and add them to the execution queue.
756 @return DB_SUCCESS or error code. */
757 [[nodiscard]] dberr_t split();
758
759 /** @return true if in error state. */
760 [[nodiscard]] bool is_error_set() const {
762 }
763
764 private:
765 /** Context ID. */
766 size_t m_id{std::numeric_limits<size_t>::max()};
767
768 /** If true then split the context at the block level. */
769 bool m_split{};
770
771 /** Range to read in this context. */
773
774 /** Scanner context. */
776
777 public:
778 /** Context information related to executing thread ID. */
780
781 /** Current block. */
783
784 /** Current row. */
785 const rec_t *m_rec{};
786
787 /** Number of pages traversed by the context. */
788 size_t m_n_pages{};
789
790 /** True if m_rec is the first record in the page. */
791 bool m_first_rec{true};
792
794
795 /** Start of a new range to scan. */
796 bool m_start{};
797
798 friend class Parallel_reader;
799};
800
801#endif /* !row0par_read_h */
uint32_t page_no_t
Page number.
Definition: api0api.h:46
The index tree cursor.
Persistent cursor wrapper around btr_pcur_t.
Definition: row0pread.cc:208
Parallel reader execution context.
Definition: row0pread.h:685
bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.h:734
Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
Constructor.
Definition: row0pread.h:691
size_t id() const
Definition: row0pread.h:699
size_t m_n_pages
Number of pages traversed by the context.
Definition: row0pread.h:788
~Ctx()=default
Destructor.
bool is_error_set() const
Definition: row0pread.h:760
dberr_t split()
Split the context into sub-ranges and add them to the execution queue.
Definition: row0pread.cc:148
Scan_ctx::Range m_range
Range to read in this context.
Definition: row0pread.h:772
Scan_ctx * m_scan_ctx
Scanner context.
Definition: row0pread.h:775
bool m_start
Start of a new range to scan.
Definition: row0pread.h:796
const trx_t * trx() const
Definition: row0pread.h:705
bool m_split
If true then split the context at the block level.
Definition: row0pread.h:769
dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr)
Traverse the records in a node.
Definition: row0pread.cc:774
size_t thread_id() const
Definition: row0pread.h:713
bool m_first_rec
True if m_rec is the first record in the page.
Definition: row0pread.h:791
Thread_ctx * thread_ctx() const
Definition: row0pread.h:716
size_t scan_id() const
The scan ID of the scan context this belongs to.
Definition: row0pread.h:702
const buf_block_t * m_block
Current block.
Definition: row0pread.h:782
ulint * m_offsets
Definition: row0pread.h:793
bool move_to_next_node(PCursor *pcursor)
Move to the next node in the specified level.
Definition: row0pread.cc:723
size_t m_id
Context ID.
Definition: row0pread.h:766
const dict_index_t * index() const
Definition: row0pread.h:708
size_t partition_id() const
Definition: row0pread.h:721
Thread_ctx * m_thread_ctx
Context information related to executing thread ID.
Definition: row0pread.h:779
const rec_t * m_rec
Current row.
Definition: row0pread.h:785
dberr_t traverse()
Traverse the pages by key order.
Definition: row0pread.cc:739
Parallel reader context.
Definition: row0pread.h:476
std::atomic_size_t m_s_locks
Number of threads that have S locked the index.
Definition: row0pread.h:674
dberr_t partition(const Scan_range &scan_range, Ranges &ranges, size_t split_level)
Partition the B+Tree for parallel read.
Definition: row0pread.cc:1309
page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const
Traverse from given sub-tree page number to start of the scan range from the given page number.
Definition: row0pread.cc:1097
std::pair< ulint, buf_block_t * > Savepoint
mtr_t savepoint.
Definition: row0pread.h:514
size_t id() const
Definition: row0pread.h:525
Scan_ctx & operator=(Scan_ctx &&)=delete
void index_s_lock()
S lock the index.
Definition: row0pread.cc:132
std::shared_ptr< Iter > create_persistent_cursor(const page_cur_t &page_cursor, mtr_t *mtr) const
Create the persistent cursor that will be used to traverse the partition and position on the the star...
Definition: row0pread.cc:676
size_t m_id
Context ID.
Definition: row0pread.h:653
F m_f
Callback function.
Definition: row0pread.h:662
bool index_s_own() const
Definition: row0pread.h:645
dberr_t create_context(const Range &range, bool split)
Create an execution context for a range and add it to the Parallel_reader's run queue.
Definition: row0pread.cc:1344
void index_s_unlock()
S unlock the index.
Definition: row0pread.cc:140
dberr_t create_contexts(const Ranges &ranges)
Create the execution contexts based on the ranges.
Definition: row0pread.cc:1369
void release_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:634
page_no_t search(const buf_block_t *block, const dtuple_t *key) const
Find the page number of the node that contains the search key.
Definition: row0pread.cc:1059
size_t max_threads() const
Definition: row0pread.h:630
Config m_config
Parallel scan configuration.
Definition: row0pread.h:656
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:529
void copy_row(const rec_t *rec, Iter *iter) const
Build a dtuple_t from rec_t.
Definition: row0pread.cc:646
~Scan_ctx()=default
Destructor.
std::vector< Savepoint, ut::allocator< Savepoint > > Savepoints
For releasing the S latches after processing the blocks.
Definition: row0pread.h:517
size_t m_depth
Depth of the Btree.
Definition: row0pread.h:665
std::pair< std::shared_ptr< Iter >, std::shared_ptr< Iter > > Range
The first cursor should read up to the second cursor [f, s).
Definition: row0pread.h:520
void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const
Create and add the range to the scan ranges.
Definition: row0pread.cc:1137
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:671
Scan_ctx & operator=(const Scan_ctx &)=delete
std::vector< Range, ut::allocator< Range > > Ranges
Definition: row0pread.h:522
bool is_error_set() const
Definition: row0pread.h:534
const trx_t * m_trx
Covering transaction.
Definition: row0pread.h:659
buf_block_t * block_get_s_latched(const page_id_t &page_id, mtr_t *mtr, size_t line) const
Fetch a block from the buffer pool and acquire an S latch on it.
Definition: row0pread.cc:354
Scan_ctx(Scan_ctx &&)=delete
Scan_ctx(const Scan_ctx &)=delete
Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f)
Constructor.
Definition: row0pread.cc:201
Parallel_reader * m_reader
The parallel reader.
Definition: row0pread.h:668
dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t split_level, Ranges &ranges, mtr_t *mtr)
Find the subtrees to scan in a block.
Definition: row0pread.cc:1153
bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.cc:591
The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan ...
Definition: row0pread.h:101
size_t max_threads() const
Definition: row0pread.h:353
std::function< dberr_t(Thread_ctx *thread_ctx)> Finish
Callback to finalise callers state.
Definition: row0pread.h:140
void parallel_read()
Create the threads and do a parallel read across the partitions.
Definition: row0pread.cc:1401
std::atomic_size_t m_n_completed
Total tasks executed so far.
Definition: row0pread.h:451
std::vector< IB_thread, ut::allocator< IB_thread > > m_parallel_read_threads
List of threads used for paralle_read purpose.
Definition: row0pread.h:463
constexpr static size_t MAX_THREADS
Maximum value for innodb-parallel-read-threads.
Definition: row0pread.h:104
Parallel_reader & operator=(Parallel_reader &&)=delete
bool m_sync
If the caller wants to wait for the parallel_read to finish it's run.
Definition: row0pread.h:469
bool is_queue_empty() const
Definition: row0pread.cc:933
dberr_t run(size_t n_threads)
Start the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1478
std::vector< Thread_ctx *, ut::allocator< Thread_ctx * > > m_thread_ctxs
Context information related to each parallel reader thread.
Definition: row0pread.h:472
std::vector< page_no_t, ut::allocator< page_no_t > > Links
Definition: row0pread.h:114
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:460
bool is_active() const
Definition: row0pread.h:406
size_t m_n_threads
Number of worker threads that will be spawned.
Definition: row0pread.h:427
uint64_t m_sig_count
Value returned by previous call of os_event_reset() on m_event.
Definition: row0pread.h:442
dberr_t get_error_state() const
Get the error stored in the global error state.
Definition: row0pread.h:326
Parallel_reader(const Parallel_reader &)=delete
size_t m_max_threads
Maximum number of worker threads to use.
Definition: row0pread.h:424
std::shared_ptr< Ctx > dequeue()
Fetch the next job execute.
Definition: row0pread.cc:917
void release_unused_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:382
Start m_start_callback
Callback at start (before processing any rows).
Definition: row0pread.h:454
os_event_t m_event
For signalling worker threads about events.
Definition: row0pread.h:439
Parallel_reader & operator=(const Parallel_reader &)=delete
void set_start_callback(Start &&f)
Set the callback that must be called before any processing.
Definition: row0pread.h:335
bool is_error_set() const
Definition: row0pread.h:356
Scan_ctxs m_scan_ctxs
Scan contexts.
Definition: row0pread.h:436
Parallel_reader(size_t max_threads)
Constructor.
Definition: row0pread.cc:188
constexpr static size_t MAX_RESERVED_THREADS
Maximum value for reserved parallel read threads for data load so that at least this many threads are...
Definition: row0pread.h:108
dberr_t spawn(size_t n_threads) noexcept
Spawn the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1463
ib_mutex_t m_mutex
Mutex protecting m_ctxs.
Definition: row0pread.h:430
void set_finish_callback(Finish &&f)
Set the callback that must be called after all processing.
Definition: row0pread.h:339
void worker(Thread_ctx *thread_ctx)
Poll for requests and execute.
Definition: row0pread.cc:940
void join()
Wait for the join of threads spawned by the parallel reader.
Definition: row0pread.h:318
~Parallel_reader()
Destructor.
Definition: row0pread.cc:90
std::list< std::shared_ptr< Scan_ctx >, ut::allocator< std::shared_ptr< Scan_ctx > > > Scan_ctxs
Definition: row0pread.h:419
Finish m_finish_callback
Callback at end (adter processing all rows).
Definition: row0pread.h:457
size_t m_scan_ctx_id
Counter for allocating scan context IDs.
Definition: row0pread.h:445
static std::atomic_size_t s_active_threads
Number of threads currently doing parallel reads.
Definition: row0pread.h:466
static void release_threads(size_t n_threads)
Release the parallel read threads.
Definition: row0pread.h:303
constexpr static size_t MAX_TOTAL_THREADS
Maximum value for at most number of parallel read threads that can be spawned.
Definition: row0pread.h:112
void enqueue(std::shared_ptr< Ctx > ctx)
Add an execution context to the run queue.
Definition: row0pread.cc:911
void set_n_threads(size_t n_threads)
Set the number of threads to be spawned.
Definition: row0pread.h:368
static size_t available_threads(size_t n_required, bool use_reserved)
Check how many threads are available for parallel reads.
Definition: row0pread.cc:103
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:362
std::list< std::shared_ptr< Ctx >, ut::allocator< std::shared_ptr< Ctx > > > Ctxs
Definition: row0pread.h:415
State
Scan state.
Definition: row0pread.h:122
@ PAGE
Start/Finish page read.
@ CTX
Start/Finish Ctx state.
@ THREAD
Start/Finish thread state.
@ UNKNOWN
Unknown state.
dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
Add scan context.
Definition: row0pread.cc:1518
bool is_tree_empty() const
Definition: row0pread.h:329
std::atomic_size_t m_ctx_id
Context ID.
Definition: row0pread.h:448
std::function< dberr_t(Thread_ctx *thread_ctx)> Start
Callback to initialise callers state.
Definition: row0pread.h:137
Parallel_reader(const Parallel_reader &&)=delete
std::function< dberr_t(const Ctx *)> F
Callback to process the rows.
Definition: row0pread.h:143
Ctxs m_ctxs
Contexts that must be executed.
Definition: row0pread.h:433
Page identifier.
Definition: buf0types.h:207
Page size descriptor.
Definition: page0size.h:50
Allocator that allows std::* containers to manage their memory through ut::malloc* and ut::free libra...
Definition: ut0new.h:2183
Global error codes for the database.
dberr_t
Definition: db0err.h:39
@ DB_SUCCESS
Definition: db0err.h:43
uint32_t dict_tf_to_fsp_flags(uint32_t table_flags)
Convert a 32 bit integer table flags to the 32 bit FSP Flags.
Definition: dict0dict.cc:4999
static bool dict_table_is_comp(const dict_table_t *table)
Check whether the table uses the compact page format.
The low-level file system.
static int flags[50]
Definition: hp_test1.cc:40
static void start(mysql_harness::PluginFuncEnv *env)
Definition: http_auth_backend_plugin.cc:180
static void mem_heap_free(mem_heap_t *heap)
Frees the space occupied by a memory heap.
static mem_heap_t * mem_heap_create(ulint size, ut::Location loc, ulint type=MEM_HEAP_DYNAMIC)
Creates a memory heap.
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
static Value err()
Create a Value object that represents an error condition.
Definition: json_binary.cc:908
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
The interface to the operating system condition variables.
The interface to the threading wrapper.
A class describing a page size.
Record manager global types.
byte rec_t
Definition: rem0types.h:41
required string key
Definition: replication_asynchronous_connection_failover.proto:60
Select.
Scan (Scan_ctx) configuration.
Definition: row0pread.h:171
size_t m_partition_id
Partition id if the index to be scanned belongs to a partitioned table, else std::numeric_limits<size...
Definition: row0pread.h:211
Config(const Scan_range &scan_range, dict_index_t *index, size_t read_level=0, size_t partition_id=std::numeric_limits< size_t >::max())
Constructor.
Definition: row0pread.h:178
const bool m_is_compact
Row format of table.
Definition: row0pread.h:201
size_t m_read_level
Btree level from which records need to be read.
Definition: row0pread.h:207
Config(const Config &config)=default
Copy constructor.
dict_index_t * m_index
(Cluster) Index in table to scan.
Definition: row0pread.h:198
const Scan_range m_scan_range
Range to scan.
Definition: row0pread.h:195
const page_size_t m_page_size
Tablespace page size.
Definition: row0pread.h:204
Boundary of the range to scan.
Definition: row0pread.h:492
const dtuple_t * m_tuple
Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple,...
Definition: row0pread.h:507
btr_pcur_t * m_pcur
Persistent cursor.
Definition: row0pread.h:510
const rec_t * m_rec
Start scanning from this key.
Definition: row0pread.h:503
mem_heap_t * m_heap
Heap used to allocate m_rec, m_tuple and m_pcur.
Definition: row0pread.h:497
~Iter()
Destructor.
Definition: row0pread.cc:75
const ulint * m_offsets
m_rec column offsets.
Definition: row0pread.h:500
Specifies the range from where to start the scan and where to end it.
Definition: row0pread.h:146
Scan_range()
Default constructor.
Definition: row0pread.h:148
Scan_range(const Scan_range &scan_range)=default
Copy constructor.
const dtuple_t * m_end
End of the scan, can be null for +infinity.
Definition: row0pread.h:164
const dtuple_t * m_start
Start of the scan, can be nullptr for -infinity.
Definition: row0pread.h:161
std::string to_string() const
Convert the instance to a string representation.
Definition: row0pread.cc:57
Scan_range(const dtuple_t *start, const dtuple_t *end)
Constructor.
Definition: row0pread.h:157
Thread related context information.
Definition: row0pread.h:215
size_t m_thread_id
Thread ID.
Definition: row0pread.h:266
Thread_ctx & operator=(Thread_ctx &&)=delete
void create_blob_heap() noexcept
Create BLOB heap.
Definition: row0pread.h:245
Thread_ctx(const Thread_ctx &)=delete
mem_heap_t * m_blob_heap
BLOB heap per thread.
Definition: row0pread.h:273
void * m_callback_ctx
Callback information related to the thread.
Definition: row0pread.h:270
Thread_ctx(size_t id) noexcept
Constructor.
Definition: row0pread.h:218
void restore_to_first_unprocessed() noexcept
Definition: row0pread.cc:523
PCursor * m_pcursor
Current persistent cursor.
Definition: row0pread.h:279
void restore_to_last_processed_user_record() noexcept
Definition: row0pread.cc:514
Thread_ctx(Thread_ctx &&)=delete
State get_state() const noexcept
Definition: row0pread.h:251
Thread_ctx & operator=(const Thread_ctx &)=delete
State m_state
Worker thread state.
Definition: row0pread.h:276
void save_current_user_record_as_last_processed() noexcept
Definition: row0pread.cc:509
void set_callback_ctx(T *ctx) noexcept
Set thread related callback information.
Definition: row0pread.h:232
void save_previous_user_record_as_last_processed() noexcept
Definition: row0pread.cc:519
T * get_callback_ctx() noexcept
Get the thread related callback information/.
Definition: row0pread.h:240
~Thread_ctx() noexcept
Destructor.
Definition: row0pread.h:221
Definition: btr0pcur.h:99
The buffer control block structure.
Definition: buf0buf.h:1746
Data structure for an index.
Definition: dict0mem.h:1041
Data structure for a database table.
Definition: dict0mem.h:1913
Structure for an SQL data tuple of fields (logical record)
Definition: data0data.h:696
The info structure stored at the beginning of a heap block.
Definition: mem0mem.h:302
Mini-transaction handle and buffer.
Definition: mtr0mtr.h:177
InnoDB condition variable.
Definition: os0event.cc:63
Index page cursor.
Definition: page0cur.h:311
Definition: gen_lex_token.cc:149
Definition: trx0trx.h:675
#define UNIV_PAGE_SIZE
The universal page size of the database.
Definition: univ.i:294
unsigned long int ulint
Definition: univ.i:406
#define UT_LOCATION_HERE
Definition: ut0core.h:73
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:105
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93