WL#7361: MSR: per-channel replication filters

Affects: Server-8.0   —   Status: Complete   —   Priority: Medium

Executive Summary
=================
Add replication filtering support to a multi-source slave,
to filter out the execution of selected replicated data from a
specific channel. In short, create channel specific replication
filters (also called per-channel replication filters).

Currently, (through WL#1697) replication filters are global i.e
they are applicable to all channels. This WL aims to make to create
per-channel replication filters, and global replication filters
for backward compatibility.


USES
====
Channel specific filters are needed in case when a multi-sourced slave is
in use. This feature is typically used in the following way:

Example1: Consider a Master1 with databases DBLOCAL, DB1,
               and a Master2 with databases DBLOCAL2, DB2.
  If a slave that wants to replicate from masters Master1 and Master2
  needs to filter out local databases then it needs to create channel
  specific filters for masters Master1 and Master2

Example2: Consider the case of filtering out replicated data from system
          tables from all other sources except one.

        Example: When setting up replication with GTIDs+MSR.)
        master1> create replication user, "slave_user";
        master2> create replication user, "slave_user";
                 the above statements are replicated to a slave.
       Unless, GTID_NEXT is set on the slave for master2,
       there is an obvious conflict on the user name and we want to avoid
       that using per channel filters.
Functional and Non-Functional Requirements

FUNCTIONAL REQUIREMENTS
=======================
F-1: Create channel specific replication filters to filter out the execution
     of selected replicated data from a specific channel.
F-2: Create global replication filters for backward compatibility.
F-3: To support per-channel filters, extend the following options/commands:
       A) startup options: --replicate-*
       B) CHANGE REPLICATION FILTER
       C) RESET SLAVE
       D) SHOW SLAVE STATUS
F-4: Users can specify a per-channel filter through a command-line option
     for an existent channel.

F-5: Introduce a new P_S table performance_schema.replication_applier_filters.
     Users can query all per-channel replication filters on every channel
     from the table, further more do troubleshooting and monitoring from
     the table.
F-6: Introduce a new P_S table performance_schema.replication_applier_global_filters
     Users can query all global replication filters.

NON-FUNCTIONAL REQUIREMENTS
===========================
NF-1: Introducing channel specific replication filter does not show negative
      impact on the overall performance.
NF-2: Implicit requirements: Follow the SQL standard, work on all platforms,
      do not impact other internal MySQL components.
There are per-channel and global replication filters. Each channel uses
*only* its own per-channel replication filters to filter the event
stream. It never uses global replication filters to filter the event
stream. (A new channel would copy global replication filters to its
per-channel replication filters if there are no per-channel replication
filters and there are global replication filters on the filter type
when it is being configured.)

The per-channel replication filters and global replication filters can
be configured in two ways:
A) startup options: --replicate-*
B) SQL commands: CHANGE REPLICATION FILTER

Additionally, behavior for the following statements needs to be specified:
C) RESET SLAVE [ALL] [FOR CHANNEL]
D) SHOW SLAVE STATUS [FOR CHANNEL]

Query, troubleshoot, monitor replication filters and do statistics:
E) CREATE A NEW performance_schema.replication_applier_filters

Show the global replication filters:
F) CREATE A NEW performance_schema.replication_applier_global_filters


A) Startup options: --replicate-*
=================================

The current startup options are extended by allowing to specify channel_name in
filter variable to configure per-channel replication filters as follows.
  --replicate-do-db=:
  --replicate-ignore-db=:
  --replicate-do-table=:
  --replicate-ignore-table=:
  --replicate-rewrite-db=:db2>
  --replicate-wild-do-table=:
  --replicate-wild-ignore-table=:
