WL#3549: Binlog: compression

Affects: Server-8.0   —   Status: Complete

EXECUTIVE SUMMARY

This worklog implements binary log compression, making use of the popular compression algorithm ZSTD. Compression is done per transaction. Compressed transactions remain compressed when they are replicated between master's and slaves. This means that both storage and network footprint is reduced for handling binlogs.

USER/DEV STORIES

  • As a MySQL user I want to be able to tell the server to compress the binary logs, so I save disk space.
  • As a MySQL user I want to be able to tell the server to compress binary logs before sending them out to the slaves so that I save network bandwidth.
  • As a MySQL user I want to be able to tell the server to compress binary logs so that I reduce the amount of storage space used to store my backups (MEB and binary log archives).

SCOPE

The work described in this document does:

  • design and implement per transaction compression.
  • design and implement per encoding and decoding of compressed transactions in libbinlogevents
  • design and implement per transaction decompression in mysqlbinlog
  • design and implement per transaction decompression in server commands that inspect transactions in the binary log, such as SHOW BINLOG EVENTS
  • implement support for ZSTD compression only.

The work described on this document does not:

  • design, implement or do any work on other tools such as MEB w.r.t. compression.
  • design, implement or do any other work to change, optimize or refactor the binary log format.

LIMITS

  • Non-transactional engines are not supported.
  • Transactions that end up in incident events are not compressed.
  • Compressed transactions cannot exceed (MAX_ALLOWED_PACKET - bytes used for event header) bytes of compressed payload.
  • Compresses only ROW based format transactions.

THIRD PARTY LIBRARIES

  • ZSTD (1.3.3)

REFERENCES

  • BUG#46435: compress binary files
  • BUG#48396: Compressed Binlog
  • BUG#80770: Compression of row-based event
  • BUG#90785: Caching compressed binlog events for reuse...
    • The work on this worklog addresses the compression part, but not the caching one. Regardless, the motivation for the cache was to avoid compressing the same event multiple times (one per dump thread). This worklog addresses that part. In addition, the cache also avoids IO. This patch does not do that. However, if the compressed events are read from disk, there is a good chance that the data is already cached in the OS buffer. Therefore, on average the penalty should be the one of issuing a system call.
  • Add option to compress events in the binary log
  • Add option to specify compression library
    • This adds support for ZSTD in the server.

Functional Requirements

  • FR1. A new boolean system variable SHALL control whether transactions are compressed or not in the binary log. The variable is named binlog_transaction_compression. If set it SHALL enable compression, otherwise no compression is performed. If compression is enabled, a transaction SHALL be compressed using ZSTD while finalizing the capture cache, i.e., right before committing.

  • FR2. binlog_transaction_compression SHALL NOT be modifiable inside a transaction context. If the user tries to modify its value while inside a transaction an error SHALL be emited:

    • ER_VARIABLE_NOT_SETTABLE_IN_TRANSACTION
  • FR3. binlog_transaction_compression SHALL NOT be modifiable unless the user has the following privileges granted: SUPER, SYSTEM_VARIABLES_ADMIN or SESSION_VARIABLES_ADMIN. Otherwise, an error shall be emited:

    • ER_SPECIFIC_ACCESS_DENIED_ERROR
  • FR4. Stats for compressed and uncompressed transactions SHALL be recorded by the receiver thread and exposed in the performance schema table binary_log_transaction_compression_stats under the log type: RELAY. Uncompressed transactions SHALL present compression type: NONE. Compressed transactions SHALL present compression type: ZSTD.

  • FR5. Stats for compressed and uncompressed transactions SHALL be recorded by the session and applier threads and exposed in the performance schema table binary_log_transaction_compression_stats under the log type: BINARY, but only if the binary log is enabled. Uncompressed transactions SHALL present compression type: NONE. Compressed transactions SHALL present compression type: ZSTD.

  • FR6. Truncating binlog_transaction_compression_stats SHALL remove the entries from the performance schema table. No DML is allowed on this table. If a DML is attempted an error SHALL be emited:

    • ER_WRONG_PERFSCHEMA_USAGE
  • FR7. binlog_transaction_compression_stats SHALL NOT present any records if the binary log is disabled.

  • FR8. Records in binlog_transaction_compression_stats SHALL be volatile. If the server restarts then previous records are lost.

  • FR9. mysqlbinlog SHALL printout, as comments, the compression algorithm used, the compression size and the uncompressed size if the --verbose switch is used.

  • FR10. mysqlbinlog SHALL uncompress the contents of a Transaction_payload log event and print decompressed events one-by-one.

  • FR11. SHOW BINLOG EVENTS SHALL uncompress the contents of a Transaction_payload log event and print the decompressed events one-by-one.

  • FR12. SHOW RELAYLOG EVENTS SHALL uncompress the contents of a Transaction_payload log event and print the decompressed events one-by-one.

  • FR13. Compressed transactions size in bytes MUST be smaller than the decompressed transaction size.

  • FR14. A transaction containing incident events SHALL not be compressed, even if compression is enabled.

  • FR15. A transaction that modifies both non-transactional and transaction tables SHALL NOT be compressed, even if compression is enabled.

  • FR16. Control events SHALL NOT be compressed if compression is enabled. The full list of control events is depicted in the HLS section.

  • FR17. Compressed transactions SHALL NOT be decompressed by the sender thread before sending them to the secondary/slave.

  • FR18. Compressed transactions SHALL NOT be decompressed by the receiver thread when receiving them from the primary/master.

  • FR19. Events inside a transaction_payload event SHALL exhibit the same end_log_pos, the end_log_pos of the Transaction_payload event.

  • FR20. Replicating a Transaction_payload event from a NEW master to an OLD slave SHALL result in a replication error:

    • ER_SLAVE_CORRUPT_EVENT
  • FR21. Compressed transactions SHALL NOT be compressed on the slave if the applier threads are running with binlog_transaction_compression = OFF.

  • FR22. Uncompressed transactions SHALL NOT be compressed by the replication applier threads if they are running with binlog_transaction_compression = OFF.

  • FR23. Uncompressed transactions SHALL be compressed by the replication applier threads if they are running with binlog_transaction_compression = ON.

  • FR24. Compressed transactions SHALL be compressed again by the replication applier threads if they are running with binlog_transaction_compression = ON.

