WL#2241: Hash join

Affects: Server-8.0   —   Status: Complete

This worklog aims to implement hash join as a way of executing equi-joins in MySQL.

Example:

 CREATE TABLE t1 (col1 INT);
 CREATE TABLE t2 (col1 INT);
 ... insert data ...
 SELECT * FROM t1 JOIN t2 ON (t1.col1 = t2.col1);
 SELECT * FROM t1 JOIN t2 ON (t1.col1 = t2.col1 + 2);

The above queries should be able to execute using hash join.

F-1: If the join condition contains at least one equi-join condition, the server should execute the join using hash join.

F-2: Hash join must accept a join with no join conditions (Cartesian product).

F-3: Hash join must respect the system variable "join_buffer_size". That is, hash join should not use more memory per join than the value of "join_buffer_size".

F-4: The server must accept/recognize the two new optimizer hints "HASH_JOIN" and "NO_HASH_JOIN".

F-5: The server must accept/recognize the new optimizer switch "hash_join". If the value is "off", the optimizer must not use hash join as the join algorithm.

NF-1: A query executed using hash join should be equal to or faster than executing the same query with block nested loop.

Contents


How does hash join work

This worklog aims to implement hash join for inner joins, with spill to disk if the data does not fit in the given memory. We will first explain how in-memory hash join works, and then explain the on-disk hash join. The descriptions below will assume a simple equi-join between two tables, such as this:

 CREATE TABLE t1 (foo INT);
 CREATE TABLE t2 (bar INT);
 ...
 SELECT * FROM t1 JOIN t2 on (t1.foo = t2.bar);

Classic hash join

The classic hash join algorithm consists of two phases; the build phase and the probe phase.

  1. One of the two tables to be joined is designated as the "build" table. This table is read into a in-memory hash table where the hash value is calculated from the equi-join attribute. This is also known as the build phase. Let us say that we have the following query:

     SELECT * FROM t1 JOIN t2 on (t1.foo = t2.bar);
    
    If "t1" is designated as the build table, the in-memory hash table is created using the value from "t1.foo" as the hash key.
  2. The other table in the join (the one that isn't designated as the build input) is designated as the probe input. After the build phase is complete, we read a single row from the probe input. A hash key is calculated using the join attribute ("t2.bar" in the above case), and this hash key is used to perform a lookup in the in-memory hash table. For each match found, a result row is produced. This is done for every row in the probe input, and this phase is called the probe phase.

This join is completed by scanning both inputs only once, but it requires that the entire build input fits into memory. If the build input doesn't fit into memory, it can be solved by:

  1. Read as much as possible from the build input into main memory
  2. Run the entire probe phase reading the whole probe input
  3. Clear the in-memory hash table
  4. Jump back to step one as long as there is more data in the build input.

This is clearly not ideal as we can risk reading the probe input many times. This is where the on-disk hash join can be applied:

on-disk hash join

  1. Start out by partitioning both the probe table and the build table out to several smaller files on disk. The number of partitions should be calculated so that a single partition file from the build table can be fully loaded into the in-memory hash table. Deciding which partition/file a row should be put in is decided by a hash function on the join attribute, just as in the classic hash join.

  2. When all rows are partitioned out to smaller files on disk, load the first partition from the build partitions into the in-memory hash table, and run the probe phase by scanning the first partition from the probe partitions. As the rows are distributed using the same hash function in both the build and probe table, we know that matching rows will be located in the same corresponding partitions/files. Note that it's _very_ important to choose a different hash function for the hash table than the hash function used to partition the tables. If we use the same hash function for both the hash table and when distributing rows to the files on disk, we will end up with a _very_ bad hash table.

  3. When the first pair of partitions is processed, clear the in-memory hash table and load the partition file from the build table. Do the probe phase using the second partition from the probe table. Repeat until all partitions are processed.

Note that if we have a skewed data set, a single partition from the build table might not fit into memory. In this case, the algorithm does the following:

  1. Read as much as possible from the partition into main memory
  2. Run the entire probe phase reading the whole probe partition
  3. Clear the in-memory hash table
  4. Jump back to step one as long as there is more data in the build partition.

Hash join in MySQL

As mentioned, this worklog aims to implement hash join for inner joins where applicable.

When will it be used

As the first step, we will simply replace Block Nested Loop with hash join whenever possible. In the case of a simple two-table join, hash join can be used if we have at least one equi-join condition. Consider the below queries:

  CREATE TABLE t1 (t1_1 INT, t1_2 INT);
  CREATE TABLE t2 (t2_1 INT, t2_2 INT)

  1) SELECT * FROM t1 JOIN t2 ON (t1.t1_1 = t2.t2_1);
  2) SELECT * FROM t1 JOIN t2 ON (t1.t1_1 = t2.t2_1 AND t1.t1_2 = t2.t2_2);
  3) SELECT * FROM t1 JOIN t2 ON (t1.t1_1 = t2.t2_1 AND t2.t2_2 > 43);
  4) SELECT * FROM t1 JOIN t2 ON (t1.t1_1 + t1.t1_2 = t2.t2_1);
  5) SELECT * FROM t1 JOIN t2 ON (FLOOR(t1.t1_1 + t1.t1_2) = CEIL(t2.t2_1 = 
t2.t2_2));