---- Syntax ---- Each command line parameter optionally takes a channel_name followed by a colon, further followed by the filter specification. Note that the first colon is interpreted as a separator, others are literal colons. ---- Semantics ---- Without specifying channel_name in filter variable, the startup options shall act on the default channel. See below. --replicate-do-db=: --replicate-ignore-db=: --replicate-do-table=: --replicate-ignore-table=: --replicate-rewrite-db=:-> --replicate-wild-do-table=:
--replicate-wild-ignore-table=:
Without specifying channel_name and a followed 'colon' in filter variable, the startup options shall configure the global replication filters. See below. --replicate-do-db= --replicate-ignore-db= --replicate-do-table= --replicate-ignore-table= --replicate-rewrite-db=-> --replicate-wild-do-table=
--replicate-wild-ignore-table=
If the user specifies a per-channel replication filter through a command-line option (or in a configuration file) for a slave replication channel which does not exist as of now (i.e not present in slave info tables yet), then the per-channel replication filter is discarded with the following warning: "There are per-channel replication filter(s) configured for channel '%.192s' which does not exist. The filter(s) have been discarded." If the user specifies a per-channel replication filter through a command-line option (or in a configuration file) for group replication channels 'group_replication_recovery' and 'group_replication_applier' which is disallowed, then the per-channel replication filter is discarded with the following warning: "There are per-channel replication filter(s) configured for group replication channel '%.192s' which is disallowed. The filter(s) have been discarded." How global and per-channel replication filters work together? - Any global replication filter option will add the filter to global replication filters on the filter type, not add the filter to every channel on the filter type. - Any per-channel replication filter option will add the filter to per-channel replication filters of the specified channel on the filter type. - Every slave replication channel will copy global replication filters to its per-channel replication filters if there are no per-channel replication filters and there are global replication filters on the filter type when it is being configured. Example: Suppose channels '' and 'ch1' exist before the server starts, the command line options --replicate-do-db=db1 --replicate-do-db=ch1:db2 --replicate-do-db=db3 --replicate-ignore-db=db4 --replicate-ignore-db=:db5 would result in: global replication filters: do_db=db1,db3, ignore_db=db4 default channel: do_db=db1,db3 ignore_db=db5 ch1: do_db=db2 ignore_db=db4 Note: GROUP REPLICATION channels should not be configurable using --replicate* nor CHANGE REPLICATION FILTER, and should not inherit from global filters. BTW: if user specifies multiple replicate-rewrite-db=FROM->TO options having the same FROM database, all are added together (put into the rewrite_do list) and the first one takes affect. The global replication filters and per-channel filters have the same behavior in the worklog. So there is no change on this, since a channel uses either global or per-channel rewrite filters on a filter type. B) SQL commands: CHANGE REPLICATION FILTER ========================================== Dynamic replication filters are currently settable using the CHANGE REPLICATION FILTER statement. We extend this command to introduce dynamic replication filters per channel, by allowing a FOR CHANNEL clause as follows. ---- Syntax ---- CHANGE REPLICATION FILTER filter [, filter...] [FOR CHANNEL ] filter: REPLICATE_DO_DB = (db_list) | REPLICATE_IGNORE_DB = (db_list) | REPLICATE_DO_TABLE = (tbl_list) | REPLICATE_IGNORE_TABLE = (tbl_list) | REPLICATE_WILD_DO_TABLE = (wild_tbl_list) | REPLICATE_WILD_IGNORE_TABLE = (wild_tbl_list) | REPLICATE_REWRITE_DB = (db_pair_list) ---- Semantics ---- 1) If an explicit FOR CHANNEL clause is provided, the statement acts on that configured slave replication channel removing any existing replication filter if it has the same filter type as one of specified replication filters, and replacing them with the specified ones. Filter types that were not explicitly listed in the statement are not modified. The statement is disallowed with an error 'ER_SLAVE_CONFIGURATION' on slave replication channel if it is not configured. The statement is disallowed with an error 'ER_SLAVE_CHANNEL_OPERATION_NOT_ALLOWED' on group replication channels. For example, --replicate-do-db=ch_1:my_db1 --replicate-do-db=ch_1:my_db2 --replicate-do-db=ch_1:my_db3 --replicate-ignore-db=ch_1:my_db4 --replicate-ignore-db=ch_1:my_db5 --replicate-ignore-db=ch_1:my_db6 CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(`db3`.initfilet3) FOR CHANNEL 'ch_1'; CHANGE REPLICATION FILTER REPLICATE_WILD_DO_TABLE=('initfiled%.t%') FOR CHANNEL 'ch_1'; CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db1,db2,`db32`, `db,3`), REPLICATE_DO_DB = (my_db3, my_db4), Replicate_Ignore_DB = (my_initfiledb3) FOR CHANNEL 'ch_1'; (Note: the above statement is equal to the following three statements: CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db1,db2,`db32`, `db,3`) FOR CHANNEL 'ch_1'; CHANGE REPLICATION FILTER REPLICATE_DO_DB = (my_db3, my_db4) FOR CHANNEL 'ch_1'; CHANGE REPLICATION FILTER Replicate_Ignore_DB = (my_initfiledb3) FOR CHANNEL 'ch_1';) SHOW SLAVE STATUS FOR CHANNEL 'ch_1' displays the following filters: Replicate_Do_DB = 'my_db3,my_db4' Replicate_Ignore_DB = 'my_initfiledb3' Replicate_Do_Table = '' Replicate_Ignore_Table = 'db3.initfilet3' Replicate_Wild_Do_Table = 'initfiled%.t%' Replicate_Wild_Ignore_Table = '' Replicate_Rewrite_DB = ''. (SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_filters displays the same result with different format.) 2) CHANGE REPLICATION FILTER filter [, filter...] with no FOR CHANNEL clause does the following, both for every configured slave replication channel's per-channel filter and for the global replication filters: For every filter type, if the filter type is listed in the statement, then any existing filter rules of that type are replaced by the filter rules specified in the statement, otherwise the old value of the type is retained. The statement does not act on group replication channels, because replication filters on group replication channels are disallowed. For example, Suppose channels '', 'ch_1', 'ch_2' exist before the server starts and the following options and statements are used: --replicate-do-db=db1 --replicate-do-db=:db1 --replicate-do-db=:db2 --replicate-do-db=ch_1:db4 --replicate-do-db=ch_1:db5 --replicate-do-db=ch_3:db6 --replicate-wild-do-table=db.t1% --replicate-wild-ignore-table=ch_1:db.t2% # Server startup writes the following to the error log: Warning: There are per-channel replication filter(s) configured for channel 'ch_3' which does not exist. The filter(s) have been discarded. SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_filters; CHANNEL_NAME FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE COUNTER REPLICATE_DO_DB db1,db2 STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_DO_DB db4,db5 STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_1 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_WILD_IGNORE_TABLE db.t2% STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_2 REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 ... 0 ch_2 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_global_filters; FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 11:53:45.11341 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 11:53:45.11341 CHANGE MASTER TO ... FOR CHANNEL ch_3; SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_filters; CHANNEL_NAME FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE COUNTER REPLICATE_DO_DB db1,db2 STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_DO_DB db4,db5 STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_1 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_WILD_IGNORE_TABLE db.t2% STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_2 REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 ... 0 ch_2 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_3 REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 ... 0 ch_3 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 CHANGE REPLICATION FILTER REPLICATE_DO_DB = (dbA) FOR CHANNEL ''; CHANGE REPLICATION FILTER REPLICATE_DO_DB = () FOR CHANNEL 'ch_1'; SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_filters; CHANNEL_NAME FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE COUNTER REPLICATE_DO_DB dbA CHANGE_REPLI..._FOR_CHANNEL 2016-12-02 ... 0 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_DO_DB CHANGE_REPLI..._FOR_CHANNEL 2016-12-02 ... 0 ch_1 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_WILD_IGNORE_TABLE db.t2% STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_2 REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 ... 0 ch_2 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_3 REPLICATE_DO_DB db1 STARTUP_OPTIONS 2016-12-02 ... 0 ch_3 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 CHANGE REPLICATION FILTER REPLICATE_DO_DB = (dbB); SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_filters; CHANNEL_NAME FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE COUNTER REPLICATE_DO_DB dbA CHANGE_REPLICATION_FILTER 2016-12-02 ... 0 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_DO_DB CHANGE_REPLICATION_FILTER 2016-12-02 ... 0 ch_1 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_1 REPLICATE_WILD_IGNORE_TABLE db.t2% STARTUP_OPTIONS_FOR_CHANNEL 2016-12-02 ... 0 ch_2 REPLICATE_DO_DB db1 CHANGE_REPLICATION_FILTER 2016-12-02 ... 0 ch_2 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 ch_3 REPLICATE_DO_DB db1 CHANGE_REPLICATION_FILTER 2016-12-02 ... 0 ch_3 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 ... 0 SELECT * FROM PERFORMANCE_SCHEMA.replication_applier_global_filters; FILTER_NAME FILTER_RULE CONFIGURED_BY ACTIVE_SINCE REPLICATE_DO_DB dbB CHANGE_REPLICATION_FILTER 2016-12-02 11:59:21.12689 REPLICATE_WILD_DO_TABLE db.t1% STARTUP_OPTIONS 2016-12-02 11:59:21.12689 C. SQL COMMAND: RESET SLAVE [ALL] [FOR CHANNEL] =============================================== 1) "RESET SLAVE FOR CHANNEL ''" does not remove the replication channel specified by 'FOR CHANNEL' clause, so it shall retain replication filters of the channel. It throws an error 'ER_SLAVE_CHANNEL_DOES_NOT_EXIST' if the channel does not exist. So this statement is not changed by the worklog. 2) "RESET SLAVE" does not remove any replication channel, so it shall retain all per-channel replication filters and all global replication filters. So this statement is not changed by the worklog. 3) "'RESET SLAVE ALL FOR CHANNEL ''" removes the replication channel specified by 'FOR CHANNEL' clause, so it shall remove all per-channel replication filters of the channel if the channel exists. Then SELECT * FROM performance_schema.replication_applier_filters and SHOW SLAVE STATUS proves there's no channel anymore and therefore its replication filters are gone too. It still throws an error 'ER_SLAVE_CHANNEL_DOES_NOT_EXIST' if the channel does not exist as before. 4) "RESET SLAVE ALL" with no FOR CHANNEL clause removes all replication channels, so it shall remove all per-channel replication filters but does not touch all global replication filters. When the new empty channel is being configured, it therefore uses the global replication filters (copies all global replication filters to its own per-channel replication filters). A user who wants to remove all global and per-channel filters can use the statement: CHANGE REPLICATION FILTER Replicate_Do_DB = (), Replicate_Ignore_DB = (), Replicate_Do_Table = (), Replicate_Ignore_Table = (), Replicate_Wild_Do_Table = (), Replicate_Wild_Ignore_Table = (), Replicate_Rewrite_DB = (). D. SQL COMMAND: SHOW SLAVE STATUS [FOR CHANNEL ] ============================================================== SHOW SLAVE STATUS FOR CHANNEL shall show per-channel replication filters for the specified channel, or throw an error 'ER_SLAVE_CHANNEL_DOES_NOT_EXIST' if the channel does not exist. SHOW SLAVE STATUS with no FOR CHANNEL clause shall show the per-channel replication filters on every channel. E. CREATE A NEW performance_schema.replication_applier_filters ============================================================== We shall introduce a new dedicated P_S table to display per-channel replication filters for usability. So create and maintain the new P_S table with the following columns: 1) Channel_name: the name of the channel; 2) Filter_name: REPLICATE_DO_DB, REPLICATE_IGNORE_DB, REPLICATE_DO_TABLE, REPLICATE_IGNORE_TABLE, REPLICATE_WILD_DO_TABLE, REPLICATE_WILD_IGNORE_TABLE, REPLICATE_REWRITE_DB; 3) Filter_rule: The values that user has configured with startup options: --replicate-* or through CHANGE REPLICATION FILTER command (This also includes empty set when user unsets the rules). 4) Configured_by: ENUM(STARTUP_OPTIONS, CHANGE_REPLICATION_FILTER, STARTUP_OPTIONS_FOR_CHANNEL, CHANGE_REPLICATION_FILTER_FOR_CHANNEL); (These enumeration constants are the most self-descriptive set of identifiers, and supporting all the use cases: U1. Reflect the configured commands; U2. Determine if the filter has been persisted; U3. Debugging by a confused user, or learn the logic of default filters by playing with different ways to set them.) 5) Active_since: Timestamp of when the configuration took place; (To a new channel copying the global replication filters as its own per-channel filters, set 'active_since' to channel creation time.) 6) Counter: the hit counter of the filter since last configuration; Note: (4) and (5) are important to troubleshooting. (6) is more about statistics (and monitoring). So the table is created as below: CREATE TABLE performance_schema.replication_applier_filters( CHANNEL_NAME CHAR(64) collate utf8_general_ci not null, FILTER_NAME CHAR(64) collate utf8_general_ci not null, FILTER_RULE LONGTEXT not null, CONFIGURED_BY ENUM('STARTUP_OPTIONS','CHANGE_REPLICATION_FILTER','STARTUP_OPTIONS_FOR_CHANNEL,','CHANGE_REPLICATION_FILTER_FOR_CHANNEL') not null, ACTIVE_SINCE TIMESTAMP(6) NOT NULL default 0, COUNTER bigint(20) unsigned NOT NULL default 0 ) ENGINE=PERFORMANCE_SCHEMA; F) CREATE A NEW performance_schema.replication_applier_global_filters ====================================================================== We shall introduce a new dedicated P_S table to display all global replication filters for usability. So create and maintain the new P_S table with the following columns: 1) Filter_name: REPLICATE_DO_DB, REPLICATE_IGNORE_DB, REPLICATE_DO_TABLE, REPLICATE_IGNORE_TABLE, REPLICATE_WILD_DO_TABLE, REPLICATE_WILD_IGNORE_TABLE, REPLICATE_REWRITE_DB; 2) Filter_rule: The values that user has configured with startup options: --replicate-* or through CHANGE REPLICATION FILTER command (This also includes empty set when user unsets the rules). 3) Configured_by: ENUM(STARTUP_OPTIONS, CHANGE_REPLICATION_FILTER); 4) Active_since: Timestamp of when the configuration took place; So the table is created as below: CREATE TABLE performance_schema.replication_applier_filters( FILTER_NAME CHAR(64) collate utf8_general_ci not null, FILTER_RULE LONGTEXT not null, CONFIGURED_BY ENUM('STARTUP_OPTIONS','CHANGE_REPLICATION_FILTER') not null, ACTIVE_SINCE TIMESTAMP(6) NOT NULL default 0, ) ENGINE=PERFORMANCE_SCHEMA;
The idea of the implementation is as follows:
1. Create a struct called filter_map which maps a channel name
   to it's Rpl_filter object. (Note, that Rpl_filter already
   exists in the present code).

   typedef std::map Rpl_filter_map;