Non-functional requirements

  • NFR1. With compression enabled, the throughput of a sysbench WO workload on the master SHALL NOT be affected more than 3% when compared to the same setup without compression.

  • NFR2. With compression enabled, the transaction latency of sysbench WO workload on the master SHALL NOT increase more than 3%.

  • NFR3. With compression enabled, when running sysbench WO workload, the applier SHALL NOT degrade throughput by more than 3% when compared to the same setup without compression.

  • NFR4. With compression enabled, when running sysbench WO workload, the applier SHALL NOT be slower applying the relay-log by more than 3% when compared to the same setup without compression.

  • NFR5. Events of a compressed transaction (except for the GTID event) SHALL not be checksummed.

  • NFR6. Binary log recovery SHALL handle compressed transactions as well and thus there SHALL NOT be any difference when compression is used w.r.t. recoverability.

  • NFR7. Compressed transactions are sent to Group Replication compressed as well. This will be transparent and SHALL not affect the functionality of Group Replication.

  • NFR8. Compressed transactions are sent as a single event (transaction_payload event). Therefore, the slave SHALL observe the SLAVE_MAX_ALLOWED_PACKET limit. If this limits is exceeded, then the existing error for that situation SHALL be emitted:

    • ER_RPL_LOG_ENTRY_EXCEEDS_SLAVE_MAX_ALLOWED_PACKET
  • NFR9. When semi-sync is enabled on the master and slave, the receiver thread SHALL send and acknowledgment when it receives the Transaction_payload event for the transaction.

  • NFR10. The Transaction_payload event SHALL be recognized by the transaction boundary parser on the receiver and no error SHALL be emited if it follows a GTID event.

  • NFR11. Regarding rollbacks on the applier, there SHALL be no difference between a compressed transaction and uncompressed transaction if there are errors while applying it.

  • NFR12. There SHALL NOT be any difference in the behavior of MASTER_POS_WAIT when dealing with compressed transactions.

High Level Specification

Summary of the Approach

  • Compression algorithm used is ZSTD.
  • We compress transaction per transaction.
  • Control events are not compressed.
  • DML and DDL related events are compressed, as they are part of the transaction.
  • The transaction id event - GTID or ANONYMOUS - is not compressed, only the payload of the transaction. I.e., the remaining events.
  • Transactions are compressed before being written to the binary log files on disk.
  • Transactions are replicated compressed, thence no need to recompress after reading from the binary log file and sending them out (e.g., when sending them out to the slave or when sending to the group in GR).
  • The receiver writes compressed transactions to the relay log.
  • Transactions remain compressed in the relay log.
  • The applier decompresses transactions only when applying them.
  • Compression stats can be fetched from a performance schema table.

Capture Details

COMPRESSION ALGORITHM. In this worklog we implement support for ZSTD compression. However, the low level design is open ended so that additional algorithms can be added later if needed (e.g., zlib or lz4). In addition, compression and decompression, as well as encoding and decoding of the compression envelope, is implemented in libbinlogevents, so that it is easier to decode/encode offline and extend support for additional compression formats.

COMPRESSION PIPELINE. We compress when finalizing the capture caches. This means that we do not have to bookkeep transaction savepoints and recompress everything once there is a rollback to a specific savepoint. This also means that we can compress in parallel. Each thread compresses its own cache before entering the binlog group commit pipeline.

  • POSSIBLE OPTIMIZATION. Note though that depending on performance hit, we may revisit this later on as a sub-worklog or a performance optimization for this work. The alternative would be to compress on-the-fly, while we capture changes. But then we need to cater for transaction savepoints.

    There are side-effects when compressing while finalizing the caches:

  • GCS COMPRESSION VS BINARY LOG TRANSACTION COMPRESSION: If the binary log transactions are already compressed when passed on to GR, then GCS compression is less interesting, since it will effectively compress mostly the GCS headers and not so much the transaction body.

UNCOMPRESSED EVENTS. The following control events are never part of a transaction payload, therefore, they are never compressed. That is OK, since these events are mostly rare or carry metadata for the compression work itself.

   UNKNOWN_EVENT = 0,
   START_EVENT_V3 = 1,
   STOP_EVENT = 3,
   ROTATE_EVENT = 4,
   SLAVE_EVENT = 7,
   FORMAT_DESCRIPTION_EVENT = 15,
   INCIDENT_EVENT = 26,
   HEARTBEAT_LOG_EVENT = 27,
   IGNORABLE_LOG_EVENT = 28,
   GTID_LOG_EVENT = 33,
   ANONYMOUS_GTID_LOG_EVENT = 34,
   PREVIOUS_GTIDS_LOG_EVENT = 35,
   TRANSACTION_CONTEXT_EVENT = 36,
   VIEW_CHANGE_EVENT = 37,

