WL#7672: Client interface for reading the replication stream

Affects: Server-8.0   —   Status: Complete

Extend libmysql's C API with a stable interface for getting
replication events from the server as a stream of packets.

We need this to avoid having to call undocumented APIs and package internal
header files in order to implement binlog based programs like the MySQL Applier
for Hadoop.
FR1. Add a new C API function MYSQL_RPL *mysql_binlog_open(MYSQL *mysql,
MYSQL_RPL *stream)
FR1.1. mysql_binlog_open requires an authenticated client connection
FR1.2. mysql_binlog_open can be used where mysql_exec_direct can be used
FR1.3. mysql_binlog_open will issue the COM_BINLOG_DUMP rpc if the
MYSQL_RPL_GTID flag is not set
FR1.4. mysql_binlog_open will issue the COM_BINLOG_DUMP_GTID rpc if the
MYSQL_RPL_GTID flag is set
FR1.5. the MYSQL_RPL structure must be filled in with valid data to be passed to
the above RPCs.
FR1.6. if the data are invalid the server will return an ERR packet with the
error that should be available through the mysql_error() C API
NF1.7. Any C API command aside from mysql_binlog_fetch() and
mysql_binlog_close() issued for the MYSQL connection on which
mysql_binlog_open() succeeded is wrong, will lead to unpredictable results and
will put the connection in undefined state.
FR2. Add a new C API function int mysql_binlog_fetch(MYSQL_RPL *rpl)
FR2.1. mysql_binlog_fetch() works only after a successful mysql_binlog_open()
command have been issued
FR2.2. mysql_binlog_fetch() will read one binlog record (as sent by the server)
FR2.3. mysql_binlog_fetch() will return 0 on failure
FR2.4. mysql_binlog_fetch() will return non-zero on success
FR2.5. in case of failure mysql_error() should return the error text.
FR2.6. mysql_binlog_fetch() will store the result into the buffer member of
MYSQL_RPL
FR2.7. mysql_binlog_fetch() will store the size of the data into the size member
of the MYSQL_RPL structure
NF2.8. Any C API command aside from mysql_binlog_fetch() and
mysql_binlog_close() issued for the MYSQL connection on which
mysql_binlog_fetch() succeeded is wrong, will lead to unpredictable results and
will put the connection in undefined state.
FR2.9. mysql_binlog_fetch() updates the MYSQL_RPL structure members
FR3. Add a new C API function int mysql_binlog_close(MYSQL_RPL *rpl)
FR3.1. mysql_binlog_close() can be called only for a connection for which a
successful mysql_binlog_open() was called
FR3.2. Once mysql_binlog_close() succeeds the connection can only be closed via
mysql_close()
Problem Description 
===================

In order to fetch a replication stream using libmysqlclient it is
necessary to execute a number of steps to create a protocol packet and
send it to the server. For example, the following code is used in
~mysqlbinlog~ to establish a connection to a replication stream:


  command= COM_BINLOG_DUMP;
  size_t allocation_size= ::BINLOG_POS_OLD_INFO_SIZE +
    BINLOG_NAME_INFO_SIZE + ::BINLOG_FLAGS_INFO_SIZE +
    ::BINLOG_SERVER_ID_INFO_SIZE + 1;
  if (!(command_buffer= (uchar *) my_malloc(PSI_NOT_INSTRUMENTED,
                                            allocation_size, MYF(MY_WME))))
  {
    error("Got fatal error allocating memory.");
    DBUG_RETURN(ERROR_STOP);
  }
  uchar* ptr_buffer= command_buffer;
  
  /*
    COM_BINLOG_DUMP accepts only 4 bytes for the position, so
    we are forced to cast to uint32.
  */
  int4store(ptr_buffer, (uint32) start_position);
  ptr_buffer+= ::BINLOG_POS_OLD_INFO_SIZE;
  int2store(ptr_buffer, binlog_flags);
  ptr_buffer+= ::BINLOG_FLAGS_INFO_SIZE;
  int4store(ptr_buffer, server_id);
  ptr_buffer+= ::BINLOG_SERVER_ID_INFO_SIZE;
  memcpy(ptr_buffer, logname, BINLOG_NAME_INFO_SIZE);
  ptr_buffer+= BINLOG_NAME_INFO_SIZE;
  
  command_size= ptr_buffer - command_buffer;
  
  ...
  
  if (simple_command(mysql, command, command_buffer, command_size, 1))
  {
    error("Got fatal error sending the log dump command.");
    my_free(command_buffer);
    DBUG_RETURN(ERROR_STOP);
  }
  my_free(command_buffer);