2. Create a global object for storing per-channel replication filters

   Multisource_filter_info rpl_filter_map;

3. For a particular channel, have a pointer allocated to refer
   to a per-channel filter. Since filtering is about execution
   by applier threads, we place it in rli. In short, add
   Rpl_filter* to Relay_log_info

   Class Relay_log_info
   {
     public:
        Rpl_filter *rpl_filter;
   }

4. When a channel is being configured, establish the relation between
   per-channel filter and the Channel's Relay_log_info

   rpl_filter= rpl_filter_map.get_channel_filter(channel);
   rli->set_filter(rpl_filter);

   And then take a copy of global_rpl_filter if needed.
   The filter is removed from the map when the channel is deleted.

5. Wherever there is a filter logic, (for example: in log_event.cc
   or sql_parse.cc) make filtering per channel. This means,
   in the present code, following is done:

   a) Replace rpl_filter with rli->rpl_filter
   b) Replace rpl_filter with thd->rli_slave->rpl_filter
   c) Replace rpl_filter with mi->rli->rpl_filter

6. Create a new performance_schema.replication_applier_filters table by the
   similar approach of creating the
   performance_schema.replication_applier_configuration table with the
   following columns:
     1) Channel_name: the name of the channel;
     2) Filter_name: REPLICATE_DO_DB, REPLICATE_IGNORE_DB,
                     REPLICATE_DO_TABLE, REPLICATE_IGNORE_TABLE,
                     REPLICATE_WILD_DO_TABLE, REPLICATE_WILD_IGNORE_TABLE,
                     REPLICATE_REWRITE_DB;
     3) Filter_rule: The values that user has configured with startup
                     options: --replicate-* or through CHANGE REPLICATION
                     FILTER command (This also includes empty set when user
                     unsets the rules).
     4) Configured_by: ENUM(STARTUP_OPTIONS, CHANGE_REPLICATION_FILTER,
                            STARTUP_OPTIONS_FOR_CHANNEL,
                            CHANGE_REPLICATION_FILTER_FOR_CHANNEL);
     5) Active_since: Timestamp of when the configuration took place;
                      (To a new channel copying the global replication
                       filters as its own per-channel replicaation filters,
                       set 'active_since' to channel creation time.)
     6) Counter: the hit counter of the filter since last configuration;

7. Create a new performance_schema.replication_applier_global_filters table
   by the similar approach of creating the
   performance_schema.replication_applier_filters table.