COMPRESSED EVENTS. The following events are compressed as part of the transaction payload.

  QUERY_EVENT = 2,
  INTVAR_EVENT = 5,
  APPEND_BLOCK_EVENT = 9,
  DELETE_FILE_EVENT = 11,
  RAND_EVENT = 13,
  USER_VAR_EVENT = 14,
  XID_EVENT = 16,
  BEGIN_LOAD_QUERY_EVENT = 17,
  EXECUTE_LOAD_QUERY_EVENT = 18,
  TABLE_MAP_EVENT = 19,
  WRITE_ROWS_EVENT_V1 = 23,
  UPDATE_ROWS_EVENT_V1 = 24,
  DELETE_ROWS_EVENT_V1 = 25,
  IGNORABLE_LOG_EVENT = 28,
  ROWS_QUERY_LOG_EVENT = 29,
  WRITE_ROWS_EVENT = 30,
  UPDATE_ROWS_EVENT = 31,
  DELETE_ROWS_EVENT = 32,
  XA_PREPARE_LOG_EVENT = 38,
  PARTIAL_UPDATE_ROWS_EVENT = 39,

COMPRESSED TRANSACTIONS ENVELOPE. Compressed transaction payload is added to a new event which becomes the container of such bytes. This new event is:

 TRANSACTION_PAYLOAD_EVENT = 40,

EVENT CHECKSUMS. Only the payload (Transaction_payload_event) is checksummed, if the checksum option is enabled (which is by default). The rest of the events inside of it are not checksummed, nor contain the extra bytes - e.g., padding - that would be used to store a checksum. Regular, uncompressed transactions will have all their events checksummed.

INCIDENT EVENTS. If the capture cache contains incident events, then compression shall be automatically skipped.

NON-TRANSACTIONAL EVENTS. If any non-transactional event was captured, then compression shall be automatically skipped.

Persistence Details

BINARY LOG. Compressed transactions are persisted in the binary log. They are persisted as a Transaction_payload_event.

RECOVERY. The binary log is used as the global recovery log in MySQL, and as such, its contents need to be inspected during recovery, to be able to roll forward any transaction that had been prepared in the storage engines, written to the binary log, but never committed before the server shutdown involuntarily. To fix this, the Transaction_payload_event shall be decompressed and decoded and its events inspected. In other words, the XIDs saved in the log will be collected from inside the Transaction_payload_event. Then the recovery procedure is the same as with uncompressed transactions.

Propagation Details

MASTER SENDER THREAD. The dump thread is not modified at all. It shall read regular events and send them out to the slave. This means that for full transactions, these will be sent in one go. It is so, because the Transaction_payload_event is handled as a regular event as well.

PROTOCOL COMPRESSION. Note that the communication protocol between master and slaves may have compression turned on at the protocol level. If sending out compressed transactions already, the protocol will have little opportunity to compress further.

GROUP REPLICATION SENDER. Group replication does not inspect the contents of transactions and as such it will receive the Transaction_payload_event as a stream of bytes and just send them out as they are.

GROUP REPLICATION COMPRESSION. Note that the group communication system between members of a group may have compression turned on. If sending out compressed transactions already, the communication protocol will have little opportunity to compress further.

MAX_ALLOWED_PACKET AND PAYLOAD FRAGMENTATION. In group replication, we now have support for large messages. These are split in smaller chunks when sending them out and the full packet is assembled back when receiving. For regular async replication, we observe the MAX_ALLOWED_PACKET limit.

  • IMPORTANT. Note though that with compression, the dump thread sends full transactions as a single packet also in asynchronous replication (the same behavior exists for GR). Thus large transactions (even compressed) may hit the limit.

  • POSSIBLE OPTIMIZATION. There is a potential optimization which would be to chain multiple payload events, each with its own set of complete and compressed events. This would result in a frag layer. If needed we can prepare for this and do it in a follow up worklog. The payload event has to be designed with this in mind.

SEMISYNC REPLICATION. The receiver thread acknowledges when it receives the Transaction_payload_log_event.

Receiver Details

DECOMPRESSION. The receiver thread does NOT decompress or do anything to the contents of the compressed transactions. It will just queue Transaction_payload_events as a regular event.

TRANSACTION BOUNDARY PARSER. The transaction boundary parser needs to be adjusted to reflect the new state transaction when the receiver handles a Transaction_payload_event. This means that the following transition has to happen when a Transaction_boundary_parser comes in: EVENT_PARSER_GTID -> EVENT_PARSER_NONE.

STATS. The receiver thread does not decompress, but it does parse the Transaction_payload_event header, to collect the stats to present in the performance schema table. This is similar to the way GTID timestamps are collected.

Applier Details

DECOMPRESSION. We make decompression happen in the worker threads for the MTS. By the time the Transaction_payload_event is applied, the applier thread decompresses the transaction payload, decodes event by event and applies them one-by-one.

APPLY PROCEDURE. For compressed transactions, from the multi-threaded applier scheduler point of view, it is the Transaction_payload_event that is applied. Therefore, if some errors is found while applying that event, then the full transaction will fail.

PHYSICAL POSITIONS. Compressed events share the same end_log_pos with its container (Transaction_payload_event).

  • Impact on MASTER_POS_WAIT. MASTER_POS_WAIT will wait for the group positions to be updated. This is done at transaction commit. Therefore, given that we do not allow compression while mixing non-trans and transactional events, MASTER_POS_WAIT shall wait for the transaction to have committed and the positions updated.

