WL#3283: C/C++ API to binary log

Affects: Server-Prototype Only   —   Status: In-Progress

DESCRIPTION
===========

The motivation for this is to support software that mine the binary log by
developing and deploying a programmatic C++ API to the binary log. The API
should be distributed as a free-standing component (e.g., a library and/or
header files) that can be used to read (and optionally write) a binary log.

RATIONALE/REQUIREMENTS
======================

The purpose of the API is multifaceted; it should be possible to: Read and
decode binary log files and use the contents for other purposes, e.g., to insert
the contents into other database systems (potentially non-SQL). For this reason,
it is important to decouple the internal binary log events to an external
representation of the contents. Read the binary log stream from various sources,
e.g., directly from the master as it is being generated. For this reason, we
need a flexible and adaptable representation to denote the source of the binary
log stream. Insert event into existing binary log and create new binary logs.
This will allow us to replicate data from other database systems.

For this purpose, we need a programmatic model for reading binary log streams.
The value of an binlog API is measured in its popularity and adoption rate. The
top three reasons to use this API are:
* The existence of an intuitive reference implementation which builds and
executes out-of-the-box.
* Robustness of the parameter definition space: it should be difficult to use
the interface in the ”wrong way”.
* The implementation should demonstrate efficient memory management, efficient
use of available threads, efficient parsing of binary protocols.

The key features intended to fulfill the above requirements are:
* Full text search engine indexer reference implementation
* Two public C++ interfaces: class Binlog, class Content_handler
* Make use of multiple threads.

The reference application will depend on a lot of other classes, some of which
will be specified in future worklogs:
* class Row_event_set, class Result_set, class Converter, class Value, class
Row_event_set::iterator, class Result_set::iterator, class Row_of_fields
The preliminary design of these classes are specified in this worklog for
convenience but their specification might still change later.

Obviously the API needs to define the basic operations that can be done on the
binary log, but this worklog should not aspire for completeness, but rather the
minimal set of features that can be used to implement the reference application.
The structure of the code should have an architecture so that more features can
be added on demand by a larger and disconnected community of contributors.

EXTERNAL REQUIREMENTS
=====================

A number of requirements from potential users of the interface have surfaced and
these need to be met. Most of them are already trivially met, but we list them
here anyway.

R1. It shall be possible to get the creation timestamp for a binary log.

    The creation timestamp is stored in the header of the Format description
    log event that is first in a binary log.


INTRODUCTION
============

In order to create a flexible interface to the binary log stream, we base the
API on concepts taken from the C++ standard, more precisely: sequence containers
and iterators. These are basic and well-known concepts that will provide a
simple, yet flexible interface to manipulate the binary log stream. In order to
provide a flexible representation of the source of the binary log stream, we
will use URIs to reference binary logs. This will provide maximal flexibility
regarding the location and method for retrieving contents of a binary log. An
URI reference contain both the protocol and the location for a resource.


REFERENCES
==========

WL#2582

Pitch presentation:
http://docs.google.com/present/view?id=dzgbn3h_92dst2s6g9

Demo:
https://intranet.mysql.com/~kpettersson/demo.html

Link to this
document:https://docs.google.com/Doc?docid=0AQkKGaRXOG3JZHpnYm4zaF85OTk2dGRqY3hr&hl=sv

Get source tree from Launchpad:
bzr co lp:mysql-replication-listener
INTRODUCTION
============

The basic library consists of a low-level event-driven interface that can be
used to process the events in the binary log. For each event, basic information
about the event is made available directly, while more complex parts of events
require a separate processing step. This interface can then be used by higher
layers to extract the information they need and process it to their likings.

The reasons for this design are:

1. More complex parts of the event---such as the rows in a rows event or
   handling of character sets in strings---require a dedicated effort to parse
   and by avoiding this step, we can keep extraneous processing to a minimum.

2. We want to avoid putting too much logic into the low-level library since
   that would require significant changes as the protocols evolve and formats
   change.

3. It makes the library significantly easier both to develop and use for
   advanced cases such as injecting new events, potentially of unknown format,
   into the stream.

