WL#12168: Add Partition Information into the Binary Log

Affects: Server-8.0   —   Status: Complete   —   Priority: Medium

Motivation
----------
Splitting a large table into small pieces may lead to better performance if only
part of the data is accessed because less information is retrieved or updated.

Taking into account these possible optimizations, replication will inject into the
binary log information on table partition and make it visible through the
different tools such as "mysqlbinlog".

In particular, an external tool may need needs to take into account Innodb
Partitions
to support change propagation on partitioned tables.


User Stories
------------
1. As Replication Developer, I want to have the necessary information in the
   binary log to change replication and speed up the applier by updating the
   different partitions concurrently.

2. As Replication Developer, I want to have the necessary information in the
   binary log to create filters to replicate only a sub-set of the partitioned 
   table.

3. As Replication User, I want to have the necessary information in the binary 
   log to create my own tools to exploit partitioned data. For example we can 
   offload the
   partitioned table to a secondary engine, with support for change propagation,
   efficient dropping/truncating of partitions, and possibly fast queries on 
   individual partitions.

References
----------

https://dev.mysql.com/doc/refman/8.0/en/partitioning-types.html
Functional Requirements
-----------------------

FR1. If a table has been partitioned, the binary log must contain partition
information when running the replication in RBR mode. The partition information
will be present for INSERT/UPDATE/DELETE operation on a table.

FR2. Backwards compatibility must be preserved meaning that cross-version
replication must not be affected by this work.

FR3. The mysqlbinlog utility shall print the partitioning information, like
     the partition_id(of both before-image and after-image, in case of update).

Non-Functional Requirements
---------------------------

NFR1 - The solution should try to minimize space footprint and performance
drawbacks that the extract information will generate in the binary log and impact
Changes to the Interface Specification
--------------------------------------

I-1: No new files.
I-2: No new syntax will be added.
I-3: No new commands
I-4: No new tools.
I-5: No impact on existing functionality.

High-Level Specification
------------------------

The external tool will maintain the consistency with the running server using
replication, and to be able to replicate the rows information to the right
partition/sub-partition it is required to have the partition information stored
in the binlog file.

The partition information is exposed to the server layer through the table
structure which is available to the binary log infra-structure.

So we should:
1) Extend the ROWS_EVENT to contain the information about partitiona and sub-
   partition if there is any.

2) Change the prepare row event logic to create a new row event when the table 
   is same but the partition or sub-partition is different.

3) For any INSERT/UPDATE/DELETE Event affecting more than one partition there 
   will be only one TABLE_MAP_EVENT.

4) If a query affect N different partition then that many INSERT/UPDATE/DELETE 
   event will be created
   Like if a query affects 2 partitions then there will be 1 Table_map_event and
   2 Update_log_event logged in the binary log.

5) For an UPDATE event, the partition_id of both the source and target partition 
   should be included in the binary log.

A new class Extra_row_info is introduced inside Rows_event class. Extra_row_info
will contain information about extra row data from ndb (extra_row_ndb_info) and
partition info in form of integer variables

At any given time a Rows_event can have both, one or none of ndb_info and
partition_info present as part of Rows_event. In case both ndb_info and
partition_info are present then below will be the order in which they will be
stored.

        +----------+--------------------------------------+
        |type_code |        extra_row_ndb_info            |
        +--- ------+--------------------------------------+
        | NDB      |Len of ndb_info |Format |ndb_data     |
        | 1 byte   |1 byte          |1 byte |len - 2 byte |
        +----------+----------------+-------+-------------+

        In case of INSERT/DELETE
        +-----------+----------------+
        | type_code | partition_info |
        +-----------+----------------+
        |   PART    |  partition_id  |
        | (1 byte)  |     2 byte     |
        +-----------+----------------+

        In case of UPDATE
        +-----------+------------------------------------+
        | type_code |        partition_info              |
        +-----------+--------------+---------------------+
        |   PART    | partition_id | source_partition_id |
        | (1 byte)  |    2 byte    |       2 byte        |
        +-----------+--------------+---------------------+

This is the format for any information stored as extra_row_info. type_code is
not a part of the class Extra_row_info as it is a constant values used at the
time of serializing and decoding the event.

Range of partition_id:
---------------------

1) Only partition and no sub_partition

   If there are N partitions then the partition_id will range from
   [0, N-1].

2) Both partition and sub_partition

  If there are N partitions and M sub_partitions the the partition_id
  will range from [0, MxN -1].

UPGRADE/DOWNGRADE and CROSS-VERSION REPLICATION
-----------------------------------------------
NEW -> OLD shall work, even when NEW generates partition information.

OLD -> NEW shall work, even when NEW expects partition information.
Extending Rows_event
--------------------

A new class Extra_row_info is introduced inside Rows_event class. Extra_row_info
will contain information about extra row data from ndb (extra_row_ndb_info) and
partition info in form of integer variables partition_id and source_partition_id.