In all of the queries above, hash join can be used to execute the joins because

  • Each query has at least one equi-join condition between the two tables.
  • Each side of the join condition only refers to their corresponding table. As

in query 4 and 5, each side of the join condition doesn't have to be a simple column. It can be a more complex expression, as long as the expression only contains columns from its corresponding table.

In a multi-table join, the same conditions must hold. Consider the following query and it's possible query plan:

  CREATE TABLE t1 (col1 INT, col2 INT);
  CREATE TABLE t2 (col1 INT, col2 INT);
  CREATE TABLE t3 (col1 INT, col2 INT);

  SELECT * FROM
    t1
  JOIN
    t2 ON (t1.col1 = t2.col1)
  JOIN
    t3 ON (t2.col1 = t3.col1 AND t1.col2 = t3.col2);


   (HJ) - t2.col1 = t3.col1 AND t1.col2 = t3.col2
    /\
   /  \
  |   t3
  |
 (HJ) - t1.col1 = t2.col1
  /\  
 /  \
t1  t2

Here we have a three-table join where each join is processed using a hash join. In both join node, hash join can be used since one side of the join condition refers to "earlier" (left) tables while the other side refers only to the right table.

Any extra conditions that aren't equi-join conditions are attached as filters after the join. As an example, here is a query that contains a non equi-join condition and a possible execution plan;

  CREATE TABLE t1 (col1 INT, col2 INT);
  CREATE TABLE t2 (col1 INT, col2 INT);
  CREATE TABLE t3 (col1 INT, col2 INT);

  SELECT * FROM
    t1
  JOIN
    t2 ON (t1.col1 = t2.col1 AND t1.col2 < t2.col2)
  JOIN
    t3 ON (t2.col1 = t3.col1);


    (HJ) - t2.col1 = t3.col1
     /\
    /  \
   |   t3
   |
(filter) - t1.col2 < t2.col2
   |
  (HJ) - t1.col1 = t2.col1
   /\  
  /  \
 t1  t2

When will it NOT be used

If a join doesn't contain any equi-join condition, hash join will not be used. In those cases, it will fall back to block nested loop using the old executor. As an example, the below query will not be able to use hash join as the join algorithm:

 SELECT * FROM t1 JOIN t2 ON (t1.col1 < t2.col1);

Also note that the below query will not be able to execute using hash join since the join between t2 and t3 doesn't have an equi-join condition. Thus, the entire query will be executed using the old executor where hash join isn't available:

 SELECT * FROM t1
   JOIN t2 ON (t1.col1 = t2.col1)
   JOIN t3 ON (t2.col1 < t3.col1);

User visible changes

There are mainly three things users can do to affect wheter or not hash join is used, and _how_ hash join is used; optimizer hint, optimizer switch and "join_buffer_size". With optimizer hint and optimizer switch, the user can control whether or not hash join can be used.

If hash join has been selected as the join algorithm, join_buffer_size affects _how_ the join is executed. If the build input (the left side of the join) fits into the given join buffer size, the entire join is executed in memory. However, if the build input does not fit into the given join buffer size, the build and probe input will be partitioned out to smaller chunk files on disk. This is also known as the GRACE hash join, and is a more efficient way of executing hash join on large inputs. But note that what we aim to implement is a hybrid hash join where we try to do everything in memory; GRACE hash join as described in the literature spills to disk right away.

