114 using Links = std::vector<page_no_t, ut::allocator<page_no_t>>;
167 [[nodiscard]] std::string
to_string()
const;
179 size_t read_level = 0,
180 size_t partition_id = std::numeric_limits<size_t>::max())
231 template <
typename T>
239 template <
typename T>
299 const auto SEQ_CST = std::memory_order_seq_cst;
301 ut_a(active >= n_threads);
325 return m_ctx_id.load(std::memory_order_relaxed) == 0;
358 m_err.store(
err, std::memory_order_relaxed);
384 void enqueue(std::shared_ptr<Ctx> ctx);
388 [[nodiscard]] std::shared_ptr<Ctx>
dequeue();
395 void worker(Thread_ctx *thread_ctx);
403 m_ctx_id.load(std::memory_order_relaxed);
409 std::list<std::shared_ptr<Ctx>,
413 std::list<std::shared_ptr<Scan_ctx>,
512 using Savepoints = std::vector<Savepoint, ut::allocator<Savepoint>>;
515 using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
517 using Ranges = std::vector<Range, ut::allocator<Range>>;
520 [[nodiscard]]
size_t id()
const {
return m_id; }
525 m_err.store(
err, std::memory_order_relaxed);
539 mtr_t *mtr,
size_t line)
const;
584 size_t depth,
const size_t split_level,
Ranges &ranges,
641 return m_s_locks.load(std::memory_order_acquire) > 0;
648 size_t m_id{std::numeric_limits<size_t>::max()};
694 [[nodiscard]]
size_t id()
const {
return m_id; }
761 size_t m_id{std::numeric_limits<size_t>::max()};
uint32_t page_no_t
Page number.
Definition: api0api.h:46
Persistent cursor wrapper around btr_pcur_t.
Definition: row0pread.cc:208
Parallel reader execution context.
Definition: row0pread.h:680
bool is_rec_visible(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.h:729
Ctx(size_t id, Scan_ctx *scan_ctx, const Scan_ctx::Range &range)
Constructor.
Definition: row0pread.h:686
size_t id() const
Definition: row0pread.h:694
size_t m_n_pages
Number of pages traversed by the context.
Definition: row0pread.h:783
~Ctx()=default
Destructor.
bool is_error_set() const
Definition: row0pread.h:755
dberr_t split()
Split the context into sub-ranges and add them to the execution queue.
Definition: row0pread.cc:148
Scan_ctx::Range m_range
Range to read in this context.
Definition: row0pread.h:767
Scan_ctx * m_scan_ctx
Scanner context.
Definition: row0pread.h:770
bool m_start
Start of a new range to scan.
Definition: row0pread.h:791
const trx_t * trx() const
Definition: row0pread.h:700
bool m_split
If true then split the context at the block level.
Definition: row0pread.h:764
dberr_t traverse_recs(PCursor *pcursor, mtr_t *mtr)
Traverse the records in a node.
Definition: row0pread.cc:649
size_t thread_id() const
Definition: row0pread.h:708
bool m_first_rec
True if m_rec is the first record in the page.
Definition: row0pread.h:786
Thread_ctx * thread_ctx() const
Definition: row0pread.h:711
size_t scan_id() const
The scan ID of the scan context this belongs to.
Definition: row0pread.h:697
const buf_block_t * m_block
Current block.
Definition: row0pread.h:777
ulint * m_offsets
Definition: row0pread.h:788
bool move_to_next_node(PCursor *pcursor)
Move to the next node in the specified level.
Definition: row0pread.cc:598
size_t m_id
Context ID.
Definition: row0pread.h:761
const dict_index_t * index() const
Definition: row0pread.h:703
size_t partition_id() const
Definition: row0pread.h:716
Thread_ctx * m_thread_ctx
Context information related to executing thread ID.
Definition: row0pread.h:774
const rec_t * m_rec
Current row.
Definition: row0pread.h:780
dberr_t traverse()
Traverse the pages by key order.
Definition: row0pread.cc:614
Parallel reader context.
Definition: row0pread.h:471
std::atomic_size_t m_s_locks
Number of threads that have S locked the index.
Definition: row0pread.h:669
dberr_t partition(const Scan_range &scan_range, Ranges &ranges, size_t split_level)
Partition the B+Tree for parallel read.
Definition: row0pread.cc:1184
page_cur_t start_range(page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const
Traverse from given sub-tree page number to start of the scan range from the given page number.
Definition: row0pread.cc:972
std::pair< ulint, buf_block_t * > Savepoint
mtr_t savepoint.
Definition: row0pread.h:509
size_t id() const
Definition: row0pread.h:520
Scan_ctx & operator=(Scan_ctx &&)=delete
void index_s_lock()
S lock the index.
Definition: row0pread.cc:132
std::shared_ptr< Iter > create_persistent_cursor(const page_cur_t &page_cursor, mtr_t *mtr) const
Create the persistent cursor that will be used to traverse the partition and position on the the star...
Definition: row0pread.cc:551
size_t m_id
Context ID.
Definition: row0pread.h:648
F m_f
Callback function.
Definition: row0pread.h:657
bool index_s_own() const
Definition: row0pread.h:640
dberr_t create_context(const Range &range, bool split)
Create an execution context for a range and add it to the Parallel_reader's run queue.
Definition: row0pread.cc:1219
void index_s_unlock()
S unlock the index.
Definition: row0pread.cc:140
dberr_t create_contexts(const Ranges &ranges)
Create the execution contexts based on the ranges.
Definition: row0pread.cc:1244
void release_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:629
page_no_t search(const buf_block_t *block, const dtuple_t *key) const
Find the page number of the node that contains the search key.
Definition: row0pread.cc:934
size_t max_threads() const
Definition: row0pread.h:625
Config m_config
Parallel scan configuration.
Definition: row0pread.h:651
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:524
void copy_row(const rec_t *rec, Iter *iter) const
Build a dtuple_t from rec_t.
Definition: row0pread.cc:521
~Scan_ctx()=default
Destructor.
std::vector< Savepoint, ut::allocator< Savepoint > > Savepoints
For releasing the S latches after processing the blocks.
Definition: row0pread.h:512
size_t m_depth
Depth of the Btree.
Definition: row0pread.h:660
std::pair< std::shared_ptr< Iter >, std::shared_ptr< Iter > > Range
The first cursor should read up to the second cursor [f, s).
Definition: row0pread.h:515
void create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const
Create and add the range to the scan ranges.
Definition: row0pread.cc:1012
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:666
Scan_ctx & operator=(const Scan_ctx &)=delete
std::vector< Range, ut::allocator< Range > > Ranges
Definition: row0pread.h:517
bool is_error_set() const
Definition: row0pread.h:529
const trx_t * m_trx
Covering transaction.
Definition: row0pread.h:654
buf_block_t * block_get_s_latched(const page_id_t &page_id, mtr_t *mtr, size_t line) const
Fetch a block from the buffer pool and acquire an S latch on it.
Definition: row0pread.cc:319
Scan_ctx(Scan_ctx &&)=delete
Scan_ctx(const Scan_ctx &)=delete
Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f)
Constructor.
Definition: row0pread.cc:201
Parallel_reader * m_reader
The parallel reader.
Definition: row0pread.h:663
dberr_t create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t split_level, Ranges &ranges, mtr_t *mtr)
Find the subtrees to scan in a block.
Definition: row0pread.cc:1028
bool check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr)
Build an old version of the row if required.
Definition: row0pread.cc:466
The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan ...
Definition: row0pread.h:101
size_t max_threads() const
Definition: row0pread.h:348
std::function< dberr_t(Thread_ctx *thread_ctx)> Finish
Callback to finalise callers state.
Definition: row0pread.h:140
void parallel_read()
Create the threads and do a parallel read across the partitions.
Definition: row0pread.cc:1276
std::atomic_size_t m_n_completed
Total tasks executed so far.
Definition: row0pread.h:446
std::vector< IB_thread, ut::allocator< IB_thread > > m_parallel_read_threads
List of threads used for paralle_read purpose.
Definition: row0pread.h:458
constexpr static size_t MAX_THREADS
Maximum value for innodb-parallel-read-threads.
Definition: row0pread.h:104
Parallel_reader & operator=(Parallel_reader &&)=delete
bool m_sync
If the caller wants to wait for the parallel_read to finish it's run.
Definition: row0pread.h:464
bool is_queue_empty() const
Definition: row0pread.cc:808
dberr_t run(size_t n_threads)
Start the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1353
std::vector< Thread_ctx *, ut::allocator< Thread_ctx * > > m_thread_ctxs
Context information related to each parallel reader thread.
Definition: row0pread.h:467
std::vector< page_no_t, ut::allocator< page_no_t > > Links
Definition: row0pread.h:114
std::atomic< dberr_t > m_err
Error during parallel read.
Definition: row0pread.h:455
bool is_active() const
Definition: row0pread.h:401
size_t m_n_threads
Number of worker threads that will be spawned.
Definition: row0pread.h:422
uint64_t m_sig_count
Value returned by previous call of os_event_reset() on m_event.
Definition: row0pread.h:437
dberr_t get_error_state() const
Get the error stored in the global error state.
Definition: row0pread.h:321
Parallel_reader(const Parallel_reader &)=delete
size_t m_max_threads
Maximum number of worker threads to use.
Definition: row0pread.h:419
std::shared_ptr< Ctx > dequeue()
Fetch the next job execute.
Definition: row0pread.cc:792
void release_unused_threads(size_t unused_threads)
Release unused threads back to the pool.
Definition: row0pread.h:377
Start m_start_callback
Callback at start (before processing any rows).
Definition: row0pread.h:449
os_event_t m_event
For signalling worker threads about events.
Definition: row0pread.h:434
Parallel_reader & operator=(const Parallel_reader &)=delete
void set_start_callback(Start &&f)
Set the callback that must be called before any processing.
Definition: row0pread.h:330
bool is_error_set() const
Definition: row0pread.h:351
Scan_ctxs m_scan_ctxs
Scan contexts.
Definition: row0pread.h:431
Parallel_reader(size_t max_threads)
Constructor.
Definition: row0pread.cc:188
constexpr static size_t MAX_RESERVED_THREADS
Maximum value for reserved parallel read threads for data load so that at least this many threads are...
Definition: row0pread.h:108
dberr_t spawn(size_t n_threads) noexcept
Spawn the threads to do the parallel read for the specified range.
Definition: row0pread.cc:1338
ib_mutex_t m_mutex
Mutex protecting m_ctxs.
Definition: row0pread.h:425
void set_finish_callback(Finish &&f)
Set the callback that must be called after all processing.
Definition: row0pread.h:334
void worker(Thread_ctx *thread_ctx)
Poll for requests and execute.
Definition: row0pread.cc:815
void join()
Wait for the join of threads spawned by the parallel reader.
Definition: row0pread.h:313
~Parallel_reader()
Destructor.
Definition: row0pread.cc:90
std::list< std::shared_ptr< Scan_ctx >, ut::allocator< std::shared_ptr< Scan_ctx > > > Scan_ctxs
Definition: row0pread.h:414
Finish m_finish_callback
Callback at end (adter processing all rows).
Definition: row0pread.h:452
size_t m_scan_ctx_id
Counter for allocating scan context IDs.
Definition: row0pread.h:440
static std::atomic_size_t s_active_threads
Number of threads currently doing parallel reads.
Definition: row0pread.h:461
static void release_threads(size_t n_threads)
Release the parallel read threads.
Definition: row0pread.h:298
constexpr static size_t MAX_TOTAL_THREADS
Maximum value for at most number of parallel read threads that can be spawned.
Definition: row0pread.h:112
void enqueue(std::shared_ptr< Ctx > ctx)
Add an execution context to the run queue.
Definition: row0pread.cc:786
void set_n_threads(size_t n_threads)
Set the number of threads to be spawned.
Definition: row0pread.h:363
static size_t available_threads(size_t n_required, bool use_reserved)
Check how many threads are available for parallel reads.
Definition: row0pread.cc:103
void set_error_state(dberr_t err)
Set the error state.
Definition: row0pread.h:357
std::list< std::shared_ptr< Ctx >, ut::allocator< std::shared_ptr< Ctx > > > Ctxs
Definition: row0pread.h:410
State
Scan state.
Definition: row0pread.h:122
@ PAGE
Start/Finish page read.
@ CTX
Start/Finish Ctx state.
@ THREAD
Start/Finish thread state.
dberr_t add_scan(trx_t *trx, const Config &config, F &&f)
Add scan context.
Definition: row0pread.cc:1393
bool is_tree_empty() const
Definition: row0pread.h:324
std::atomic_size_t m_ctx_id
Context ID.
Definition: row0pread.h:443
std::function< dberr_t(Thread_ctx *thread_ctx)> Start
Callback to initialise callers state.
Definition: row0pread.h:137
Parallel_reader(const Parallel_reader &&)=delete
std::function< dberr_t(const Ctx *)> F
Callback to process the rows.
Definition: row0pread.h:143
Ctxs m_ctxs
Contexts that must be executed.
Definition: row0pread.h:428
Page identifier.
Definition: buf0types.h:207
Page size descriptor.
Definition: page0size.h:50
Allocator that allows std::* containers to manage their memory through ut::malloc* and ut::free libra...
Definition: ut0new.h:2182
Global error codes for the database.
dberr_t
Definition: db0err.h:39
@ DB_SUCCESS
Definition: db0err.h:43
uint32_t dict_tf_to_fsp_flags(uint32_t table_flags)
Convert a 32 bit integer table flags to the 32 bit FSP Flags.
Definition: dict0dict.cc:4999
static bool dict_table_is_comp(const dict_table_t *table)
Check whether the table uses the compact page format.
The low-level file system.
static int flags[50]
Definition: hp_test1.cc:40
static void start(mysql_harness::PluginFuncEnv *env)
Definition: http_auth_backend_plugin.cc:180
static void mem_heap_free(mem_heap_t *heap)
Frees the space occupied by a memory heap.
static mem_heap_t * mem_heap_create(ulint size, ut::Location loc, ulint type=MEM_HEAP_DYNAMIC)
Creates a memory heap.
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
static Value err()
Create a Value object that represents an error condition.
Definition: json_binary.cc:927
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
The interface to the operating system condition variables.
The interface to the threading wrapper.
A class describing a page size.
Record manager global types.
byte rec_t
Definition: rem0types.h:41
required string key
Definition: replication_asynchronous_connection_failover.proto:60
Scan (Scan_ctx) configuration.
Definition: row0pread.h:171
size_t m_partition_id
Partition id if the index to be scanned belongs to a partitioned table, else std::numeric_limits<size...
Definition: row0pread.h:211
Config(const Scan_range &scan_range, dict_index_t *index, size_t read_level=0, size_t partition_id=std::numeric_limits< size_t >::max())
Constructor.
Definition: row0pread.h:178
const bool m_is_compact
Row format of table.
Definition: row0pread.h:201
size_t m_read_level
Btree level from which records need to be read.
Definition: row0pread.h:207
Config(const Config &config)=default
Copy constructor.
dict_index_t * m_index
(Cluster) Index in table to scan.
Definition: row0pread.h:198
const Scan_range m_scan_range
Range to scan.
Definition: row0pread.h:195
const page_size_t m_page_size
Tablespace page size.
Definition: row0pread.h:204
Boundary of the range to scan.
Definition: row0pread.h:487
const dtuple_t * m_tuple
Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple,...
Definition: row0pread.h:502
btr_pcur_t * m_pcur
Persistent cursor.
Definition: row0pread.h:505
const rec_t * m_rec
Start scanning from this key.
Definition: row0pread.h:498
mem_heap_t * m_heap
Heap used to allocate m_rec, m_tuple and m_pcur.
Definition: row0pread.h:492
~Iter()
Destructor.
Definition: row0pread.cc:75
const ulint * m_offsets
m_rec column offsets.
Definition: row0pread.h:495
Specifies the range from where to start the scan and where to end it.
Definition: row0pread.h:146
Scan_range()
Default constructor.
Definition: row0pread.h:148
Scan_range(const Scan_range &scan_range)=default
Copy constructor.
const dtuple_t * m_end
End of the scan, can be null for +infinity.
Definition: row0pread.h:164
const dtuple_t * m_start
Start of the scan, can be nullptr for -infinity.
Definition: row0pread.h:161
std::string to_string() const
Convert the instance to a string representation.
Definition: row0pread.cc:57
Scan_range(const dtuple_t *start, const dtuple_t *end)
Constructor.
Definition: row0pread.h:157
Thread related context information.
Definition: row0pread.h:215
size_t m_thread_id
Thread ID.
Definition: row0pread.h:261
Thread_ctx & operator=(Thread_ctx &&)=delete
void create_blob_heap() noexcept
Create BLOB heap.
Definition: row0pread.h:245
Thread_ctx(const Thread_ctx &)=delete
mem_heap_t * m_blob_heap
BLOB heap per thread.
Definition: row0pread.h:268
void * m_callback_ctx
Callback information related to the thread.
Definition: row0pread.h:265
Thread_ctx(size_t id) noexcept
Constructor.
Definition: row0pread.h:218
void savepoint() noexcept
Save current position, commit any active mtr.
Definition: row0pread.cc:420
PCursor * m_pcursor
Current persistent cursor.
Definition: row0pread.h:274
Thread_ctx(Thread_ctx &&)=delete
State get_state() const noexcept
Definition: row0pread.h:251
Thread_ctx & operator=(const Thread_ctx &)=delete
State m_state
Worker thread state.
Definition: row0pread.h:271
void set_callback_ctx(T *ctx) noexcept
Set thread related callback information.
Definition: row0pread.h:232
dberr_t restore_from_savepoint() noexcept
Restore saved position and resume scan.
Definition: row0pread.cc:414
T * get_callback_ctx() noexcept
Get the thread related callback information/.
Definition: row0pread.h:240
~Thread_ctx() noexcept
Destructor.
Definition: row0pread.h:221
Definition: btr0pcur.h:99
The buffer control block structure.
Definition: buf0buf.h:1747
Data structure for an index.
Definition: dict0mem.h:1046
Data structure for a database table.
Definition: dict0mem.h:1909
Structure for an SQL data tuple of fields (logical record)
Definition: data0data.h:694
The info structure stored at the beginning of a heap block.
Definition: mem0mem.h:302
Mini-transaction handle and buffer.
Definition: mtr0mtr.h:177
InnoDB condition variable.
Definition: os0event.cc:63
Index page cursor.
Definition: page0cur.h:311
Definition: gen_lex_token.cc:149
Definition: trx0trx.h:675
#define UNIV_PAGE_SIZE
The universal page size of the database.
Definition: univ.i:294
unsigned long int ulint
Definition: univ.i:406
#define UT_LOCATION_HERE
Definition: ut0core.h:73
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:105
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93