MySQL  8.0.27
Source Code Documentation
hash_join_iterator.h File Reference

An iterator for joining two inputs by using hashing to match rows from the inputs. More...

#include <stdio.h>
#include <cstdint>
#include <memory>
#include <vector>
#include "my_alloc.h"
#include "my_base.h"
#include "my_table_map.h"
#include "prealloced_array.h"
#include "sql/hash_join_buffer.h"
#include "sql/hash_join_chunk.h"
#include "sql/immutable_string.h"
#include "sql/item_cmpfunc.h"
#include "sql/join_type.h"
#include "sql/mem_root_array.h"
#include "sql/pack_rows.h"
#include "sql/row_iterator.h"
#include "sql_string.h"

Go to the source code of this file.

Classes

struct  ChunkPair
 
class  HashJoinIterator
 

Detailed Description

An iterator for joining two inputs by using hashing to match rows from the inputs.

The iterator starts out by doing everything in-memory. If everything fits into memory, the joining algorithm for inner joins works like this:

1) Designate one input as the "build" input and one input as the "probe" input. Ideally, the smallest input measured in total size (not number of rows) should be designated as the build input.

2) Read all the rows from the build input into an in-memory hash table. The hash key used in the hash table is calculated from the join attributes, e.g., if we have the following query where "orders" is designated as the build input:

SELECT * FROM lineitem INNER JOIN orders ON orders.o_orderkey = lineitem.l_orderkey;

the hash value will be calculated from the values in the column orders.o_orderkey. Note that the optimizer recognizes implicit join conditions, so this also works for SQL statements like:

SELECT * FROM orders, lineitem WHERE orders.o_orderkey = lineitem.l_orderkey;

3) Then, we read the rows from the probe input, one by one. For each row, a hash key is calculated for the other side of the join (the probe input) using the join attribute (lineitem.l_orderkey in the above example) and the same hash function as in step 2. This hash key is used to do a lookup in the hash table, and for each match, an output row is produced. Note that the row from the probe input is already located in the table record buffers, and the matching row stored in the hash table is restored back to the record buffers where it originally came from. For details around how rows are stored and restored, see comments on pack_rows::StoreFromTableBuffers.

The size of the in-memory hash table is controlled by the system variable join_buffer_size. If we run out of memory during step 2, we degrade into a hybrid hash join. The data already in memory is processed using regular hash join, and the remainder is processed using on-disk hash join. It works like this:

1) The rest of the rows in the build input that did not fit into the hash table are partitioned out into a given amount of files, represented by HashJoinChunks. We create an equal number of chunk files for both the probe and build input. We determine which file to put a row in by calculating a hash from the join attribute like in step 2 above, but using a different hash function.

2) Then, we read the rows from the probe input, one by one. We look for a match in the hash table as described above, but the row is also written out to the chunk file on disk, since it might match a row from the build input that we've written to disk.

3) When the entire probe input is read, we run the "classic" hash join on each of the corresponding chunk file probe/build pairs. Since the rows are partitioned using the same hash function for probe and build inputs, we know that matching rows must be located in the same pair of chunk files.

The algorithm for semijoin is quite similar to inner joins:

1) Designate the inner table (i.e. the IN-side of a semijoin) as the build input. As semijoins only needs the first matching row from the inner table, we do not store duplicate keys in the hash table.

2) Output all rows from the probe input where there is at least one matching row in the hash table. In case we have degraded into on-disk hash join, we write the probe row out to chunk file only if we did not find a matching row in the hash table.

The optimizer may set up semijoins with conditions that are not pure join conditions, but that must be attached to the hash join iterator anyways. Consider the following query and (slightly modified) execution plan:

SELECT c FROM t WHERE 1 IN (SELECT t.c = col1 FROM t1);

-> Hash semijoin (no condition), extra conditions: (1 = (t.c = t1.col1)) -> Table scan on t -> Hash -> Table scan on t1

In this query, the optimizer has set up the condition (1 = (t.c = t1.col1)) as the semijoin condition. We cannot use this as a join condition, since hash join only supports equi-join conditions. However, we cannot attach this as a filter after the join, as that would cause wrong results. We attach these conditions as "extra" conditions to the hash join iterator, and causes these notable behaviors:

a. If we have any extra conditions, we cannot reject duplicate keys in the hash table: the first row matching the join condition could fail the extra condition(s).

b. We can only output rows if all extra conditions pass. If any of the extra conditions fail, we must go to the next matching row in the hash table.

c. In case of on-disk hash join, we must write the probe row to disk after we have checked that there are no rows in the hash table that match any of the extra conditions.