In order to see whether or not hash join is chosen as the join algorithm, the user can use EXPLAIN FORMAT=tree to see the execution plan.

mysql> mysql> EXPLAIN FORMAT=tree SELECT * FROM t1, t2 WHERE t1.col1 = t2.col1;
+-----------------------------------------------------------------------------------------+
| EXPLAIN                                                                                 |
+-----------------------------------------------------------------------------------------+
| -> Inner hash join (t2.col1 = t1.col1)
    -> Table scan on t1
    -> Table scan on t2
 |
+-----------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

Neither FORMAT=traditional or FORMAT=JSON will print out if hash join will be used, since we only know for sure when creating the iterator tree whether or not hash join actually will be used. That is too late for FORMAT=[traditional|JSON].

Performance schema

All data that is allocated for the hash table in hash join iterator is instrumented using performance schema. The user can look at the memory usage by inspecting the event name "memory/sql/hash_join" in the memory_summary_* tables:

mysql> select * from memory_summary_global_by_event_name where event_name like "%hash_join%"\G
*************************** 1. row ***************************
                  EVENT_NAME: memory/sql/hash_join
                 COUNT_ALLOC: 139
                  COUNT_FREE: 139
   SUM_NUMBER_OF_BYTES_ALLOC: 2577684
    SUM_NUMBER_OF_BYTES_FREE: 2577684
              LOW_COUNT_USED: 0
          CURRENT_COUNT_USED: 0
             HIGH_COUNT_USED: 27
    LOW_NUMBER_OF_BYTES_USED: 0
CURRENT_NUMBER_OF_BYTES_USED: 0
   HIGH_NUMBER_OF_BYTES_USED: 358580
1 row in set (0.00 sec)

If a hash join starts to spill out to disk, the file usage can be inspected by looking at the various file_* tables in performance schema. All files created by hash join has the event name "wait/io/file/sql/hash_join":

mysql> select * from file_summary_by_event_name where event_name like "%hash%"\G
*************************** 1. row ***************************
               EVENT_NAME: wait/io/file/sql/hash_join
               COUNT_STAR: 90
           SUM_TIMER_WAIT: 900042640
           MIN_TIMER_WAIT: 0
           AVG_TIMER_WAIT: 10000205
           MAX_TIMER_WAIT: 267632555
               COUNT_READ: 35
           SUM_TIMER_READ: 79219890
           MIN_TIMER_READ: 0
           AVG_TIMER_READ: 2263240
           MAX_TIMER_READ: 10019380
 SUM_NUMBER_OF_BYTES_READ: 271578
              COUNT_WRITE: 35
          SUM_TIMER_WRITE: 516922895
          MIN_TIMER_WRITE: 0
          AVG_TIMER_WRITE: 14769175
          MAX_TIMER_WRITE: 267632555
SUM_NUMBER_OF_BYTES_WRITE: 271578
               COUNT_MISC: 20
           SUM_TIMER_MISC: 303899855
           MIN_TIMER_MISC: 0
           AVG_TIMER_MISC: 15194860
           MAX_TIMER_MISC: 63246820
1 row in set (0.00 sec)

Ideally, hash join shouldn't spill out to disk. One way to prevent it from happening is to increase the variable join_buffer_size, so that the left table fits in memory.

It's also important to know that since hash join may create files on disk, the variable "max_open_files" may stop the join if the hash join creates too many files. Each hash join iterator will create at most 128 files. But if you have a multi-table join, it's possible to reach the limit of max_open_files. There are mainly two ways to avoid reaching this limit;

  1. Ensure that you have a big enough join_buffer_size so that the hash join doesn't spill to disk
  2. Increase the variable max_open_files.

Contents


Hashing

As the name "hash join" implies, hashing is a core part of the join algorithm. This section will describe the hashing strategy used.

The build input from the join will be loaded into a hash table. In this worklog, we have chosen to use std::unordered_multimap as the hash table implementation. It's a general purpose hash table that is available on all platforms, and matches our requirements:

  • Must support multiple values with the same key
  • Must support hash-based lookup
  • Must be available on all platforms

The hash table is wrapped in a row buffer class ("HashJoinBuffer") that contains rows from one or more tables. The key in the hash table is retrieved from the join condition. Consider the following trivial example:

 SELECT t1.data FROM t1 JOIN t2 ON (t1.key = t2.key);