Issuing a simple_command directly is not part of the officially
supported C interface, hence approach suffer a few drawbacks:

1. The sheer code size make this very hard to use and the risk of
   making a mistake is high.
2. Constructing the protocol packet manually can result in subtle
   security issues such as buffer overflows and have a risk of
   accidentally creating packets that can cause problems for the
   server.
3. Changes to the protocol can potentially break any code that uses
   this interface.
   - This risk forces tools using this interface to be build for each
     version of the server just to ensure that there are no subtle
     issues that can cause breakage.

This worklog proposes to solve these issues by creating a well-defined
interface and make it available as part of the officially supported
libmysqlclient interface.

1. A well-defined interface will not require complicated code and
   reduce the risk for mistakes.
2. Placing a layer between the packet construction and the user
   provide means for integrity checking of the input parameters.
3. Having a stable interface allow a single version of a tool to be
   built for any version of the server supporting this interface.



Interface 
==========

When connecting to a server, a normal connection is first made. After
that, several commands might be issued to request information about
the server before the replication stream is started.

Constants 
----------

MYSQL_RPL_GTID: Flag to indicate that COM_BINLOG_DUMP_GTID should
                    be used rather than COM_BINLOG_DUMP.

Types 
------

Struct for information about a replication stream:

  struct MYSQL_RPL {
    const char *filename;
    uint64_t position;
    uint32_t server_id;
    uint32_t flags;
  
    ssize_t size;
    const unsigned char *buffer;
  };


filename: Filename of the binary log to read.
position: Position in the binary log to start reading from.
server_id: Server ID to use when identifying with the master.
flags: The 16 most significant bits of the ~flags~ field are flags
           for this interface (e.g., ~MYSQL_RPL_GTID~), while the
           lowest 16 bits are passed in the COM_BINLOG_DUMP
           packet. There are currently no flags for COM_BINLOG_DUMP
           (nor COM_BINLOG_DUMP_GTID), but we retain the parameter for
           future enhancements.
size: Size of the packet returned by ~mysql_binlog_fetch~
buffer: Pointer to returned data.

Functions 
----------

~MYSQL_RPL *mysql_binlog_open(MYSQL *mysql, MYSQL_RPL *stream)~ 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Open a new replication stream using the information in the ~MYSQL_RPL~
structure.

In constrast with other mysql functions, we pass the parameters for
the stream as a structure to the ~mysql_binlog_open~ function. This
approach is:
- Easier to use because it is more familiar to Unix/Linux users.
- More flexible and future proof
  - Parameters can be added to the structure without breaking the
    binary compatibility.
- Can be made thread-safe since ~mysql_binlog_open~ do not manage the
  memory.

~int mysql_binlog_close(MYSQL_RPL *rpl)~ 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Close replication stream.

~int mysql_binlog_fetch(MYSQL_RPL *rpl)~ 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Fetch one event/packet from the server, return error on failure. On
error, ~rpl->size == -1~ and ~rpl->buffer~ is undefined. On success,
~rpl->size~ give the number of bytes read and ~rpl->buffer~ point to
the retrieved bytes.

No separate error function is defined since ~mysql_error~ will contain
the error on failure.

Use-cases 
==========

This contain a rough draft of use-cases showing how the interface is
intended to be used.

Open a stream and start reading from the beginning:

  MYSQL *mysql = mysql_init(NULL);
  char logfile[64];
  MYSQL_RPL stream = { logfile, 4 };
  
  /* Open a new connection to the server */
  mysql_real_connect(con, "localhost", "root", "root_pswd", NULL, 0, NULL, 0);
  
  /* Figure out what binary log file is the oldest one */
  mysql_query(mysql, "SHOW BINARY LOGS");
  MYSQL_RES *result = mysql_store_result(mysql);
  MYSQL_ROW row = mysql_fetch_row(result);
  strcpy(logfile, row[0]);
  mysql_free_result(result);
  
  /* Open a replication stream and start reading */
  if (mysql_binlog_open(mysql, &stream)) {
    fprintf(stderr, "Error: %s\n", mysql_error(mysql));
    exit(1);
  } else {
    while (mysql_binlog_fetch(stream) != 0)
      do_something_with_event(stream->buffer, stream->size);
    mysql_binlog_close(stream);
  }
  
  mysql_close(mysql);