If we are able to execute the hash join in memory (classic hash join), the output will be sorted the same as the left (probe) input. If we start spilling to disk, we lose any reasonable ordering properties.

Note that we still might end up in a case where a single chunk file from disk won't fit into memory. This is resolved by reading as much as possible into the hash table, and then reading the entire probe chunk file for each time the hash table is reloaded. This might happen if we have a very skewed data set, for instance.

When we start spilling to disk, we allocate a maximum of "kMaxChunks" chunk files on disk for each of the two inputs. The reason for having an upper limit is to avoid running out of file descriptors.

There is also a flag we can set to avoid hash join spilling to disk regardless of the input size. If the flag is set, the join algorithm works like this:

1) Read as many rows as possible from the build input into an in-memory hash table. 2) When the hash table is full (we have reached the limit set by the system variable join_buffer_size), start reading from the beginning of the probe input, probing for matches in the hash table. Output a row for each match found. 3) When the probe input is empty, see if there are any remaining rows in the build input. If so, clear the in-memory hash table and go to step 1, continuing from the build input where we stopped the last time. If not, the join is done.

Doing everything in memory can be beneficial in a few cases. Currently, it is used when we have a LIMIT without sorting or grouping in the query. The gain is that we start producing output rows a lot earlier than if we were to spill both inputs out to disk. It could also be beneficial if the build input almost fits in memory; it would likely be better to read the probe input twice instead of writing both inputs out to disk. However, we do not currently do any such cost based optimization.

There is a concept called "probe row saving" in the iterator. This is a technique that is enabled in two different scenarios: when a hash join build chunk does not fit entirely in memory and when hash join is not allowed to spill to disk. Common for these two scenarios is that a probe row will be read multiple times. For certain join types (semijoin), we must take care so that the same probe row is not sent to the client multiple times. Probe row saving takes care of this by doing the following:

  • If we realize that we are going to read the same probe row multiple times, we enable probe row saving.
  • When a probe row is read, we write the row out to a probe row saving write file, given that it matches certain conditions (for semijoin we only save unmatched probe rows).
  • After the probe input is consumed, we will swap the probe row saving write file and the probe row saving read file, making the write file available for writing again.
  • When we are to read the probe input again, we read the probe rows from the probe row saving read file. This ensures that we i.e. do not output the same probe row twice for semijoin. Note that if the rows we read from the probe row saving read file will be read again (e.g., we have a big hash join build chunk that is many times bigger than the available hash table memory, causing us to process the chunk file in chunks), we will again write the rows to a new probe row saving write file. This reading from the read file and writing to a new write file continues until we know that we are seeing the probe rows for the last time.

We use the same methods as on-disk hash join (HashJoinChunk) for reading and writing rows to files. Note that probe row saving is never enabled for inner joins, since we do want to output the same probe row multiple times if it matches muliple rows from the build input. There are some differences regarding when probe row saving is enabled, depending on the hash join type (see enum HashJoinType):

  • IN_MEMORY: Probe row saving is never activated, since the probe input is read only once.
  • SPILL_TO_DISK: If a build chunk file does not fit in memory (may happen with skewed data set), we will have to read the corresponding probe chunk multiple times. In this case, probe row saving is enabled as soon as we see that the build chunk does not fit in memory, and remains active until the entire build chunk is consumed. After the probe chunk is read once, we swap the probe row saving write file and probe row saving read file so that probe rows will be read from the probe row saving read file. Probe row saving is deactivated once we move to the next pair of chunk files.
  • IN_MEMORY_WITH_HASH_TABLE_REFILL: Probe row saving is activated when we see that the build input is too large to fit in memory. Once the probe iterator has been consumed once, we swap the probe row saving write file and probe row saving read file so that probe rows will be read from the probe row saving read file. As long as the build input is not fully consumed, we write probe rows from the read file out to a new write file, swapping these files for every hash table refill. Probe row saving is never deactivated in this hash join type.

Note that we always write the entire row when writing to probe row saving file. It would be possible to only write the match flag, but this is tricky as long as we have the hash join type IN_MEMORY_WITH_HASH_TABLE_REFILL. If we were to write only match flags in this hash join type, we would have to read the probe iterator multiple times. But there is no guarantee that rows will come in the same order when reading an iterator multiple times (e.g. NDB does not guarantee this), so it would require us to store match flags in a lookup structure using a row ID as the key. Due to this, we will reconsider this if the hash join type IN_MEMORY_WITH_HASH_TABLE_REFILL goes away.