Let us say that the table "t1" is stored in the row buffer. The hash table key in this case will be the value found in "t1.key", since that is the join condition that belongs to "t1". If we have multiple equalities, they will be concatenated together in order to form the hash table key. The hash table key is a simple structure that consists of a data pointer and a length, and we rely on xxHash64 as the hash function. xxHash64 was chosen due to its speed and hash quality (ref. https://github.com/rurban/smhasher).

xxHash64 expects a input seed, which gives us two options; always use a fixed seed, or generate a new random seed for each HashJoinIterator. For simplicity, we have chosen to provide a fixed seed. Note that we need two seeds; one seed when calculating the hash for the hash table, and a different seed when calculating which chunk file on disk we should write the row to in case of on-disk hash join.

In order to store a row in the row buffer, we implement the class "BufferRow". A BufferRow can hold a single row from a table, or possibly a joined row that consists of data from multiple table. In general, a BufferRow holds three things:

1) Null-flags for all the columns it holds. 2) The row-id from each table. This is only stored if QEP_TAB::keep_current_rowid is set. 3) The actual data from the columns

The class exposes two main functions; "StoreFromTableBuffers" and "LoadIntoTableBuffers". The former stores the row that lies in the tables' record buffers (record[0]) into the BufferRow, and the latter restores the data in the BufferRow back to the tables' record buffers. The columns to be stored into the BufferRow is determined by the read_set of each table. For each column marked in the read_set, we are using Field::pack() to create a format suitable for storage. A MEM_ROOT for allocation must be provided when storing a row, and in most cases the MEM_ROOT in the row buffer should be provided. Note that in order to create a BufferRow, it is expected that the row to be stored is located in the table's record buffer. The BufferRow can later be restored in the table's record buffer, and we use Field:unpack() to put the data back in the record buffer.