diff --git a/libbinlogevents/include/rows_event.h
b/libbinlogevents/include/rows_event.h
index f3732d5..524b6ab 100644
--- a/libbinlogevents/include/rows_event.h
+++ b/libbinlogevents/include/rows_event.h
@@ -890,13 +911,73 @@ class Rows_event : public Binary_log_event {
+  class Extra_row_info {
+   private:
+    static const int UNDEFINED{INT32_MAX};
+    /** partition_id for a row in a partitioned table */
+    int m_partition_id;
+    /**
+      It is the partition_id of the source partition in case
+      of Update_event, the target's partition_id is m_partition_id.
+      This variable is used only in case of Update_event.
+    */
+    int m_source_partition_id;
+    /** The extra row info provided by NDB */
+    unsigned char *m_extra_row_ndb_info;
+
+   public:
+    Extra_row_info()
+        : m_partition_id(UNDEFINED),
+          m_source_partition_id(UNDEFINED),
+          m_extra_row_ndb_info(nullptr) {}
+
+    Extra_row_info(const Extra_row_info &) = delete;

Two new typecodes are introduced which will be used while encoding and decoding
the information
inside the container Extra_row_info.

+/**
+  This is the typecode defined for the different elements present in
+  the container Extra_row_info, this is different from the format information
+  stored inside extra_row_ndb_info at EXTRA_ROW_INFO_FORMAT_OFFSET.
+*/
+enum class enum_extra_row_info_typecode { NDB = 0, PART = 1};

The encoding will happen inside the thd constructor of Rows_log_event.

diff --git a/sql/log_event.cc b/sql/log_event.cc
index ac51167..481f390 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc

+int get_part_id(partition_info *part_info) {
+  uint32_t part_id = binary_log::Rows_event::Extra_row_info::UNDEFINED;
+  longlong func_value;
+  if (part_info != nullptr) {
+    part_info->get_partition_id(part_info, &part_id, &func_value);
+  }
+  return static_cast(part_id);
+}
+
@@ -7335,7 +7347,36 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg,
                   /* Set extra row data to a known value */
                   extra_row_info = set_extra_data(extra_data););
 #endif
+  partition_info *part_info = tbl_arg->part_info;
+  if (part_info != nullptr) {
+    auto part_id = get_part_id(part_info);
+    if (part_id != binary_log::Rows_event::Extra_row_info::UNDEFINED) {
+      m_extra_row_info.set_partition_id(part_id);
+    }
+  }
+  /* Copy Extra ndb data from thd into new event */
+  uint8 extra_row_ndb_info_len = 0;
+  if (extra_row_ndb_info) {
     /* Copy Extra data from thd into new event */
-    uint8 extra_data_len = extra_row_info[EXTRA_ROW_INFO_LEN_OFFSET];
-    assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
-
-    m_extra_row_data =
-        (uchar *)my_malloc(key_memory_log_event, extra_data_len, MYF(MY_WME));
+    extra_row_ndb_info_len = extra_row_ndb_info[EXTRA_ROW_INFO_LEN_OFFSET];
+    DBUG_ASSERT(extra_row_ndb_info_len >= EXTRA_ROW_INFO_HEADER_LENGTH);

-    if (likely(m_extra_row_data != NULL)) {
-      memcpy(m_extra_row_data, extra_row_info, extra_data_len);
-    }
+    m_extra_row_info.set_ndb_info(extra_row_ndb_info, extra_row_ndb_info_len);
   }

During the decoding we will use the type code written in the buffer, and will
use that to decode the different blocks of Extra_row_info container.

--- a/libbinlogevents/src/rows_event.cpp
+++ b/libbinlogevents/src/rows_event.cpp
@@ -390,6 +391,8 @@ Rows_event::Rows_event(const char *buf, const
Format_description_event *fde)

+        case enum_extra_row_info_typecode::PART: {
+          uint32_t part_id_placeholder = 0;
+          READER_TRY_SET(part_id_placeholder, read);
+          m_extra_row_info.set_partition_id(part_id_placeholder);
+          if (event_type == UPDATE_ROWS_EVENT ||
+              event_type == UPDATE_ROWS_EVENT_V1 ||
+              event_type == PARTIAL_UPDATE_ROWS_EVENT) {
+            READER_TRY_SET(part_id_placeholder, read);
+            m_extra_row_info.set_source_partition_id(part_id_placeholder);
+          }
+          break;
+        }

How will a separate event be generated for different partition

In the method binlog_prepare_pending_rows_event()

diff --git a/sql/binlog.cc b/sql/binlog.cc
index 1254521..6827afa 100644
--- a/sql/binlog.cc
+++ b/sql/binlog.cc
 template 
 Rows_log_event *THD::binlog_prepare_pending_rows_event(
     TABLE *table, uint32 serv_id, size_t needed, bool is_transactional,
-    RowsEventT *hint MY_ATTRIBUTE((unused)),
-    const unsigned char *extra_row_info) {
+    const unsigned char *extra_row_info, uint32 source_part_id) {
@@ -10361,16 +10361,9 @@ Rows_log_event *THD::binlog_prepare_pending_rows_event(
+    partition_info *part_info = table->part_info;
+    if (part_info != nullptr) {
+      auto part_id = get_part_id(part_info);
+    }
+
   Rows_log_event *pending = binlog_get_pending_rows_event(is_transactional);

   if (unlikely(pending && !pending->is_valid())) DBUG_RETURN(NULL);
@@ -10381,8 +10396,8 @@ Rows_log_event *THD::binlog_prepare_pending_rows_event(
       pending->get_general_type_code() != general_type_code ||
       pending->get_data_size() + needed > binlog_row_event_max_size ||
       pending->read_write_bitmaps_cmp(table) == false ||
-      !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
-                                      extra_row_info)) {
+      !(pending->m_extra_row_info.compare_extra_row_info(
+          extra_row_info, part_id, source_part_id))) {


This means if the data in the container extra_row_info in the last cached event
is not same as the current ndb_info + partition_info then create a new
Rows_log_event, now for two different partition this value will be different and
hence one row_event per partition will be created.