Typical usage pattern for the API is illustrated as:

1. Create a binary log object.  
  MySQL::Binary_log binlog(MySQL::create_transport(url));
2. Register content handlers
  binlog.insert_content_handler(&my_content_handler);
2. Connect to the data source
  binlog.connect();
3. Start the event loop
  while(!quite) {
   Binary_log_event *event;
   binlog.wait_for_next_event(event);
    ...
   delete event;
  }
4. Interpret the event header and retrieve the payload.
  class My_content_handler : public Content_handler
  Binary_log_event* process_query_event(Query_event *event)
  { ... }

5. Do something useful with the event.
  std::cout << "Query event: query="
            << event->query
            << " db= "
            << event->db_name
            << std::endl;


Processing Events
=================

The basic API supports aggregation of events within the same transaction and the
ability chain multiple content handlers, but there is no support for more
complex processing of the contents of events. For that reason, it is necessary
to use separate utilities.

The reference application will be using various row set container interfaces for
accessing rows contained within row events or from result sets returned from
queries. Note that these classes aren't part of this worklog, but the need to
document them are motivated by the reference application used to demonstrate the
usefulness of the Binary_log API. The design of these classes might change even
after this work log design is approved and completed.

The Row_event_set container needs a table map as well as a row event for its
construction:

  MySQL::Row_event_set rows(row_event, table_map_event);

From this row set container we can get a forward input iterator:

  MySQL::Row_event_set::iterator it= rows.begin();

A typical iteration looks like this:
  do {
    MySQL::Row_of_fields fields= *it;
    if (event->get_event_type() == MySQL::WRITE_ROWS_EVENT)
      table_insert(tm->table_name,fields);
    if (event->get_event_type() == MySQL::UPDATE_ROWS_EVENT)
    {
      ++it;
      MySQL::Row_of_fields fields2= *it;
      table_update(tm->table_name,fields,fields2);
    }
    if (event->get_event_type() == MySQL::DELETE_ROWS_EVENT)    
      table_delete(tm->table_name,fields);
  } while (++it != rows.end());

A Row_of_fields is a standard vector of MySQL::Value objects. A typical
iteration of a Row_of_fields container looks like this:

  MySQL::Row_of_fields::iterator field_it= fields.begin();
  MySQL::Converter converter(*field_it);
  do {   
   std::string str;
   converter.to(str, *field_it);
   std::cout << " field type: "
             << (*field_it).type()
             << " Field size is: "
             << (*field_it).size()
             << " Field data is: "
             << str << std::endl;
  } while(++field_it != fields.end());
The API implemented as an independent library.

1. Source files
2. Binary log API
3. Content handlers
4. Binary log events
5. Row set and value interface

SOURCE FILES
==========

The source files are split into two directories. One is for include files and
one for the library source. In addition there are separate directories for test
files and reference applications.

Listener user interface
  binlog_api.h
  binary_log.cpp
  binlog_driver.h

Transport factory
  access_method_factory.cpp
  access_method_factory.h

Content handlers
  basic_content_handler.h
  basic_content_handler.cpp
 
Binlog interfaces
  binlog_event.cpp
  binlog_event.h
  field_iterator.cpp
  field_iterator.h

Row set and value interfaces
  value.cpp
  value.h
  row_of_fields.cpp
  row_of_fields.h
  rowset.h

MySQL interfaces
  protocol.cpp
  protocol.h
  resultset_iterator.cpp
  resultset_iterator.h
  tcp_driver.cpp
  tcp_driver.h
 
Misc utilities
  bounded_buffer.h


In order to use the library a developer only needs to include "binlog_api.h"
which is responsible for including everything else. The row set and value
interfaces were developed in order to create a unified interface which combines
ordinary MySQL client result sets with replication result sets.

Binary log API
==========

This section describes the binary log API with class definitions.

    * All the class specifications below are placed in the 'mysql' namespace, so
the namespace identifier is removed from the class descriptions.
    * Identifiers from the C99 stdint.h header file is used when fixed-size
integers are specified. These are typically available in the boost library, but
we omit the requirement on boost from the specification of the interface.
    * In the description below, unparsed data is represented using Blob_type,
which is a sequence of bytes.



To read events from some source, a *transport service* (or just *transport* when
it is clear from the context) is used. The transport service is responsible for
connecting to some source---such as a file or a MySQL server---read the events
from the source and queue them for further processing. In this worklog, only the
TCP/IP server protocol is implemented, but other transport services are easy to
add.

As event queue up in a background thread, the application has to continuously
poll for new events, otherwise the queue will eventually become full and no more
events will be received from the server.

The Binary_log requires a Binary_log_driver instance, which provides an
implementation of a transport service. A Binary_log_driver can be created
using a transport factory.

With the Binary_log interface, a user can:

    * Iterate over new events as they arrive from the source.
    * Change the binary_log position vector to start receiving events from a new
position.
    * Retrieve the last known position not yet processed as an even in the event
queue. (also known as the 'current position')
    * Connect and disconnect the underlying transport service.


class Binary_log {
public:
  typedef *implementation details* Position;
  typedef *implementation details* Error;

  Binary_log(Binary_log_driver *drv);

  /**
   * Connect the transport and start receiving events
   */
  Error connect();

  /**
   * Blocking attempt to get the next binlog event from the stream
   * @param event [out] Pointer to a binary log event to be fetched.
   * @return
   *   @retval 0 Success
   *   @retval >0 Error code
   */
  Error wait_for_next_event(Binary_log_event *&event);

 
  /**
   * Inserts/removes content handlers in and out of the chain
   * The Content_handler_pipeline is a derived std::list
   */
  Content_handler_pipeline *content_handler_pipeline();


  /**
   * Set the binlog position (filename, position)
   *
   * @return 0 on success, >0 error code
   */
  Error position(const std::string &filename, const Position &position);

  /**
   * Set the binlog position using current filename
   * @param position Requested position
   *
   * @return 0 on success, >0 error code
   */
  Error position(const Position &position);

  /**
   * Fetch the binlog position for the current file
   */
  Position position(void);

  /**
   * Fetch the current active binlog file name.
   * @param[out] filename
   *
   * @return The file position
   */
  Position position(std::string &filename) const;

};

TCP / IP driver
=================

The only implementation of the Binary_log_driver in this worklog is the
Binlog_tcp_driver.

class Binary_log_driver
{
public:
  typedef *implementation details* Position;
  typedef *implementation details* Error;

  /**
   * Connect to the binary log using previously declared connection parameters
   * @return Success or error code
   * @retval 0 Success
   * @retval >0 Error code (to be specified)
   */
  virtual Error connect()= 0;


  /**
   * Blocking attempt to get the next binlog event from the stream
   * @param event [out] Pointer to a binary log event to be fetched.
   * @return
   *   @retval 0 Success
   *   @retval >0 Error code
   */
  virtual Error wait_for_next_event(Binary_log_event *&event)= 0;

  /**
   * Set the reader position
   * @param str The file name
   * @param position The file position
   * @return
   *   @retval 0 Success
   *   @retval >0 Error code
   */
  virtual Position set_position(const std::string &str,
                                const Position &position)= 0;

  virtual Position get_position(std::string &str,
                                Position *position) const = 0;
};

The inner workings of the tcp/ip driver are based on the boost::asio library
which provides access to the network through both synchronous and asynchronous
calls. A dedicated thread is created for receiving events in an message loop.

Signing on to the MySQL database server is done in a synchronous manner in the
user thread. The process looks like this:

   1. Accept handshake from server
   2. Send authentication package to the server
   3. Accept OK server package (or error in case of failure)
   4. Send COM_REGISTER_SLAVE command to server
   5. Accept OK package from server
   6. Send COM_QUERY command to the server: "SHOW MASTER STATUS"
   7. Retrieve result set and set current binlog position.
   8. Send COM_DUMP_BINLOG command to the server
   9. Post a completion request to the event loop to start receiving an event
