MySQL 8.4.2
Source Code Documentation
|
Contains spill state for set operations' use of in-memory hash map. More...
Classes | |
struct | CountPair |
For a given chunk file pair {HF, IF}, the count of rows in each chunk respectively. More... | |
Public Types | |
enum class | ReadingState : uint8_t { SS_NONE , SS_READING_LEFT_HF , SS_READING_LEFT_IF , SS_COPY_OPERAND_N_TO_IF , SS_READING_RIGHT_HF , SS_READING_RIGHT_IF , SS_FLUSH_REST_OF_LEFT_IFS } |
using | hash_map_type = ankerl::unordered_dense::segmented_map< ImmutableStringWithLength, LinkedImmutableString, ImmutableStringHasher > |
Public Member Functions | |
SpillState (THD *thd, MEM_ROOT *mem_root) | |
bool | spill () |
Inquire spill handling state. More... | |
bool | simulated_secondary_overflow (bool *spill) |
void | set_secondary_overflow () |
bool | secondary_overflow () const |
Getter, cf. comment for m_secondary_overflow . More... | |
void | secondary_overflow_handling_done () |
ReadingState | read_state () |
bool | init (const Operand &left_operand, hash_map_type *hash_map, size_t rows_in_hash_table, size_t read_rows_before_dedup, MEM_ROOT *hash_map_mem_root, TABLE *t) |
Initialize the spill to disk processing state with some variables. More... | |
int | read_next_row (const Operand *current_operand) |
Given current state of spill processing, return the next row up for inserting into or matching against the hash map. More... | |
int | read_next_row_secondary_overflow () |
Given current state of secondary overflow processing, return the next row up for inserting into or matching against the index in the result table (we no longer use hashing, having fallen back on de-duplicating via index in resulting output table. More... | |
bool | write_HF (THD *thd, size_t set, size_t chunk_idx, const Operands &operands, ha_rows *stored_rows) |
Used to write a complete (or incomplete in the case of secondary overflow) HF chunk to the materialized tmp table. More... | |
bool | write_completed_HFs (THD *thd, const Operands &operands, ha_rows *stored_rows) |
Write the contents of the final generation of HD chunks to the materialized table which will hold the result set of the set operation. More... | |
bool | write_partially_completed_HFs (THD *thd, const Operands &operands, ha_rows *stored_rows) |
Write the contents of the HD chunks that were completed when a secondary memory overflow has occurred. More... | |
bool | save_offending_row () |
THD * | thd () |
size_t | current_chunk_file_set () const |
size_t | current_chunk_idx () const |
Static Public Member Functions | |
static void | reset_hash_map (hash_map_type *hash_map) |
Private Types | |
using | SetCounts = Mem_root_array< CountPair > |
For a chunk file pair, an array of counts indexed by m_current_chunk_file_set. More... | |
Private Member Functions | |
void | switch_to_HF () |
void | switch_to_IF () |
bool | compute_chunk_file_sets (const Operand *current_operand) |
Compute sizing of and set aside space for the on-disk chunks and their associated in-memory structures, based on the row estimate taken from Operand::m_estimated_output_rows. More... | |
bool | initialize_first_HF_chunk_files () |
bool | spread_hash_map_to_HF_chunk_files () |
The initial hash map that overflowed will be spread over the determined number of chunk files, cf. More... | |
bool | save_operand_to_IF_chunk_files (const Operand *current_operand) |
bool | save_rest_of_operand_to_IF_chunk_files (const Operand *current_operand) |
bool | reset_for_spill_handling () |
bool | append_hash_map_to_HF () |
We are done processing a {HF, IF} chunk pair. More... | |
size_t | hash_to_chunk_index (uint64_t hash) const |
Static Private Member Functions | |
static size_t | chunk_index_to_set (size_t chunk_index) |
static size_t | chunk_offset (size_t chunk_index) |
Private Attributes | |
size_t | m_simulated_set_idx {std::numeric_limits<size_t>::max()} |
size_t | m_simulated_chunk_idx {std::numeric_limits<size_t>::max()} |
size_t | m_simulated_row_no {std::numeric_limits<size_t>::max()} |
THD * | m_thd |
ReadingState | m_spill_read_state {ReadingState::SS_NONE} |
If not SS_NONE, we have detected an overflow in the in-memory hash map while reading the left(-most) operand of an INTERSECT or EXCEPT operation and are ready for reading next row from an operand (left or right). More... | |
bool | m_secondary_overflow {false} |
If true, we have seen memory overflow also during spill handling. More... | |
TABLE * | m_materialized_table {nullptr} |
The materialized table we are eventualy writing the result of the set operation to. More... | |
pack_rows::TableCollection | m_table_collection |
Cached value for {m_materialized_table}. More... | |
hash_map_type * | m_hash_map {nullptr} |
The in-memory hash map that overflowed. More... | |
size_t | m_hash_seed {0} |
Modify for each operator in a N-ary set operation to avoid initial chunks filling up right away due to row order in previous operation. More... | |
size_t | m_rows_in_hash_map {0} |
At the time of overflow: how many rows from left operand are in hash map after deduplication. More... | |
size_t | m_read_rows_before_dedup {0} |
At the time of overflow: how many rows have we read from left operand. More... | |
MEM_ROOT * | m_hash_map_mem_root {nullptr} |
The mem_root of m_hash_map. We need it for reusing its space. More... | |
size_t | m_num_chunks {0} |
The number of chunks needed after rounding up to nearest power of two. More... | |
size_t | m_no_of_chunk_file_sets {0} |
The number of chunk file sets needed to process all m_num_chunks chunks. More... | |
size_t | m_current_chunk_file_set {0} |
The current chunk under processing. 0-based. More... | |
struct anonymous_namespace{composite_iterators.cc}::SpillState:: { ... } | m_offending_row |
Keeps the row that was just read from the left operand when we discovered that we were out of space in the in-memory hash map. More... | |
String | m_row_buffer |
Temporary space for (de)serializing a row. More... | |
Mem_root_array< ChunkPair > | m_chunk_files |
Array to hold the list of chunk files on disk in case we degrade into on-disk set EXCEPT/INTERSECT. More... | |
size_t | m_current_chunk_idx {0} |
The index of the chunk pair being read, incremented before use. More... | |
size_t | m_current_row_in_chunk {0} |
The current row no (1-based) in a chunk being read, incremented before use. More... | |
HashJoinChunk | m_remaining_input |
Used if m_no_of_chunk_file_sets > 1 so we can replay input rows from operands over sets 1..S-1, i.e. More... | |
Mem_root_array< SetCounts > | m_row_counts |
A matrix of counters keeping track of how many rows have been stashed away in the chunk files for each set in each chunk file of the current generation. More... | |
Static Private Attributes | |
static constexpr uint32_t | m_magic_prime = 4391 |
Contains spill state for set operations' use of in-memory hash map.
If we encounter a situation in which the hash map for set operations overflows allowed memory, we initiate a spill to disk procedure. This class encapsulates state using during this procedure. Spill to disk starts with a call to handle_hash_map_full
.
We built a mechanism with an in-memory hash map which can spill gracefully to disk if the volume of rows gets large and still perform well. In the presence of wrong table cardinality information, we may not be able to complete the spill to disk procedure (if we still run out of memory when hashing chunks, see below). If so, we fall back on de-duplicating using the non-unique key of the output (materialized) result table.
The spill code is partially based on code developed for hash join: e.g. we reuse packing/unpacking functions like
StoreFromTableBuffersRaw (pack_rows.h) LoadImmutableStringIntoTableBuffers (hash_join_buffer.h)
and furthermore, the ankerl::unordered_dense library, and the chunk file abstraction.
Definitions: A' - set of rows from operand 1 of set operation that fits in the in-memory hash map, deduplicated, with counters A - set of rows from operand 1 before deduplication B - non-deduplicated set of rows from operand 1 that didn't fit C = A + B - total set of rows in operand one; not known a priori, but we use the statistics for an estimate. M - (aka. m_num_chunks) total number of chunk files the tertiary hash distributes the rows to. Multiple of 2, as used for hash join. N - (aka. HashJoinIterator::kMaxChunks) the max number of HF and IF files that may be open at one time. May be smaller than M. S = ceiling(M/N) (aka. m_no_of_chunk_file_sets) - number of sets of open files we need s - the set of chunk files opened (aka. m_chunk_files), sets are enumerated from 0..S-1, cf. m_current_chunk_file_set. n - number of operands in set operation REMAININGINPUT (aka. m_remaining_input) - tmp file needed if S > 1. MATERIALIZEDTABLE (aka. m_materialized_table) - output for EXCEPT/INTERSECT algorithm primary hash - MySQL record hash, aka. calc_row_hash(m_materialized_table) secondary hash - the hash function used by ankerl::unordered_dense for the in-memory hash map based on primary hash tertiary hash - hash function for distributing rows to chunk files, cf. MY_XXH64 based on primary hash ============ !In-memory ! Two kinds of tmp chunk files, HF and IF !hash map ! HF: already Hashed and de-duplicated rows File ! A' rows ! IF: Input File (not yet de-duplicated rows) !==========! | !---------! !----------------! | ! B ! ! REMAININGINPUT ! | !---------! !----------------! | | ↓ tertiary hash → 0:M-1 ↓ +--------+------------\ +--------+------------\ ↓ ↓ ↓ ↓ ↓ ↓ !----! !----! !------! !----! !----! !------! !HF_0! !HF_1! .. !HF_M-1! !IF_0! !IF_1! .. !IF_M-1! !----! !----! !------! !----! !----! !------! ↑ ↑ N N !-------------------! !----------! !----------! ! MATERIALIZEDTABLE ! ! operand-2! .. ! operand-n! !-------------------! !----------! !----------! If M > N, we cannot have open all chunk files at the same time, so in each chunk file we have this structure: +-------+ | | rows from set 0 +-------+ : +-------+ | | rows from set S-1 +-------+ If we need more M than N, M will be a multiple of N as well as a multiple of 2, since N is also chosen a multiple of two (currently 128). So, the physical tmp file contains several logical chunk files. For the HF chunks, we in addition have several generations of these: each round of processing appends a new generation (more updated) version of the chunks. For a 2 operand set operation, we have three generations: 1. the initial row sets from the in-memory hash map (A' spread over M chunks) 2. updated sets with the rest of the left operand (C deduplicated and spread over M chunks) 3. updated sets after we have processed the right operand We keep track of the read and write positions on the tmp files, cf. methods HashJoinChunk::SetAppend and HashJoinChunk::ContinueRead. This enables reading back rows from the generation last written, and the writing of a new generation at the tail of the chunk file. More set operands than two adds further generations, one for each extra operand. * Algorithm 1. The in-memory hash map can hit its memory limit when we read the left set operand (block) after having read A rows, resulting in A' rows in in-memory hash map. If we do not hit the limit, we are done, no spill to disk is required. Note: Spill can never happen when we read operand 2..n since operand 1 of INTERSECT and EXCEPT determines the maximum rows in the result set and hence the maximal size of the in-memory hash map. So, we will have established the spill-over storage *before* reading of operands 2..n starts. 2. Before looking at operand 2..n, we need to finish processing the remaining rows in the left operand, cf. the details below: 3. When we hit limit, we: Determine number N of chunk files based on the estimated number of rows in operand 1 (the left operand). As mentioned, if number of chunks needed (M) > maxOpenFiles, we still allow this but will keep open only a subset s at any one time, presuming worst case of no deduplication, i.e. A'==A. In this case, M == N * S, but M can be as low as 2 (M << N). This is performed in the method `compute_chunk_file_sets' and `initialize_first_HF_chunk_files'. 3.a) For all file sets s in 1..S: - rehash with tertiary hash and write A' to files HF-{0..N-1} all rows in in-mem hash map. Save the computed primary hash value in the hash column, so we do not need to compute it over again when we read HF-k into hash map again. This is done in method `spread_hash_map_to_HF_chunk_files'. HF chunk file sets are now in generation one. - When s contains hash for offending row, write the offending row |A|+1 that did't fit the in-memory hash map to IF-k in s. (aka. m_offending_row) Note these rows (A') have been de-duplicated down to A' and counters set accordingly. 3.b) For all file sets s in 1..S: 3.b.1) read the rest of the left input (or re-read them via REMAININGINPUT if s>1), hash and write to destination file IF-k the rows which, based on its tertiary hash value, have index k in the current set. If s is the first file set AND S>1 and row didn't go to a file in s, also save input row to file REMAININGINPUT since we need it for another file set (since we cannot replay the source). See method `save_rest_of_operand_to_IF_chunk_files' and `reset_for_spill_handling'. At this point we have the rest of the input rows B (that that have not been matched against HFs) in IF-{0..N-1}. HF rows already are unique and have set operation counters already set based on first part of input rows that did fit in memory (so we have no need to "remember" that part of input except as initialized counters): only the remaining input rows (from operand 1) are of concern to us now. From here on, the logic is driven from the read_next_row. The set counter logic is still handled by process_row_hash. Most of the machinery for reading, writing and switching chunk files are driven by a state machine from read_next_row, (almost) invisible to process_row_hash, except for a simplified handling when we re-enter HF rows into the hash map ready to process operand 2..n, cf. call to `load_HF_row_into_hash_map': these rows have already been de-duplicated and the hash table will not grow in size compared to operand one (intersect and except can't increase result set size), so we can use a shorter logic path. 3.c) For each s in 1..S do For each pair of {HF-k, IF-k} in s do 3.c.1) Read HF-k into hash map: optimization: use saved hash value Cf. ReadingState::SS_READING_LEFT_HF 3.c.2) Read rows from IF-k, continuing hash processing of operand one. Cf. ReadingState::SS_READING_LEFT_IF. If hash map overflows here, we recover by changing to de-duplicating via the tmp table (we re-initialize it with a non-unique index on the hash field in the row in handle_hash_map_full). This overflow means we cannot fit even 1/M-th of set of unique rows in input set of operand 1 in memory). If row estimates are reasonably good, it should not happen. For details on secondary overflow recovery, see handle_hash_map_full and comments in materialize_hash_map, and logic in read_next_row_secondary_overflow. 3.c.3) We are done with pair {HF-k, IF-k}, append hash map to HF-k and empty in-memory hash map, cf. `append_hash_map_to_HF'. We are done with operand 1, and we have min(M,N) HF files with unique rows (incl counters) on disk in one or more sets, in generation two. 4.a) For each operand 2..n do 4.a.0) Empty all IFs and REMAININGINPUT. For each s in S do 4.a.1) Read input operand (from block or REMAININGINPUT if s>1), hash to IF-k, and write. If s==1 AND S>1 also save input row to file REMAININGINPUT since we need them for the next file set s, cf. save_operand_to_IF_chunk_files. 4.a.2) Similar to same as 3.c, except with right side counter logic cf. states ReadingState::SS_READING_RIGHT_{HF,IF}. 5) We now have min(N,M) HF files with unique rows sets (incl set logic counters) on disk (generation three), all operands have been processed. For each HF-k read it and write to MATERIALIZEDTABLE.
using anonymous_namespace{composite_iterators.cc}::SpillState::hash_map_type = ankerl::unordered_dense::segmented_map< ImmutableStringWithLength, LinkedImmutableString, ImmutableStringHasher> |
|
private |
For a chunk file pair, an array of counts indexed by m_current_chunk_file_set.
|
strong |
|
inline |
|
private |
We are done processing a {HF, IF} chunk pair.
The results are in the in-memory hash map, which we now append to the current HF chunk file, i.e. m_chunk_files[offset].build_chunk; clear the in-memory hash map, and make the HF chunk file ready for reading of what we now append.
|
inlinestaticprivate |
|
inlinestaticprivate |
|
private |
Compute sizing of and set aside space for the on-disk chunks and their associated in-memory structures, based on the row estimate taken from Operand::m_estimated_output_rows.
Also save away the offending row (the one that we read, but we couldn't put into the hash map) so that we can write it to an IF chunk later.
This could be 1 too high, if we managed to insert key but not value, but never mind.
Set aside space for current generation of chunk row counters. This is a two dimensional matrix. Each element is allocated in initialize_first_HF_chunk_files
|
inline |
|
inline |
|
inlineprivate |
hash modulo m_num_chunks optimized calculation
bool SpillState::init | ( | const Operand & | left_operand, |
hash_map_type * | hash_map, | ||
size_t | rows_in_hash_table, | ||
size_t | read_rows_before_dedup, | ||
MEM_ROOT * | hash_map_mem_root, | ||
TABLE * | t | ||
) |
Initialize the spill to disk processing state with some variables.
left_operand | the left-most operand in a N-ary set operation |
hash_map | the in-memory hash map that overflowed, causing the spill to disk |
rows_in_hash_table | the number of rows in the hash map |
read_rows_before_dedup | the number of rows read from the left operand before de-duplicating into the hash map |
hash_map_mem_root | the mem_root used for allocating space for the hash map's keys and values |
t | the materialized table that receive the result set of the set operation |
|
private |
Initialize REMAININGINPUT tmp file for replay of input rows for chunk file sets 2..S
int SpillState::read_next_row | ( | const Operand * | current_operand | ) |
Given current state of spill processing, return the next row up for inserting into or matching against the hash map.
current_operand | the operand (query block) we are currently reading from |
0 | OK |
-1 | End of records |
1 | Error |
int SpillState::read_next_row_secondary_overflow | ( | ) |
Given current state of secondary overflow processing, return the next row up for inserting into or matching against the index in the result table (we no longer use hashing, having fallen back on de-duplicating via index in resulting output table.
First, return the row which caused the overflow as row #1. Next, we read the rest of the IF rows of the current chunk we were processing when the secondary overflow occured. Finally, we read all remaining left side IF chunks, if any, which haven't been matched with their corresponding HF chunk, i.e. we do not need to read IF files that have already been matched up with their corresponding HF chunk files prior to the secondary overflow, if any.
Processing of right operand(s) will proceed as for non-hashed de-duplication (similarly to what is done for UNION), and is not handled here. Cf. secondary_overflow_handling_done which completes secondary overflow handling and reverts to normal non hashed de-duplication for operands 2..n.
0 | OK |
-1 | End of records |
1 | Error |
|
inline |
|
private |
|
inlinestatic |
bool SpillState::save_offending_row | ( | ) |
|
private |
Prepare to read from REMAININGINPUT
If we have more than one chunk file set, we need the input rows (that we couldn't write to set 0) again for writing to the next sets, so save in REMAININGINPUT
If we have more than one chunk file set, we need the input rows (that we couldn't write to set 0) again for writing to the next sets, so save in REMAININGINPUT
Rewind all IF chunk files and possibly REMAININGINPUT.
|
inlineprivate |
|
inline |
Getter, cf. comment for m_secondary_overflow
.
|
inline |
|
inline |
bool SpillState::simulated_secondary_overflow | ( | bool * | spill | ) |
|
inline |
Inquire spill handling state.
|
private |
The initial hash map that overflowed will be spread over the determined number of chunk files, cf.
Spread the contents of the hash map over the HF files.
initialize_next_HF_chunk_files
All rows for chunk file set 0 precede all rows for chunk file set 1 etc, so we can later retrieve all rows belonging to each file set by scanning only a section of each chunk file.
TODO: this matrix of counts is allocated on normal execution MEM_ROOT
. If this space usage is seen as excessive, we could get rid of it by instead putting sentinel rows in a chunk at the start of each new chunk file set. That way, we can know when we have read all rows belonging to a chunk file set (instead of relying on this counter matrix).
Reset number at end of each set, so we can determine number of rows for each set in a chunk file, cf. m_row_counts above.
|
inlineprivate |
|
inlineprivate |
|
inline |
bool SpillState::write_completed_HFs | ( | THD * | thd, |
const Operands & | operands, | ||
ha_rows * | stored_rows | ||
) |
Write the contents of the final generation of HD chunks to the materialized table which will hold the result set of the set operation.
TODO: avoid materializing more rows than required if LIMIT is present TODO: stream rows as soon as final generation of a HF chunk file is ready?
thd | Current session state | |
operands | The operands of the set operation | |
[out] | stored_rows | Will be incremenented with the number of produced rows |
bool SpillState::write_HF | ( | THD * | thd, |
size_t | set, | ||
size_t | chunk_idx, | ||
const Operands & | operands, | ||
ha_rows * | stored_rows | ||
) |
Used to write a complete (or incomplete in the case of secondary overflow) HF chunk to the materialized tmp table.
Will handle spill to disk if needed.
thd | Session state | |
set | The set for which to write a chunk | |
chunk_idx | The chunk for which to write rows | |
operands | The operands of the set operation | |
[out] | stored_rows | Incremented with the number of row written from the specified chunk to the materialized tmp table |
bool SpillState::write_partially_completed_HFs | ( | THD * | thd, |
const Operands & | operands, | ||
ha_rows * | stored_rows | ||
) |
Write the contents of the HD chunks that were completed when a secondary memory overflow has occurred.
In the general case it is a mix of 1. and 2. generation HF chunks.
thd | Current session state | |
operands | The operands of the set operation | |
[out] | stored_rows | Will be updated with the written rows |
String anonymous_namespace{composite_iterators.cc}::SpillState::m_buffer |
|
private |
Array to hold the list of chunk files on disk in case we degrade into on-disk set EXCEPT/INTERSECT.
Maximally kMaxChunks can be open and used at one time.
size_t anonymous_namespace{composite_iterators.cc}::SpillState::m_chunk_offset {0} |
|
private |
The current chunk under processing. 0-based.
|
private |
The index of the chunk pair being read, incremented before use.
|
private |
The current row no (1-based) in a chunk being read, incremented before use.
|
private |
The in-memory hash map that overflowed.
We will use it also during spill phase, so we need a pointer to it.
|
private |
The mem_root of m_hash_map. We need it for reusing its space.
|
private |
Modify for each operator in a N-ary set operation to avoid initial chunks filling up right away due to row order in previous operation.
|
staticconstexprprivate |
|
private |
The materialized table we are eventualy writing the result of the set operation to.
|
private |
The number of chunk file sets needed to process all m_num_chunks chunks.
|
private |
The number of chunks needed after rounding up to nearest power of two.
It may be larger thank HashJoinIterator::kMaxChunks in which case m_no_of_chunk_file_sets > 1.
struct anonymous_namespace{composite_iterators.cc} { ... } anonymous_namespace{composite_iterators.cc}::SpillState::m_offending_row |
Keeps the row that was just read from the left operand when we discovered that we were out of space in the in-memory hash map.
Save it for writing it to IF-k.
|
private |
At the time of overflow: how many rows have we read from left operand.
|
private |
Used if m_no_of_chunk_file_sets > 1 so we can replay input rows from operands over sets 1..S-1, i.e.
not used for rows from set 0. Not used if we only have one chunk file set.
|
private |
Temporary space for (de)serializing a row.
Cf also m_offending_row.m_buffer for a similar dedicated space.
|
private |
A matrix of counters keeping track of how many rows have been stashed away in the chunk files for each set in each chunk file of the current generation.
Used to allow us to read back the correct set of rows from each chunk given the current m_current_chunk_file_set. It is indexed thus: m_row_counts[ chunk index ][ set index ]
|
private |
At the time of overflow: how many rows from left operand are in hash map after deduplication.
|
private |
If true, we have seen memory overflow also during spill handling.
This is because a HF chunk won't fit in memory, i.e. the computation we made to ensure it would fit, was not sufficient to make it so. This can be because table cardinality statistics is not up to date, or data density is very skewed. In this case we fall back on using tmp table unique key for de-duplicating.
size_t anonymous_namespace{composite_iterators.cc}::SpillState::m_set {0} |
|
private |
|
private |
|
private |
|
private |
If not SS_NONE, we have detected an overflow in the in-memory hash map while reading the left(-most) operand of an INTERSECT or EXCEPT operation and are ready for reading next row from an operand (left or right).
|
private |
Cached value for {m_materialized_table}.
|
private |
bool anonymous_namespace{composite_iterators.cc}::SpillState::m_unsaved {true} |