TRANSACTION IDENTIFIERS. No change whatsoever to the execution flow of GTID handling.

SQL_SLAVE_SKIP_COUNTER. The skip counter is operated by the coordinator thread. As such, the counter does not account for inner events inside the transaction payload event, meaning that if one event is skipped and that event happens to be the transaction payload event, then the entire transaction is skipped.

DATABASE PARALELL SCHEDULER. The coordinator calculates the entire set of databases that events inside of a transaction payload event touches. And then schedules the transaction. This is a slight difference in behavior. Without compression, the mapping of databases to transactions is done on an event per event, which can lead to more interleaving. With compression the mapping is done upfront, before the execution takes place.

Security Context

Nothing to specify.

Upgrade/Downgrade and Cross-version Replication

This worklog has impact on cross-version replication and downgrades. In the following text, we use NEW and OLD to refer to a server with WL#3549 in and a server without WL#3549.

  1. NEW -> OLD

    • An OLD server shall not be able to replicate from a NEW server that generates compressed transactions.
    • The rest of the restrictions when replicating from NEW to OLD shall still apply.
  2. OLD -> NEW

    • No impact, however, note that if the NEW server has binlog_transaction_compression=ON, then transactions replicated shall be compressed in NEW (and then propagated compressed as well to downstream slaves).

User Interface

System Variables

The following are new variables to be implemented, which allow the user to enable the feature and also control when it should kick in:

  • NAME: binlog_transaction_compression
  • VALUES: ON, OFF
  • DEFAULT: OFF
  • SCOPE: GLOBAL and SESSION
  • REPLICATED: NO
  • DYNAMIC: YES
  • PERSIST: PERSIST
  • PRIVILEGES REQUIRED: SUPER, SYSTEM_VARIABLES_ADMIN or SESSION_VARIABLES_ADMIN
  • DESCRIPTION: "This variable turns on or off transaction compression for the binary log"

This variable can only be changed if outside of a transaction. In other words, cannot be changed while inside a transactional context.

  • NAME: binlog_transaction_compression_level_zstd
  • VALUES: [1,22]
  • DEFAULT: 3
  • SCOPE: GLOBAL and SESSION
  • REPLICATED: NO
  • DYNAMIC: YES
  • PERSIST: PERSIST
  • PRIVILEGES REQUIRED: SUPER, SYSTEM_VARIABLES_ADMIN or SESSION_VARIABLES_ADMIN
  • DESCRIPTION: "Specifies the transaction compression level for ZSTD transaction compression in the binary log."

This variable can only be changed if outside of a transaction. In other words, cannot be changed while inside a transactional context.

Observability

Performance Schema

A new performance schema table shall show stats on compressed transactions in the binary log (i.e., an aggregate for all transactions, that were captured and eventually written out to the binary log, since the server was started or since the last TRUNCATE executed against the table).

TABLE: performance_schema.binary_log_transaction_compression_stats

FIELDS:

 Plugin_table table_binary_log_transaction_compression_stats::m_table_def(
   /* Schema name */
   "performance_schema",
   /* Name */
   "binary_log_transaction_compression_stats",
   /* Definition */
   " LOG_TYPE ENUM('BINARY', 'RELAY') NOT NULL"
   "   COMMENT \"The log type to which the transactions were written.\",\n"
   " COMPRESSION_TYPE VARCHAR(64) NOT NULL\n"
   "   COMMENT \"The transaction compression algorithm used.\",\n"
   " TRANSACTION_COUNTER BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"Number of transactions written to the log\",\n"
   " COMPRESSED_BYTES_COUNTER BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"The total number of bytes compressed.\",\n"
   " UNCOMPRESSED_BYTES_COUNTER BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"The total number of bytes uncompressed.\",\n"
   " COMPRESSION_PERCENTAGE SMALLINT SIGNED NOT NULL"
   "   COMMENT \"The compression ratio as a percentage.\",\n"
   " FIRST_TRANSACTION_ID TEXT"
   "   COMMENT \"The first transaction written.\",\n"
   " FIRST_TRANSACTION_COMPRESSED_BYTES BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"First transaction written compressed bytes.\",\n"
   " FIRST_TRANSACTION_UNCOMPRESSED_BYTES BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"First transaction written uncompressed bytes.\",\n"
   " FIRST_TRANSACTION_TIMESTAMP TIMESTAMP(6)"
   "   COMMENT \"When the first transaction was written.\",\n"
   " LAST_TRANSACTION_ID TEXT"
   "   COMMENT \"The last transaction written.\",\n"
   " LAST_TRANSACTION_COMPRESSED_BYTES BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"Last transaction written compressed bytes.\",\n"
   " LAST_TRANSACTION_UNCOMPRESSED_BYTES BIGINT UNSIGNED NOT NULL"
   "   COMMENT \"Last transaction written uncompressed bytes.\",\n"
   " LAST_TRANSACTION_TIMESTAMP TIMESTAMP(6)"
   "   COMMENT \"When the last transaction was written.\"\n",
   /* Options */
   " ENGINE=PERFORMANCE_SCHEMA",
   /* Tablespace */
   nullptr);

This table shall contain one row per compression algorithm (ZSTD or NONE) used and the log type (BINARY, RELAY) into which the compressed transaction was written into. This allows the user to calculate the compression ratio of the logs that it has persisted in the system.

