MySQL 9.0.0
Source Code Documentation
Parallel_reader Class Reference

The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan start and scan end search. More...

#include <row0pread.h>

Classes

struct  Config
 Scan (Scan_ctx) configuration. More...
 
class  Ctx
 Parallel reader execution context. More...
 
class  Scan_ctx
 Parallel reader context. More...
 
struct  Scan_range
 Specifies the range from where to start the scan and where to end it. More...
 
struct  Thread_ctx
 Thread related context information. More...
 

Public Types

enum class  State : uint8_t { UNKNOWN , THREAD , CTX , PAGE }
 Scan state. More...
 
using Links = std::vector< page_no_t, ut::allocator< page_no_t > >
 
using Start = std::function< dberr_t(Thread_ctx *thread_ctx)>
 Callback to initialise callers state. More...
 
using Finish = std::function< dberr_t(Thread_ctx *thread_ctx)>
 Callback to finalise callers state. More...
 
using F = std::function< dberr_t(const Ctx *)>
 Callback to process the rows. More...
 

Public Member Functions

 Parallel_reader (size_t max_threads)
 Constructor. More...
 
 ~Parallel_reader ()
 Destructor. More...
 
dberr_t add_scan (trx_t *trx, const Config &config, F &&f)
 Add scan context. More...
 
void join ()
 Wait for the join of threads spawned by the parallel reader. More...
 
dberr_t get_error_state () const
 Get the error stored in the global error state. More...
 
bool is_tree_empty () const
 
void set_start_callback (Start &&f)
 Set the callback that must be called before any processing. More...
 
void set_finish_callback (Finish &&f)
 Set the callback that must be called after all processing. More...
 
dberr_t spawn (size_t n_threads) noexcept
 Spawn the threads to do the parallel read for the specified range. More...
 
dberr_t run (size_t n_threads)
 Start the threads to do the parallel read for the specified range. More...
 
size_t max_threads () const
 
bool is_error_set () const
 
void set_error_state (dberr_t err)
 Set the error state. More...
 
void set_n_threads (size_t n_threads)
 Set the number of threads to be spawned. More...
 
 Parallel_reader (const Parallel_reader &)=delete
 
 Parallel_reader (const Parallel_reader &&)=delete
 
Parallel_readeroperator= (Parallel_reader &&)=delete
 
Parallel_readeroperator= (const Parallel_reader &)=delete
 

Static Public Member Functions

static size_t available_threads (size_t n_required, bool use_reserved)
 Check how many threads are available for parallel reads. More...
 
static void release_threads (size_t n_threads)
 Release the parallel read threads. More...
 

Static Public Attributes

constexpr static size_t MAX_THREADS {256}
 Maximum value for innodb-parallel-read-threads. More...
 
constexpr static size_t MAX_RESERVED_THREADS {16}
 Maximum value for reserved parallel read threads for data load so that at least this many threads are always available for data load. More...
 
constexpr static size_t MAX_TOTAL_THREADS {MAX_THREADS + MAX_RESERVED_THREADS}
 Maximum value for at most number of parallel read threads that can be spawned. More...
 

Private Types

using Ctxs = std::list< std::shared_ptr< Ctx >, ut::allocator< std::shared_ptr< Ctx > > >
 
using Scan_ctxs = std::list< std::shared_ptr< Scan_ctx >, ut::allocator< std::shared_ptr< Scan_ctx > > >
 

Private Member Functions

void release_unused_threads (size_t unused_threads)
 Release unused threads back to the pool. More...
 
void enqueue (std::shared_ptr< Ctx > ctx)
 Add an execution context to the run queue. More...
 
std::shared_ptr< Ctxdequeue ()
 Fetch the next job execute. More...
 
bool is_queue_empty () const
 
void worker (Thread_ctx *thread_ctx)
 Poll for requests and execute. More...
 
void parallel_read ()
 Create the threads and do a parallel read across the partitions. More...
 
bool is_active () const
 

Private Attributes

size_t m_max_threads {}
 Maximum number of worker threads to use. More...
 
size_t m_n_threads {0}
 Number of worker threads that will be spawned. More...
 