Some basic profiling done with Intel i7 Haswell and GCC 8.2 shows that a majority of the time is used on actually constructing the hash table. It would be an future improvement to replace std::unordered_multimap with a more performant hash table, as the literature suggests that there are better alternatives (see for instance https://probablydance.com/2017/02/26/i-wrote-the-fastest-hashtable/).

Partitioning and buffer size

The size of the row buffer is controlled by the adjustable variable "join_buffer_size". If the row buffer goes full during the build phase, we partition the rest of the build input out to several smaller files on disk. We try to determine the number of files such that any file on disk from the build input can be read completely into the row buffer without running out of memory. The formula for finding the number of files is the following;

  // Get the estimated number of rows produced by the join.
  const size_t rows_produced_by_join = QEP_TAB::->position()->prefix_rowcount;

  // Get how many rows we managed to put in the row buffer, and reduce it by a
  // given factor.
  const double reduction_factor = 0.9;
  const size_t rows_in_hash_table = row_buffer.Rows() * reduction_factor;

  // See how many rows we expect to find in the build input
  const size_t remaining_rows = rows_produced_by_join - rows_in_hash_table;

  // Finally, the number of files needed is the number of remaining rows divided
  // by the number of rows the row buffer can hold.
  const size_t files_needed = remaining_rows / rows_in_hash_table;

Note that we have a "reduction factor" for the number of rows in the hash table. This is a small safeguard that will help us ensure that we don't end up in a situation where we can almost fit the entire file on disk into the hash table, but a few rows is missing. If that happens, we must scan the probe file twice. We would much rather end up with one extra pair of files on disk instead of risking reading the probe files twice.

New classes and functions

This section will explain the most important new classes and functions that are added in this worklog.

HashJoinRowBuffer

The HashJoinRowBuffer is a class that takes care of storing rows (BufferRows) in an in-memory hash table. The HashJoinRowBuffer holds the rows read from the build input, and the key inserted into the hash table is extracted from the join condition. Consider the following join:

SELECT * FROM t1 INNER JOIN t2 ON (t1.col1 = t2.col1);

If t1 is designated as the build input, the keys in the hash table is the value in t1.col1. The keys are hashed using xxHash64, and note that the hash table must support multiple keys with the same value. All of the data in the row buffer is allocated on a MEM_ROOT, and the MEM_ROOT is maintained internally by the row buffer class.

The HashJoinRowBuffer has the following interface:

  • HashJoinRowBuffer(const TableCollection &tables, size_t max_mem_available);
    • Construct a row buffer that will hold the data given by "tables", and at most "max_mem_available" bytes of data.
  • bool Init(std::uint32_t hash_seed);
    • Initialize the row buffer with the given seed to be used in the xxHash64 hashing.
  • void Clear(std::uint32_t hash_seed);
    • Clears the row buffer of all data.
  • StoreRowResult StoreRow(const std::vector<Item_func_eq *> &join_conditions);
    • Store the rows that currently lies in the tables' record buffers, where the key is extracted from the given join conditions.
  • void LoadRange(const Key &key);
    • Prepare the row buffer for reading, by loading all rows that matches the given key.
  • bool Next();
    • Get the next matching row in the row buffer.
  • BufferRow *GetCurrentRow() const;
    • Return a pointer to the current matching row

BufferRow

A BufferRow is a class that holds all data for a row, where one row consits of data from one or more tables. It simply consits of a data pointer and the data length, and the memory is allocated as a contiguos block of memory. This makes it very easy to dump a BufferRow out to file when necessary. Under the hood, this class uses the method Field::pack() to get the data in a format suitable for storage, and Field::unpack() to restore the data back to the field.

The class has the following interface:

  • bool StoreFromTableBuffers(const TableCollection &tables, MEM_ROOT *mem_root);
    • Takes the row that currently lies in the tables record buffers and store it in this object. The data is allocated on the supplied MEM_ROOT.
  • void LoadIntoTableBuffers(const TableCollection &tables);
    • Takes the data in this object and puts it back to the tables record buffer.
  • const uchar *data() const;
    • Returns a pointer to the data
  • size_t data_length() const;
    • Returns the length of the data.

HashJoinChunk

A HashJoinChunk is a class that represents a file on disk, where rows can be stored. Internally, the class uses the IO_CACHE structure to write and read to/from disk. IO_CACHE holds a small internal buffer that is used to buffer up data before flushing to disk, and this buffer size is adjustable through the Init()-function of the HashJoinChunk class.

The class has the following interface:

  • bool Init(size_t file_buffer_size);
    • Initialize the chunk file, and set the IO_CACHE with a buffer size of "file_buffer_size"
  • ha_rows num_rows() const;
    • Return the number of rows that this chunk file holds
  • bool WriteRowToChunk(const hash_join_buffer::TableCollection &tables);
    • Write the row that lies in the tables' record buffer out to this chunk file.
  • void PositionFile(ha_rows row_index);
    • Position the chunk file for read at the given row index.
  • bool PutNextRowInTableBuffers(const hash_join_buffer::TableCollection &tables);
    • Take the row that the chunk file is positioned at and put it back to the tables' record buffer. The file position is advanced by one row.
  • bool PrepareForRead();
    • Flush all the file contents to disk. This must be called after we are done writing out data to the chunk file.

HashJoinIterator

We will implement one new iterator, the HashJoinIterator. It takes in two iterators as its input; the left input and the right input. The most important thing to note is that the build phase (building the hash table, and possibly writing the build input to chunk files on disk) happens in Init() of the iterator. Also, the left input is always used as the build input in the hash join.

The iterator constructor will filter out the tables and columns it will use, and store those in internal structures. What columns the iterator actually needs is determined by the read_set of each table. If the read_set is completely blank, the entire table is filtered out as well.

Adding the hash join iterator to the iterator execution engine

Hash join will only be available with the new iterator execution engine. When the optimizer is building the iterator tree (in create_iterators()) and a BNL is encountered, we call JOIN_CACHE::can_be_replaced_with_hash_join() to see if we can replace the BNL with a hash join. The function can_be_replaced_with_hash_join() looks at the condition that is attached to the table (QEP_TAB) and see if it can find any equi-join conditions. If it can find at least one, the function returns true and the optimizer continues to build the iterator tree. If it can't find any equi-join condition, we fall back to the old executor. Also, note that if there aren't any conditions attached to the table, we can still execute hash join; the join condition is "true".

When joins are connected together (in ConnectJoins()), we take all the equi-join conditions and send them to the hash join iterator. Any join conditions that cannot be handled by the hash join iterator (i.e. non equi-join) are attached as filters after the hash join iterator.