This table is truncatable, so the user can reset the stats whenever he feels like it.

Regarding privileges, the same set of restrictions and privileges for other performance schema tables apply.

In case the binary log is disabled, this table shall not have any rows in it.

Errors

COMPRESSION ERRORS. There shall not be any new error codes. If a transaction cannot be compressed, the existing capture errors shall be emitted.

DECOMPRESSION ERRORS. There shall not be any new error codes. If a transaction cannot be decompressed, the existing errors shall be emited.

CAPTURE ERRORS. There shall not be any new error codes. If a transaction cannot be decompressed, the existing errors shall be emited.

Stages

Decompression. A new stage shall be introduced which states that a transaction is being decompressed with the following message:

"Decompressing transaction."

Compression. A new stage shall be introduced which states that a transaction is being compressed. The following stage message is to be displayed:

"Compressing transaction."

Progress

PERFORMANCE_SCHEMA.EVENTS_STAGES_CURRENT shall show progress through WORK_COMPLETED and WORK_ESTIMATED fields for the compress and decompress stages.

mysqlbinlog

METADATA. The mysqlbinlog tool is modified to printout as comments the compression algorithm used, the compression size, the uncompressed size. For example:

(...) transaction_compression_type=ZSTD transaction_compression_size=4512 transaction_uncompressed_size=1000175

COMPRESSED EVENTS end_log_pos. The mysqlbinlog tool is modified to printout the contents of the Transaction_payload_event. It shall decompress and decode the events inside and print them out as if they were regular events. This makes it possible to replay compressed transactions. Note that the compressed events shall all exhibit the same end_log_pos of the Transaction_payload_event event they belong to.

Deployment and Installation

No new files to package. No new files to install. No new public header file to be included in the mysql dev package.

Protocol

Client Protocol

No changes to the sever-to-server communication protocol. No changes to the client-server classic protocol. No changes to the client-server x-protocol.

Binary Log Format

There are changes to the binary log format. A new event is created. The new event is the Transaction_payload_log_event.

TRANSACTION_PAYLOAD_EVENT. This is the new event. It shall carry the payload of compressed transactions. This event is in fact, generic, as it carries a byte stream of events inside, that may not even be compressed. In the future, we may choose to use this event to send full uncompressed transactions between replication endpoints or change stream consumers.

Its internal header structure is described by the following specification.

There are 3 fields in this event. Each of the fields has a well defined tag which is encoded as well and a field size. The tag specifies the field and is encoded as a uint64_t. The size, specifies the amount of bytes used for that field to be encoded.

Fields:

TRANSACTION_PAYLOAD
    NAME: m_payload
    VALUES: bytes
    DEFAULT: nullptr
    ENCODING STYLE: raw bytes
    MIN SIZE ON THE WIRE: m_payload_size
    MAX SIZE ON THE WIRE: m_payload_size
    DESCRIPTION: The actual transaction compressed payload.
    CONDITIONAL: NO
    TYPE: const char *

TRANSACTION_PAYLOAD_COMPRESSION_TYPE:
    NAME: m_payload_compression_type
    VALUES: enumeration { NONE, ZSTD }
    DEFAULT: NONE
    ENCODING STYLE: network encoding
    MIN SIZE ON THE WIRE: 1 bytes
    MAX SIZE ON THE WIRE: 9 bytes
    DESCRIPTION: The compression algorithm specification. If no compression,
                 then this value is NONE
    CONDITIONAL: NO
    TYPE: uint64_t

TRANSACTION_PAYLOAD_UNCOMPRESSED_SIZE:
    NAME: m_payload_uncompressed_size
    VALUES: [1-ULONGMAX]
    DEFAULT: 0
    ENCODING STYLE: network encoding
    MIN SIZE ON THE WIRE: 0 bytes
    MAX SIZE ON THE WIRE: 9 bytes
    DESCRIPTION: The size of the compressed transaction payload decompressed.
    CONDITIONAL: YES, exists if m_compression_type != NONE
    TYPE: uint64_t

Therefore when compression is enabled, the GTID log event shall have the following fields:

+-----------------------------------------+
| field_type (1-9 bytes)                  |
+-----------------------------------------+
| field_size (1-9 bytes)                  |
+-----------------------------------------+
| m_payload (1 to N bytes)                |
+-----------------------------------------+
| field_type (1-9 bytes)                  |
+-----------------------------------------+
| field_size (1-9 bytes)                  |
+-----------------------------------------+
| m_compression_type (1 to 9 bytes)       |
+-----------------------------------------+
| field_type (1-9 bytes)                  |
+-----------------------------------------+
| field_size (1-9 bytes)                  |
+-----------------------------------------+
| m_uncompressed_size size (0 to 9 bytes) |
+-----------------------------------------+

This structure is similar to the Query_log_event post-header. It makes decoding of the event possible even if fields are missing or if one replicates from a new version to an old version and the payload event has extra fields in it that the old version is unaware (it will skip them).

Failure Model Specification

No implications to the current state of the art in terms of failures model.

CHANGES TO MYSQLBINLOG

mysqlbinlog needs to process the new Transaction_payload event. So in the main loop of the event processing, we add an entry so that this event is handled as well.

case binary_log::TRANSACTION_PAYLOAD_EVENT:
  ...

This also requires that Transaction_paylod_event implements the member function Log_event::print(FILE *, PRINT_EVENT_INFO *info) const.

CHANGES TO libbinlogevents