ib_mutex_t m_mutex
 Mutex protecting m_ctxs. More...
 
Ctxs m_ctxs {}
 Contexts that must be executed. More...
 
Scan_ctxs m_scan_ctxs {}
 Scan contexts. More...
 
os_event_t m_event {}
 For signalling worker threads about events. More...
 
uint64_t m_sig_count
 Value returned by previous call of os_event_reset() on m_event. More...
 
size_t m_scan_ctx_id {}
 Counter for allocating scan context IDs. More...
 
std::atomic_size_t m_ctx_id {}
 Context ID. More...
 
std::atomic_size_t m_n_completed {}
 Total tasks executed so far. More...
 
Start m_start_callback {}
 Callback at start (before processing any rows). More...
 
Finish m_finish_callback {}
 Callback at end (adter processing all rows). More...
 
std::atomic< dberr_tm_err {DB_SUCCESS}
 Error during parallel read. More...
 
std::vector< IB_thread, ut::allocator< IB_thread > > m_parallel_read_threads
 List of threads used for paralle_read purpose. More...
 
bool m_sync
 If the caller wants to wait for the parallel_read to finish it's run. More...
 
std::vector< Thread_ctx *, ut::allocator< Thread_ctx * > > m_thread_ctxs
 Context information related to each parallel reader thread. More...
 

Static Private Attributes

static std::atomic_size_t s_active_threads {}
 Number of threads currently doing parallel reads. More...
 

Detailed Description

The core idea is to find the left and right paths down the B+Tree.These paths correspond to the scan start and scan end search.

Follow the links at the appropriate btree level from the left to right and split the scan on each of these sub-tree root nodes.

If the user has set the maximum number of threads to use at say 4 threads and there are 5 sub-trees at the selected level then we will split the 5th sub-tree dynamically when it is ready for scan.

We want to allow multiple parallel range scans on different indexes at the same time. To achieve this split out the scan context (Scan_ctx) from the execution context (Ctx). The Scan_ctx has the index and transaction information and the Ctx keeps track of the cursor for a specific thread during the scan.

To start a scan we need to instantiate a Parallel_reader. A parallel reader can contain several Scan_ctx instances and a Scan_ctx can contain several Ctx instances. Its' the Ctx instances that are eventually executed.

This design allows for a single Parallel_reader to scan multiple indexes at once. Each index range scan has to be added via its add_scan() method. This functionality is required to handle parallel partition scans because partitions are separate indexes. This can be used to scan completely different indexes and tables by one instance of a Parallel_reader.

To solve the imbalance problem we dynamically split the sub-trees as and when required. e.g., If you have 5 sub-trees to scan and 4 threads then it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()), the first thread that finishes scanning the first set of 4 partitions will then dynamically split the 5th sub-tree and add the newly created sub-trees to the execution context (Ctx) run queue in the Parallel_reader. As the other threads complete their sub-tree scans they will pick up more execution contexts (Ctx) from the Parallel_reader run queue and start scanning the sub-partitions as normal.

Note: The Ctx instances are in a virtual list. Each Ctx instance has a range to scan. The start point of this range instance is the end point of the Ctx instance scanning values less than its start point. A Ctx will scan from [Start, End) rows. We use std::shared_ptr to manage the reference counting, this allows us to dispose of the Ctx instances without worrying about dangling pointers.

NOTE: Secondary index scans are not supported currently.

Member Typedef Documentation

◆ Ctxs

using Parallel_reader::Ctxs = std::list<std::shared_ptr<Ctx>, ut::allocator<std::shared_ptr<Ctx> >>
private

◆ F

using Parallel_reader::F = std::function<dberr_t(const Ctx *)>

Callback to process the rows.

◆ Finish

using Parallel_reader::Finish = std::function<dberr_t(Thread_ctx *thread_ctx)>

Callback to finalise callers state.

◆ Links

◆ Scan_ctxs

using Parallel_reader::Scan_ctxs = std::list<std::shared_ptr<Scan_ctx>, ut::allocator<std::shared_ptr<Scan_ctx> >>
private

◆ Start

