MySQL  8.0.19
Source Code Documentation
row0pread.h
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 2018, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 
25 *****************************************************************************/
26 
27 /** @file include/row0pread.h
28 Parallel read interface.
29 
30 Created 2018-01-27 by Sunny Bains. */
31 
32 #ifndef row0par_read_h
33 #define row0par_read_h
34 
35 #include <functional>
36 #include <vector>
37 
38 #include "os0thread-create.h"
39 #include "row0sel.h"
40 #include "univ.i"
41 
42 // Forward declarations
43 struct trx_t;
44 struct mtr_t;
45 struct btr_pcur_t;
46 struct buf_block_t;
47 struct dict_table_t;
48 
49 #include "btr0cur.h"
50 #include "db0err.h"
51 #include "fil0fil.h"
52 #include "os0event.h"
53 #include "page0size.h"
54 #include "rem0types.h"
55 #include "ut0mpmcbq.h"
56 
57 /** The core idea is to find the left and right paths down the B+Tree.These
58 paths correspond to the scan start and scan end search. Follow the links
59 at the appropriate btree level from the left to right and split the scan
60 on each of these sub-tree root nodes.
61 
62 If the user has set the maximum number of threads to use at say 4 threads
63 and there are 5 sub-trees at the selected level then we will split the 5th
64 sub-tree dynamically when it is ready for scan.
65 
66 We want to allow multiple parallel range scans on diffrent indexes at the
67 same time. To achieve this split out the scan context (Scan_ctx) from the
68 execution context (Ctx). The Scan_ctx has the index and transaction
69 information and the Ctx keeps track of the cursor for a specific thread
70 during the scan.
71 
72 To start a scan we need to instantiate a Parallel_reader. A parallel reader
73 can contain several Scan_ctx instances and a Scan_ctx can contain several
74 Ctx instances. Its' the Ctx instances that are eventually executed.
75 
76 The Parallel_reader will start one thread per Scan_ctx to service read ahead
77 requests. Currently, the read ahead is a physical read ahead ie. it will read
78 one extent at a time.
79 
80 This design allows for a single Parallel_reader to scan multiple indexes
81 at once. Each index range scan has to be added via its add_scan() method.
82 This functionality is required to handle parallel partition scans because
83 partitions are separate indexes. This can be used to scan completely
84 different indexes and tables by one instance of a Parallel_reader.
85 
86 To solve the imbalance problem we dynamically split the sub-trees as and
87 when required. e.g., If you have 5 sub-trees to scan and 4 threads then
88 it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
89 the first thread that finishes scanning the first set of 4 partitions will
90 then dynamically split the 5th sub-tree and add the newly created sub-trees
91 to the execution context (Ctx) run queue in the Parallel_reader. As the
92 other threads complete their sub-tree scans they will pick up more execution
93 contexts (Ctx) from the Parallel_reader run queue and start scanning the
94 sub-partitions as normal.
95 
96 Note: The Ctx instances are in a virtual list. Each Ctx instance has a
97 range to scan. The start point of this range instance is the end point
98 of the Ctx instance scanning values less than its start point. A Ctx
99 will scan from [Start, End) rows. We use std::shared_ptr to manage the
100 reference counting, this allows us to dispose of the Ctx instances
101 without worrying about dangling pointers.
102 
103 NOTE: Secondary index scans are not supported currently. */
105  public:
106  /** Maximum value for innodb-parallel-read-threads. */
107  constexpr static size_t MAX_THREADS{256};
108 
109  using Links = std::vector<page_no_t, ut_allocator<page_no_t>>;
110 
111  // Forward declaration.
112  class Ctx;
113  class Scan_ctx;
114 
115  /** Callback to initialise callers state. */
116  using Start = std::function<dberr_t(size_t thread_id)>;
117 
118  /** Callback to finalise callers state. */
119  using Finish = std::function<dberr_t(size_t thread_id)>;
120 
121  /** Callback to process the rows. */
122  using F = std::function<dberr_t(const Ctx *)>;
123 
124  /** Specifies the range from where to start the scan and where to end it. */
125  struct Scan_range {
126  /** Default constructor. */
128 
129  /** Copy constructor.
130  @param[in] scan_range Instance to copy from. */
131  Scan_range(const Scan_range &scan_range)
132  : m_start(scan_range.m_start), m_end(scan_range.m_end) {}
133 
134  /** Constructor.
135  @param[in] start Start key
136  @param[in] end End key. */
138  : m_start(start), m_end(end) {}
139 
140  /** Start of the scan, can be nullptr for -infinity. */
141  const dtuple_t *m_start{};
142 
143  /** End of the scan, can be null for +infinity. */
144  const dtuple_t *m_end{};
145 
146  /** Convert the instance to a string representation. */
147  std::string to_string() const MY_ATTRIBUTE((warn_unused_result));
148  };
149 
150  /** Scan (Scan_ctx) configuration. */
151  struct Config {
152  /** Constructor.
153  @param[in] scan_range Range to scan.
154  @param[in] index Cluster index to scan.
155  @param[in] read_level Btree level from which records need to be read.
156  */
157  Config(const Scan_range &scan_range, dict_index_t *index,
158  size_t read_level = 0)
159  : m_scan_range(scan_range),
160  m_index(index),
161  m_is_compact(dict_table_is_comp(index->table)),
162  m_page_size(dict_tf_to_fsp_flags(index->table->flags)),
163  m_read_level(read_level) {}
164 
165  /** Copy constructor.
166  @param[in] config Instance to copy from. */
168  : m_scan_range(config.m_scan_range),
169  m_index(config.m_index),
170  m_is_compact(config.m_is_compact),
171  m_page_size(config.m_page_size),
172  m_read_level(config.m_read_level) {}
173 
174  /** Range to scan. */
176 
177  /** (Cluster) Index in table to scan. */
178  dict_index_t *m_index{};
179 
180  /** Row format of table. */
181  const bool m_is_compact{};
182 
183  /** Tablespace page size. */
185 
186  /** if true then enable separate read ahead threads. */
187  bool m_read_ahead{true};
188 
189  /** Btree level from which records need to be read. */
190  size_t m_read_level{0};
191  };
192 
193  /** Constructor.
194  @param[in] max_threads Maximum number of threads to use.
195  @param[in] sync true if the read is synchronous */
196  explicit Parallel_reader(size_t max_threads, bool sync = true);
197 
198  /** Destructor. */
200 
201  /** Check how many threads are available for parallel reads.
202  @param[in] n_required Number of threads required.
203  @return number of threads available. */
204  static size_t available_threads(size_t n_required)
205  MY_ATTRIBUTE((warn_unused_result));
206 
207  /** Release the parallel read threads. */
208  static void release_threads(size_t n_threads) {
209  const auto RELAXED = std::memory_order_relaxed;
210  auto active = s_active_threads.fetch_sub(n_threads, RELAXED);
211  ut_a(active >= n_threads);
212  }
213 
214  /** Add scan context.
215  @param[in,out] trx Covering transaction.
216  @param[in] config Scan condfiguration.
217  @param[in] f Callback function.
218  (default is 0 which is leaf level)
219  @return error. */
220  dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
221  MY_ATTRIBUTE((warn_unused_result));
222 
223  /** Wait for the join of threads spawned by the parallel reader. */
224  void join() {
225  for (auto &t : m_parallel_read_threads) {
226  t.wait();
227  }
228 
229  for (auto &t : m_read_ahead_threads) {
230  t.wait();
231  }
232  }
233 
234  /** Get the error stored in the global error state.
235  @return global error state. */
236  dberr_t get_error_state() const MY_ATTRIBUTE((warn_unused_result)) {
237  return (m_err);
238  }
239 
240  /** @return true if the tree is empty, else false. */
241  bool is_tree_empty() const MY_ATTRIBUTE((warn_unused_result)) {
242  return (m_ctx_id.load(std::memory_order_relaxed) == 0);
243  }
244 
245  /** Set the callback that must be called before any processing.
246  @param[in] f Call before first row is processed.*/
247  void set_start_callback(Start &&f) { m_start_callback = std::move(f); }
248 
249  /** Set the callback that must be called after all processing.
250  @param[in] f Call after last row is processed.*/
251  void set_finish_callback(Finish &&f) { m_finish_callback = std::move(f); }
252 
253  /** Start the threads to do the parallel read for the specified range.
254  @return DB_SUCCESS or error code. */
255  dberr_t run() MY_ATTRIBUTE((warn_unused_result));
256 
257  /** @return the configured max threads size. */
258  size_t max_threads() const MY_ATTRIBUTE((warn_unused_result));
259 
260  /** @return true if in error state. */
261  bool is_error_set() const MY_ATTRIBUTE((warn_unused_result)) {
262  return (m_err.load(std::memory_order_relaxed) != DB_SUCCESS);
263  }
264 
265  /** Set the error state.
266  @param[in] err Error state to set to. */
268  m_err.store(err, std::memory_order_relaxed);
269  }
270 
271  // Disable copying.
272  Parallel_reader(const Parallel_reader &) = delete;
273  Parallel_reader(const Parallel_reader &&) = delete;
275  Parallel_reader &operator=(const Parallel_reader &) = delete;
276 
277  private:
278  /** Release unused threads back to the pool.
279  @param[in] unused_threads Number of threads to "release". */
280  void release_unused_threads(size_t unused_threads) {
281  ut_a(m_max_threads >= unused_threads);
282  release_threads(unused_threads);
283  m_max_threads -= unused_threads;
284  }
285 
286  /** Add an execution context to the run queue.
287  @param[in] ctx Execution context to add to the queue. */
288  void enqueue(std::shared_ptr<Ctx> ctx);
289 
290  /** Fetch the next job execute.
291  @return job to execute or nullptr. */
292  std::shared_ptr<Ctx> dequeue() MY_ATTRIBUTE((warn_unused_result));
293 
294  /** @return true if job queue is empty. */
295  bool is_queue_empty() const MY_ATTRIBUTE((warn_unused_result));
296 
297  /** Poll for requests and execute.
298  @param[in] id Thread ID */
299  void worker(size_t id);
300 
301  /** Create the threads and do a parallel read across the partitions. */
302  void parallel_read();
303 
304  /** @return true if tasks are still executing. */
305  bool is_active() const MY_ATTRIBUTE((warn_unused_result)) {
306  return (m_n_completed.load(std::memory_order_relaxed) <
307  m_ctx_id.load(std::memory_order_relaxed));
308  }
309 
310  /** @return true if the read-ahead request queue is empty. */
311  bool read_ahead_queue_empty() const MY_ATTRIBUTE((warn_unused_result)) {
312  return (m_submitted.load(std::memory_order_relaxed) ==
313  m_consumed.load(std::memory_order_relaxed));
314  }
315 
316  /** Read ahead thread.
317  @param[in] n_pages Read ahead batch size. */
318  void read_ahead_worker(page_no_t n_pages);
319 
320  /** Start the read ahead worker threads. */
321  void read_ahead();
322 
323  private:
324  /** Read ahead request. */
326  /** Default constructor. */
327  Read_ahead_request() : m_scan_ctx(), m_page_no(FIL_NULL) {}
328  /** Constructor.
329  @param[in] scan_ctx Scan context requesting the read ahead.
330  @param[in] page_no Start page number of read ahead. */
332  : m_scan_ctx(scan_ctx), m_page_no(page_no) {}
333 
334  /** Scan context requesting the read ahead. */
335  const Scan_ctx *m_scan_ctx{};
336 
337  /** Starting page number. */
338  page_no_t m_page_no{};
339  };
340 
341  // clang-format off
342  using Ctxs =
343  std::list<std::shared_ptr<Ctx>,
345 
346  using Scan_ctxs =
347  std::list<std::shared_ptr<Scan_ctx>,
349 
350  /** Read ahead queue. */
352  // clang-format on
353 
354  /** Maximum number of worker threads to use. */
355  size_t m_max_threads{};
356 
357  /** Mutex protecting m_ctxs. */
358  mutable ib_mutex_t m_mutex;
359 
360  /** Contexts that must be executed. */
362 
363  /** Scan contexts. */
365 
366  /** For signalling worker threads about events. */
368 
369  /** Counter for allocating scan context IDs. */
370  size_t m_scan_ctx_id{};
371 
372  /** Context ID. Monotonically increasing ID. */
373  std::atomic_size_t m_ctx_id{};
374 
375  /** Total tasks executed so far. */
376  std::atomic_size_t m_n_completed{};
377 
378  /** Callback at start (before processing any rows). */
380 
381  /** Callback at end (adter processing all rows). */
383 
384  /** Read ahead queue. */
386 
387  /** Number of read ahead requests submitted. */
388  std::atomic<uint64_t> m_submitted{0};
389 
390  /** Number of read ahead requests processed. */
391  std::atomic<uint64_t> m_consumed{0};
392 
393  /** Error during parallel read. */
394  std::atomic<dberr_t> m_err{DB_SUCCESS};
395 
396  /** List of threads used for read_ahead purpose. */
397  std::vector<IB_thread> m_read_ahead_threads;
398 
399  /** List of threads used for paralle_read purpose. */
400  std::vector<IB_thread> m_parallel_read_threads;
401 
402  /** Number of threads currently doing parallel reads. */
403  static std::atomic_size_t s_active_threads;
404 
405  /** If the caller wants to wait for the parallel_read to finish it's run */
406  bool m_sync;
407 
408  friend class Ctx;
409  friend class Scan_ctx;
410 };
411 
412 /** Parallel reader context. */
414  private:
415  /** Constructor.
416  @param[in] reader Parallel reader that owns this context.
417  @param[in] id ID of this scan context.
418  @param[in] trx Transaction covering the scan.
419  @param[in] config Range scan config.
420  @param[in] f Callback function. */
421  Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx,
422  const Parallel_reader::Config &config, F &&f);
423 
424  public:
425  /** Destructor. */
426  ~Scan_ctx();
427 
428  private:
429  /** Boundary of the range to scan. */
430  struct Iter {
431  /** Destructor. */
432  ~Iter();
433 
434  /** Heap used to allocate m_rec, m_tuple and m_pcur. */
436 
437  /** m_rec column offsets. */
438  const ulint *m_offsets{};
439 
440  /** Start scanning from this key. Raw data of the row. */
441  const rec_t *m_rec{};
442 
443  /** Tuple representation inside m_rec, for two Iter instances in a range
444  m_tuple will be [first->m_tuple, second->m_tuple). */
445  const dtuple_t *m_tuple{};
446 
447  /** Number of externally stored columns. */
448  ulint m_n_ext{ULINT_UNDEFINED};
449 
450  /** Persistent cursor.*/
452  };
453 
454  /** mtr_t savepoint. */
455  using Savepoint = std::pair<ulint, buf_block_t *>;
456 
457  /** For releasing the S latches after processing the blocks. */
458  using Savepoints = std::vector<Savepoint, ut_allocator<Savepoint>>;
459 
460  /** The first cursor should read up to the second cursor [f, s). */
461  using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
462 
463  using Ranges = std::vector<Range, ut_allocator<Range>>;
464 
465  /** @return the scan context ID. */
466  size_t id() const MY_ATTRIBUTE((warn_unused_result)) { return (m_id); }
467 
468  /** Set the error state.
469  @param[in] err Error state to set to. */
471  m_err.store(err, std::memory_order_relaxed);
472  }
473 
474  /** @return true if in error state. */
475  bool is_error_set() const MY_ATTRIBUTE((warn_unused_result)) {
476  return (m_err.load(std::memory_order_relaxed) != DB_SUCCESS);
477  }
478 
479  /** Fetch a block from the buffer pool and acquire an S latch on it.
480  @param[in] page_id Page ID.
481  @param[in,out] mtr Mini transaction covering the fetch.
482  @param[in] line Line from where called.
483  @return the block fetched from the buffer pool. */
484  buf_block_t *block_get_s_latched(const page_id_t &page_id, mtr_t *mtr,
485  int line) const
486  MY_ATTRIBUTE((warn_unused_result));
487 
488  /** Partition the B+Tree for parallel read.
489  @param[in] scan_range Range for partitioning.
490  @param[in,out] ranges Ranges to scan.
491  @param[in] split_level Sub-range required level (0 == root).
492  @return the partition scan ranges. */
493  dberr_t partition(const Scan_range &scan_range, Ranges &ranges,
494  size_t split_level);
495 
496  /** Find the page number of the node that contains the search key. If the
497  key is null then we assume -infinity.
498  @param[in] block Page to look in.
499  @param[in] key Key of the first record in the range.
500  @return the left child page number. */
501  page_no_t search(const buf_block_t *block, const dtuple_t *key) const
502  MY_ATTRIBUTE((warn_unused_result));
503 
504  /** Traverse from given sub-tree page number to start of the scan range
505  from the given page number.
506  @param[in] page_no Page number of sub-tree.
507  @param[in,out] mtr Mini-transaction.
508  @param[in] key Key of the first record in the range.
509  @param[in,out] savepoints Blocks S latched and accessed.
510  @return the leaf node page cursor. */
511  page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key,
512  Savepoints &savepoints) const
513  MY_ATTRIBUTE((warn_unused_result));
514 
515  /** Create and add the range to the scan ranges.
516  @param[in,out] ranges Ranges to scan.
517  @param[in,out] leaf_page_cursor Leaf page cursor on which to create the
518  persistent cursor.
519  @param[in,out] mtr Mini-transaction */
520  void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor,
521  mtr_t *mtr) const;
522 
523  /** Find the subtrees to scan in a block.
524  @param[in] scan_range Partition based on this scan range.
525  @param[in] page_no Page to partition at if at required level.
526  @param[in] depth Sub-range current level.
527  @param[in] split_level Sub-range starting level (0 == root).
528  @param[in,out] ranges Ranges to scan.
529  @param[in,out] mtr Mini-transaction */
530  dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no,
531  size_t depth, const size_t split_level, Ranges &ranges,
532  mtr_t *mtr);
533 
534  /** Build a dtuple_t from rec_t.
535  @param[in] rec Build the dtuple from this record.
536  @param[in,out] iter Build in this iterator. */
537  void copy_row(const rec_t *rec, Iter *iter) const;
538 
539  /** Create the persistent cursor that will be used to traverse the
540  partition and position on the the start row.
541  @param[in] page_cursor Current page cursor
542  @param[in] mtr Mini-transaction covering the read.
543  @return Start iterator. */
544  std::shared_ptr<Iter> create_persistent_cursor(const page_cur_t &page_cursor,
545  mtr_t *mtr) const
546  MY_ATTRIBUTE((warn_unused_result));
547 
548  /** Build an old version of the row if required.
549  @param[in,out] rec Current row read from the index. This can
550  be modified by this method if an older version
551  needs to be built.
552  @param[in,out] offsets Same as above but pertains to the rec offsets
553  @param[in,out] heap Heap to use if a previous version needs to be
554  built from the undo log.
555  @param[in,out] mtr Mini transaction covering the read.
556  @return true if row is visible to the transaction. */
557  bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap,
558  mtr_t *mtr) MY_ATTRIBUTE((warn_unused_result));
559 
560  /** Read ahead from this page number.
561  @param[in] page_no Start read ahead page number. */
562  void submit_read_ahead(page_no_t page_no) {
563  ut_ad(page_no != FIL_NULL);
565 
566  Read_ahead_request read_ahead_request(this, page_no);
567 
568  while (!m_reader->m_read_aheadq.enqueue(read_ahead_request)) {
569  UT_RELAX_CPU();
570  }
571 
572  m_reader->m_submitted.fetch_add(1, std::memory_order_relaxed);
573  }
574 
575  /** Create an execution context for a range and add it to
576  the Parallel_reader's run queue.
577  @param[in] range Range for which to create the context.
578  @param[in] split true if the sub-tree should be split further.
579  @return DB_SUCCESS or error code. */
580  dberr_t create_context(const Range &range, bool split)
581  MY_ATTRIBUTE((warn_unused_result));
582 
583  /** Create the execution contexts based on the ranges.
584  @param[in] ranges Ranges for which to create the contexts.
585  @return DB_SUCCESS or error code. */
586  dberr_t create_contexts(const Ranges &ranges)
587  MY_ATTRIBUTE((warn_unused_result));
588 
589  /** @return the maximum number of threads configured. */
590  size_t max_threads() const MY_ATTRIBUTE((warn_unused_result)) {
591  return (m_reader->m_max_threads);
592  }
593 
594  /** Release unused threads back to the pool.
595  @param[in] unused_threads Number of threads to "release". */
596  void release_threads(size_t unused_threads) {
597  m_reader->release_threads(unused_threads);
598  }
599 
600  /** S lock the index. */
601  void index_s_lock();
602 
603  /** S unlock the index. */
604  void index_s_unlock();
605 
606  /** @return true if at least one thread owns the S latch on the index. */
607  bool index_s_own() const {
608  return (m_s_locks.load(std::memory_order_acquire) > 0);
609  }
610 
611  private:
613 
614  /** Context ID. */
615  size_t m_id{std::numeric_limits<size_t>::max()};
616 
617  /** Parallel scan configuration. */
619 
620  /** Covering transaction. */
621  const trx_t *m_trx{};
622 
623  /** Callback function. */
625 
626  /** Depth of the Btree. */
627  size_t m_depth{};
628 
629  /** The parallel reader. */
631 
632  /** Error during parallel read. */
633  mutable std::atomic<dberr_t> m_err{DB_SUCCESS};
634 
635  /** Number of threads that have S locked the index. */
636  std::atomic_size_t m_s_locks{};
637 
638  friend class Parallel_reader;
639  friend class Parallel_reader::Ctx;
640 
641  Scan_ctx(Scan_ctx &&) = delete;
642  Scan_ctx(const Scan_ctx &) = delete;
643  Scan_ctx &operator=(Scan_ctx &&) = delete;
644  Scan_ctx &operator=(const Scan_ctx &) = delete;
645 };
646 
647 class PCursor;
648 
649 /** Parallel reader execution context. */
651  private:
652  /** Constructor.
653  @param[in] id Thread ID.
654  @param[in] scan_ctx Scan context.
655  @param[in] range Range that the thread has to read. */
656  Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
657  : m_id(id), m_range(range), m_scan_ctx(scan_ctx) {}
658 
659  public:
660  /** Destructor. */
661  ~Ctx();
662 
663  public:
664  /** @return the context ID. */
665  size_t id() const MY_ATTRIBUTE((warn_unused_result)) { return (m_id); }
666 
667  /** The scan ID of the scan context this belongs to. */
668  size_t scan_id() const MY_ATTRIBUTE((warn_unused_result)) {
669  return (m_scan_ctx->id());
670  }
671 
672  /** @return the covering transaction. */
673  const trx_t *trx() const MY_ATTRIBUTE((warn_unused_result)) {
674  return (m_scan_ctx->m_trx);
675  }
676 
677  /** @return the index being scanned. */
678  const dict_index_t *index() const MY_ATTRIBUTE((warn_unused_result)) {
679  return (m_scan_ctx->m_config.m_index);
680  }
681 
682  private:
683  /** Traverse the pages by key order.
684  @return DB_SUCCESS or error code. */
685  dberr_t traverse() MY_ATTRIBUTE((warn_unused_result));
686 
687  /** Traverse the records in a node.
688  @param[in] pcursor persistent b-tree cursor
689  @param[in] mtr mtr
690  @return error */
691  dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr);
692 
693  /** Move to the next node in the specified level.
694  @param[in] pcursor persistent b-tree cursor
695  @param[in] mtr mtr
696  @return success */
697  bool move_to_next_node(PCursor *pcursor, mtr_t *mtr);
698 
699  /** Split the context into sub-ranges and add them to the execution queue.
700  @return DB_SUCCESS or error code. */
701  dberr_t split() MY_ATTRIBUTE((warn_unused_result));
702 
703  private:
704  /** Context ID. */
705  size_t m_id{std::numeric_limits<size_t>::max()};
706 
707  /** If true the split the context at the block level. */
708  bool m_split{};
709 
710  /** Range to read in this context. */
712 
713  /** Scanner context. */
715 
716  public:
717  /** Current executing thread ID. */
718  size_t m_thread_id{std::numeric_limits<size_t>::max()};
719 
720  /** Current block. */
722 
723  /** Current row. */
724  const rec_t *m_rec{};
725 
726  /** True if m_rec is the first record in the page. */
727  bool m_first_rec{true};
728 
729  ulint *m_offsets{};
730 
731  /** Start of a new range to scan. */
732  bool m_start{};
733 
734  friend class Parallel_reader;
735 };
736 
737 #endif /* !row0par_read_h */
Parallel_reader::Scan_ctx::create_ranges
dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t split_level, Ranges &ranges, mtr_t *mtr)
Find the subtrees to scan in a block.
Definition: row0pread.cc:799
page_no_t
uint32 page_no_t
Page number.
Definition: api0api.h:57
Parallel_reader::Scan_ctx::index_s_lock
void index_s_lock()
S lock the index.
Definition: row0pread.cc:124
Parallel_reader::join
void join()
Wait for the join of threads spawned by the parallel reader.
Definition: row0pread.h:224
Parallel_reader::Ctx::m_block
const buf_block_t * m_block
Current block.
Definition: row0pread.h:721
Parallel_reader::Ctx::m_range
Scan_ctx::Range m_range
Range to read in this context.
Definition: row0pread.h:711
Parallel_reader::is_queue_empty
bool is_queue_empty() const
Definition: row0pread.cc:619
Parallel_reader::Scan_ctx::m_trx
const trx_t * m_trx
Covering transaction.
Definition: row0pread.h:621
mpmc_bq::enqueue
bool enqueue(T const &data)
Enqueue an element.
Definition: ut0mpmcbq.h:83
Parallel_reader::dequeue
std::shared_ptr< Ctx > dequeue()
Fetch the next job execute.
Definition: row0pread.cc:603
Parallel_reader::worker
void worker(size_t id)
Poll for requests and execute.
Definition: row0pread.cc:626
dtuple_t
Structure for an SQL data tuple of fields (logical record)
Definition: data0data.h:716
Parallel_reader::Scan_ctx::m_depth
size_t m_depth
Depth of the Btree.
Definition: row0pread.h:627
btr_pcur_t
Definition: btr0pcur.h:177
Parallel_reader::Ctx::m_id
size_t m_id
Context ID.
Definition: row0pread.h:705
Parallel_reader::Scan_ctx::check_visibility
bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.cc:331
thread_id
static my_thread_id thread_id
Definition: my_thr_init.cc:62
Parallel_reader::release_threads
static void release_threads(size_t n_threads)
Release the parallel read threads.
Definition: row0pread.h:208
ut_a
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:53
Parallel_reader::Scan_ctx::m_err
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:633
ut_ad
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:65
Parallel_reader::Scan_ctx::Iter::m_offsets
const ulint * m_offsets
m_rec column offsets.
Definition: row0pread.h:438
Parallel_reader::Scan_ctx::operator=
Scan_ctx & operator=(Scan_ctx &&)=delete
dict_tf_to_fsp_flags
uint32_t dict_tf_to_fsp_flags(uint32_t table_flags)
Convert a 32 bit integer table flags to the 32 bit FSP Flags.
Definition: dict0dict.cc:4883
Parallel_reader::Scan_ctx::Iter::m_tuple
const dtuple_t * m_tuple
Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple,...
Definition: row0pread.h:445
Parallel_reader::add_scan
dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
Add scan context.
Definition: row0pread.cc:1147
Parallel_reader::Start
std::function< dberr_t(size_t thread_id)> Start
Callback to initialise callers state.
Definition: row0pread.h:116
dict_index_t
Data structure for an index.
Definition: dict0mem.h:869
Parallel_reader::Scan_ctx::create_persistent_cursor
std::shared_ptr< Iter > create_persistent_cursor(const page_cur_t &page_cursor, mtr_t *mtr) const
Create the persistent cursor that will be used to traverse the partition and position on the the star...
Definition: row0pread.cc:423
Parallel_reader::Config::m_index
dict_index_t * m_index
(Cluster) Index in table to scan.
Definition: row0pread.h:178
Parallel_reader::parallel_read
void parallel_read()
Create the threads and do a parallel read across the partitions.
Definition: row0pread.cc:1098
mtr_t
Mini-transaction handle and buffer.
Definition: mtr0mtr.h:169
Parallel_reader::Scan_ctx::m_id
size_t m_id
Context ID.
Definition: row0pread.h:615
config
Definition: default_engine.h:52
Parallel_reader::m_mutex
ib_mutex_t m_mutex
Mutex protecting m_ctxs.
Definition: row0pread.h:358
Parallel_reader::m_max_threads
size_t m_max_threads
Maximum number of worker threads to use.
Definition: row0pread.h:355
os_event
InnoDB condition variable.
Definition: os0event.cc:66
Parallel_reader::Scan_ctx::Iter::m_pcur
btr_pcur_t * m_pcur
Persistent cursor.
Definition: row0pread.h:451
Parallel_reader::Scan_ctx::set_error_state
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:470
Parallel_reader::Ctx::m_thread_id
size_t m_thread_id
Current executing thread ID.
Definition: row0pread.h:718
btr0cur.h
page_cur_t
Index page cursor.
Definition: page0cur.h:315
Parallel_reader::m_n_completed
std::atomic_size_t m_n_completed
Total tasks executed so far.
Definition: row0pread.h:376
Parallel_reader::Config
Scan (Scan_ctx) configuration.
Definition: row0pread.h:151
Parallel_reader::Scan_range::m_start
const dtuple_t * m_start
Start of the scan, can be nullptr for -infinity.
Definition: row0pread.h:141
Parallel_reader::Ctx::index
const dict_index_t * index() const
Definition: row0pread.h:678
Parallel_reader::Scan_ctx::partition
dberr_t partition(const Scan_range &scan_range, Ranges &ranges, size_t split_level)
Partition the B+Tree for parallel read.
Definition: row0pread.cc:955
Parallel_reader::Ctx::traverse
dberr_t traverse()
Traverse the pages by key order.
Definition: row0pread.cc:498
Parallel_reader::m_sync
bool m_sync
If the caller wants to wait for the parallel_read to finish it's run.
Definition: row0pread.h:406
Parallel_reader::Scan_ctx::m_reader
Parallel_reader * m_reader
The parallel reader.
Definition: row0pread.h:630
Parallel_reader::Ctx::split
dberr_t split()
Split the context into sub-ranges and add them to the execution queue.
Definition: row0pread.cc:140
Parallel_reader::Scan_ctx::Iter::m_rec
const rec_t * m_rec
Start scanning from this key.
Definition: row0pread.h:441
Parallel_reader::Ctx::~Ctx
~Ctx()
Destructor.
Definition: row0pread.cc:94
Parallel_reader::Scan_ctx::Scan_ctx
Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f)
Constructor.
Definition: row0pread.cc:187
Parallel_reader::run
dberr_t run()
Start the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1124
dberr_t
dberr_t
Definition: db0err.h:38
Parallel_reader::m_scan_ctx_id
size_t m_scan_ctx_id
Counter for allocating scan context IDs.
Definition: row0pread.h:370
Parallel_reader::Ctx::scan_id
size_t scan_id() const
The scan ID of the scan context this belongs to.
Definition: row0pread.h:668
dict_table_t
Data structure for a database table.
Definition: dict0mem.h:1510
buf_block_t
The buffer control block structure.
Definition: buf0buf.h:1318
Parallel_reader::read_ahead_queue_empty
bool read_ahead_queue_empty() const
Definition: row0pread.h:311
Parallel_reader::Ctxs
std::list< std::shared_ptr< Ctx >, ut_allocator< std::shared_ptr< Ctx > >> Ctxs
Definition: row0pread.h:344
Parallel_reader::Scan_ctx::start_range
page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const
Traverse from given sub-tree page number to start of the scan range from the given page number.
Definition: row0pread.cc:737
Parallel_reader::Scan_ctx::m_config
Config m_config
Parallel scan configuration.
Definition: row0pread.h:618
key
static const char * key
Definition: suite_stubs.c:14
Parallel_reader::set_finish_callback
void set_finish_callback(Finish &&f)
Set the callback that must be called after all processing.
Definition: row0pread.h:251
Parallel_reader::m_scan_ctxs
Scan_ctxs m_scan_ctxs
Scan contexts.
Definition: row0pread.h:364
json_binary::err
static Value err()
Create a Value object that represents an error condition.
Definition: json_binary.cc:908
Parallel_reader::enqueue
void enqueue(std::shared_ptr< Ctx > ctx)
Add an execution context to the run queue.
Definition: row0pread.cc:597
rec_t
byte rec_t
Definition: rem0types.h:39
mem_block_info_t
The info structure stored at the beginning of a heap block.
Definition: mem0mem.h:337
Parallel_reader::Ctx::m_offsets
ulint * m_offsets
Definition: row0pread.h:729
Parallel_reader::read_ahead_worker
void read_ahead_worker(page_no_t n_pages)
Read ahead thread.
Definition: row0pread.cc:1045
Parallel_reader::Scan_ctx::create_contexts
dberr_t create_contexts(const Ranges &ranges)
Create the execution contexts based on the ranges.
Definition: row0pread.cc:1014
ut0mpmcbq.h
Parallel_reader::m_ctxs
Ctxs m_ctxs
Contexts that must be executed.
Definition: row0pread.h:361
Parallel_reader::~Parallel_reader
~Parallel_reader()
Destructor.
Definition: row0pread.cc:98
Parallel_reader::Scan_range::Scan_range
Scan_range(const Scan_range &scan_range)
Copy constructor.
Definition: row0pread.h:131
Parallel_reader::Scan_ctx::Iter
Boundary of the range to scan.
Definition: row0pread.h:430
Parallel_reader::s_active_threads
static std::atomic_size_t s_active_threads
Number of threads currently doing parallel reads.
Definition: row0pread.h:403
Parallel_reader::Config::m_page_size
const page_size_t m_page_size
Tablespace page size.
Definition: row0pread.h:184
Parallel_reader::Scan_range
Specifies the range from where to start the scan and where to end it.
Definition: row0pread.h:125
Parallel_reader::Ctx::Ctx
Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
Constructor.
Definition: row0pread.h:656
Parallel_reader::Scan_ctx::m_f
F m_f
Callback function.
Definition: row0pread.h:624
Parallel_reader::Scan_ctx::Savepoints
std::vector< Savepoint, ut_allocator< Savepoint > > Savepoints
For releasing the S latches after processing the blocks.
Definition: row0pread.h:458
Parallel_reader::Config::m_read_ahead
bool m_read_ahead
if true then enable separate read ahead threads.
Definition: row0pread.h:187
Parallel_reader::F
std::function< dberr_t(const Ctx *)> F
Callback to process the rows.
Definition: row0pread.h:122
Parallel_reader::Scan_ctx::create_range
void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const
Create and add the range to the scan ranges.
Definition: row0pread.cc:783
Parallel_reader::m_read_aheadq
Read_ahead_queue m_read_aheadq
Read ahead queue.
Definition: row0pread.h:385
Parallel_reader::Read_ahead_request::Read_ahead_request
Read_ahead_request()
Default constructor.
Definition: row0pread.h:327
page_size_t
Page size descriptor.
Definition: page0size.h:50
page0size.h
Parallel_reader::available_threads
static size_t available_threads(size_t n_required)
Check how many threads are available for parallel reads.
Definition: row0pread.cc:104
Parallel_reader::read_ahead
void read_ahead()
Start the read ahead worker threads.
Definition: row0pread.cc:1080
Parallel_reader::Ctx::m_split
bool m_split
If true the split the context at the block level.
Definition: row0pread.h:708
Parallel_reader::Ctx::m_start
bool m_start
Start of a new range to scan.
Definition: row0pread.h:732
Parallel_reader::MAX_THREADS
constexpr static size_t MAX_THREADS
Maximum value for innodb-parallel-read-threads.
Definition: row0pread.h:107
Parallel_reader::Scan_ctx::submit_read_ahead
void submit_read_ahead(page_no_t page_no)
Read ahead from this page number.
Definition: row0pread.h:562
DB_SUCCESS
@ DB_SUCCESS
Definition: db0err.h:42
Parallel_reader::set_error_state
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:267
os0thread-create.h
Parallel_reader::Ctx::id
size_t id() const
Definition: row0pread.h:665
Parallel_reader::Ctx::trx
const trx_t * trx() const
Definition: row0pread.h:673
Parallel_reader::set_start_callback
void set_start_callback(Start &&f)
Set the callback that must be called before any processing.
Definition: row0pread.h:247
Parallel_reader::Finish
std::function< dberr_t(size_t thread_id)> Finish
Callback to finalise callers state.
Definition: row0pread.h:119
UT_RELAX_CPU
#define UT_RELAX_CPU()
Definition: ut0ut.h:119
rules_table_service::end
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188
Parallel_reader::m_parallel_read_threads
std::vector< IB_thread > m_parallel_read_threads
List of threads used for paralle_read purpose.
Definition: row0pread.h:400
Parallel_reader::Scan_ctx::id
size_t id() const
Definition: row0pread.h:466
Parallel_reader::operator=
Parallel_reader & operator=(Parallel_reader &&)=delete
Parallel_reader::Scan_ctx::max_threads
size_t max_threads() const
Definition: row0pread.h:590
Parallel_reader::Scan_ctx::Range
std::pair< std::shared_ptr< Iter >, std::shared_ptr< Iter > > Range
The first cursor should read up to the second cursor [f, s).
Definition: row0pread.h:461
Parallel_reader::m_submitted
std::atomic< uint64_t > m_submitted
Number of read ahead requests submitted.
Definition: row0pread.h:388
Parallel_reader::Config::Config
Config(const Config &config)
Copy constructor.
Definition: row0pread.h:167
Parallel_reader::Scan_range::m_end
const dtuple_t * m_end
End of the scan, can be null for +infinity.
Definition: row0pread.h:144
dict_table_is_comp
UNIV_INLINE ibool dict_table_is_comp(const dict_table_t *table)
Check whether the table uses the compact page format.
Parallel_reader::get_error_state
dberr_t get_error_state() const
Get the error stored in the global error state.
Definition: row0pread.h:236
page_id_t
Page identifier.
Definition: buf0types.h:153
Parallel_reader::Config::Config
Config(const Scan_range &scan_range, dict_index_t *index, size_t read_level=0)
Constructor.
Definition: row0pread.h:157
Parallel_reader::is_tree_empty
bool is_tree_empty() const
Definition: row0pread.h:241
Parallel_reader::max_threads
size_t max_threads() const
mpmc_bq< Read_ahead_request >
Parallel_reader::Config::m_scan_range
const Scan_range m_scan_range
Range to scan.
Definition: row0pread.h:175
Parallel_reader::Scan_range::Scan_range
Scan_range(const dtuple_t *start, const dtuple_t *end)
Constructor.
Definition: row0pread.h:137
Parallel_reader::m_read_ahead_threads
std::vector< IB_thread > m_read_ahead_threads
List of threads used for read_ahead purpose.
Definition: row0pread.h:397
db0err.h
Parallel_reader::Ctx::m_rec
const rec_t * m_rec
Current row.
Definition: row0pread.h:724
Parallel_reader::Scan_ctx::block_get_s_latched
buf_block_t * block_get_s_latched(const page_id_t &page_id, mtr_t *mtr, int line) const
Fetch a block from the buffer pool and acquire an S latch on it.
Definition: row0pread.cc:242
os0event.h
Parallel_reader::Scan_ctx::index_s_unlock
void index_s_unlock()
S unlock the index.
Definition: row0pread.cc:132
Parallel_reader::Scan_ctx::Iter::m_n_ext
ulint m_n_ext
Number of externally stored columns.
Definition: row0pread.h:448
Parallel_reader::Scan_ctx
Parallel reader context.
Definition: row0pread.h:413
Parallel_reader::Ctx::m_scan_ctx
Scan_ctx * m_scan_ctx
Scanner context.
Definition: row0pread.h:714
Parallel_reader::Scan_ctx::Iter::m_heap
mem_heap_t * m_heap
Heap used to allocate m_rec, m_tuple and m_pcur.
Definition: row0pread.h:435
PCursor
Persistent cursor wrapper around btr_pcur_t.
Definition: row0pread.cc:194
Parallel_reader::Read_ahead_request
Read ahead request.
Definition: row0pread.h:325
FIL_NULL
constexpr page_no_t FIL_NULL
'null' (undefined) page offset in the context of file spaces
Definition: fil0fil.h:884
Parallel_reader::m_consumed
std::atomic< uint64_t > m_consumed
Number of read ahead requests processed.
Definition: row0pread.h:391
fil0fil.h
Parallel_reader::m_ctx_id
std::atomic_size_t m_ctx_id
Context ID.
Definition: row0pread.h:373
Parallel_reader
The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan ...
Definition: row0pread.h:104
Parallel_reader::Ctx::move_to_next_node
bool move_to_next_node(PCursor *pcursor, mtr_t *mtr)
Move to the next node in the specified level.
Definition: row0pread.cc:469
Parallel_reader::Scan_ctx::search
page_no_t search(const buf_block_t *block, const dtuple_t *key) const
Find the page number of the node that contains the search key.
Definition: row0pread.cc:700
Parallel_reader::Ctx::traverse_recs
dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr)
Traverse the records in a node.
Definition: row0pread.cc:529
Parallel_reader::Scan_ctx::copy_row
void copy_row(const rec_t *rec, Iter *iter) const
Build a dtuple_t from rec_t.
Definition: row0pread.cc:393
Parallel_reader::Scan_ctx::Iter::~Iter
~Iter()
Destructor.
Definition: row0pread.cc:79
start
static void start(PluginFuncEnv *env)
Definition: http_server_plugin.cc:572
Parallel_reader::is_error_set
bool is_error_set() const
Definition: row0pread.h:261
Parallel_reader::m_start_callback
Start m_start_callback
Callback at start (before processing any rows).
Definition: row0pread.h:379
Parallel_reader::Scan_ctx::Savepoint
std::pair< ulint, buf_block_t * > Savepoint
mtr_t savepoint.
Definition: row0pread.h:455
Parallel_reader::Scan_ctx::~Scan_ctx
~Scan_ctx()
Destructor.
Definition: row0pread.cc:96
Parallel_reader::Scan_ctxs
std::list< std::shared_ptr< Scan_ctx >, ut_allocator< std::shared_ptr< Scan_ctx > >> Scan_ctxs
Definition: row0pread.h:348
Parallel_reader::m_event
os_event_t m_event
For signalling worker threads about events.
Definition: row0pread.h:367
Parallel_reader::release_unused_threads
void release_unused_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:280
Parallel_reader::Read_ahead_request::Read_ahead_request
Read_ahead_request(Scan_ctx *scan_ctx, page_no_t page_no)
Constructor.
Definition: row0pread.h:331
Parallel_reader::Parallel_reader
Parallel_reader(size_t max_threads, bool sync=true)
Constructor.
Definition: row0pread.cc:175
row0sel.h
Parallel_reader::Scan_ctx::Ranges
std::vector< Range, ut_allocator< Range > > Ranges
Definition: row0pread.h:463
ut_allocator
Allocator class for allocating memory from inside std::* containers.
Definition: ut0new.h:567
Parallel_reader::Links
std::vector< page_no_t, ut_allocator< page_no_t > > Links
Definition: row0pread.h:109
Parallel_reader::is_active
bool is_active() const
Definition: row0pread.h:305
Parallel_reader::Scan_ctx::m_s_locks
std::atomic_size_t m_s_locks
Number of threads that have S locked the index.
Definition: row0pread.h:636
Parallel_reader::Scan_ctx::create_context
dberr_t create_context(const Range &range, bool split)
Create an execution context for a range and add it to the Parallel_reader's run queue.
Definition: row0pread.cc:989
index
char * index(const char *, int c)
Definition: mysql.cc:2875
Parallel_reader::Scan_ctx::release_threads
void release_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:596
Parallel_reader::m_finish_callback
Finish m_finish_callback
Callback at end (adter processing all rows).
Definition: row0pread.h:382
Parallel_reader::Scan_ctx::index_s_own
bool index_s_own() const
Definition: row0pread.h:607
trx_t
Definition: trx0trx.h:780
Parallel_reader::Scan_ctx::is_error_set
bool is_error_set() const
Definition: row0pread.h:475
Parallel_reader::Ctx
Parallel reader execution context.
Definition: row0pread.h:650
Parallel_reader::Scan_range::to_string
std::string to_string() const
Convert the instance to a string representation.
Definition: row0pread.cc:61
Parallel_reader::m_err
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:394
flags
static int flags[50]
Definition: hp_test1.cc:39
Parallel_reader::Ctx::m_first_rec
bool m_first_rec
True if m_rec is the first record in the page.
Definition: row0pread.h:727
Parallel_reader::Scan_range::Scan_range
Scan_range()
Default constructor.
Definition: row0pread.h:127
rem0types.h