NEW EVENT: Transaction_payload_event

This new event is part of object model of libbinlogevents and holds an in-memory representation of a transaction payload and its compression attributes.

class Transaction_payload_event : public Binary_log_event {
 protected:
  const char *m_payload;
  uint64_t m_payload_size;
  transaction::compression::type m_compression_type;
  uint64_t m_uncompressed_size;

 (...)

CODEC API: Encoder/Decoder for the Transaction_payload_event

A new codec is created to encode/decode into and from the wire a Transaction_payload_event. These live in their own namespace. There is a generic API that can be implemented by different encoding and decoding mechanisms.

The Generic API:

#ifndef CODECS_BASE_INCLUDED
#define CODECS_BASE_INCLUDED

#include <utility>
#include "binary_log.h"

namespace binary_log {
namespace codecs {

/* Stateless */
class Codec {
 public:
  /**
   This member function decodes the data in buffer "from" (with
   length "size") into the binary log event "to". If this function
   returns an error, the first return member (size used) shall be
   undefined.

   @param from buffer to decode.
   @param size the size of the buffer.
   @param the log event to decode into. Note that 
          to.header()->get_type_code() must match the type code
          of the encoded data. Otherwise, an error shall be returned.

   @return first member - the size of the buffer uzed to decode. Second
           member - false on success, true otherwise
   */
  virtual std::pair<std::size_t, bool> decode(const unsigned char *from,
                                              std::size_t size,
                                              Binary_log_event &to) const = 0;

  /**
    Encode the given event "from" into the buffer provided. If this
    returns an error, the first return member (size used) shall be
    undefined.

    @param from The event to encode.
    @param to   The buffer to encode to.
    @param size The buffer size.

    @param first member - the size of the buffer uzed to encode. Second
           member - false on success, true otherwise.
   */
  virtual std::pair<std::size_t, bool> encode(const Binary_log_event &from,
                                              unsigned char *to,
                                              std::size_t size) const = 0;

  virtual ~Codec() {}
};

} // namespace codecs } // namespace binary_log

#endif

COMPRESSOR/DECOMPRESSOR API

A new namespace for transaction compression is established. This contains the APIs and implementations for compression and decompression of events.

#ifndef COMPRESSION_BASE_INCLUDED
#define COMPRESSION_BASE_INCLUDED

#include <cstddef>
#include <string>
#include <tuple>

namespace binary_log {
namespace transaction {
namespace compression {

enum type {
  /* No compression. */
  NONE = 0,

  /* ZSTD compression. */
  ZSTD = 1,
};

class CompDecomp {
protected:
  unsigned char *m_buffer;
  std::size_t m_buffer_capacity;
  unsigned char *m_buffer_cursor;

public:
  CompDecomp();
  virtual ~CompDecomp();

  /**
    The compression algorithm type. E.g., ZSTD or NONE. In the future
    other algorythms may also be supported (perhaps some user implements
    it or we do it), such as LZ4 or ZLIB.
    @return the compression type.
  */
  virtual type commpression_type_code() = 0;

  /**
  This member function SHALL open the compressor or decompressor.
  Every compressor/decompressor must be opened before it is utilized.

  @return false on success, true otherwise
  */
  virtual bool open() = 0;

  /**
  This member function SHALL close the compressor/decompressor.
  Every compressor/decompressor MUST be closed once the stream
  is over.
  */
  virtual bool close() = 0;

  /**
  This member function returns the size of the compressed/decompressed
  data.

  @return the size of the actual data.
  */
  virtual size_t size();

  /**
    This member function SHALL set the buffer into which the compressor or
    decompressor shall write the data after it processes the stream.

    @param buffer the buffer itself.
    @param size   the capacity of the buffer.

    @return true if something went wrong, false otherwise.
  */
  virtual bool set_buffer(unsigned char *buffer, std::size_t size);

  /**
    This member function SHALL return the buffer, the size of its data
    and its capacity.

    @return a tuple containing the buffer, size and capacity.
  */
  virtual std::tuple<unsigned char *, std::size_t, std::size_t> get_buffer();

  /**
  This member function resets the compressor/decompressor.
  */
  virtual void reset();
};

class Compressor : public CompDecomp {
public:

  /**
    This member function SHALL compress the data provided with the given
    length. Note that the buffer to store the compressed data must have
    already have been set through @c set_buffer.

    If the output buffer is not large enough an error shall be returned.
    The contents of the output buffer may still have been modified in
    that case.

    @param data the data to compress
    @param length the length of the data.
    @return false on success, true otherwise
  */
  virtual bool compress(const unsigned char *data, size_t length) = 0;
};

class Decompressor : public CompDecomp {
public:

  /**
    This member function SHALL decompress the data provided with the given
    length. Note that the buffer to store the compressed data must have
    already have been set through @c set_buffer.

    If the output buffer is not large enough an error shall be returned.
    The contents of the output buffer may still have been modified in
    that case.

    @param data the data to compress
    @param length the length of the data.
    @return false on success, true otherwise
  */
  virtual bool decompress(const unsigned char *data, size_t in_length) = 0;
};

}  // namespace compression
}  // namespace transaction
}  // namespace binary_log

#endif

ITERATORS API

Within the transaction compression namespace, there shall be an iterator that the programmer can use to wrap a compressed buffer and iterate over it without having to explcitly decompress everytime and then fetch the event. This iterator shall automatically decompress and provide the next event's encoded data.

#ifndef COMPRESSION_ITERATOR_INCLUDED
#define COMPRESSION_ITERATOR_INCLUDED

#include <control_events.h>
#include <event_reader.h>

namespace binary_log {
namespace transaction {
namespace compression {

class Iterator {
  private:
  const char *m_compressed_buffer;
  size_t m_compressed_buffer_size;
  binary_log::transaction::compression::Decompressor *m_decoder;
  const char *m_decompressed_buffer;
  size_t m_decompressed_buffer_size;
  binary_log::Event_reader *m_reader;

  public:
  Iterator(const char *buf, size_t buf_len, size_t uncompressed_len,
            binary_log::transaction::compression::type comp_algo);

  virtual ~Iterator();

  const char *begin();
  const char *end();
  const char *next();

  protected:
  const char *do_next();
};

}  // namespace compression
}  // namespace transaction
}  // namespace binary_log

#endif

CHANGES TO RECOVERY

Binary log recovery now has to take into account the fact that there are transaction payload log events. This means the recovery procedure needs to unpack (and decompress if needed) the payload of Transaction_payload events. Then process each event as it would normally do.

Therefore the bulk of the recovery remains the same, except that events in a transaction payload event are unpacked and processed.

For example, assuming that we have an iterator defined over a Binlog_file_reader object that automatically unpacks Transaction_payload events, we can:

static bool binlog_recover(Binlog_file_reader *binlog_file_reader,
                          my_off_t *valid_pos) {
  bool res = false;
  binlog::tools::Iterator it(binlog_file_reader);
  it.set_copy_event_buffer();

  /*
    The flag is used for handling the case that a transaction
    is partially written to the binlog.
  */
  bool in_transaction = false;
  int memory_page_size = my_getpagesize();
  {
    MEM_ROOT mem_root(key_memory_binlog_recover_exec, memory_page_size);
    memroot_unordered_set<my_xid> xids(&mem_root);
    /*
      Now process events in the queue. Queue is dynamically changed
      everytime we process an event. This may be a bit suboptimal
      since it adds an indirection, but it helps to generalize the
      usage of the transaction payload event (which unfolds into
      several events into the queue when it is processed).
    */
    for (Log_event *ev = it.begin(); !res && (ev != it.end()); ev = it.next()) {
      switch (ev->get_type_code()) {
        // may be begin, middle or end of a transaction
        case binary_log::QUERY_EVENT: {
          // starts a transaction
          if (!strcmp(((Query_log_event *)ev)->query, "BEGIN"))
            in_transaction = true;
          (...)
        }
      }
      (...)
    }
    res = res || (total_ha_2pc > 1 && ha_recover(&xids));
  }
  (...)

CHANGES TO BINLOG STREAM

We add a new Compressed_ostream that extends Basic_ostream. Into this stream a compressor is injected that will compress data as it is written to the stream.

class Compressed_ostream : public Basic_ostream {
private:
  binary_log::transaction::compression::Compressor *m_compressor;

public:
  Compressed_ostream();
  virtual ~Compressed_ostream();
  Compressed_ostream(const Compressed_ostream &) = delete;
  Compressed_ostream &operator=(const Compressed_ostream &) = delete;
  binary_log::transaction::compression::Compressor *get_compressor();
  void set_compressor(binary_log::transaction::compression::Compressor *);
  bool write(const unsigned char *buffer, my_off_t length) override;
};

This stream shall be used to seamlessly copy bytes from one ostream to another and compressing events while doing it.

CHANGES TO CAPTURE CACHES

The capture cache shall provide a new member function:

bool binlog_cache_data::compress(THD *thd)

This member function shall be called when the cache is finalized. Then it shall compress the cache and overwrite its contents with the new transaction contents. This shall be later flushed to disk after the GTID event.

Note that the compress function shall validate that all the requirements for doing compression are actually met. These requirements are described in the HLD section.

  • IMPORTANT if we decide to do the optimization that we compress on the fly (then we need to change this approach).

CHANGES TO SHOW BINLOG EVENTS

SHOW BINLOG EVENTS shall unpack and print the contents of Transaction_payload_event. Therefore, whenever a payload event shows up in the binlog stream, SHOW BINLOG EVENTS shall print it and then print each event that it finds inside of it.

CHANGES TO THE RECEIVER

The receiver thread shall decode the Transaction_payload_event and extract the value of the fields containing compression type, payload size, uncompressed size. This is then saved so that it can be displayed in the performance schema binary log compression stats table.

CHANGES TO PERFORMANCE SCHEMA

A new table is added:

Plugin_table table_binary_log_transaction_compression_stats::m_table_def(
  /* Schema name */
  "performance_schema",
  /* Name */
  "binary_log_transaction_compression_stats",
  /* Definition */
  " LOG_TYPE ENUM('BINARY', 'RELAY') NOT NULL COMMENT \"The log type to which the transactions were written.\",\n"
  " COMPRESSION_TYPE ENUM('ZSTD', 'NONE') NOT NULL COMMENT \"The transaction compression algorithm used.\",\n"
  " TRANSACTION_COUNTER BIGINT UNSIGNED NOT NULL COMMENT \"Number of transactions written to the log\",\n"
  " COMPRESSED_BYTES_COUNTER BIGINT UNSIGNED NOT NULL COMMENT \"The total number of bytes compressed.\",\n"
  " UNCOMPRESSED_BYTES_COUNTER BIGINT UNSIGNED NOT NULL COMMENT \"The total number of bytes uncompressed.\",\n"
  " COMPRESSION_PERCENTAGE SMALLINT UNSIGNED NOT NULL COMMENT \"The compression ratio as a percentage.\",\n"
  " LAST_TRANSACTION_ID TEXT COMMENT \"The last transaction written.\",\n"
  " LAST_TRANSACTION_COMPRESSED_BYTES BIGINT UNSIGNED NOT NULL COMMENT \"Last transaction written compressed bytes.\",\n"
  " LAST_TRANSACTION_UNCOMPRESSED_BYTES BIGINT UNSIGNED NOT NULL COMMENT \"Last transaction written uncompressed bytes.\",\n"
  " LAST_TRANSACTION_TIMESTAMP TIMESTAMP(6) COMMENT \"When the last transaction was written.\",\n"
  " PRIMARY KEY(LOG_TYPE, COMPRESSION_TYPE)",
  /* Options */
  " ENGINE=PERFORMANCE_SCHEMA",
  /* Tablespace */
  nullptr);

And here is the table descriptor:

PFS_engine_table_share table_binary_log_transaction_compression_stats::m_share =
{
    &pfs_truncatable_acl,
    &table_binary_log_transaction_compression_stats::create,
    NULL, /* write_row */
    table_binary_log_transaction_compression_stats::delete_all_rows,
    table_binary_log_transaction_compression_stats::get_row_count,
    sizeof(pos_t), /* ref length */
    &m_table_lock,
    &m_table_def,
    true, /* perpetual */
    PFS_engine_table_proxy(),
    {0},
    false /* m_in_purgatory */
};

Testing

Unit tests

We add unit tests for the codec and compressor/decompressors. These shall be put under unittest/gunit/libbinlogevents.

These tests shall be built and linked with libinlogstandalone library, so that they do not require building the entire server to run them.

MTR

We introduce MTR tests that test:

  • performance_schema.binary_log_transaction_compression_stats
  • @@binlog_transaction_compression
  • Replication of compressed transactions with single threaded applier
  • Replication of compressed transactions with single with multi-threaded applier
  • Group replication of compressed transactions.

PB2

Two new collections are introduced that run daily and weekly for the following test suites: - rpl - rpl_nogtid - rpl_gtid - binlog - binlog_nogtid - binlog_gtid - group_replication

There shall be tests that are incompatible with compression and these need to be adjusted or automatically skipped. Those that are to be skipped, should be annotated with not_binlog_transaction_compression_on.inc.

--let $_check_log_bin= query_get_value(SHOW GLOBAL VARIABLES LIKE 'log_bin', Value, 1)
if ($_check_log_bin == ON)
{
  if (`SELECT @@binlog_transaction_compression == 'ON'`)
  {
    --skip Test cannot run with binlog_transaction_compression turned ON
  }
}

CHANGES TO THE BOUNDARY PARSER

The boundary parser requires a new event boundary type. This new event boundary type shall be associated with a Transaction_payload_event. Once the parser needs to transition state because of a transaction_payload_event, the parser will verify if the current state is EVENT_PARSER_GTID. If not it reports an error. Otherwise it validates that the stream is fine.

  • IMPORTANT Note that if we do the potential optimization to have more than one transaction payload event (i.e., use segments), then the parser shall need to differentiate between Transaction_payload_event events that trigger transitions to the next transaction from those that trigger transitions to the next transaction_payload_event.

CHANGES TO CMAKE

CMake needs some changes. We need to include the new source files created and we need to link mysqlbinlog and the binlog library with zstd.

CHANGES TO THE FILE STRUCTURE

Binary Log

We move towards the setup where we have the following structure:

  sql
  |
  (...)
  |
  +-- binlog
  |   +-- global.cc
  |   +-- global.h
  |   +-- monitoring
  |   |   +-- context.cc
  |   |   +-- context.h
  |   +-- tools
  |       +-- iterators.cc
  |       +-- iterators.h
  • sql/binlog/global.cc|h - this is (global) context for the binary log.
  • sql/binlog/monitoring/ - this is (global) context for monitoring.
  • sql/binlog/tools/ - this hosts tools for the binlog.

We gradually start moving the pieces from one place to another.

Binlog Events

  libbinlogevents
  |
  (...)
  +-- include
  |   +-- binary_log.h
  |   +-- binlog_event.h
  |   +-- byteorder.h
  |   +-- codecs
  |   |   +-- base.h
  |   |   +-- binary.h
  |   |   +-- factory.h
  |   +-- compression
  |   |   +-- base.h
  |   |   +-- factory.h
  |   |   +-- iterator.h
  |   |   +-- zstd.h
  |   +-- control_events.h
  |   +-- debug_vars.h
  |   +-- event_reader.h
  |   +-- event_reader_macros.h
  |   +-- load_data_events.h
  |   +-- rows_event.h
  |   +-- statement_events.h
  |   +-- table_id.h
  |   +-- uuid.h
  |   +-- wrapper_functions.h
  +-- src
  |   +-- binary_log_funcs.cpp
  |   +-- binlog_event.cpp
  |   +-- CMakeLists.txt
  |   +-- codecs
  |   |   +-- binary.cpp
  |   |   +-- factory.cpp
  |   +-- compression
  |   |   +-- base.cpp
  |   |   +-- factory.cpp
  |   |   +-- iterator.cpp
  |   |   +-- zstd.cpp
  (...)

We introduce three new directories:

  • compression - holds compression source code
  • codecs - holds encoder and decoder source code