using Parallel_reader::Start = std::function<dberr_t(Thread_ctx *thread_ctx)>

Callback to initialise callers state.

Member Enumeration Documentation

◆ State

enum class Parallel_reader::State : uint8_t
strong

Scan state.

Enumerator
UNKNOWN 

Unknown state.

THREAD 

Start/Finish thread state.

CTX 

Start/Finish Ctx state.

PAGE 

Start/Finish page read.

Constructor & Destructor Documentation

◆ Parallel_reader() [1/3]

Parallel_reader::Parallel_reader ( size_t  max_threads)
explicit

Constructor.

Parameters
[in]max_threadsMaximum number of threads to use.

◆ ~Parallel_reader()

Parallel_reader::~Parallel_reader ( )

Destructor.

◆ Parallel_reader() [2/3]

Parallel_reader::Parallel_reader ( const Parallel_reader )
delete

◆ Parallel_reader() [3/3]

Parallel_reader::Parallel_reader ( const Parallel_reader &&  )
delete

Member Function Documentation

◆ add_scan()

dberr_t Parallel_reader::add_scan ( trx_t trx,
const Config config,
Parallel_reader::F &&  f 
)

Add scan context.

Parameters
[in,out]trxCovering transaction.
[in]configScan condfiguration.
[in]fCallback function. (default is 0 which is leaf level)
Returns
error.

◆ available_threads()

size_t Parallel_reader::available_threads ( size_t  n_required,
bool  use_reserved 
)
static

Check how many threads are available for parallel reads.

Parameters
[in]n_requiredNumber of threads required.
[in]use_reservedtrue if reserved threads needs to be considered while checking for availability of threads
Returns
number of threads available.

◆ dequeue()

std::shared_ptr< Parallel_reader::Ctx > Parallel_reader::dequeue ( )
private

Fetch the next job execute.

Returns
job to execute or nullptr.

◆ enqueue()

void Parallel_reader::enqueue ( std::shared_ptr< Ctx ctx)
private

Add an execution context to the run queue.

Parameters
[in]ctxExecution context to add to the queue.

◆ get_error_state()

dberr_t Parallel_reader::get_error_state ( ) const
inline

Get the error stored in the global error state.

Returns
global error state.

◆ is_active()

bool Parallel_reader::is_active ( ) const
inlineprivate
Returns
true if tasks are still executing.

◆ is_error_set()

bool Parallel_reader::is_error_set ( ) const
inline
Returns
true if in error state.

◆ is_queue_empty()

bool Parallel_reader::is_queue_empty ( ) const
private
Returns
true if job queue is empty.

◆ is_tree_empty()

bool Parallel_reader::is_tree_empty ( ) const
inline
Returns
true if the tree is empty, else false.

◆ join()

void Parallel_reader::join ( )
inline

Wait for the join of threads spawned by the parallel reader.

◆ max_threads()

size_t Parallel_reader::max_threads ( ) const
inline
Returns
the configured max threads size.

◆ operator=() [1/2]

Parallel_reader & Parallel_reader::operator= ( const Parallel_reader )
delete

◆ operator=() [2/2]

Parallel_reader & Parallel_reader::operator= ( Parallel_reader &&  )
delete

◆ parallel_read()

void Parallel_reader::parallel_read ( )
private

Create the threads and do a parallel read across the partitions.

◆ release_threads()

static void Parallel_reader::release_threads ( size_t  n_threads)
inlinestatic

Release the parallel read threads.

◆ release_unused_threads()

void Parallel_reader::release_unused_threads ( size_t  unused_threads)
inlineprivate

Release unused threads back to the pool.

Parameters
[in]unused_threadsNumber of threads to "release".

◆ run()

dberr_t Parallel_reader::run ( size_t  n_threads)

Start the threads to do the parallel read for the specified range.

Parameters
[in]n_threadsNumber of threads to use for the scan.
Returns
DB_SUCCESS or error code.

◆ set_error_state()

void Parallel_reader::set_error_state ( dberr_t  err)
inline

Set the error state.

Parameters
[in]errError state to set to.

◆ set_finish_callback()