package.
  10. Start the event thread (if it's not already running)


The event loop will continue to process events until the connection shuts down
or a shutdown request is posted to the event queue. The high level flow is
described above.

Errors are propagated from the asynchronous- to the synchronous realm by
incident events (see above Incident_event).


Content handlers
======

An alternative to pulling event in an event loops is to use an event based API.
This is done by creating a subclass of Content_handler base class and use the
Binary_log interface to install it. With the library comes a default content
handler which does table event indexing on a per transaction basis.

/**
 * A content handler accepts an event and returns the same event,
 * a new one or 0 (the event was consumed by the content handler).
 * The default behavior is to return the event unaffected.
 * The generic event handler is used for events which aren't routed to
 * a dedicated member function, user defined events being the most
 * common case.
 */

class Basic_content_handler {
public:
  Basic_content_handler();
  Basic_content_handler(const MySQL::Basic_content_handler& orig);
  virtual ~Basic_content_handler();

  virtual Binary_log_event *process_event(Query_event *ev);
  virtual Binary_log_event *process_event(Row_event *ev);
  virtual Binary_log_event *process_event(Table_map_event *ev);
  virtual Binary_log_event *process_event(Xid *ev);
  virtual Binary_log_event *process_event(User_var_event *ev);
  virtual Binary_log_event *process_event(Incident_event *ev);
  virtual Binary_log_event *process_event(Rotate_event *ev);
  virtual Binary_log_event *process_event(Int_var_event *ev);
  virtual Binary_log_event *process_event(Binary_log_event
*ev);

  Binary_log_event *process_event(Binary_log_event *ev);

protected:
  /**
   * The Injection queue is emptied before any new event is pulled from
   * the Binary_log_driver. Injected events will pass through all content
   * handlers. The injection queue type is guaranteed to satisfy the
   * requirements of a sequence container according to the C++ standard.
   */
  Injection_queue *get_injection_queue();
};


Binary log events
==============

When a replication event is received through the transport layer it is decoded
and stored in dedicated containers. The top structure for all these containers
is called Binary_log_event.

The Log_event_type is an integral representing the type of the log event.
Symbolic names for the types are available in the library.

class Binary_log_event
{
public:
    Binary_log_event();

    /**
     * Helper method
     */
    Log_event_type get_event_type() const;

    /**
     * Return a pointer to the header of the log event
     */
    Log_event_header *header();
};


Each Binary_log_event needs to be derived to get an event body and has a
reference to an event header POD:

class Log_event_header
{
public:
  uint8_t  marker; // always 0 or 0xFF
  uint32_t timestamp;
  uint8_t  type_code;
  uint32_t server_id;
  uint32_t event_length;
  uint32_t next_position;
  uint16_t flags;
};


Each Event_body has a specific implementation corresponding to its event type:

class Query_event: public Binary_log_event
{
public:
    Query_event(Binary_log_event *ev);
    uint32_t thread_id;
    uint32_t exec_time;
    uint16_t error_code;
    Blob_type variables;
    std::string db_name;
    std::string query;
};


class Rotate_event: public Binary_log_event
{
public:
    Rotate_event(Binary_log_event *ev);
    std::string binlog_file;
    uint64_t binlog_pos;
};

class Format_event: public Binary_log_event
{
public:
    Format_event(Binary_log_event *ev);
    uint16_t binlog_version;
    std::string master_version;
    uint32_t created_ts;
    uint8_t  log_header_len;
};

class User_var_event: public Binary_log_event
{
public:
    User_var_event(Binary_log_event *ev);
    std::string name;
    uint8_t is_null;
    uint8_t type;
    uint32_t charset;
    Blob_type value;
};

class Table_map_event: public Binary_log_event
{
public:
    Table_map_event(Binary_log_event *ev);
    uint64_t table_id;
    uint16_t flags;
    std::string db_name;
    std::string table_name;
    Blob_type columns;
    Blob_type metadata;
    Blob_type null_bits;
};

class Row_event: public Binary_log_event
{
public:
    Row_event(Binary_log_event *ev);
    uint64_t table_id;
    uint16_t flags;
    uint64_t columns_len;
    uint32_t null_bits_len;
    Blob_type used_columns;
    Blob_type row;
};

class Int_var_event: public Binary_log_event
{
public:
    Int_var_event(Binary_log_event *ev);
    uint8_t  type;
    uint64_t value;
};

class Incident_event: public Binary_log_event
{
public:
    Incident_event(Binary_log_event *ev);
    uint8_t  type;
    std::string message;
};

class Xid: public Binary_log_event
{
public:
    Xid(Binary_log_event *ev);
    uint64_t xid_id;
};


Row set and value interface
===========================

Events can have a high level of complexity and in order to decode the events
into something useful a set of tools are needed. All data values are
encapsulated  in a lightweight wrapper which describes the data as a vector:
(type, metadata, storage)

class Value
{
public:
    /**
     *  @param type The event type
     *  @param metadata Attributes of the stored data
     *  @param storage A pointer to the memory where the actual value resides
     */
    Value(enum system::enum_field_types type,
          uint32_t metadata,
          const char *storage);

    Value();
    
    /**
     * Copy constructor
     */
    Value(const Value& val);

    Value &operator=(const Value &val);
    bool operator==(const Value &val) const;
    bool operator!=(const Value &val) const;

    ~Value();

    void is_null(bool s) const;
    bool is_null(void) const;
    
    const char *storage();
    enum system::enum_field_types type() const;
    uint32_t metadata() const;
   
    /**
     * Get the length in bytes of the entire storage (any metadata part +
     * actual data)
     */
    size_t length() const;

    /**
     * Returns the integer representation of a storage of a pre-specified
     * type.
     */
    int32_t as_int32() const;

    /**
     * Returns the integer representation of a storage of pre-specified
     * type.
     */
    int64_t as_int64() const;

    /**
     * Returns the integer representation of a storage of pre-specified
     * type.
     */
    int8_t as_int8() const;

    /**
     * Returns the integer representation of a storage of pre-specified
     * type.
     */
    int16_t as_int16() const;

    /**
     * Returns a pointer to the character data of a string type stored
     * in the pre-defined storage.
     * @note The position is an offset of the storage pointer determined
     * by the metadata and type.
     *
     * @param[out] size The size in bytes of the character string.
     *
     */
    char *as_c_str(unsigned long &size);

    /**
     * Returns a pointer to the byte data of a blob type stored in the pre-
     * defined storage.
     * @note The position is an offset of the storage pointer determined
     * by the metadata and type.
     *
     * @param[out] size The size in bytes of the blob data.
     */
    unsigned char *as_blob(unsigned long &size);

    float as_float();
    double as_double();
};

The data access methods which the Value object provides are not interpreting the
underlying storage but just returns adjusted pointers based on the type and
metadata as if the Value object. This means that it is possible that a result
doesn't make sense if an access method is used which doesn't correspond to a
compatible storage type.

In order to get a proper interpretation of the data, a converter must be used.
The converter supplied in the library look like this:

class Converter
{
public:
    Converter();

    /**
     * Converts and copies the sql value to a std::string object.
     * @param[out] str The target string
     * @param[in] val The value object to be converted
     */
    void to(std::string &str, const Value &val);

    /**
     * Converts and copies the sql value to a long integer.
     * @param[out] out The target variable
     * @param[in] val The value object to be converted
     */
    void to(long &out, const Value &val) const;

    /**
     * Converts and copies the sql value to a floating point number.
     * Sample usage:
     * @code
     * Converter().to(my_float, my_value);
     * @endcode
     * @param[out] out The target variable
     * @param[in] val The value object to be converted
     */
    void to(float &out, const Value &val) const;
};

There are two types of classes used to extract Row_of_fields objects from row
events and from result sets returned by a query.

For row events, the Row_event_set is used:

class Row_event_set
{
public:
    typedef *implementation specific* iterator;
    typedef *implementation specific* const_iterator;

    Row_event_set(const Row_event *arg1, const Table_map_event *arg2);

    iterator begin();
    iterator end();
    const_iterator begin() const;
    const_iterator end() const;
};