WL#5721: Refactor replication dump thread

Affects: Server-Prototype Only   —   Status: Complete   —   Priority: Medium

The replication master works as follows. Each slave connects in a client thread 
called a dump thread. Each dump thread reads binlog events from the binlog and 
sends them to the associated slave. Each dump thread holds a mutex lock while 
reading events from the binlog. This lock prevents other client threads from 
writing the binlog and prevents other dump threads from reading the binlog. The 
lock may be held for a significant time span because the dump thread is doing 
(For reference, the current implementation is something like this, grossly 

  current binlog write design:

    acquire LOCK_log
    write log event to binlog
    release LOCK_log
    signal update

  current dump thread design:
    while client is not killed:
      acquire LOCK_log
      read event from binlog
      release LOCK_log
      if EOF was reached in the previous read:
        acquire LOCK_log
        wait for update signal
        read event from binlog
        release LOCK_log

Moreover, there are unnecessarily many corner cases in this design, making 
the code difficult to maintain(all code for processing events duplicated).

This worklog aims at simplifying the design of the dump thread. The dump thread 
should only hold a lock for a very short time, while reading the position up to 
which the binary log has been written. This is expected to have the following 

 1. Performance improvement.
    We will not do any I/O while holding locks. This means we effectively
    remove almost all interference between dump threads and client threads.
    Currently, this is not expected to give any significant improvement.
    However, in the future we plan to implement parallel writing of the binlog.
    After we have implemented parallel binlog writing, the present worklog may
    give a significant performance improvement.
 2. Better code with fewer corner cases.
    The current design requires that we handle some corner cases where a dump
    thread may read a partially written event from the current binary log when
    the disk becomes full. This makes the code unnecessarily complex.

(For reference,  It would be better if readers and writers synchronized on a 
 global variable that holds the position of the end of the last completely 
 written event in the binary log.
  binlog write design:
    write event to binlog
    acquire lock_binlog_end
    binlog_end = current write position
    release lock_binlog_end
    signal update

  dump thread design:
    end_position = 0
    while client is not killed:
      if current read position == end_position:
        acquire lock_binlog_end
        while end_position == binlog_end and client is not killed:
          wait for update signal
        release lock_binlog_end
      if client is killed:
      read event from binlog

Analysis On All Write Operations Of Binary Log
* Appending New Events(group flush queue header thread)
  1. Get LOCK_log
  2. Append events
     For each transaction Do
       Flush stmt cache and trx cache.
       Increase m_prep_xids(it is protected by a rwlock).
  3. Flush cache to file.
  4. Sends update signal
  5. Releases LOCK_log

* Purge Logs
  Purge can be done automatically or manually.

  It is the logic in MYSQL_BIN_LOG::purge_logs(). It purges all log files
  before a given log file, its name is passed in purge_logs() through the
  argument 'to_logs'.

  1. Get LOCK_index
  2. Get log file name list including all files should be purged

     For each entry in index file Do
       Read binlog file name
       Check if it is same to 'to_logs'. If ture, break the loop.
         It already finds all file names should be purged.
       Checking if it is active log. If true, break the loop.
         Active file can not be purged. It doesn't need lock, because the thread
         is holding LOCK_index, so active file could not be changed.
       Checking if it is used by dump thread. If ture, break the loop.
         All files following the files are using by a dump threads can not be
         purged. The checking process is:

         Get LOCK_thread_count
         For each thread Do
           Check if it is same to the file which the dump thread is reading.
         Releases LOCK_thread_count

         Dump threads could not move to next file. Because purge thread is 
         holding LOCK_index. Dump threads have to get LOCK_index when moving to 
         next file.

  3. Remove log names from index file
  4. Remove binlog files.
  5. Release Lock_index

  Note: Ignored the safe recovery stuff here.

* Log Rotation
  1. Get LOCK_log
  2. Get LOCK_commit
     LOCK_commit is used in a loop way.

     while (get_prep_xids() > 0)
       mysql_cond_wait(&m_prep_xids_cond, &LOCK_commit);

  3. Get LOCK_index
  4. Generate new binlog file name.
  5. Close active file
     Append Rotate_log_event to active file
     Send update signal(update_cond).
     Close the file.
  6. Create new log file
     Set its name as active name (copy to log_file_name variable)
     Open the new file.
     Append Format_description_log_event.
     Update index file.
  7. Release locks
     Release LOCK_index
     Release LOCK_commit
     Release LOCK_log

* Reset Master
  1. Get LOCK_log
  2. Get LOCK_index
  3. Delete all log files
  4. Delete index file.
  5. Create and open new index file.
  6. Create and open new active file.
  7. Release locks
     Release LOCK_index
     Release LOCK_log

  Note: It doesn't check if there is any dump thread. So dump threads may
        encounter some wired situations. E.g. Could not find next file, etc.

Remove LOCK_log From Dump Thread
Binary events can only be appened at the end of the active file. So it is safe
to read both inactive and active binlog files without lock. The only problem
is that dump thread may read incomplete events at the end of active log file.

To solve the problem, MYSQL_BIN_LOG needs a variable which records the end
position of the last event in active file. It is shared between all binlog read
threads and binlog write threads.

- Write Threads
  Update the variable after appending events and before 'Signal update'.
  It includes 'Appending New Events', 'Log Rotation' and 'Reset Master'

- Read Threads
  Only read the events that their position are before the variable. Dump
  threads will wait new events after reading all events before the variable.
  It may includes 'Dump' and Show binlog events' operations. But this worklog
  only work on Dump operations.

- To Sovle Deadlock Problem

  The deadlock problem is described below.
    SESSION 1                      SESSION 2            DUMP THREAD
    ---------                      ---------            -----------
  1 Flush Stage(mark rotate flag)  

  2 Sync Stage

  3 Waiting binlog ACK <------------- ACK ------------- Send binlog

  4 Commit Stage                                        Waiting New Events
  5                                Flush Stage
                                   Signal update

  6 Rotate(hold LOCK_log)             
    WAITING for Session2 to commit
  7                                Sync Stage           WAITING for Session1
                                                        release LOCK_log
  8                                WAITING for Group ACK

  9                                [Commit Stage]not happened yet

  Session1 has to hold LOCK_log to block other sessions to append new events.
  It also blocked dump threads to read events.

  In this design, dump threads do not acquire LOCK_log, it is never blocked
  by session1. So the deadlock is broken.

    SESSION 1                      SESSION 2            DUMP THREAD
    ---------                      ---------            -----------
  1 Flush Stage(mark rotate flag)  

  2 Sync Stage

  3 Waiting binlog ACK <------------- ACK ------------- Send binlog

  4 Commit Stage                                        Waiting New Events
  5                                Flush Stage
                                   Signal update

  6 Rotate(hold LOCK_log)                               Send binlog
    WAITING for Session2 to commit

  7                                Sync Stage           Send binlog
  8                                WAITING Group ACK  <-- ACK -- Send binlog

  9                                Commit Stage

  10 Rotate
Binlog end position
* binlog_end_pos
  It is the shared variable described in HLS.

  my_off_t binlog_end_pos;

* LOCK_binlog_end_pos
  It is used to protect binlog_end_pos. The lock is required when both writing
  and reading binlog_end_pos.

  binlog_end_pos and LOCK_binlog_end_pos could be used by both binary log and
  relay log. But in this wl, they are used only in binary log.

  mysql_mutx_t LOCK_binlog_end_pos;

* update_binlog_end_pos()
  It is used to update binlog_end_pos only by MYSQL_BIN_LOG.

  void update_binlog_end_pos();

* get_binlog_end_pos()
  It returns the value of binlog_end_pos;

  my_off_t get_binlog_end_pos();

* get_binlog_end_pos_lock()
  It returns the pointer of LOCK_binlog_end_pos;

  mysql_mutx_t* get_binlog_end_pos_lock();

* lock_binlog_end_pos()
  It locks LOCK_binlog_end_pos.

  void lock_binlog_end_pos();

* unlock_binlog_end_pos()
  It unlocks LOCK_binlog_end_pos;

  void unlock_binlog_end_pos();

* update_cond
  The orignal update_cond is still used to wait and signal event update.
  It is used by both binary log and relay log. When waiting for binary log
  update, the new lock LOCK_binlog_end_pos is used with it together. But
  when waiting for relay log update, LOCK_log is still used.

* MYSQL_BIN_LOG::signal_update()
  It is used to broadcast update_cond to all other threads, so they can continue
  if they are waiting for new events updates.

  binlog_end_pos is updated in this function.
  an extra signal_update() should be called after creating a new log file. It is
  in MYSQL_BIN_LOG::open_binlog();

  In general, it should always send a signal after binlog_end_pos is updated.
  So this function is called in update_binlog_end_pos().

* flush_io_cache()
  Events is originally written into cache, but not the binary log file. So the
  cache has to be flushed into the binary log file before updating
  binlog_end_pos. Otherwise, dump threads will encounter an error that
  binlog_end_pos is bigger than the valid file size.

  There are two places where signal_update() is called, but data is not flushed
  into the log file. The extra flush_io_cache() should be added.
  - MYSQL_BIN_LOG::new_file_impl()
    after appending Rotate_log_event.
  - MYSQL_BIN_LOG::close()
    after appending Stop_log_event.

Binlog_sender class
The class is designed to make the code of mysql_binlog_send() has better
modularity and readablity. All code of original mysql_binlog_send is
moved to the class. Now mysql_binlog_send just need to call Binlog_sender.

* run()
  It is the entry function which should be called by mysql_binlog_send.

  void run();

  while (!has_error() && !m_thd->killed)
    1. send faked Rotate_log_event
    2. open binlog file.
    3. call send_binlog() to dump events in the file.
    4. find next binlog file.

  The loop will be broken if any error is encountered in any step.

* init()
  It initializes the environment of Binlog_sender, and checks if dump request
  is valid.

  void init();

* cleanup()
  It cleanup the environment of Binlog_sender when all events are sent or an
  error encounter.

  void cleanup();

* send_binlog()
  It sends all events in a binlog file to slave.

  int send_events(IO_CACHE *log_cache, my_off_t start_pos);

  log_cache   The binlog will be sent.
  start_pos   Sends events start from start_pos

  0 if read all events successfully.
  1 if an error is encountered or it should stop without waiting for new events.

  1. send Format_description_log_event
  2. check if the binlog file has Previous_gtids_log_event. If it has no
     Previous_gtids_log_event and it is in AUTO_POSITION mode, then return 0
     to skip this file.
  3. while (!m_thd->killed)
       3.1 call get_binlog_end_pos() to get end_pos of the binlog.
           if end_pos is 0, then return 0.
       3.2 call send_events() to dump all events before the end_pos.

* get_binlog_end_pos()
  It get the end position of the binlog file.

  my_off_t get_binlog_end_pos(IO_CACHE *log_cache, bool skip_group,
                              bool *sent_heartbeat_event);
  log_cache   The binlog file is reading.

  my_off_t end_pos;

    1. mysql_bin_log.lock_binlog_end_pos();
       end_pos= mysql_bin_log.get_binlog_end_pos();

    2. check if the reading file is active file. If it is not active file:
       2.1 save file size to end_pos.
       2.2 if end_pos equals to my_b_tell(log_cache), it alreay reads all
           events in the file. then returns 0.
       2.3 Otherwise, returns end_pos.

       /* my_b_tell(log_cache) tells the end position where it has read. */

    4. if end_pos is larger than my_b_tell(log_cache), then returns end_pos.
    5. if it doesn't need to wait for new events then return 1. Otherwise,
       flush the net cache before waiting
    6. call wait_new_events() to wait for update_cond broadcast.
    /* Return 1 if an error is encountered in any step. */
  } while (!m_thd->killed);

* send_events()
  It reads events from a binlog file and sends them.

  int send_events(IO_CACHE *log_cache, my_off_t end_pos,
                  bool *skip_group, my_off_t *skip_group_end_pos);

  log_cache   The binlog file is sending.
  end_pos     Only the events before end_pos is sent.

  0 if read all events successfully.
  1 if an error is encountered or it should stop without waiting for new events.

  while (!thd->killed && my_b_tell(log_cache) < end_pos)
    1. read a event from the log file.
    2. check if the event should be skipped when AUTO_POSITION is on.
       Jump to the begin of the loop if the event should be skipped.
    3. send a heartbeat if any event is skipped.
    4. send the packet.

    /* Return 1 if an error is encountered in any step. */
  send a heartbeat if any event is skipped.
* wait_new_events()
  It waits for update_cond broadcast until receiving or timeout. It will send a
  heartbeat if heartbeat is required.

  int wait_new_events(my_off_t log_pos);
  0 if succeeds
  1 if an error happens