void Parallel_reader::set_finish_callback ( Finish &&  f)
inline

Set the callback that must be called after all processing.

Parameters
[in]fCall after last row is processed.

◆ set_n_threads()

void Parallel_reader::set_n_threads ( size_t  n_threads)
inline

Set the number of threads to be spawned.

Parameters
[in]n_threadsnumber of threads to be spawned.

◆ set_start_callback()

void Parallel_reader::set_start_callback ( Start &&  f)
inline

Set the callback that must be called before any processing.

Parameters
[in]fCall before first row is processed.

◆ spawn()

dberr_t Parallel_reader::spawn ( size_t  n_threads)
noexcept

Spawn the threads to do the parallel read for the specified range.

Don't wait for the spawned to threads to complete.

Parameters
[in]n_threadsnumber of threads that need to be spawned
Returns
DB_SUCCESS or error code.

◆ worker()

void Parallel_reader::worker ( Parallel_reader::Thread_ctx thread_ctx)
private

Poll for requests and execute.

Parameters
[in]thread_ctxthread related context information

Member Data Documentation

◆ m_ctx_id

std::atomic_size_t Parallel_reader::m_ctx_id {}
private

Context ID.

Monotonically increasing ID.

◆ m_ctxs

Ctxs Parallel_reader::m_ctxs {}
private

Contexts that must be executed.

◆ m_err

std::atomic<dberr_t> Parallel_reader::m_err {DB_SUCCESS}
private

Error during parallel read.

◆ m_event

os_event_t Parallel_reader::m_event {}
private

For signalling worker threads about events.

◆ m_finish_callback

Finish Parallel_reader::m_finish_callback {}
private

Callback at end (adter processing all rows).

◆ m_max_threads

size_t Parallel_reader::m_max_threads {}
private

Maximum number of worker threads to use.

◆ m_mutex

ib_mutex_t Parallel_reader::m_mutex
mutableprivate

Mutex protecting m_ctxs.

◆ m_n_completed

std::atomic_size_t Parallel_reader::m_n_completed {}
private

Total tasks executed so far.

◆ m_n_threads

size_t Parallel_reader::m_n_threads {0}
private

Number of worker threads that will be spawned.

◆ m_parallel_read_threads

std::vector<IB_thread, ut::allocator<IB_thread> > Parallel_reader::m_parallel_read_threads
private

List of threads used for paralle_read purpose.

◆ m_scan_ctx_id

size_t Parallel_reader::m_scan_ctx_id {}
private

Counter for allocating scan context IDs.

◆ m_scan_ctxs

Scan_ctxs Parallel_reader::m_scan_ctxs {}
private

Scan contexts.

◆ m_sig_count

uint64_t Parallel_reader::m_sig_count
private

Value returned by previous call of os_event_reset() on m_event.

◆ m_start_callback

Start Parallel_reader::m_start_callback {}
private

Callback at start (before processing any rows).

◆ m_sync

bool Parallel_reader::m_sync
private

If the caller wants to wait for the parallel_read to finish it's run.

◆ m_thread_ctxs

std::vector<Thread_ctx *, ut::allocator<Thread_ctx *> > Parallel_reader::m_thread_ctxs
private

Context information related to each parallel reader thread.

◆ MAX_RESERVED_THREADS

constexpr static size_t Parallel_reader::MAX_RESERVED_THREADS {16}
staticconstexpr

Maximum value for reserved parallel read threads for data load so that at least this many threads are always available for data load.

◆ MAX_THREADS

constexpr static size_t Parallel_reader::MAX_THREADS {256}
staticconstexpr

Maximum value for innodb-parallel-read-threads.

◆ MAX_TOTAL_THREADS

constexpr static size_t Parallel_reader::MAX_TOTAL_THREADS {MAX_THREADS + MAX_RESERVED_THREADS}
staticconstexpr

Maximum value for at most number of parallel read threads that can be spawned.

◆ s_active_threads

std::atomic_size_t Parallel_reader::s_active_threads {}
staticprivate

Number of threads currently doing parallel reads.


The documentation for this class was generated from the following files: