WL#7672: Client interface for reading the replication stream
Affects: Server-8.0
—
Status: Complete
Extend libmysql's C API with a stable interface for getting replication events from the server as a stream of packets. We need this to avoid having to call undocumented APIs and package internal header files in order to implement binlog based programs like the MySQL Applier for Hadoop.
FR1. Add a new C API function MYSQL_RPL *mysql_binlog_open(MYSQL *mysql, MYSQL_RPL *stream) FR1.1. mysql_binlog_open requires an authenticated client connection FR1.2. mysql_binlog_open can be used where mysql_exec_direct can be used FR1.3. mysql_binlog_open will issue the COM_BINLOG_DUMP rpc if the MYSQL_RPL_GTID flag is not set FR1.4. mysql_binlog_open will issue the COM_BINLOG_DUMP_GTID rpc if the MYSQL_RPL_GTID flag is set FR1.5. the MYSQL_RPL structure must be filled in with valid data to be passed to the above RPCs. FR1.6. if the data are invalid the server will return an ERR packet with the error that should be available through the mysql_error() C API NF1.7. Any C API command aside from mysql_binlog_fetch() and mysql_binlog_close() issued for the MYSQL connection on which mysql_binlog_open() succeeded is wrong, will lead to unpredictable results and will put the connection in undefined state. FR2. Add a new C API function int mysql_binlog_fetch(MYSQL_RPL *rpl) FR2.1. mysql_binlog_fetch() works only after a successful mysql_binlog_open() command have been issued FR2.2. mysql_binlog_fetch() will read one binlog record (as sent by the server) FR2.3. mysql_binlog_fetch() will return 0 on failure FR2.4. mysql_binlog_fetch() will return non-zero on success FR2.5. in case of failure mysql_error() should return the error text. FR2.6. mysql_binlog_fetch() will store the result into the buffer member of MYSQL_RPL FR2.7. mysql_binlog_fetch() will store the size of the data into the size member of the MYSQL_RPL structure NF2.8. Any C API command aside from mysql_binlog_fetch() and mysql_binlog_close() issued for the MYSQL connection on which mysql_binlog_fetch() succeeded is wrong, will lead to unpredictable results and will put the connection in undefined state. FR2.9. mysql_binlog_fetch() updates the MYSQL_RPL structure members FR3. Add a new C API function int mysql_binlog_close(MYSQL_RPL *rpl) FR3.1. mysql_binlog_close() can be called only for a connection for which a successful mysql_binlog_open() was called FR3.2. Once mysql_binlog_close() succeeds the connection can only be closed via mysql_close()
Problem Description =================== In order to fetch a replication stream using libmysqlclient it is necessary to execute a number of steps to create a protocol packet and send it to the server. For example, the following code is used in ~mysqlbinlog~ to establish a connection to a replication stream: command= COM_BINLOG_DUMP; size_t allocation_size= ::BINLOG_POS_OLD_INFO_SIZE + BINLOG_NAME_INFO_SIZE + ::BINLOG_FLAGS_INFO_SIZE + ::BINLOG_SERVER_ID_INFO_SIZE + 1; if (!(command_buffer= (uchar *) my_malloc(PSI_NOT_INSTRUMENTED, allocation_size, MYF(MY_WME)))) { error("Got fatal error allocating memory."); DBUG_RETURN(ERROR_STOP); } uchar* ptr_buffer= command_buffer; /* COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to cast to uint32. */ int4store(ptr_buffer, (uint32) start_position); ptr_buffer+= ::BINLOG_POS_OLD_INFO_SIZE; int2store(ptr_buffer, binlog_flags); ptr_buffer+= ::BINLOG_FLAGS_INFO_SIZE; int4store(ptr_buffer, server_id); ptr_buffer+= ::BINLOG_SERVER_ID_INFO_SIZE; memcpy(ptr_buffer, logname, BINLOG_NAME_INFO_SIZE); ptr_buffer+= BINLOG_NAME_INFO_SIZE; command_size= ptr_buffer - command_buffer; ... if (simple_command(mysql, command, command_buffer, command_size, 1)) { error("Got fatal error sending the log dump command."); my_free(command_buffer); DBUG_RETURN(ERROR_STOP); } my_free(command_buffer); Issuing a simple_command directly is not part of the officially supported C interface, hence approach suffer a few drawbacks: 1. The sheer code size make this very hard to use and the risk of making a mistake is high. 2. Constructing the protocol packet manually can result in subtle security issues such as buffer overflows and have a risk of accidentally creating packets that can cause problems for the server. 3. Changes to the protocol can potentially break any code that uses this interface. - This risk forces tools using this interface to be build for each version of the server just to ensure that there are no subtle issues that can cause breakage. This worklog proposes to solve these issues by creating a well-defined interface and make it available as part of the officially supported libmysqlclient interface. 1. A well-defined interface will not require complicated code and reduce the risk for mistakes. 2. Placing a layer between the packet construction and the user provide means for integrity checking of the input parameters. 3. Having a stable interface allow a single version of a tool to be built for any version of the server supporting this interface. Interface ========== When connecting to a server, a normal connection is first made. After that, several commands might be issued to request information about the server before the replication stream is started. Constants ---------- MYSQL_RPL_GTID: Flag to indicate that COM_BINLOG_DUMP_GTID should be used rather than COM_BINLOG_DUMP. Types ------ Struct for information about a replication stream: struct MYSQL_RPL { const char *filename; uint64_t position; uint32_t server_id; uint32_t flags; ssize_t size; const unsigned char *buffer; }; filename: Filename of the binary log to read. position: Position in the binary log to start reading from. server_id: Server ID to use when identifying with the master. flags: The 16 most significant bits of the ~flags~ field are flags for this interface (e.g., ~MYSQL_RPL_GTID~), while the lowest 16 bits are passed in the COM_BINLOG_DUMP packet. There are currently no flags for COM_BINLOG_DUMP (nor COM_BINLOG_DUMP_GTID), but we retain the parameter for future enhancements. size: Size of the packet returned by ~mysql_binlog_fetch~ buffer: Pointer to returned data. Functions ---------- ~MYSQL_RPL *mysql_binlog_open(MYSQL *mysql, MYSQL_RPL *stream)~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Open a new replication stream using the information in the ~MYSQL_RPL~ structure. In constrast with other mysql functions, we pass the parameters for the stream as a structure to the ~mysql_binlog_open~ function. This approach is: - Easier to use because it is more familiar to Unix/Linux users. - More flexible and future proof - Parameters can be added to the structure without breaking the binary compatibility. - Can be made thread-safe since ~mysql_binlog_open~ do not manage the memory. ~int mysql_binlog_close(MYSQL_RPL *rpl)~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Close replication stream. ~int mysql_binlog_fetch(MYSQL_RPL *rpl)~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Fetch one event/packet from the server, return error on failure. On error, ~rpl->size == -1~ and ~rpl->buffer~ is undefined. On success, ~rpl->size~ give the number of bytes read and ~rpl->buffer~ point to the retrieved bytes. No separate error function is defined since ~mysql_error~ will contain the error on failure. Use-cases ========== This contain a rough draft of use-cases showing how the interface is intended to be used. Open a stream and start reading from the beginning: MYSQL *mysql = mysql_init(NULL); char logfile[64]; MYSQL_RPL stream = { logfile, 4 }; /* Open a new connection to the server */ mysql_real_connect(con, "localhost", "root", "root_pswd", NULL, 0, NULL, 0); /* Figure out what binary log file is the oldest one */ mysql_query(mysql, "SHOW BINARY LOGS"); MYSQL_RES *result = mysql_store_result(mysql); MYSQL_ROW row = mysql_fetch_row(result); strcpy(logfile, row[0]); mysql_free_result(result); /* Open a replication stream and start reading */ if (mysql_binlog_open(mysql, &stream)) { fprintf(stderr, "Error: %s\n", mysql_error(mysql)); exit(1); } else { while (mysql_binlog_fetch(stream) != 0) do_something_with_event(stream->buffer, stream->size); mysql_binlog_close(stream); } mysql_close(mysql);
Copyright (c) 2000, 2024, Oracle Corporation and/or its affiliates. All rights reserved.