WL#10500: MTA: Optimized and generic change stream applier
EXECUTIVE SUMMARY
In this worklog, we present a new applier for replicated transactions, called the Change Stream Applier. Our primary objectives are to:
- Improve performance
- Increase maintainability
- Make the design more modular, allowing for easier adoption of new features in the future
We will retain the existing command interface for the most part to ensure ease of migration for existing users, while modifying the application components to improve efficiency and scalability. We will also generalize and encapsulate the interaction with the Relay Log (RL) into an API, making it easier to replace or modify in the future if needed. Notably, the receiver side and storage engine interfaces will remain unchanged.
The current applier design has several limitations, including:
Before WL#10500, the number of workers could only be configured globally, applying the same value to every channel, and giving all channels the same amount of resources. We enable configuring the number of workers per channel which may provide a means to control priority between channels.
Receiver thread and applier communicate via the Replication Storage, i.e., the Relay log. There is no direct connection between Receiver and the Applier.
The current architecture suffers from tight coupling between key components, including Applier, Session, Transaction Coordinator, and Storage, resulting in low cohesion. This leads to increased maintenance costs and complexity when introducing new features. To address this, our goal is to create an extensible design by abstracting core concepts such as Storage, Threads, Transaction Dependencies, Thread Pools, Receivers, Appliers, Statistics, Resource Monitors, and others.
Currently, the concepts of threads, transactions, and sessions are intertwined, lacking a clear definition of a lightweight thread that can be allocated to execute transactions or parts of transactions. This limits parallelism. The proposed design aims to decouple these concepts, laying the groundwork for future support of intra-transaction parallelism by allowing multiple workers to apply transactions, thereby paving the way for potential concurrency improvements within individual transactions.
Key enhancements:
Enhances parallelism: transactions wait for dependencies, but subsequent transactions can proceed immediately. The apply and commit phases are separated. When the next transaction is ready for commit but the previous one has not finished its apply phase, the worker will not be blocked waiting for its turn to commit. Instead, it will pick another parallel transaction to apply.
Enables parallel relay log purging: Logs are purged as soon as consumed, with prior files ready for deletion.
Makes the applier responsive to STOP REPLICA commands.
Decouples core components (Applier, Session, Scheduler, Storage) for better cohesion and easier feature extension.
Supports per-channel worker configuration via APPLIER_WORKER_COUNT in CHANGE REPLICATION SOURCE TO.
Introduces APPLIER_VERSION to select the applier and APPLIER_EVENT_MEMORY_LIMIT for per-channel limit of binlog event caching.
Generalizes Relay Log interaction via a new API for future replaceability.
Supports row-based binary logging with GTIDs; lays groundwork for intra-transaction parallelism.
Retains unchanged receiver thread and storage engine interfaces for seamless migration.
Adds extensible components for applier statistics, resource management, and monitoring.
Decouples event application logic from transaction scheduling.
Allows workers to read transactions from storage in parallel, releasing event memory upon consumption.
Unifies transaction execution and retry logic.
Achieves an extendable and maintainable design, with added classes prepared for extendable transaction dependency enforcement, statistics monitoring and resource management, featuring named resources and named statistics through a consistent API.
USER/DEV STORIES
US1. As a user, I want the replica to have at least 70% (TBC) throughput of the source, assuming the same shape:
- so that I can apply my workload faster
US2. As a replication developer, I want the replica to have a robust design:
so that is easy to introduce new features
so that is easy to introduce new unit tests and spot bugs in the code
so that I can exchange existing algorithms for more efficient versions in order to improve the replica performance
US3. As a replication developer, I want the replica to be designed with a goal that it should be possible to extend towards supporting intra-transaction parallelism and streaming replication.
SCOPE
The scope of this worklog assumes designing and implementing a new, Generic Change Stream Applier, redefining its communication and synchronization patterns between replication entities such as Receiver thread, Coordinator thread, Applier Worker Threads and the Relay log. Additionally, the goal is to develop an adaptable architecture that enables effortless extension with new features, ensuring minimal impact on the current implementation. For the time being, we keep the current user interface, meaning commands used to create and steer Replication Channels. To steer the new Change Stream Applier, we extend the CHANGE REPLICATION SOURCE TO command with new options: * APPLIER_VERSION: This option enables users to specify the applier version to be used. * APPLIER_WORKER_COUNT: This option allows users to configure the number of worker threads allocated to a channel for the new Change Stream Applier. * APPLIER_EVENT_MEMORY_LIMIT: This option allows users to specify a per-channel memory limit for caching binlog events These enhancements provide greater flexibility and control over replication configuration.
LIMITATIONS
APPLIER_WORKER_COUNT option of the "CHANGE REPLICATION SOURCE TO" applies only to the Change Stream Applier.
APPLIER_EVENT_MEMORY_LIMIT option of the "CHANGE REPLICATION SOURCE TO" applies only to the Change Stream Applier.
Since the Change Stream Aplier does not support all of the MTA features, replication is limited to sources supporting row-based binary logging and identifying transactions using GTIDs.
The features not supported in the Change Stream Applier are:
- Statement or Mixed mode binary logs (binlog-format=STATEMENT, binlog-format=MIXED)
- filename and offset replication channels
- skipping parts of transactions (sql-replica-skip-counter), feature is not needed when using GTIDs
- until clauses other than SQL_BEFORE_GTIDS, SQL_AFTER_GTIDS
- delayed applier
- GTID_ONLY=0
- REQUIRE_ROW_FORMAT=0
- replica_pending_jobs_size_max
- replication from GTID disabled sources (ASSIGN_GTIDS_TO_ANONYMOUS_TRANSACTIONS)
- extended applier statistics
- GTID_MODE equal to OFF, OFF_PERMISSIVE, ON_PERMISSIVE
- legacy VCLE
- IGNORE_SERVER_IDS option of the CRST command (feature is not needed when using GTIDs, applied GTIDs are automatically skipped)
CSA will support all MTA features not listed in the above list.
Software Overview
Architecture Overview
The Change Stream Applier (CSA) is a new, generic applier component for MySQL replication that replaces the legacy Multi-threaded Applier (MTA). It introduces a modular, extensible architecture designed to improve performance, maintainability, and scalability. The CSA operates across four layers:
- Concurrency Layer: Provides thread-safe synchronization primitives, lock-free data structures, and concurrent queues for efficient inter-thread communication.
- Scheduling Layer: Implements a scheduler with dependency management, enabling parallel task execution while respecting transaction ordering constraints.
- Service Layer: Orchestrates the core applier logic, including transaction processing, session management, and error handling.
- Server Layer: Integrates the Service Layer with existing MySQL replication infrastructure, handling command interfaces (CRST) and storage interactions.
The CSA maintains compatibility with existing replication interfaces while abstracting the underlying Change Stream implementation (initially the Relay Log, extensible to other sources).
Key Components
The CSA comprises the following key components:
- Change Stream Applier Service: A singleton instance managing channels
configured with CSA, orchestrating initialization, transaction processing,
and shutdown of the applier. It runs the main processing loop (via its
run()function) to continuously retrieve jobs from the Transaction Provider, calculate dependencies using a Dependency Adapter, create executable tasks with schedules, and submit them to the Scheduler. It also provides functions for shutdown (stop()) and statistics access (get_statistics()), while creating and managing instances of all core components per channel. - Dependency Adapter: Calculates transaction dependencies based on metadata like last_committed and sequence_number from GTID events, providing an abstract interface for pluggable implementations to translate dependencies into formats usable by the Scheduler Clock and Dependency Tracker.
- Transaction Provider: On request, reads transactions from Replication Storage and returns them as Jobs to the requester, supporting both synchronous and asynchronous variants. It extracts raw metadata such as last_committed and sequence_number from GTID events in the relay log. For relay-log-backed jobs it uses streaming-transaction applier dispatch. It may return the Job immediately after GTID and publish the remaining events incrementally into the fetchable transaction.
- Replication Storage: Abstracts access to the Relay Log, enabling multiple concurrent readers, automatic purging, and prefetching.
- Scheduler: Categorizes tasks by readiness based on timing and dependency constraints using Scheduler Clocks and Dependency Trackers, then dispatches ready tasks to the Thread Pool for execution. Dependencies are modeled abstractly between phase endpoints (end of one phase to start of another), enabling both end-to-start (e.g., when one task completes, another can start) and start-to-start (e.g., when one task enters a phase, another can enter the same phase) semantics without transaction-specific details.
- Scheduler Clock: Handles time-based scheduling dependencies between tasks, determining when tasks can start based on logical time, with implementations like LWM Clock (for global transaction ordering) and Commit Order Clock (for commit registration sequencing, i.e. RPCO). The LWM Clock (Clock_lwm_registry) computes the Low Water Mark as the maximum task ID of consecutively executed transactions, using the sequuential task id of finished tasks and leveraging last_committed and sequence number information from the Gtid_log_event. Transactions with a required LWM <= current LWM clock value may execute immediately, while those with LWM > current LWM clock value must wait for the clock to advance. This enables parallel execution within the same logical time window while preserving global order. One Scheduler Clock instance is instantiated per Scheduler (thus per replication channel), though multiple clocks can be registered for different task phases. The Scheduler Clock manages timing dependencies. Dependencies are not queried from the Scheduler Clock; instead, the Scheduler uses the clock to determine when tasks can run based on time windows.
- Dependency Tracker: Maintains task-to-task execution dependencies (e.g., end-to-start relationships) between tasks, ensuring correct execution order. The policy to determine dependencies for a task is pluggable, so that we can evolve the representation of dependencies in the future.
- Thread Pool: A pool of configurable worker threads executing Tasks concurrently. Tasks are executable wrappers around Jobs (or other units of work) supplied via a queue. The queue is populated by the Scheduler, which enqueues Tasks for the "apply" phase or "commit" phase of a Job, depending on the Job's progress and schedule.
- Communication Queue: Synchronized queue for thread-safe communication between producers and consumers.
- Session Service: Manages database sessions (THD contexts and Relay Log Info), providing thread-safe acquisition and release.
- Jobs: Abstract units of work representing transactions, encapsulating execution logic for phase-based application (single-phase for transactions that run with RPCO disabled or multi-phase for commit-order preservation), with support for retries and error handling. Each transaction is mapped to one Job instance, which is divided into phases (e.g., apply, register for commit, commit). Jobs implement a generic interface executed polymorphically by the Thread Pool, with specialized implementations like Job_applier for binlog events. Dependencies are established between Jobs (transactions), while phases are an internal execution mechanism handled by the Scheduler (via implemented Schedule).
- Statistics Monitor: Storage for statistics/metrics. Monitored threads can submit samples, and monitoring threads can query metrics. All access is lock-free.
- Resource Monitor: Singleton component that provides functions for other threads to check and enforce per-channel memory limits for event caching, allowing threads to wait for available memory and preventing excessive usage by individual channels.
- Transaction Conflict Managers: Dedicated threads (one per replication channel) resolving RPCO deadlocks via coordinated rollbacks. They dequeue deadlock resolution requests enqueued by worker threads detecting conflicts and perform asynchronous rollbacks.
- Prefetcher: Optional asynchronous thread prefetching Relay Log data to reduce I/O latency.
- Commit Order Manager: MTA Commit Order Manager, reused in CSA
Layer-Component Mapping
The four architectural layers represent functional partitions of the CSA software, while the Key Components are the concrete building blocks that implement these layers. The following mapping clarifies the relationship:
- Concurrency Layer: Thread Pool, Communication Queue
- Scheduling Layer: Scheduler, Scheduler Clock, Dependency Tracker
- Service Layer: Change Stream Applier Service, Transaction Provider, Session Service, Jobs, Statistics Monitor, Resource Monitors, Transaction Conflict Managers, Prefetcher.
- Server Layer: Replication Storage, Commit Order Manager.
Static and Dynamic System Flow
The CSA follows an event-driven, producer-consumer architecture that processes replicated transactions through coordinated phases:
Transaction Ingestion and Dependency Resolution
- The Transaction Provider reads transaction metadata or full events from the Relay Log, creating Jobs (transaction units of work). For relay-log-backed jobs, it may dispatch a Job after GTID and stream the remaining transaction events incrementally to the worker.
- A Dependency Adapter analyzes GTID log events to calculate transaction dependencies using metadata such as last_committed and sequence_number, providing an abstract interface for dependency resolution that integrates with the Scheduler Clock and Dependency Tracker.
- Jobs are scheduled via a Scheduler that enforces end-to-start and start-to-start dependencies, queuing tasks for execution when timing constraints are met.
Parallel Execution with Ordering
- The Thread Pool executes scheduled jobs concurrently across configurable worker threads.
- For transactions requiring commit order preservation (RPCO), jobs normally
execute in phases:
- Apply Phase: Events are applied to the replica database.
- Register for Commit Phase: Transactions register for commit in strict sequence.
- Commit Phase: After registration, the committing worker executes the commit while preserving source commit order via the Commit Order Manager. On retry after a commit-phase failure, the job is replayed in a single scheduler dispatch. The retry path does not return to the scheduler shelf between prepare and commit and does not perform a second Commit Order Manager registration within the same transaction lifecycle.
- A Session Service manages database sessions, providing thread-safe access to THD contexts and Relay Log Info (RLI).
Synchronization and Resource Management
- Communication between components uses Synchronized Queues (bounded FIFO buffers with lock-striping and atomic operations).
- A Statistics Monitor tracks performance metrics across threads and channels.
- Concurrent Relay Log purging ensures automatic cleanup without blocking replication.
- Resource Monitors enforce per-channel memory limits for event caching.
Error Handling and Recovery
- Transient errors trigger automatic job retries.
- Transaction Conflict Managers resolve RPCO deadlocks by coordinating rollbacks for transactions that have already finished apply and were handed to external rollback ownership. Transactions that already entered commit ownership stay on the commit path and are signaled there instead of being rescued externally.
- Fatal errors halt the CSA service, reporting detailed diagnostics and preserving data consistency.
Key Dynamic Behaviors
- Adaptive Reading: Switches between cached metadata and direct Relay Log access based on transaction size and worker load.
- Streaming-Transaction Applier Dispatch: For relay-log-backed jobs, the Job may be dispatched after GTID while the remaining events are still being published into the fetchable transaction. This applies both when events are later fetched from cached payload and when they are read on demand from the relay log. If publication is interrupted, the partial transaction is marked truncated so the worker can roll it back silently and the next Job can replay it from source.
- Time-Based Scheduling: Uses logical timestamps to schedule transactions in execution windows, enabling efficient parallel processing within windows while respecting dependencies.
- Phase-Based Execution: Supports both single-phase (RPCO disabled) and multi-phase (RPCO enabled) transaction application.
- Scalable Parallelism: Configurable worker counts per channel allow fine-grained resource allocation.
- Asynchronous Prefetching: Background threads (one per channel) prefetch Relay Log data to reduce I/O latency, controlled by a global configuration option and autonomously enabled on inactive relay log files.
Dynamic View Walkthrough
The Change Stream Applier (CSA) operates through coordinated threads that process transactions asynchronously where possible, while maintaining synchronous dependencies for ordering. Below is a walkthrough of the dynamic flow, highlighting thread actions, component interactions, and sync/async behavior.
Initialization Phase (Synchronous)
- START REPLICA starts an asynchronous thread (Change Stream Applier main thread) that synchronously initializes CSA service components (Transaction Provider, Log Prefetcher if enabled, Scheduler, Thread Pool, Session Service). Registers the RPCO phase if commit order preservation is enabled. Starts additional asynchronous threads for replication processing while initializing the components.
Transaction Processing Loop
1. Transaction Provider (Optionally Asynchronous)
- Action: When asynchronous, it runs in a separate thread, continuously reading from the Relay Log. When synchronous, parses the next transaction from the relay log. Transaction Provider parses transaction boundaries, extracts metadata (last_committed, sequence_number), and creates Jobs. For relay-log-backed jobs it may return the Job after the GTID event and keep publishing the remaining transaction events incrementally.
- Interactions:
- Async Transaction provider:
- Synchronous read from Relay Log.
- Asynchronous enqueue Jobs into a queue connecting coordinator (SQL) thread
- Sync Transaction provider:
- Synchronous read from Relay Log.
- Synchronous return Jobs for further coordination by the CSA Service
2. Scheduler Thread (Asynchronous)
- Action: Monitors task queues, checks dependencies, and dispatches ready tasks to Thread Pool.
- Key Phases:
- Check Phase Queues: Synchronously inspects phase queues (e.g., commit phase) for tasks ready by phase clock.
- Check Task Queues: Verifies global clock and Dependency Tracker for end-to-start dependencies.
- Dispatch: Asynchronously enqueues tasks to Thread Pool queue.
- Interactions:
- Synchronous updates to LWM Clock and Dependency Tracker.
- Asynchronous callbacks from Thread Pool on task completion (update clocks, mark dependencies met).
- Sync/Async: Async loop; sync operations for dependency checks, async for task dispatch and callbacks.
3. Thread Pool Worker Threads (Asynchronous)
- Action: Multiple workers dequeue and execute Jobs. For RPCO-enabled
transactions:
- Phase 1: Apply events (attach session, wait for the next event when streamed from the Relay Log/cache, fetch, apply).
- Phase 2: Register for commit.
- Phase 3: Commit. In the normal path these phases are split across scheduler
dispatches; in
retry_committhey are replayed in one dispatch.
- Interactions:
- Synchronous session acquisition/release via Session Service.
- Asynchronous fetches from Relay Log or cache.
- Synchronous callbacks to Scheduler on completion (update clocks, notify next tasks).
- Sync/Async: Async execution per task; sync for session ops and callbacks.
4. Prefetcher Thread (Optional, Asynchronous)
- Action: Background thread prefetches Relay Log chunks to reduce I/O latency.
- Interactions:
- Asynchronous reads from Relay Log to prefetch all available data (prefetcher thread runs independently of the main CSA service, allowing concurrent processing of jobs and scheduling by the CSA service).
- Synchronous enqueue of prefetched data into shared buffers (blocking until enqueued).
- CSA service asynchronously retrieves individual jobs from the prefetched data when requested.
- Sync/Async: Fully async; non-blocking prefetching.
5. Transaction Conflict Manager Thread (Asynchronous, On-Demand)
- Action: Dedicated thread per channel handles RPCO deadlock resolution by rolling back conflicting transactions that were explicitly handed to external rollback ownership.
- Interactions:
- Asynchronous dequeue deadlock resolution requests from queue.
- Synchronous rollback via associated RLI (blocks until rollback completes).
- Transactions that already entered commit ownership are not rolled back by this thread; they are signaled on the commit path instead.
- Sync/Async: Async processing; sync rollback execution.
Shutdown Phase (Synchronous)
- The thread executing STOP REPLICA synchronously waits for all async threads (Scheduler, Transaction Provider if asynchronous, Prefetcher if enabled, Thread Pool workers, main Change Stream Applier Thread) to join. Active incomplete streamed transactions are truncated to unblock waiting workers before cleanup completes. Cleans up resources.
Summary
This architecture decouples transaction processing from storage specifics, enabling future extensions for streaming replication and enhanced concurrency models.
FUNCTIONAL REQUIREMENTS:
----- COMPATIBILITY REQUIREMENTS -----
CPT.0010 The Change Stream Applier shall support row-based binary logging
CPT.0020 The Change Stream Applier shall determine transaction dependencies using the same scheme as the Multi-threaded Applier uses when replica_parallel_type=LOGICAL_CLOCK, regardless of the value of replica_parallel_type.
CPT.0030 Following a hardware crash, the Change Stream Applier shall recover its state and resume replication from a consistent point, ensuring data integrity.
CPT.0040 In the event of an error occurring in one replication channel, the Applier shall continue applying changes from other channels without interruption
CPT.0050 The Change Stream Applier shall provide users with the ability to adjust the number of worker threads for each channel, offering flexibility in resource allocation.
CPT.0060 The Change Stream Applier shall be compatible with replication data generated by a source of version 9.7, applying changes correctly.
CPT.0070 The Change Stream Applier shall adhere to the existing CHANGE REPLICATION SOURCE TO interface for configuring channels, ensuring seamless integration and ease of migration for existing users (see INTERFACE REQUIREMENTS).
CPT.0080 The Change Stream Applier shall comply with the established tracking of transaction dependencies, as specified in the GTID Log Event, including last committed and sequence number.
----- PERFORMANCE REQUIREMENTS -----
PE.0010 The Change Stream Applier shall improve the throughput of the replica in some of the workloads.
----- INTERFACE REQUIREMENTS -----
INT.0010: The CHANGE REPLICATION SOURCE TO command shall include an APPLIER_VERSION option, allowing users to specify the applier version for a particular channel.
INT.0011: The APPLIER_VERSION option shall accept integer values: 1 (enabling the Multi-threaded Applier) and 2 (enabling the Change Stream Applier). Values outside this range shall result in an ER_CRST_UNKNOWN_APPLIER_VERSION error.
INT.0012: When creating a new channel, if the APPLIER_VERSION option is not specified, the default applier version shall be 1.
INT.0013: When updating an existing channel, if the APPLIER_VERSION option is not specified, the previously configured applier version shall be retained.
INT.0014: Executing the RESET REPLICA statement shall reset the APPLIER_VERSION to 1 only if the ALL option is specified.
INT.0015: The applier version shall be stored in the slave_relay_log_info system table, in the Applier_version column, with a default value of 1.
INT.0016: The applier version shall also be shown in the replication_applier_configuration PFS table, in the APPLIER_VERSION column.
INT.0017: Changes to the "APPLIER_VERSION" of the existing channel shall be accepted only when the coordinator thread is not running (OFF).
INT.0020: The CHANGE REPLICATION SOURCE TO command shall include an APPLIER_WORKER_COUNT option, allowing users to specify the number of workers for a particular channel.
INT.0021: The APPLIER_WORKER_COUNT option shall only be valid when using the Change Stream Applier (CSA). Otherwise, an ER_CRST_APPLIER_WORKER_COUNT_ONLY_FOR_CSA error shall be returned.
INT.0022: The APPLIER_WORKER_COUNT option shall accept integer values between 1 and 1024. Values outside this range shall result in an ER_CRST_APPLIER_WORKER_COUNT_OUT_OF_RANGE error.
INT.0023: When creating a new channel, if the APPLIER_WORKER_COUNT option is not specified, the replica_parallel_workers server option shall be used as the default number of workers.
INT.0024: When updating an existing channel, if the APPLIER_WORKER_COUNT option is not specified, the previously configured number of workers shall be retained.
INT.0025: Executing the RESET REPLICA statement shall reset the APPLIER_WORKER_COUNT to the value of @@replica_parallel_workers only if the ALL option is specified.
INT.0026: The specified number of workers shall be stored in the slave_relay_log_info system table.
INT.0027: The specified number of workers shall also be shown in the replication_applier_configuration PFS table, in the APPLIER_WORKER_COUNT column.
INT.0028: Changes to the "APPLIER_WORKER_COUNT" of the existing channel shall be accepted only when the coordinator thread is not running (OFF).
INT.0030: The CHANGE REPLICATION SOURCE TO command shall include an APPLIER_EVENT_MEMORY_LIMIT option, allowing users to specify the per-channel memory limit for caching binlog events.
INT.0031: The APPLIER_EVENT_MEMORY_LIMIT option shall only be valid when using the Change Stream Applier (CSA): * when APPLIER_EVENT_MEMORY_LIMIT is already set and the user changes APPLIER_VERSION to 'MTA': This change is permitted; no error is returned. * when APPLIER_VERSION is already set to MTA and the user attempts to set APPLIER_EVENT_MEMORY_LIMIT: This change is not allowed; the applier returns the error ER_CRST_APPLIER_EVENT_MEMORY_LIMIT_ONLY_FOR_CSA. * when the user tries to set both APPLIER_VERSION to MTA and APPLIER_EVENT_MEMORY_LIMIT at the same time: This change is not allowed; the applier returns the error ER_CRST_APPLIER_EVENT_MEMORY_LIMIT_ONLY_FOR_CSA. * when the user specifies APPLIER_VERSION=CSA and sets APPLIER_EVENT_MEMORY_LIMIT: This change is permitted; no error is returned.
INT.0032: If the user sets the APPLIER_EVENT_MEMORY_LIMIT option to the value
lower than the current value of the replica_max_allowed_packet,
the system shall accept the new configuration but issue the
ER_WARN_CRST_APPLIER_EVENT_MEMORY_LIMIT_OUT_OF_RANGE warning.
INT.0033: The Change Stream Applier shall use the limit for the cached events equal to max(replica_max_allowed_packet, APPLIER_EVENT_MEMORY_LIMIT).
INT.0034: When creating a new channel, if the APPLIER_EVENT_MEMORY_LIMIT option is not specified, the default value shall be 0.
INT.0035: When updating an existing channel, if the APPLIER_EVENT_MEMORY_LIMIT option is not specified, the previously configured memory limit shall be retained.
INT.0036: Executing the RESET REPLICA statement shall reset the APPLIER_EVENT_MEMORY_LIMIT to the default value only if the ALL option is specified.
INT.0037: The specified event memory limit shall be stored in the slave_relay_log_info system table, in the Applier_event_memory_limit column, with a default value of 0.
INT.0038: The specified event memory limit shall also be shown in the replication_applier_configuration PFS table, in the APPLIER_EVENT_MEMORY_LIMIT column.
INT.0039: Changes to the "APPLIER_EVENT_MEMORY_LIMIT" of the existing channel shall be accepted only when the coordinator thread is not running (OFF).
INT.0040: The Change Stream Applier Service shall start channel threads when the service is enabled and the user executes the START REPLICA [SQL_THREAD] command.
INT.0050: The Change Stream Applier Service shall stop channel threads when the service is enabled and the user executes the STOP REPLICA [SQL_THREAD] command.
INT.0060: The server shall validate CSA-specific prerequisites when executing the CHANGE REPLICATION SOURCE TO command only when:
- The
APPLIER_VERSIONoption explicitly requests CSA (value 2) OR - The channel is currently CSA-enabled AND
APPLIER_VERSIONis unspecified
INT.0070: If CSA validation is triggered (INT.0060), the system shall reject the CRST command if:
- SQL delay is currently enabled OR
- SQL delay is explicitly set to a non-zero value.
Error: ER_CSA_CRST_DELAYED_APPLIER_NOT_SUPPORTED with channel name.
Post-change state: Options of the channel unchanged.
INT.0080: If CSA validation is triggered (INT.0060), the system shall reject the CRST command if:
- REQUIRE_ROW_FORMAT is currently disabled AND not explicitly enabling OR
- Explicitly disabling the REQUIRE_ROW_FORMAT.
Error: ER_CSA_CRST_REQUIREMENT_ROW_FORMAT with channel name.
Post-change state: Options of the channel unchanged.
INT.0090: If CSA validation is triggered (INT.0060), the system shall reject the CRST command if:
- GTID_ONLY is currently disabled AND not explicitly enabling OR
- Explicitly disabling the GTID_ONLY.
Error: ER_CSA_CRST_REQUIREMENT_GTID_ONLY with channel name.
Post-change state: Options of the channel unchanged.
----- DESIGN: CHANGE STREAM APPLIER SERVICE -----
DES.CSA.0010: The Change Stream Applier Service, a singleton instance managing channels configured with APPLIER_VERSION=CSA, shall have its run function invoked upon starting the channel configured with the "CSA" version of the applier to apply events on the replica.
DES.CSA.0020: The Change Stream Applier Service shall use the Transaction Provider to get Jobs.
DES.CSA.0030: The Change Stream Applier Service shall be a singleton instance that manages channels configured with APPLIER_VERSION=CSA.
DES.CSA.0040: To determine the correct order and timing of transactions, the Change Stream Applier shall utilize a Dependency Adapter.
DES.CSA.0041: The Dependency Adapter, instantiated as one instance per replication channel, shall analyze GTID log events to extract last_committed and sequence_number information for determining transaction dependencies, and shall be invoked by the CSA service to calculate dependencies for each job, taking job parameters (e.g., task ID, sequence number, commit parent) and returning dependency information (e.g., clock delay and resolved task ID).
DES.CSA.0042: The Dependency Adapter shall establish end-to-start dependencies between transactions based on the LWM Clock mechanism, i.e. calculate the time value (LWM value) the task will be allowed to run.
DES.CSA.0050: The Change Stream Applier Service shall pass transactions, along with their calculated dependencies and schedules, to the Scheduler component for execution.
DES.CSA.0060: A separate instance of the Scheduler shall be maintained for each defined replication channel by the Change Stream Applier Service.
DES.CSA.0070: The Change Stream Applier Service shall create and manage a unique instance of the Session Service for each replication channel.
DES.CSA.0080: A dedicated instance of the Transaction Provider shall be maintained by the Change Stream Applier Service for each replication channel.
DES.CSA.0090: The Change Stream Applier Service shall allocate a separate instance of the Thread Pool for each replication channel.
DES.CSA.0100: The global Statistic Monitor component shall be responsible for collecting and maintaining statistics related to the Change Stream Applier Service.
DES.CSA.0110: In case of a Change Stream Applier error, the Change Stream Applier Service shall report the error:
Replica SQL for channel '
---- DESIGN: STATISTICS MONITOR ----
DES.STM.0010 The Statistics Monitor, a singleton instance, shall maintain real-time statistics (list in OBSERVABILITY: STATISTICS) for all defined channels, and shall be invoked by monitored threads to submit samples and by monitoring threads to query metrics.
DES.STM.0020 The Statistics Monitor shall use a unique ID of the channel assigned during a successful channel creation (CRST command).
DES.STM.0030 The Statistics Monitor shall provide a lock-free way of updating a concrete statistic (exchange, add, substract) for a specific channel.
DES.STM.0040 The Statistics Monitor shall provide a way of sharding a specific statistic amongst multiple threads using a consistent hashing algorithm, to reduce contention when accessing the same statistics.
DES.STM.0050 The Statistics Monitor shall provide a function for reading the aggregated sum of a sharded statistic, across all shards.
DES.STM.0060 The Statistics Monitor component shall adhere to the Singleton design pattern as defined by the Gang of Four (GoF) patterns, ensuring that only one instance of the monitor is created and globally accessible throughout the system.
---- DESIGN: SCHEDULER ----
DES.SH.0010 The Scheduler, instantiated as a separate instance per replication channel, shall intake tasks from the Change Stream Applier Service and trigger their execution via the Thread Pool once all timing and dependency prerequisites for each task have been met.
DES.SH.0020 The Scheduler shall accept and process callable objects as tasks.
DES.SH.0030 The scheduler shall allow for specifying end-to-start dependencies between tasks.
DES.SH.0040 The scheduler shall allow for specifying a time-based end-to-start dependency for a task, enabling a task to start after a preceding task has completed.
DES.SH.0050 The scheduler shall enable registration of task phases with independent timing dependencies, as specified within a task.
DES.SH.0060 A task phase shall be scheduled based on its assigned clock, which is specifically registered for that phase.
DES.SH.0070 The scheduler shall support both end-to-start and start-to-start clock dependencies for registered task phases.
DES.SH.0080 When a start-to-start clock dependency is defined for a task phase, the scheduler shall invoke the task: Initially, when the task's time dependency is fulfilled, to release the next dependent phase/task and immediately thereafter, to allow the rest of the task phase to execute.
DES.SH.0090 For a task phase with an end-to-start clock dependency, the scheduler shall execute the task a single time, when the task's time dependency is satisfied.
DES.SH.0100 The scheduler shall utilize a high-performance Communication Queue to interface with the Thread Pool.
DES.SH.0110 The scheduler shall expose a callback called when a task finished its execution.
DES.SH.0120 The scheduler shall offer a callback function that is invoked upon task completion.
DES.SH.0130 Time dependencies for tasks shall be managed by the LWM Clock (Clock_lwm_registry) implementation.
DES.SH.0140 End-to-end ordered dependencies for tasks shall be managed by a specialized Dependency Tracker implementation.
DES.SH.0150 The scheduler thread shall execute in the following sequence: - check if any task in phase queue is ready according to phase clock, push ready tasks to the thread pool - check if any task in a task waiting queue is ready according to the global clock, push to tasks waiting for dependencies or to the thread pool if no dependencies are defined - check if any task is waiting for dependencies by consulting the Dependency tracker, push ready tasks to the thread pool
---- DESIGN: DEPENDENCY TRACKER ----
DES.DT.0010: The Dependency Tracker, instantiated as one instance per Scheduler (thus per replication channel), shall enable the definition of end-to-start dependencies between pairs of tasks, establishing a clear relationship between them, and shall be used by the Scheduler to manage task dependencies.
DES.DT.0020: The Dependency Tracker design shall be modular and extensible, allowing for various implementations, including dependency graphs, to be integrated seamlessly.
DES.DT.0030: The Dependency Tracker shall provide a mechanism to verify whether dependencies for a specific task have been satisfied, ensuring that tasks are executed in the correct order, and determine which successors shall be notified upon predecessor completion.
---- DESIGN: COMMIT ORDER ----
DES.CO.0010: The Change Stream Applier Service's commit order implementation shall align with the existing Commit Order Manager used in the Multi-threaded Applier, ensuring consistency and compatibility.
DES.CO.0020: Transactions that follow the source's commit order shall be executed in a two-phase approach: 1. Apply Phase: Transactions shall be applied and prepared for commit with end-to-end dependencies, adhering to the global clock. 2. Commit Phase: Tasks shall be committed with start-to-start dependencies, following the commit order clock, involving: * Sequential registration for commit (initial task invocation). * Parallel commit (subsequent task invocation).
---- DESIGN: SCHEDULER CLOCK ----
DES.SC.0010: The Scheduler Clock, instantiated as one instance per Scheduler (thus per replication channel), shall enable the definition of time dependencies between tasks, allowing for flexible scheduling and coordination, and shall be used by the Scheduler.
DES.SC.0020: The Scheduler Clock design shall be modular and extensible, accommodating various implementations of the scheduler clock to cater to diverse needs.
DES.SC.0030: The CSA shall provide a high-performance LWM Clock implementation (Clock_lwm_registry), computing the Low Water Mark as the highest consecutively executed task ID, leveraging last_committed and sequence number information from the Gtid_log_event to enable parallel transaction execution.
---- DESIGN: TRANSACTION PROVIDER ----
DES.TS.0010: The Transaction Provider, instantiated as a dedicated instance per replication channel, shall be invoked by the Change Stream Applier Service to supply the next Job, representing a transaction retrieved from the relay log.
DES.TS.0020: The Job shall be capable of fetching its data from replication storage or cache multiple times, allowing it to retrieve its own events as needed.
DES.TS.0030: The Change Stream Applier Service shall remain agnostic to the specific implementation details of the Replication Storage.
DES.TS.0040: The Change Stream Applier Service shall offer two variants of the Transaction Provider: synchronous and asynchronous implementations.
DES.TS.0050: The Transaction Provider shall configure the reader to optimize job self-fetching from the Replication Storage, by: * Caching transaction data if the size is below a predefined threshold. * Providing transaction metadata if the size exceeds a predefined threshold. * Allowing relay-log-backed jobs to dispatch a Job after GTID and publish the remaining transaction events incrementally.
DES.TS.0060: In case of Reader specific error, the Transaction Provider shall report the 'ER_REPLICA_RELAY_LOG_READ_FAILURE' error:
Replica SQL for channel: <> Relay log read failure: < message>
DES.TS.0070: In case of Transaction Boundary Parser specific error, the Transaction Provider shall report the 'ER_REPLICA_RELAY_LOG_READ_FAILURE' error:
Replica SQL for channel: <> Relay log read failure: < message>
DES.TS.0080: In case of encountering the event not supported in the Change Stream Applier, the Transaction Provider shall report the 'ER_REPLICA_RELAY_LOG_READ_FAILURE' error:
Replica SQL for channel: <> Relay log read failure: Unexpected event in the
Change stream applier reader:
---- DESIGN: REPLICATION STORAGE ----
DES.RS.0010: The implementation of the Replication Storage provided in this worklog shall read transaction data from the Relay Log.
DES.RS.0020: The Replication Storage Reader shall generate and transmit transaction metadata to fetchable jobs, enabling them to initiate fetching from the Relay Log or cache.
DES.RS.0021: For small transactions (below a defined threshold), the Replication Storage Reader shall provide self-fetching transactions that can read themselves from the cache, supporting multiple retries.
DES.RS.0022: For large transactions (above a defined threshold), the Replication Storage Reader shall provide self-fetching transactions that can read themselves directly from the Relay Log on demand, supporting multiple retries.
DES.RS.0030: The Replication Storage implementation shall manage the Relay Log, including reading and purging its contents.
DES.RS.0040: Multiple Jobs shall be able to access and read from the same relay log file without requiring explicit synchronization mechanisms (e.g., locks) at a specific position in the file.
DES.RS.0050: Each Job containing a transaction fetched from the Relay Log shall signal its completion in a way that allows other Relay Log file subscribers to determine when all subscribers have finished reading.
DES.RS.0060: If a Job fails to apply a transaction it shall prevent the corresponding Relay Log files from being deleted.
DES.RS.0070: A Relay Log file shall be purged when all subscribers have signalled their completion of accessing the file data.
DES.RS.0080: The responsibility for deleting the Relay Log file shall be assigned to one of the threads that accessed the file.
DES.RS.0090: The system shall handle errors during file reading and deletion in a way that prevents files from being deleted prematurely or left in an inconsistent state.
DES.RS.0100: The Replication Storage implementation shall allow for asynchronous prefetching of the Relay Log Files.
---- DESIGN: THREAD POOL ----
DES.TH.0010: The Thread Pool, instantiated as a separate instance per replication channel with a configurable number of worker threads, shall allow execution of tasks invoked by the Scheduler, enabling flexible resource allocation.
DES.TH.0020: The user shall be able to configure the number of available Thread Pool Workers, tailoring the pool size to specific needs.
DES.TH.0030: The Thread Pool Workers shall receive tasks that are ready for execution, ensuring efficient utilization of worker threads.
DES.TH.0040: Upon successful execution of a task, the Thread Pool Worker shall notify the Scheduler via a callback, confirming that the task has been completed successfully.
DES.TH.0050: In the event of a task execution failure, the Thread Pool Worker shall inform the Scheduler via a callback, indicating that the task has failed, allowing for appropriate error handling and retries.
---- DESIGN: COMMUNICATION QUEUE ----
DES.SQ.0010: The Communication Queue, instantiated as one instance per Scheduler (thus per replication channel), shall provide a high-performance, mutually exclusive access mechanism, allowing multiple producer threads to safely interact with a multiple consumer threads, and shall be used by the Scheduler to send tasks to the Thread Pool.
DES.SQ.0020: The Communication Queue shall enable the consumer thread to block and wait until a new item is added to the queue, ensuring efficient synchronization.
DES.SQ.0030: The Communication Queue shall also allow producer threads to block when the queue is full, preventing overflow and ensuring that producers wait for available space before adding new items.
---- DESIGN: SESSION SERVICE ----
DES.SE.0010: The Session Service, instantiated as a unique instance per replication channel, shall provide a free session on demand, allowing CSA jobs to request and obtain a session as needed.
DES.SE.0020: The Session Service shall define an abstract interface (Abstract Base Class) that declares pure virtual methods for: * Initialization * Deinitialization * Acquiring sessions * Releasing sessions
DES.SE.0030: The Session Service implementation shall provide a cached approach to manage sessions, with each session separately synchronized to minimize contention.
---- DESIGN: JOBS ----
DES.JOB.0010: Jobs, instantiated as one instance per transaction, shall provide a basic interface for jobs, including methods for attaching, detaching, restarting, and running the job, and shall be invoked by the Thread Pool workers to execute transaction application.
DES.JOB.0020: The Job shall have a unique identifier (ID) assigned to each job instance.
DES.JOB.0030: The Job shall maintain a reference to the channel it belongs to.
DES.JOB.0040: The Job provide methods for setting and getting the job's thread ID.
DES.JOB.0050: The Job shall provide methods for setting and getting the job's instance ID.
DES.JOB.0060: Jobs shall fetch themselves from the Relay Log using an internal reader when configured to read from the Relay Log.
DES.JOB.0070: Jobs shall retrieve cached events from the cache when configured to read from the cache, minimizing the need for Relay Log access.
DES.JOB.0080: The Job shall provide methods for resetting the job's state, allowing it to be re-fetched from the underlying cache / storage.
DES.JOB.0090: The Job shall provide methods for checking if the job is a transaction and getting the job's string representation.
DES.JOB.0100: The Job shall adhere to the existing implementation of the Decompression Stream to decompress transaction events.
DES.JOB.0110: The Job shall execute phase by phase in the following sequence: - acquire session from the Session Service - 'transaction prepare' phase: - attach to session - wait for the next event when the transaction is streamed - fetch next event from cache or read it from the relay log - decompress if event is compressed - apply event - repeat until commit event is detected - detach from session - 'register for commit' phase: - attach to session - register for commit - detach from session - 'commit' phase - attach to session - commit - detach from session - release session to the Session Service
DES.JOB.0120: The Job phases shall be executed in separate calls when one of the following applies: - "replica_preserve_commit_order" is enabled and transaction is not retried - "replica_preserve_commit_order" is enabled and transaction "prepare" phase is retried
DES.JOB.0130: The Job phases shall be executed in one call when one of the following applies: - "replica_preserve_commit_order" is disabled - transaction is retried and the last phase was "register for commit" or "commit"
DES.JOB.0140: In case of transient error, the Job shall retry execution, until the maximum number of retries is reached or transaction succeeds.
DES.JOB.0150: In case of Job failure, the CSA shall stop with an error.
---- DESIGN: RESOURCE MONITOR ----
DES.RM.0010: The Resource Monitor, a singleton instance, shall provide functions that other threads use to enforce per-channel memory limits for event caching, maintaining data for all channels.
DES.RM.0011: The The Resource Monitor, shall manage per-channel resources, allowing registration of named resources with limits, acquiring locked resources, and releasing resources, designed for concurrent access.
---- DESIGN: TRANSACTION CONFLICT MANAGER ----
DES.TCM.0010: The Transaction Conflict Monitor, a singleton instance, shall manage instances of Transaction Conflict Managers, one per replication channel, each running an asynchronous thread to autonomously perform actions to resolve RPCO deadlocks by coordinating rollbacks. Rollbacks are asynchronously requested by the CSA Jobs only for transactions that were handed to external rollback ownership after apply. Transactions that already entered commit ownership remain on the commit path.
---- DESIGN: PREFETCHER ----
DES.PF.0010: The Prefetcher, instantiated as an optional asynchronous thread per replication channel, shall autonomously prefetch Relay Log data to reduce I/O latency and provide data when requested by the Replication Storage Reader.
---- DESIGN: APPLIER CHANNEL MONITOR ----
DES.ACM.0010: The Applier_channel_monitor, instantiated as one instance per replication channel, shall monitor the applier for stalls by tracking progress metrics over a periodic interval and attempt to unblock scheduler when no progress is detected, to mitigate performance issues.
---- OBSERVABILITY: STATISTICS ----
Supports legacy MTA statistics.
GLOSSARY
Change Stream (CS)
A stream of changes made to the source database, replicated in the form of binary log events
Change Stream Applier (CSA)
A generic applier that can handle various Change Stream implementations, such as the Relay Log
Multi-threaded applier (MTA)
The previous applier implementation, which utilized multiple threads to apply changes, prior to the introduction of WL#10500
Dependency
A dependency refers to a relationship between tasks that dictates their order of execution.
Time
All references to time in this context, relate to implemented clocks that takes a part on defining dependencies between tasks. While the overall feature allows for different implementations, it is often easier to think about as logical time. For example a logical time window that codes when a task can be executed (after T1 before T2) or a logical time point from when a task can start execution (start after T4), etc.
Time dependency
A type of dependency that is based on a specific clock time or timing constraint, such as a scheduled start time or a time window. The clock implementation can be influenced by task completion, as tasks can close time windows upon their completion, effectively steering the timing dependencies between tasks.
End-to-start dependency
A specific type of dependency where the start of one task is dependent on the completion (end) of another (previous) task.
Start-to-start dependency
A specific type of dependency where the start of one task is dependent on the start of another (previous) task.
SUMMARY OF THE APPROACH
Purpose
The purpose of this worklog is to remove the current replica design pitfalls, allowing for extension of the applier, thus reducing replication lag and enhancing the applier throughput. This worklog is aimed at developing a new Generic Change Stream Applier (CSA), following an extensible design and allowing for capturing the produced Change Stream by additional consumers, without the need of changing existing implementation of the newly implemented Change Stream Applier.
Scope
The scope of this worklog assumes designing and implementing a new, Generic Change Stream Applier, redefining communication and synchronization patterns between replication entities. In the scope of this WL, we design and develop a new Change Stream Applier which will be able to interact with Replication Receiver and other kind of change stream providers in a way that is abstracted from the particular implementation of the Replication Storage. We equip the new design with possible ways of extension to deploy different kind of algorithms of transaction dependencies. Further changes in particular algorithms will not include changes in the implementation of the Change Stream Applier but will extend implementation with new concrete classes. In the scope of this WL, we keep the current user interface, meaning commands used to create and steer Replication Channels. To steer the new Change Stream Applier, we extend the CHANGE REPLICATION SOURCE TO command with new options: * APPLIER_VERSION: This option enables users to specify the applier version to be used. * APPLIER_WORKER_COUNT: This option allows users to configure the number of worker threads allocated to a channel for the new Change Stream Applier. * APPLIER_EVENT_MEMORY_LIMIT: This option allows users to specify a per-channel memory limit for caching binlog events
The features not supported in the Change Stream Applier are:
- Statement or Mixed mode binary logs (binlog-format=STATEMENT, binlog-format=MIXED)
- filename and offset replication channels
- skipping parts of transactions (sql-replica-skip-counter), feature is not needed when using GTIDs
- until clauses other than SQL_BEFORE_GTIDS, SQL_AFTER_GTIDS
- delayed applier
- GTID_ONLY=0
- REQUIRE_ROW_FORMAT=0
- replica_pending_jobs_size_max
- replication from GTID disabled sources (ASSIGN_GTIDS_TO_ANONYMOUS_TRANSACTIONS)
- extended applier statistics
- GTID_MODE equal to OFF, OFF_PERMISSIVE, ON_PERMISSIVE
- legacy VCLE
- IGNORE_SERVER_IDS option of the CRST command (feature is not needed when using GTIDs, applied GTIDs are automatically skipped)
CSA will support all MTA features not listed in the above list.
Software overview
All functionalities that constitute the new CSA are implemented on four levels:
- Concurrency: Synchronization primitives (wrappers for MYS synchronization primitives), lock-free synchronization primitives and base concurrent data structures enclosed in the mysql_concurrency library
- Scheduling: Classes responsible for implementation of a scheduler, including definition of the task, task identifiers, task schedules, task dependencies, task thread pool and finally, the task scheduler. All of those classes are enclosed in the mysql_scheduler library.
- Service layer: Contains implementation of the CSA service, introduces concept of a session service, abstract jobs and fetchable transactions. Contains all of CSA initialization code, calculation of transaction dependencies, task creation and supplying them into the Scheduler. CSA service is implemented as a component within the MySQL server (do not mix with MySQL Components).
- Server layer: Contains integration of the CSA code with reused replication code implemented in the server, including replication commands, reused binlog/relay log and corresponding readers including decompression and commit manager classes. In this layer, we implement handling of the SQL thread commands in case new CSA is in use, changes in the interface steering the applier, extensions to relay log reader classes and new glue code that allows for reusing the replica commit order manager.
Detailed description of implemented layers
Below sections provides a detailed overview of the architecture, following a bottom-up approach. It begins by describing the foundational Concurrency and Scheduling Layers, which operate independently of database-specific concepts such as transactions, tables, and replication. Building upon this foundation, the Service Layer is presented, outlining how transactions read from the Relay Log are processed within the Change Stream Applier. Finally, the modifications made to the Server Layer are discussed, highlighting the changes that enable the reuse of existing apply code within the Change Stream Applier. Note that a comprehensive description of all layers can be found in the Low-Level Design (LLD) section of the WL#10500 design document.
Concurrency layer
Concurrency layer is enclosed in the MySQL concurrency libraries.
The MySQL Concurrency libraries provides implementation of: * synchronization primitives, including wrappers to MySQL MYS conditional variable and mutex, but also implementation of the ticketing system which will serve as a building block for more complicated concurrent structures * concurrent data structures.
One of the primary purposes of the wrappers in MySQL concurrency library is to provide an interface for synchronization primitives implemented in MySQL that aligns with the interface of STL synchronization primitives. This promotes familiarity, interoperability, consistency, and ease of use. By providing an interface that closely resembles that of STL synchronization primitives, the wrappers make it easier for developers to transition between different synchronization libraries and frameworks, ultimately simplifying the development process and reducing errors:
Familiarity: Developers familiar with STL synchronization primitives can easily adapt to using MySQL's synchronization primitives, reducing the learning curve.
Interoperability: Code written using STL synchronization primitives can be more easily integrated with MySQL's synchronization primitives, promoting interoperability between different components.
Consistency: A consistent interface across different synchronization primitives makes it easier to switch between them, allowing developers to choose the best tool for their specific use case.
The wrappers achieve alignment with STL synchronization primitives by providing similar interfaces for common operations such as:
Locking and Unlocking: The Mutex_wrapper class provides lock() and unlock() methods, mirroring the lock() and unlock() methods found in STL's std::mutex.
Waiting and Notifying: The Condition_variable_wrapper class provides wait(), notify_one(), and notify_all() methods, similar to those found in STL's std::condition_variable.
Construction and Destruction: The wrappers provide constructors and destructors that match the behavior of their STL counterparts, ensuring proper initialization and cleanup of resources.
Another important aspect of the libraries is the provision of synchronized data structures, such as the synchronized queues and provision of additional lock-free sychronization primitives, such as synchronized, atomic tickets. Synchronized data structures are designed to be thread-safe, allowing multiple threads to access and manipulate them concurrently without fear of data corruption or other concurrency-related issues. Synchronization primitives allows for developing lock-free concurrent data structures.
Scheduler layer
The scheduler layer is implemented in a form of a new mysql_scheduler library.
The library consists of several components:
- Scheduler: The main component responsible for managing and executing tasks and their dependencies.
- Task: Abstract concept that represents a single task that needs to be executed.
Task phase: A task phase represents a distinct stage in the execution lifecycle of a task. Multiple phases can exist within a task, and each phase operates independently with its own timing dependencies, unaffected by other phases.
Task Schedule: Defines when the task and its phases can be executed
- Scheduler clock: Provides a way to measure current time (by default logical time, but can also be physical if implemented that way) which serves as a trigger to run a task or a task phase
- Dependency Tracker: Keeps track of end-to-start dependencies (previous task must finish before the next one starts) between tasks.
- Thread Pool: Manages a pool of threads that execute tasks.
- Synchronized queue: An abstract concept of a concurrent data
structure used to exchange data between different threads, i.e.:
- task provider and scheduler thread (1 producer - 1 consumer)
- scheduler thread and workers in the thread pool (1 producer - N consumers or N producer - N consumers depending on configuration defined in the Service Layer).
- Task registry: A synchronized data structure that provides a concurrency control mechanism, reducing contention among threads accessing the registry.
- Statistics Monitor: A dedicated component designed to efficiently collect system statistics from multiple threads.
A scheduler is a component that manages the execution of Tasks, ensuring that they are executed in the correct order and at the right time, i.e. according to the defined Task Schedule. It acts as a coordinator between tasks, allocating resources and prioritizing their execution.
Characteristics of the Task Scheduler are:
- Task Management: A scheduler manages a collection of tasks, each with its own timing and dependencies.
- Prioritization: A scheduler prioritizes tasks based on their urgency, importance, and dependencies.
- Synchronization: A scheduler synchronizes the execution of tasks, ensuring that they are executed in the correct order.
We implement a time-based scheduler which runs according to a defined, global task clock. When a Task is executed in phases, each phase is run according to a different Scheduler clock instance, registered for a particular task phase.
The Low Water Mark (LWM) clock is a logical clock that ensures transaction ordering based on the last_committed and sequence_number from GTID events. The LWM represents the maximum task ID of consecutively executed transactions, computed using the sequence_number of finished tasks. Transactions with a required LWM <= current LWM clock value may execute immediately, while those with LWM > current LWM clock value must wait for the clock to advance. This enables parallel execution within the same logical time window while preserving global order, leveraging last_committed and sequence number information from the Gtid_log_event. Scheduler prioritizes based on time the task should execute on and based on task ID (task with a lower ID is likely to run before a task with higher ID).
When timing dependencies of a task are met, Scheduler refers to a defined Dependency tracker to check if other end-to-start task dependencies are satisfied. If not, task will wait in a queue with other task ready to execute according to their timing dependencies, but not ready to execute according to end-to-start task dependencies.
When all timing and end-to-start dependencies are met, task is fired to the Thread Pool for asynchronus execution. After task is finished, independent Thread Pool workers call task and/or phase callbacks which update phase/global clock and global end-to-start dependencies defined in the Dependency Tracker.
During execution, all classes have access to Statistics Monitor, which is used to gather statistics for all of the system components.
Scheduler Unblock Mechanism
To handle situations where the applier stalls due to dependencies or clock issues, the scheduler includes an unblock mechanism. The Applier Channel Monitor detects stalls by checking for lack of progress in applied events, clock advances, or active transactions over a refresh interval. When stalled, it requests unblock from the scheduler a limited number of times.
Upon receiving an unblock request, the scheduler marks the dispatch as an unblock dispatch and allows queued tasks to proceed even if their phase is not ready, enabling a later worker to consume commit-registration work for a stalled job. This helps resolve deadlocks or stalls without violating commit order guarantees.
During unblock, the job applier may be observed by two threads. The applying thread continues its prepare path and remains the owner of phase transition out of apply. The later unblocking thread is allowed to register the transaction for commit early and then waits until the prepare thread releases the job before it commits. This preserves RPCO (Replica Preserve Commit Order) while allowing the scheduler to make progress.
Service layer
The Change Stream Applier service consists of several key components:
- Transaction Provider: Responsible for fetching binary log events or their metadata from the Replication Storage, and for creating Job objects populated with corresponding Fetchable Event Set objects. For relay-log-backed jobs it may dispatch a Job after GTID and publish the remaining transaction events incrementally.
- Log prefetcher: An optional asynchronous thread that, when enabled, proactively fetches large blocks of data from the Relay Log or Binary Log, improving performance by reducing latency and optimizing data retrieval.
- Scheduler: Manages the execution of Executable Jobs (tasks), ensuring that they are applied in the correct order and that dependencies between them are respected.
- Thread Pool: A group of worker threads that execute Jobs.
- Session Service: Manages replica sessions for Jobs
- Session : Object that encapculates server session (THD) and associated Relay Log Info (RLI)
- Job: Handles the execution of a single transaction, including:
- attaching / detaching from the replica session
- fetching from the underlying storage, as defined by the corresponding Fetchable event sets (including decompression),
- managing transaction retry after transient errors
- error reporting
- application of a sigle transaction to the replica database in phases, as defined by the commit order replica rules
- Fetchable event set: A collection of transaction events originating
from the same storage type (cache or a specific relay log file), specifying:
- The method for retrieving events from the underlying storage
- The criteria for purging applied relay log files
- The approach for decompressing events
- For relay-log-backed jobs, batches may be published incrementally and a single logical transaction may span multiple event sets when it crosses relay log files
The Change Stream Applier service operates as follows:
- Initialization: Initializes the necessary components, including the transaction provider, scheduler, worker pool, and session service.
- Transaction Processing Loop: Enters a continuous loop where it retrieves Jobs generated by the transaction provider, resolves dependencies between them using a Dependency Calculation Adapter, and submits them to the scheduler. The scheduler manages job execution, queuing them until their dependencies are met, and then dispatches them to the worker pool for processing.
- Shutdown: Shuts down the service either in response to a termination signal or due to an unrecoverable error.
Server layer
The provided codebase includes several changes to the MySQL server, primarily focused on enhancing the replication capabilities and introducing a new Change Stream Applier.
New Features
- Change Stream Applier Service: A dedicated service responsible for executing a new Change Stream Applier.
- Automatic Relay Log Removal: The new applier features distributed and automatic removal of relay logs, which helps manage disk space without requiring synchronization across all threads at a single point in the file.
- Enhanced Binary Log Stream Reading: Modifications have been made to the
server-side binary log streams, enabling:
- Retrieval of transaction event metadata without requiring full event decoding
- Direct access to transaction event payloads without needing to decode the entire event
- Event_reader_controller: The
Event_reader_controlleris a newly introduced class that supersedes theRpl_applier_reader. Its primary purpose is to facilitate advanced reading capabilities, including the ability to extract transaction event metadata and automatically remove relay logs.
Modified Components
- Slave_worker: The
Slave_workerclass has been modified to comply with the interface introduced to support commit order preservation in MTA workers and the Change Stream Applier workers. - Binary Log Stream Reading Enhancements: Updates have been made to key
classes, including
Basic_binlog_file_reader,Binlog_event_data_istreamandIBasic_binlog_file_reader, along with the introduction of new supporting structures. These changes enable:- Extraction of transaction event metadata without requiring full event decoding
- Direct access to transaction event payloads, eliminating the need for complete event decoding
- Preserve Commit Order on Replica: The server-side commit order manager has been integrated with the new applier, ensuring that transactions are committed in the correct order on the replica.
Transaction processing
Overview
- The Transaction Provider retrieves a transaction and returns a Job.
- The CSA service loop processes the Job, calculating its dependencies, creating a Task Schedule, and wrapping it into an Executable Task.
- The Executable Task is enqueued in the Scheduler queue, where it waits for its designated time window to open.
- Once the time window is open, the Scheduler moves the Executable Task to the Thread Pool queue, from where it's picked up for the execution by a free worker thread.
Execution
The worker thread executes the Executable Task. If commit ordering is enabled, the task is executed in phases:
- Apply: The worker applies the task. For relay-log-backed jobs, the worker may start applying after GTID while the transaction tail is still being published incrementally.
- Wait for Commit Order: The task is placed in a special scheduler queue, waiting for its turn to register for commit.
- Register for Commit: When the commit order clock matches the task's scheduled time, the Scheduler moves the task back to the Thread Pool queue and a worker registers the transaction for commit.
- Commit: After registration, the committing worker executes the commit while the Commit Order Manager preserves source commit order.
Commit order is preserved even in failure scenarios; although commits are parallel with respect to the scheduler, RPCO (Replica Preserve Commit Order) is still handled by the Commit_order_manager, which manages possible failures and ensures dependent transactions are handled appropriately without deadlocks or out-of-order commits.
Retry Handling
If a commit fails due to a transient error during the apply, only the apply phase is retried.
If a commit fails due to a transient error during the commit, the task is retried in a single-dispatch execution mode. Internally, it replays prepare and proceeds through the remaining commit path without re-entering the scheduler shelf. In this retry scenario, transaction dependencies and commit order dependencies are already satisfied, so the task does not perform a second Commit Order Manager registration within the same transaction lifecycle and does not need to update the commit order clock again.
USER INTERFACE
The CSA implemented within the WL#10500 can be turned on or off using
the CHANGE REPLICATION SOURCE TO command. Moreover, the CSA will enable
to set a configurable amount of workers per each user-defined channel.
The CHANGE REPLICATION SOURCE TO command is extended with new options:
* APPLIER_VERSION, accepting either "1" or "2" value,
* APPLIER_WORKER_COUNT, accepting a number within the range <1,1024>.
* APPLIER_EVENT_MEMORY_LIMIT, accepting a number equal or grater than the
replica_max_allowed_packet
Two configured options are saved in applier metadata (internal
'slave_relay_log_info' system table) and written to the
'replication_applier_configuration' PFS table.
PERSISTENT STATE
This WL extends the applier metadata with two fields:
- Applier_version: Integer, either "1" (legacy Multi-Threaded Applier) or "2"
(new Change Stream Applier)
- APPLIER_WORKER_COUNT: unsigned integer in range <1,1024>
- Applier_event_memory_limit: unsigned integer >= replica_max_allowed_packet
Those two values are persisted until a channel is deleted.
OBSERVABILITY
The observability of the CSA is organized around PFS tables exposing CSA statistics. TBD
USER PROCEDURE
To start replication with the new applier, the user needs to specify the APPLIER_VERSION equal to "2" in the CHANGE REPLICATION SOURCE TO command.
SECURITY CONTEXT
No changes w.r.t. existing
SYSTEM RESOURCES
Except the reused memory resources, CSA uses the following instrumented system resources:
Memory resource:
- key_mem_csa_prefetcher : Key associated to memory allocated by the Prefetcher, if relay log prefetching is enabled
- key_mem_csa_thp : Key associated to memory allocated by the Thread Pool
- key_mem_csa_provider : Key associated to memory allocated by the Transaction Provider
- key_mem_csa_scheduler : Key associated to memory allocated by the Scheduler
- key_decompressing_stream : Key associated to memory allocated during transaction decompression
Locks:
- key_mutex_module - CSA module lock
- key_mt_csa_sched_main - Scheduler main lock
- key_mt_csa_sched_end - Scheduler lock used to end execution
- key_mt_csa_sched_phases - Scheduler phase lock
- key_mt_csa_session_service_entry - Session service entries locks
- key_mt_csa_prefetcher_wait - Prefetcher lock
- key_mt_csa_prefetcher_file_move - Prefetcher lock protecting file rotation
RW locks:
- module rw lock
Condition variables:
- key_cv_csa_sched_main - Scheduler main condition variable
- key_cv_csa_sched_end - Scheduler condition variable used to end execution
- key_cv_csa_prefetcher_wait - Prefetcher notification condition variable
- key_cv_csa_prefetcher_file_move - Prefetcher file rotation condition variable
CSA threads:
- key_thread_worker - Thread pool worker
- key_thread_scheduler - Scheduler thread
- key_thread_prefetcher - Prefetcher thread, if enabled
- key_thread_provider - Transaction provider thread, if asynchronous provider is enabled
DEPLOYMENT and INSTALLATION
No changes w.r.t. existing
PROTOCOL
No changes w.r.t. existing
FAILURE MODEL SPECIFICATION
Upon incorrect use of the CRST command, the client may return the following error codes:
ER_CRST_UNKNOWN_APPLIER_VERSION eng "Invalid applier version specified in CHANGE REPLICATION SOURCE TO command. Refer to documentation for supported versions."
ER_CRST_APPLIER_WORKER_COUNT_OUT_OF_RANGE eng "Coordinator thread count out of bounds. Please specify a value between 1 and 1024."
ER_CRST_APPLIER_WORKER_COUNT_ONLY_FOR_CSA eng "Applier worker count can only be configured when using CSA applier version."
ER_CRST_APPLIER_EVENT_MEMORY_LIMIT_ONLY_FOR_CSA eng "Applier event memory limit can only be configured when using CSA applier version."
ER_WARN_CRST_APPLIER_EVENT_MEMORY_LIMIT_OUT_OF_RANGE eng "Applier event memory limit out of bounds. Actual limit has been set to the max_allowed_packet."
ER_CSA_UNSUPPORTED_UNTIL_COND eng "Replica SQL for channel: <> Change Stream Applier accepts the following UNTIL conditions: SQL_BEFORE_GTIDS and SQL_AFTER_GTIDS"
ER_CSA_CRST_DELAYED_APPLIER_NOT_SUPPORTED
eng "The Change Stream Applier does not currently support the SOURCE_DELAY option of the CHANGE REPLICATION SOURCE TO command. Please unset SOURCE_DELAY when using CSA."
ER_CSA_CRST_REQUIREMENT_ROW_FORMAT
eng "The Change Stream Applier currently supports row-based replication. Please set REQUIRE_ROW_FORMAT to 1 when using CSA."
ER_CSA_CRST_REQUIREMENT_GTID_ONLY
eng "The Change Stream Applier currently supports row-based replication with GTIDs. Please set GTID_ONLY to 1 when using CSA."
The remaining error codes are reused. When the Change Stream Applier encounters an error, it reports a message in the following format:
Replica SQL for channel '
The error code is specific to the issue.
Error model for unsupported features:
- Statement or Mixed mode binary logs (binlog-format=STATEMENT, binlog-format=MIXED)
Enforced by REQUIRE_ROW_FORMAT = 1
- Filename and offset replication channels
Enforced by GTID_ONLY=1
- Skipping parts of transactions (sql-replica-skip-counter)
Option has no effect when GTID_MODE=ON.
- Until clauses other than SQL_BEFORE_GTIDS, SQL_AFTER_GTIDS
The replica returns ER_CSA_UNSUPPORTED_UNTIL_COND to the client executing the CRST command.
- Delayed applier
The replica returns ER_CSA_CRST_DELAYED_APPLIER_NOT_SUPPORTED to the client executing the CRST command.
- GTID_ONLY=0
The replica returns ER_CSA_CRST_REQUIREMENT_GTID_ONLY to the client executting the CRST command.
- REQUIRE_ROW_FORMAT=0
The replica returns ER_CSA_CRST_REQUIREMENT_ROW_FORMAT to the client executting the CRST command.
- Option replica_pending_jobs_size_max
Option is ignored. The Change Stream Applier uses the limit set by using the APPLIER_EVENT_MEMORY_LIMIT option of the CRST command.
- Replication from GTID disabled sources (ASSIGN_GTIDS_TO_ANONYMOUS_TRANSACTIONS)
Enforced by the GTID_ONLY=1 (GTID_MODE must be set to ON).
- Extended applier statistics
No error code, extended statistics empty / set to 0.
- GTID_MODE = OFF, OFF_PERMISSIVE, ON_PERMISSIVE
Enforced by the GTID_ONLY=1. Cannot set GTID_MODE to value other than ON, when GTID_ONLY is equal to 1.
- Legacy VCLE
No error code. VCLE was removed in 8.3.
- IGNORE_SERVER_IDS option of the CRST command
This feature is not needed when using GTIDs, applied GTIDs are automatically skipped. Setting IGNORE_SERVER_IDS is disallowed for channels with GTID_ONLY equal to 1 (GTID_MODE must be < ON to use this feature).
UPGRADE/DOWNGRADE and CROSS-VERSION REPLICATION
The CSA supports replication from 9.7 sources, provided that replication uses row-based binary logging and the GTID_ONLY is set to ON.
On upgrade:
PFS 'replication_applier_configuration' table is extended with the following columns:
APPLIER_VERSION INTEGER UNSIGNED not null COMMENT 'Version of the applier used (either 1 - legacy Multi-Threaded Applier or 2 - new Change Stream Applier)', APPLIER_WORKER_COUNT INTEGER UNSIGNED not null COMMENT 'Number of worker threads utilized by the applier' APPLIER_EVENT_MEMORY_LIMIT INTEGER UNSIGNED not null COMMENT 'Number of worker threads utilized by the applier'
applier metadata table is exenteded with the following columns:
Applier_version INTEGER UNSIGNED NOT NULL DEFAULT 1 COMMENT 'Version of the applier used (either 1 - legacy Multi-Threaded Applier or 2 - new Change Stream Applier)', APPLIER_WORKER_COUNT INTEGER UNSIGNED NOT NULL DEFAULT 4 COMMENT 'Number of worker threads utilized by the applier' Applier_event_memory_limit INTEGER UNSIGNED NOT NULL DEFAULT 1073741824 COMMENT 'The maximum amount of memory applier channel may use to cache binlog events'
BEHAVIOR CHANGE
No changes w.r.t. existing
SUMMARY OF THE APPROACH
All functionalities that constitute the new CSA are implemented on four levels:
- Concurrency: Synchronization primitives (wrappers for MYS synchronization primitives), lock-free synchronization primitives and base concurrent data structures enclosed in the mysql_concurrency library
- Scheduling: Classes responsible for implementation of a scheduler, including definition of the task, task identifiers, task schedules, task dependencies, task thread pool and finally, the task scheduler. All of those classes are enclosed in the mysql_scheduler library.
- Service layer: Contains implementation of the CSA service, introduces concept of a session service, abstract jobs and fetchable transactions. Contains all of CSA initialization code, calculation of transaction dependencies, task creation and supplying them into the Scheduler. CSA service is implemented as a component within the MySQL server (do not mix with MySQL Components).
- Server layer: Contains integration of the CSA code with reused replication code implemented in the server, including replication commands, reused binlog/relay log and corresponding readers including decompression and commit manager classes. In this layer, we implement handling of the SQL thread commands in case new CSA is in use, changes in the interface steering the applier, extensions to relay log reader classes and new glue code that allows for reusing the replica commit order manager.
This section firstly describes a detailed transaction processing overview and later on describes overview of each CSA layer, concepts and interactions between implemented classes.
Note that provided description of layers follows the same bottom-up approach as presented in the HLS. It begins by describing details of the foundational Concurrency and Scheduling Layers, which operate independently of database-specific concepts such as transactions, tables, and replication. Building upon this foundation, the Service Layer is presented, outlining how transactions read from the Relay Log are processed within the Change Stream Applier. Finally, the modifications made to the Server Layer are discussed, highlighting the changes that enable the reuse of existing apply code within the Change Stream Applier.
Detailed description of the Transaction Processing
The CSA service facilitates transaction processing through a series of interconnected components. When the new applier is enabled via the 'CHANGE REPLICATION SOURCE TO' command and the Coordinator thread is initiated, the CSA service undertakes the following steps:
- Initialization of Necessary Components: The CSA service creates
instances of crucial components, including:
- Transaction Provider: This component is responsible for generating jobs based on transaction data or metadata retrieved from the Relay Log. By default, the synchronous transaction provider is utilized.
- Session Service: This service supplies sessions to jobs
- Dependency Adapter: A lightweight utility component within the CSA service that translates task dependency information received from the source (including last committed and sequence number) into a format compatible with the Scheduler Clock and Dependency Tracker, facilitating seamless dependency management.
- Scheduler Clock: An abstraction representing the scheduler's clock, responsible for tracking temporal dependencies between tasks across global and phase-based timelines
- Dependency Tracker A mechanism that monitors and maintains
end-to-end global dependencies within the Scheduler, ensuring that tasks
are executed in the correct order (e.g., task with ID 3 runs after task
with ID 2). In CSA runtime, default wiring uses
Dependency_tracker_stub. - Scheduler: The Scheduler plays a pivotal role in managing task execution, ensuring that tasks are applied in the correct order while respecting dependencies between them.
- Thread Pool: A group of worker threads, known as the Thread Pool, is responsible for executing tasks assigned by the Scheduler.
- Registration of Task Phases: If the
replica_preserve_commit_orderoption is enabled, the CSA service registers a task phase specifically for commits. This phase operates with a unique sequential commit order clock, guaranteeing that registering for commit is executed in a strictly sequential manner. - Statistic Monitor Initialization: To monitor performance and statistics, the CSA service initializes a Statistic Monitor for the channel.
Following the initialization phase, the applier service enters a continuous loop where it performs the following operations:
- Job Retrieval: The CSA service retrieves jobs generated by the Transaction Provider. The job itself contains the core logic for executing transaction phases, including:
- 'Transaction Prepare' phase:
- Continuously fetch and process events from the Fetchable event set until an error occurs or a commit event is encountered:
- Apply events
- 'Register for Commit' phase:
- Establish a parallel worker context
- Perform registration for commit
- 'Commit' phase:
- Execute commit operation Additionally, the job outlines the process for attaching to and detaching from a session, including acquiring a session from the session service.
- Dependency Calculation: Utilizing a Dependency Calculation Adapter, the CSA service calculates dependencies between jobs. These calculations are based on factors such as the last committed timestamp and sequence number. The default adapter (Dependency_adapter_lwm with Clock_lwm_registry) determines the optimal time slot for executing each job using LWM-based logic.
- Session Assignment: Each job is assigned a corresponding Session Service, facilitating efficient session management.
- Executable Task (/Task) Creation: The CSA service encapsulates each job
within an executable task, defining a structured workflow for job execution:
- Initialize job attachment
- Iterate until successful completion or fatal error:
- Execute job logic
- Detach job The Executable Task (Task) focuses on encapsulating retry logic and error handling. Its interface is deliberately designed to meet the requirements of functors consumed by the Scheduler.
- Schedule Creation: Based on the calculated dependencies, the CSA service generates a schedule for each task. This schedule dictates the order and timing of task execution.
- Task Submission: The task, accompanied by its schedule, is submitted to the Scheduler for further processing and eventual execution within the Thread Pool.
Upon encountering an error, the applier service reports the issue, halts all components, and terminates its execution.
Transaction Processing in the CSA Transaction Provider
The CSA transaction provider employs a concrete implementation that creates a reader responsible for retrieving transaction data or metadata from the Relay Log. This reader operates according to specific rules:
- Full Event Reading: Outside of transactions, the reader fetches full events.
- Query Log Event Handling: To ensure accurate parsing of transaction boundaries, the reader retrieves full query log events.
- Metadata Reading: Within transactions, or when events are not query log events, and the transaction size exceeds predefined thresholds, the reader fetches event metadata.
During transaction processing, the transaction reader filters out events that fall outside of transaction boundaries. When a control event is encountered within a transaction, it is removed from the transaction events. This pruning occurs during the construction of the Fetchable Event Sets. If an ignorable event is detected mid-transaction, and the reader uses metadata reading mode, the current event set is terminated, and a new event set is initiated with the subsequent event that belongs to the same transaction.
For relay-log-backed transactions, the provider uses streaming-transaction applier dispatch. It returns a Job immediately after the GTID event is known and publishes the remaining transaction events incrementally into the Fetchable Transaction. This applies both when events are later fetched from cached payload and when they are read on demand from the relay log: workers wait for event availability, fetch and apply events one by one, and move to the commit event when the terminal event is published. When the stream is interrupted before the transaction is fully published (for example on STOP REPLICA, reader stop, or relay-log truncation caused by a re-fetched transaction), the provider marks the active transaction as truncated to unblock the worker. Truncation is a non-error terminal state: the worker rolls back silently, does not retry the same partial job, and the next Job replays the transaction from the source.
Transaction Processing in the Scheduler
The Scheduler operates based on a combination of elements, including a global
clock, a dependency tracker, and clocks registered for specific phases.
When the replica_preserve_commit_order option is enabled, the CSA
registers a phase named "commit" and associates it with a unique sequential
clock. This clock ensures that tasks are released one at a time, maintaining
a strict commit order registering phases.
Each task adheres to its predefined schedule, dictating when it executes and completes. If a task doesn't finish on its initial run, it's resubmitted to the Phase queue, awaiting the next scheduled phase. This phase is governed by a separate clock, ensuring timely progression. Specifically, the Commit Order phase, registered by the CSA service, operates on a sequential commit order clock. To implement start-to-start dependencies within the Scheduler, tasks undergo two consecutive runs. The first run updates the phase clock, enabling the next task to proceed with its "commit" phase, while the second run executes concurrently with respect to other Scheduler tasks.
Notably, the Scheduler supports the registration of multiple phases, each with its own dedicated queue and associated clock. This flexible design enables efficient management of complex task dependencies and workflows.
The Scheduler follows a three-step process:
- Waiting Tasks Inspection: The Scheduler reviews tasks waiting for dependencies and checks notifications from the Dependency Tracker. Tasks that became ready are forwarded to the Thread Pool.
- Phase Queue Inspection: The Scheduler examines registered phase queues, checking if tasks match the current phase clock and phase readiness flags. Matching tasks are forwarded to the Thread Pool.
- Task Queue Inspection: The Scheduler reviews the global task queue, comparing task timestamps against the current global clock time. Ready tasks are verified against end-to-start dependencies and either sent to the Thread Pool or moved to the waiting queue.
In the event of an error, the Scheduler empties queues and ceases operation.
Transaction Processing in the Thread Pool
Worker threads within the Thread Pool are tasked with executing transactions encapsulated within repeatable tasks prepared by the Scheduler. These tasks ultimately trigger callbacks that update commit phase clocks and mark tasks as completed. The Thread Pool notifies the Scheduler of time changes, prompting it to schedule subsequent tasks for execution.
When the replica_preserve_commit_order option is enabled, the transaction
is prepared, and the worker thread places it into a phase queue registered for
commit order. This preparation involves encoding the transaction within an
executable task, as dictated by the Scheduler.
For prepared transactions, the Thread Pool worker receives tasks to execute jobs in two stages:
- Register for Commit: The worker attaches to a session, registers for commit, and detaches from the session.
- Commit Execution: After registering for commit, the worker attaches to the session again, commits the transaction, and detaches from the session.
Upon completing a commit, the task is marked as done within the task callback, and the global clock of the Scheduler is updated. If this task was the last one in the current time window and other tasks await their time window, the task closes the current window and opens a new one, releasing subsequent tasks. The Scheduler is notified of the time change, prompting it to awaken and schedule the next set of tasks for execution.
Detailed description of the concurrent relay log purge
In CSA, the relay log purge mechanism operates concurrently, triggered when the last subscriber to a relay log file completes processing its data. There are two types of subscribers: * fetchable event sets, which process consecutive events belonging to the same transaction within a single file, * transaction providers that utilize the relay log reader.
Each subscriber maintains a reference to the relay log object and increments a subscriber count. As subscribers finish processing, they release their reference to the relay log object and increment a "successful subscribers" count. The relay log file is considered for purging when its reference count reaches zero and the "successful subscribers" count equals the initial subscriber count.
The last subscriber to release its reference initiates the purge procedure. However, this doesn't guarantee immediate purging, as the relay log file is simply marked for purging. To ensure data consistency, the purging process only removes consecutive relay log files starting from the first one in the list. If there's a gap in the sequence or the list is not consecutive, the purging process will stop at the first gap. A relay log file is only eligible for purging if it has been fully read, all transactions within it have been successfully applied, and it's not an active file.
Detailed description of solving RPCO conflict
The MTA has a built-in mechanism for resolving RPCO conflicts. When a transaction is applied in the engine, it checks for conflicts with RPCO transactions. If a conflict is detected, the conflicting transaction waiting for commit order is notified to roll back (managed by Commit_order_manager). This is feasible because each worker executes a transaction from start to commit.
In contrast, CSA applies transactions in two stages: * the apply phase (executed by a worker) * the register for commit + commit phase (executed by the same or a different worker). Between these stages, the transaction waits in the scheduler queue without an active worker. This necessitates a separate mechanism for resolving RPCO conflicts.
In scenarios where a worker detects a conflict during event application and needs a lock held by a transaction awaiting RPCO, a straightforward rollback of the conflicting transaction is not feasible due to the existing InnoDB context. To resolve this deadlock, the system employs the Transaction Conflict Manager, a dedicated thread that processes requests from CSA worker threads. To avoid issues with concurrent channel stops and long waits due to multiple rollbacks requested by concurrent channels, a single instance of the Transaction Conflict Manager class is used per replication channel.
Other threads can subscribe a transaction to the Transaction Conflict Manager for conflict resolution and specify an action to be taken. Currently, the only supported action is "rollback." When a worker detects a conflict, it prepares a Rescue Task, which is executed by the Transaction Conflict Manager. The Rescue Task includes a promise object that serves as a guarantee to the waiting thread that the task has completed, allowing execution to proceed.
When a worker thread T1 detects that it needs a lock held by a transaction T2 with a higher 'last committed' value, T2 must roll back. There are two possible cases: * T2 is still applying events (Scenario 1). * T2 has finished applying events and is waiting for the commit phase (Scenario 2). In Scenario 1, since T2 has no active worker assigned, T1 must request the rollback. In Scenario 2, the rollback cannot start while T2 is still in the applying phase. To determine which thread will request the rollback, a CAS operation on an atomic boolean variable is used, along with another atomic boolean to indicate whether the transaction has been applied.
To facilitate rollback, the Transaction Conflict Manager requires an associated RLI object; hence, the RLI is not detached before the commit. The Transaction Conflict Manager processes submitted requests using a FIFO synchronized queue. The transaction requesting the rollback passes a 'promise' object to T2 for synchronization.
Two additional scenarios affect the rollback process: * scenario 3: The applier stop is requested * scenario 4: T2 is scheduled for commit and assigned a new worker.
In Scenario 3, during destruction, T2's associated RLI object is operated on by the Transaction Conflict Manager, possibly for an unfinished rollback. The scheduler thread waits for the rollback to finish by obtaining a future object on a shared promise. In Scenario 4, T2 checks if it was rolled back, waits for the rollback to finish if necessary, reapplies the events, and then commits.
This mechanism ensures that transaction conflicts are resolved efficiently across different scenarios.
Detailed description of monitoring channel event memory
The Resource Monitor is responsible for controlling the channel event memory. The memory limit for storing events is set using the CRST command with the APPLIER_EVENT_MEMORY_LIMIT option. However, there are certain cases here this limit is not enforced, such as: * Event decompression, which reports memory usage using a separate PSI key and is not counted towards the defined memory limit. * Commit events, which are small events that must be allowed to prevent deadlocks due to RPCO.
In CSA, worker threads are solely responsible for maintaining memory within the allowed limits, unlike in MTA. CSA operates in two reading modes:
- Caching full events: The memory is limited to 512B (the cached transaction size limit) multiplied by the maximum number of ongoing tasks, which is set to four times the number of worker threads.
- Caching transaction metadata: For transactions larger than 512B, only event information is cached. Worker threads are responsible for reading the transaction event payload from the relay log and keeping memory usage within defined bounds. Unlike MTA, where events are cached in the worker queue, CSA reads and releases events individually (except for compressed events). As a result, the Event_set_fetchable_relay_log class, using Resource_monitor methods, is responsible for checking and waiting for available memory within the defined limit.
Concurrency layer
Concurrency layer is enclosed in the MySQL concurrency library.
For synchronization primitives, we define two libraries: mysql_concurrency_server and mysql_concurrency.
In summary, this code provides a set of wrappers for synchronization primitives that can be used consistently across different environments, ensuring portability and maintainability of concurrent codebases. It also provides implementation of lock-free synchronization primitives, such as the Ticketing System described below. Last but not least, it provides implementation of high-performance concurrent data structures, that are used in the Scheduler and may be reused also in implementation of other features.
Library Overview
The mysql_concurrency library provides a set of synchronization primitives and utilities for building concurrent systems. It includes headers and source files for implementing threads, mutexes, condition variables, spin locks, and synchronized queues.
The provided code consists of several header files that define wrappers for synchronization primitives, specifically threads, mutexes, and condition variables. These wrappers aim to provide a standardized interface for working with these primitives across different platforms and environments.
Server-Specific Library
The mysql_concurrency_server library is a variant of the mysql_concurrency library that is specifically designed for use within the MySQL server. It includes additional headers and source files that are only relevant to the server environment.
Standalone Library
The mysql_concurrency library is a standalone version of the library that can be used outside of the MySQL server context. It includes a subset of the headers and source files found in the server-specific library.
Wrappers implemented in the mysql_concurrency library
Thread Wrappers
The Thread class provides a wrapper around the underlying threading API, either using the Standard Template Library (STL) or a custom implementation based on MySQL's MYS library. The Thread class includes methods for creating, joining, and managing threads.
Mutex Wrappers
The Mutex_wrapper class provides a wrapper around the underlying mutex API, again using either the STL or MySQL's MYS library. The Mutex_wrapper class includes methods for locking, unlocking, and checking the status of the mutex.
Condition Variable Wrappers
The Condition_variable_wrapper class provides a wrapper around the underlying condition variable API, using MySQL's MYS library. The Condition_variable_wrapper class includes methods for waiting, notifying, and timing out on condition variables.
Ticketing System
Ticketing System implements a highly efficient and scalable lock-free ticketing system that is well-suited for high-performance applications. The system avoids the use of traditional locks, which can lead to performance bottlenecks and deadlocks. The system can scale horizontally, allowing it to handle increasing loads without sacrificing performance.
The ticketing system is a collection of classes that provide a way to manage tickets in a thread-safe manner.
Purpose
The purpose of this ticketing system is to provide a lock-free mechanism for synchronizing access to shared resources. By using atomic operations, the system ensures that multiple threads can safely access and modify the ticket values without the need for traditional locks. The ticketing system has been designed within the scope of BUG#33405699: "After secondary node is killed, it can not rejoined", and moved to MySQL concurrency libraries to reuse it in the CSA service.
Classes
The ticketing system consists of the following classes:
- Ticket: Represents a ticket with a unique ID and a value.
- Atomic_ticket: Represents an atomic ticket with a unique ID and a value.
- Atomic_ticket_guard: Represents a guard for an atomic ticket, providing a way to acquire and release the ticket.
Interactions between Classes
Acquiring a Lock: When an Atomic_ticket_guard object is created, it calls the set_in_use method on the associated Atomic_ticket object to acquire the lock. This method performs an atomic compare-and-swap operation to ensure that only one thread can acquire the lock at a time.
Releasing a Lock: When an Atomic_ticket_guard object is destroyed, it calls he set_used method on the associated Atomic_ticket object to release the lock. This method simply stores the next ticket value in the Atomic_ticket object.
Getting and Setting Ticket Values: The Atomic_ticket class provides methods for loading and storing the ticket value atomically. These methods use atomic operations to ensure thread safety.
Synchronized Bounded Queue
Sync_bounded_queue class provides a high-performance,
thread-safe solution for concurrent queue management, suitable for algorithms
requiring efficient communication between multiple producers and consumers.
The Sync_bounded_queue is a synchronized data structure that provides
a thread-safe implementation of a bounded queue. It allows multiple threads
to concurrently add and remove elements from the queue, while ensuring that
the queue remains in a consistent state.
Some key features of the Sync_bounded_queue include:
Bounded Capacity: The queue has a fixed maximum capacity, preventing it from growing indefinitely and reducing the risk of memory exhaustion.
Thread-Safe: The queue is designed to be accessed and modified by multiple threads simultaneously, without fear of data corruption or other concurrency-related issues.
Efficient: The queue uses a circular buffer and atomic operations to minimize overhead and maximize performance.
Flexible: The queue provides a variety of methods for adding and removing elements, including blocking and non-blocking variants.
The queue enables multiple threads to access and modify the data concurrently, improving overall system throughput and responsiveness. By providing a thread-safe implementation, the queue simplifies the development process and reduces the risk of concurrency-related bugs. The queue's bounded capacity and synchronization mechanisms help prevent data corruption and other concurrency-related issues, ensuring that the system remains stable and reliable.
Synchronized Simple Queue
The Locking_queue class offers a straightforward and centralized means
of accessing a concurrent queue shared among multiple threads. However, due
to its reliance on a single mutex and condition variable, it may introduce
contention between threads. To minimize potential performance bottlenecks,
it is advisable to limit usage to a single producer and single consumer
whenever possible.
Spin lock mutex
The Spin_lock_mutex class implements a spin-lock mutex, a synchronization primitive that allows threads to acquire exclusive access to a shared resource. The lock is designed to minimize contention between threads by using a spinning approach, where waiting threads continuously poll the lock status instead of blocking.
The lock() method attempts to acquire the lock, and if unsuccessful, enters a busy-wait loop where it periodically yields control to other threads to reduce CPU usage. The try_lock() method attempts to acquire the lock without blocking, returning immediately if the lock is already held by another thread. The unlock() method releases the lock, allowing other threads to acquire it.
Note that this implementation uses atomic operations (test_and_set, test, and clear) to ensure thread-safety, and employs architecture-specific instructions (builtinia32pause, mmpause, etc.) to optimize the spinning behavior.
Sharded counter
The Sharded_counter class provides a thread-safe counter that allows
multiple threads to update and retrieve their individual counts concurrently.
It maintains a collection of counters, each associated with a unique thread ID,
enabling efficient and synchronized access to these counters across multiple
threads.
The class offers several methods that:
* initialize the counter collection with the specified number of threads,
setting each count to zero
* return the current count for the specified thread ID
* return the aggregate sum of all thread-specific counts
* store a value for the specified thread ID using store
* atomically increment the count for the specified thread ID by the provided\
value
* reset all thread-specific counts to zero
By utilizing atomic operations and a thread-local storage approach, the Sharded_counter class ensures accurate and efficient counting across multiple threads, making it suitable for various concurrent programming scenarios.
Scheduler layer
The scheduler layer is enclosed in the MySQL Scheduler library.
The MySQL Scheduler library is a C++ library designed to manage tasks and their dependencies in a multi-threaded environment. It provides a flexible and efficient way to schedule tasks, ensuring that they are executed in the correct order and at the right time, meaning when all dependencies of a tasks are met.
Overview
The scheduler layer is implemented in a form of a new mysql_scheduler library.
The library consists of several components:
- Scheduler: The main component responsible for managing and executing tasks and their dependencies.
- Task: Abstract concept that represents a single task that needs to be executed.
- Task phase: A task phase is a stage in the execution of a task. A task can have multiple phases, each with its own and independent from other phases timing dependencies.
- Task Schedule: Defines the timing of a task and separate task phases if any
- Scheduler clock: Provides a way to measure current time which serves as a trigger to run a task or a task phase
- Dependency Tracker: Keeps track of end-to-start dependencies between tasks.
- Thread Pool: Manages a pool of threads that execute tasks.
- Synchronized queue: An abstract concept of a concurrent data
structure used to exchange data between different threads, i.e.:
- task provider and scheduler thread (1 producer - 1 consumer)
- scheduler thread and workers in the thread pool (1 producer - N consumers or N producer - N consumers depending on configuration defined in the Service Layer).
- Task registry: A synchronized data structure that provides a concurrency control mechanism, reducing contention among threads accessing the registry.
- Statistics Monitor: A dedicated component designed to efficiently collect system statistics from multiple threads.
A simplified view of the data flow is shown below:
Scheduler
A scheduler is a component that manages the execution of Tasks, ensuring that they are executed in the correct order and at the right time, i.e. according to the defined Task Schedule. It acts as a coordinator between tasks, allocating resources and prioritizing their execution.
Characteristics of the Task Scheduler are:
- Task Management: A scheduler manages a collection of tasks, each with its own timing and dependencies.
- Prioritization: A scheduler prioritizes tasks based on their urgency, importance, and dependencies.
- Synchronization: A scheduler synchronizes the execution of tasks, ensuring that they are executed in the correct order.
We implement a time-based scheduler which runs according to a defined, global task clock. When a Task is executed in phases, each phase is run according to a different Scheduler clock instance, registered for a particular task phase. Scheduler prioritizes based on time the task should execute on and based on task ID (task with a lower ID is likely to run before a task with higher ID).
When timing dependencies of a task are met, Scheduler refers to a defined Dependency tracker to check if other end-to-start task dependencies are satisfied. If not, task will wait in a queue with other task ready to execute according to their timing dependencies, but not ready to execute according to end-to-start task dependencies.
When all timing and end-to-start dependencies are met, task is fired to the Thread Pool for asynchronus execution. After task is finished, independent Thread Pool workers call task and/or phase callbacks which update phase/global clock and global end-to-start dependencies defined in the Dependency Tracker.
During execution, all classes have access to Statistics Monitor, which is used to gather statistics for all of the system components.
The sequence flow of the Scheduler is presented in the figure below:
Task
A task is an abstract concept that represents a single task that needs to be executed. Task may be run in several Task Phases. There is no implementation of a Task in the Scheduler library. A task is considered to be a runnable object that may be run several times according to a defined Task schedule. Task is identified in the system by an artificial task ID, assigned by a task provider with an usage of task ID generator.
Task Phase
A task phase is a stage in the execution of a task. A task can have multiple phases, each with its own timing and dependencies. Task phases are used to divide a complex task into smaller, manageable pieces that can be executed independently.
Characteristics of a Task Phase
- Timing: Each task phase has its own timing, which defines when it should be executed.
- Execution: Each task phase is executed independently, and its execution is triggered by the scheduler.
Task phase concept is introduced to implement start-to-start commit order dependencies. Transaction firstly run apply phase according to global task dependencies. Later on, it waits on its turn to commit. When the scheduler runs commit phase, transaction registeres for commit and releases the next transaction in line to register for commit. Commit itself runs in parallel with respect to the Scheduler.
Task Schedule
A task schedule is a representation of a task's timing dependencies. It defines when a task should be executed, i.e. what timing dependencies it has on other tasks. Also, it defines task phases if any. Particular phase and phase clock should be registered in the Scheduler before trying to run a task schedule that is executed in separate phases.
Characteristics of a Task Schedule
- Timing: A task schedule specifies the timing constraints of a task, such as its deadline, period, and offset.
- Execution: A task schedule defines the execution characteristics of a task, such as its priority and resource requirements.
Scheduler clock
A scheduler clock is a component that provides a timing reference for a scheduler. Scheduler refers to a global clock to find out which task should execute at specific time window. Scheduler also supports registrable task phases. If tasks execute different phases and those phases have timing dependencies between each other, the scheduler will refer to each task phase clock to manage timing dependencies between registered phases. To know particular phase, phase clock and execution time, the Scheduler will consult a Task Schedule that corresponds to a particular Task.
Dependency Tracker
A dependency tracker is a component that monitors the dependencies between tasks in a centralized way. It ensures that tasks are executed in the correct order, based on their dependencies. It implements end-to-start dependencies defined between various tasks.
Characteristics of a Dependency Tracker:
- Dependency Monitoring: A dependency tracker monitors the dependencies between tasks, detecting changes and updates.
- Order Enforcement: A dependency tracker enforces the correct order of task execution, based on their dependencies.
Thread Pool
A thread pool is a component that manages a group of threads, allocating them to tasks as needed. It optimizes resource utilization and minimizes overhead.
Characteristics of a Thread Pool:
- Thread Management: A thread pool manages a group of threads, creating and destroying them as needed.
- Task Allocation: A thread pool allocates threads to tasks, ensuring that each task has sufficient resources.
- Resource Optimization: A thread pool optimizes resource utilization, minimizing idle time and maximizing throughput.
Task allocation and resource optimization is achieved at the level of workers Free workers utilize the Thread Pool synchronized queue (Sync_bounded_queue) to fetch the next task to be executed. All tasks waiting in the synchronized queue are inserted there by a Scheduler thread, after ensuring the tasks dependencies are met. Communication queue between Thread Pool and Scheduler is a N consumer - N producer queue, therefore, it is not implied whether each Scheduler in the system should use its own or a common global scheduler.
Concurrent communication queue
A concurrent queue is a data structure that enables safe and efficient communication between multiple threads or processes. It allows producers to add messages to the queue and consumers to retrieve messages from the queue, all while ensuring thread-safety and avoiding data corruption.
As a high performance communication queue between N producers and N consumers we supply the blocking, bounded, and thread-safe implementation of a FIFO queue, called Sync_bounded_queue.
The Sync_bounded_queue is bounded, which means that there is a limited number
of elements in the queue.
The Sync_bounded_queue behaves as a lock-free queue when:
- there is a sufficient number of tasks for consumers to process in the queue
- there is a sufficient number of free slots in the queue to insert new data
Producers and consumers are synchronized with the usage of atomic counters and atomic flags, which implementation is supplied by the STL. Each producer / consumer calculates a unique id of an empty / full queue slot using atomic integers pointing to queue head or tail. If there is not enough full elements, consumer will block on its empty slot until a producer puts an element in this slot. If there is not enough empty elements, a producer will block on the first-to-consume slot and wait for consumer to empty it.
A conceptual representation of the Sync_bounded_queue is provided in the
following figure:
Task Registry (Task Registry Multi)
The Task_registry_multi class is a template-based concurrent registry for tasks that handles hash conflicts by allowing multiple task entries to coexist in the same hash bucket. It implements thread-safe access to task objects using a bucketed hash table with locks per bucket.
Threads do not contend when they operate on separate entries. When a conflict occurs (task ID), threads use spin locks to lock requested resources (entry in a bucket).
Spin locks excel when locks are held for very brief periods, such as in this registry, where operations like checking task activation or updating entries are quick atomic operations. The spin lock avoids expensive context switches that std::mutex would incur.
Each bucket (determined by task ID hash) contains an unordered_map mapping task IDs to their associated objects and state. This design resolves collisions that could occur with simple hashing, ensuring that tasks with IDs mapping to the same bucket index can be stored and accessed efficiently.
Statistics monitoring
The Statistics_monitor class provides a centralized mechanism for managing
and tracking various statistics across multiple instances (channels) and
threads. It enables registration, retrieval, and updating of statistics,
ensuring thread-safety and efficient access to statistical data.
Key features of the class include:
- Instance management: Multiple instances of Statistics_monitor can be created, each with its own set of statistics. Instances are stored in a global map, accessible via the get method.
- Statistic registration: Statistics can be registered with a unique name and optional thread count, allowing for per-thread tracking.
- Statistic retrieval: Registered statistics can be retrieved using their name, with options for getting accumulated values or thread-specific values.
- Statistic updates: Statistics can be updated using addition or exchange operations, supporting incremental updates and replacement of existing values.
- Reset functionality: All registered statistics can be reset to their initial state using the reset method.
Statistics are identified using a unique string name. Mapping of the unique
identifier used within the scheduler library to string name is realized in
the mysql::scheduler::Statistics_map.
The Statistics_monitor class utilizes a combination of atomic operations,
mutexes, and concurrent counters to ensure thread-safety and efficient access
to statistical data. It provides a flexible and scalable solution for
monitoring and analyzing system performance, errors, and other metrics
in multi-threaded environments.
Some notable methods and their purposes include:
- init: Initializes the statistics monitor, preparing it for use.
- register_stat: Registers a new statistic with a unique name and optional thread count.
- find_stat and get_stat: Retrieve registered statistics by name, with options for accumulated or thread-specific values.
- register_stat, find_stat and get_stat: Register and access per-instance statistics.
- statistic values are updated through returned stat objects (for example,
addorstore) rather thanupdate_*helper methods. - reset: Resets all registered statistics to their initial state.
Overall, the Statistics_monitor class provides a robust and efficient framework for collecting, tracking, and analyzing statistical data in complex systems (CSA service), making it easier to diagnose issues, optimize performance, and improve overall system reliability.
Statistics map
The Statistics_map class serves as a registry for mapping unique
identifiers to string names for various statistics. It defines a set of
predefined statistic names as constants. The class also provides a mechanism
for initializing statistics for a given instance ID and thread count, ensuring
proper setup and tracking of statistical data. Initialization and readiness synchronization are handled in Statistics_monitor; this class defines keys and the init_statistics routine.
Service layer
The primary responsibility of the CSA service is to apply changes from the source database to the replica database in real-time, ensuring data consistency and integrity. The Change Stream Applier is designed to work with the MySQL replication protocol, specifically with the binary log format.
Overview
The Change Stream Applier Service (CSAS) is a singleton instance managing channels
configured with APPLIER_VERSION=2. It runs the main processing loop (via its
run() function) to continuously retrieve jobs from the Transaction Provider,
calculate dependencies using a Dependency Adapter, create executable tasks
with schedules, and submit them to the Scheduler. It also provides functions
for shutdown (stop()) and statistics access (get_statistics()), while
creating and managing instances of all core components per channel.
The Change Stream Applier service consists of several key components:
- Transaction Provider: Responsible for fetching binary log events or their metadata from the Replication Storage, and for creating Job objects populated with corresponding Fetchable Event Set objects.
- Log prefetcher: An optional asynchronous thread that, when enabled, proactively fetches large blocks of data from the Relay Log or Binary Log, improving performance by reducing latency and optimizing data retrieval.
- Scheduler: Manages the execution of Executable Jobs (tasks), ensuring that they are applied in the correct order and that dependencies between them are respected.
- Worker Pool: A group of worker threads that execute Jobs.
- Session Service: Manages replica sessions for Jobs
- Session : Object that encapculates server session (THD) and associated Relay Log Info (RLI)
- Job: Handles the execution of a single transaction, including:
- attaching / detaching from the replica session
- fetching from the underlying storage, as defined by the corresponding Fetchable event sets,
- managing transaction retry after transient errors
- error reporting
- application of a sigle transaction to the replica database in phases, as defined by the commit order replica rules
- Fetchable event set: A collection of transaction events originating
from the same storage type (cache or a specific relay log file), specifying:
- The method for retrieving events from the underlying storage
- The criteria for purging applied relay log files
- The approach for decompressing events
- Transaction conflict manager: An entity processing concurrent conflict resolution requests from CSA worker threads
- Applier channel monitor: Monitors the applier for stalls by tracking progress metrics and attempting to unblock the scheduler if no progress is detected.
- Resource Monitor: Responsible for controlling the named applier resources, such as channel event memory.
The Change Stream Applier Service that runs for a specific channel operates as follows:
- Initialization: Initializes the necessary components, including the transaction provider, scheduler, worker pool, and session service.
- Transaction Processing Loop: Enters a continuous loop where it retrieves Jobs generated by the transaction provider, resolves dependencies between them using a Dependency Calculation Adapter, and submits them to the scheduler. The scheduler manages job execution, queuing them until their dependencies are met, and then dispatches them to the worker pool for processing.
- Shutdown: Shuts down the service either in response to a termination signal or due to an unrecoverable error.
The simplified sequence diagram for the CSA service is illustrated below:
Jobs
Jobs implemented in the CSA service form a hierarchy of classes and adapters for executing tasks in a multi-threaded environment. They are implemented to provide a flexible and extensible framework for executing tasks in the CSA service, support various types of jobs, including binlog jobs and applier jobs, allow for test jobs that do not perform actual work and provide adaptation of Jobs to be able to ingest them by the CSA Scheduler.
Upon entering the Thread Pool, the Job commences execution. A simplified sequence diagram illustrating this process, excluding certain error-handling aspects, is depicted below:
Class Hierarchy
Job: Abstract base class defining the interface for jobsJob_binlog: Concrete implementation of theJobinterface for binlog jobsJob_applier: Concrete implementation of theJob_binloginterface for applier jobsJob_stub: Concrete implementation of theJobinterface for dummy jobs, used for testingTask_exec_job: Job adapter that wraps aJobinstance and provides additional functionality to be able to execute a Job as a Scheduler Task.
Replication Storage and interaction with Jobs
The purpose of the replication storage implemented in the CSA service is to provide a set of methods able to fetch transaction from the Relay log and abstract the exact implementation of the Replication Storage, which is the Relay log, from the implementation of the task scheduling and execution.
Replication Storage consists of several classes that form a hierarchy of readers and jobs for interacting with a replication storage system.
Class Hierarchy implemented is as follows:
Event_set_fetchable: Abstract base class defining the interface for fetchable event setsEvent_set_fetchable_relay_log: Concrete implementation ofEvent_set_fetchablefor relay logs that fetches transaction events from the relay log by parallel thread pool workers before applying each event. With this object, transaction may be fetched many times from the relay log on demand, e.g. when transaction retry is needed. Also, it does not require to keep full transaction data in memory, transaction event is fetched before application of this event and released after. In this streaming dispatch, the fetchable transaction is returned after GTID and the remaining events are published incrementally, allowing the worker to start applying the transaction before the provider has parsed its full tail.Event_set_fetchable_cache: Concrete implementation ofEvent_set_fetchablethat serves as an empty stub for the relay log reader that supplies Jobs with full transaction data that is kept in memory until transaction is succesfully appliedRelay_log_deleter: Class responsible for handling references to particular relay log files and responsible for removing files when the last reference to a particular file is releasedRelay_log_adaptive_reader: A class responsible for extracting transaction metadata or data from the Relay Log. It generatesEvent_set_fetchable_relay_logmetadata objects, which enable on-demand fetching of transaction events from the Relay Log, orEvent_set_fetchable_cacheobjects, which retrieve events from memory. The reader dynamically selects the object type based on the current worker load: if the load is below 50%, it creates Event_set_fetchable_relay_log objects, allowing workers to read transactions directly from the Relay Log; otherwise, it createsEvent_set_fetchable_cacheobjects to optimize performance. Additionally, the reader delegates transaction reading to worker threads whenever the transaction size exceeds a predefined threshold, to avoid high memory consumption by cached transaction data.Transaction_provider: Interface for particular implementation of transaction provider.Sync_transaction_provider: Implementation of theTransaction_providerinterface that reads Jobs in the same, synchronous thread.Async_transaction_provider: Implementation of theTransaction_providerinterface that reads Jobs in a separate, asynchronous thread.Event_reader_controller: TheEvent_reader_controlleris a newly introduced class that supersedes theRpl_applier_reader. Its primary purpose is to facilitate advanced reading capabilities, including the ability to extract transaction event metadata and automatically remove relay logsReader_controller_read_type: Enumeration class specifying howRelay_log_adaptive_readerreads from the relaylog (event metadata, event metadata + encoded payload, full event)
With an addition to aforementioned classes, there is a set of classes implementing an optional functionality of prefetching a fixed-size chunks of data from the relay log:
Prefetched_ifile: Implementation of theBasic_binlog_ifileinterface, allowing the Basic_binlog_file_reader to create objects of theIstream_prefetchedclassIstream_prefetched: Implementation of theBasic_seekable_istreaminterface, creating a prefetched stream of dataPrefetched_relaylog_reader: Alias to theBasic_binlog_file_readertemplate specialized with implementedPrefetched_ifileLog_prefetcher: This class initiates an asynchronous thread that prefetches consecutive relay or binary logs recorded in the index file.
Key Concepts:
- Fetchable Event Sets: Representations of events that can be fetched from a storage system on demand, many times, depending on underlying implementation
- Relay Log: The Replica Relay Log
- Job: Unit of work that can be executed by the system, currently, full transactions or objects containing full transaction metadata
- Transaction provider: Entitiy responsible for feeding the CSA service with fetchable Jobs
- Log Prefetcher: Entitiy responsible for asynchronous fetching of fixed-size chunks of raw bytes from the Relay Log
Session service
There are three implementations of session service, one of which is used for testing and performance comparison. Session service provides a free session on demand.
Session_service Interface
- Abstract base class defining the interface for session services
- Declares pure virtual methods for initialization, deinitialization, acquiring, and releasing sessions
Session_service_registry Implementation
- Concrete implementation of the Session_service interface
- Uses a cached approach to manage sessions
- Initializes a fixed-size pool of sessions during construction
- Acquires and releases sessions from the pool
- Session pool is synchronized using separate locks and condition variables for each entry
Implemetation of the Session_service_registry is based on the implementation
of the Task_registry provided by the mysql_scheduler library.
Session_service_dynamic Implementation
- Another concrete implementation of the Session_service interface
- Uses a cached approach to manage sessions
- Uses a list to store free sessions and an unordered map to track reserved sessions
- Synchronizes access to shared resources using one mutex and one condition variable
Session_service_bounded_queue Implementation
- Another concrete implementation of the Session_service interface
- Uses a cached approach to manage sessions
- Uses a Communication Queue implementation to access the managed session pool
Dependency calculation adapters
The provided classes form a hierarchy of dependency adapters that resolve dependencies between tasks based on transaction sequence numbers and last committed values. They are designed to work together to ensure correct ordering of tasks in a multi-threaded environment. The main purpose of provided adapters is to support various scheduling algorithms and strategies implemented in the Scheduler through the use of interchangeable adapters.
Adapter Hierarchy
Dependency_adapter: Abstract base class defining the interface for dependency adaptersDependency_adapter_lwm: Concrete implementation resolving dependencies based on transaction sequence number and last committedSequential_clock_adapter: Concrete implementation resolving dependencies based on transaction sequence number and last committed, returning clock delay windows
Relationships Between Classes
- All concrete adapter implementations inherit from the
Dependency_adapterinterface - Each adapter implementation builds upon the previous one, adding additional functionality and complexity
- Adapters interact with each other through their shared interface, allowing them to be used interchangeably
CSA statistics
CSA statistics are tracked through the Statistics Monitor, a feature introduced
in the concurrency layer. Additionally, the CSA service utilizes its own
dedicated statistics map, which is implemented within the
mysql::csa::Statistics_map class.
To support statistics logging, two new classes have been introduced:
Statistic_logger, which manages statistical data within a specified time
frame, and Sliding_window_counter, a helper class that tracks both the
current and previous values of a specific statistic.
Transaction conflict management
The Transaction_conflict_manager class is designed to manage transaction
conflicts by running an asynchronous thread that processes tasks from a queue.
The main features of this class are:
- Asynchronous Processing: The class runs a separate thread (m_thread) that executes tasks from a synchronized queue (m_cache).
- Task Enqueueing: Tasks can be enqueued using the enqueue method, which adds
a
Rescue_taskto the queue. - Thread Management: The start method starts the asynchronous thread, and the stop method stops the thread and waits for it to finish.
- Status Checking: The is_stopped method checks whether the thread has been
stopped.
The class uses a
Locking_queueto store tasks, ensuring thread safety. The run_thread method is the main loop that consumes tasks from the queue and executes them.
The Transaction_conflict_monitor class is a singleton that manages
a collection of Transaction_conflict_manager instances. The main features
of this class are:
- Singleton Pattern: The class follows the singleton pattern, ensuring that only one instance is created.
- Instance Management: The get method returns a Transaction_conflict_manager
instance associated with a specific
instance_id. - Lazy Initialization: The init method is called lazily when the first get request is made, ensuring that the instances are initialized only when needed.
- Thread Safety: The class uses atomic variables (m_init and m_ready) to ensure thread safety during initialization.
The Transaction_conflict_monitor class uses an array (m_instances) to store
Transaction_conflict_manager instances, with a fixed maximum size
(max_instances). The get method returns a reference to the instance
associated with the given instance_id, creating it if necessary.
Overall, the Transaction_conflict_manager class is responsible for managing
transaction conflicts, while the Transaction_conflict_monitor class provides
a singleton interface to manage multiple instances of
Transaction_conflict_manager.
Resource management
The Resource Monitor provides a framework for controlling named applier resources per channel. It is organized to support managing a wide variety of named resources, but currently only enforces the channel event memory limit. The memory limit for storing events is set using the CRST command with the APPLIER_EVENT_MEMORY_LIMIT option. However, there are certain cases where this limit is not enforced, such as: * Event decompression, which reports memory usage using a separate PSI key and is not counted towards the defined memory limit. * Commit events, which are small events that must be allowed to prevent deadlocks due to RPCO.
In CSA, worker threads are solely responsible for maintaining memory within the allowed limits, unlike in MTA. CSA operates in two reading modes:
- Caching full events: The memory is limited to 512B (the cached transaction size limit) multiplied by the maximum number of ongoing tasks, which is set to four times the number of worker threads.
- Caching transaction metadata: For transactions larger than 512B, only event information is cached. Worker threads are responsible for reading the transaction event payload from the relay log and keeping memory usage within defined bounds. Unlike MTA, where events are cached in the worker queue, CSA reads and releases events individually (except for compressed events). As a result, the Event_set_fetchable_relay_log class, using Resource_monitor methods, is responsible for checking and waiting for available memory within the defined limit.
Applier progress monitoring
The Applier_channel_monitor monitors the applier for stalls by periodically checking progress metrics and attempting to unblock the scheduler if no progress is detected. It tracks applied events, scheduler/commit clocks, and active transactions over a configurable refresh interval (default 10 seconds). If a stall is detected, it tries to unblock scheduler with limited attempts per stall based on session count.
Server layer
The provided codebase includes several changes to the MySQL server, primarily focused on enhancing the replication capabilities and introducing a new Change Stream Applier.
New Features
- Change Stream Applier Service: A dedicated service responsible for executing a new Change Stream Applier and handling asynchronous requests such as stop of the applier or obtaining the applier statistics.
- Automatic Relay Log Removal: The new applier features distributed and automatic removal of relay logs, which helps manage disk space without requiring synchronization across all threads at a single point in the file.
- Enhanced Binary Log Stream Reading: Modifications have been made to the
server-side binary log streams, enabling:
- Retrieval of transaction event metadata without requiring full event decoding
- Direct access to transaction event payloads without needing to decode the entire event
- Event_reader_controller: The
Event_reader_controlleris a newly introduced class that supersedes theRpl_applier_reader. Its primary purpose is to facilitate advanced reading capabilities, including the ability to extract transaction event metadata and automatically remove relay logs.
Modified Components
- Slave_worker: The
Slave_workerclass has been modified to comply with the interface introduced to support commit order preservation in MTA workers and the Change Stream Applier workers. - Binary Log Stream Reading Enhancements: Updates have been made to key
classes, including
Basic_binlog_file_reader,Binlog_event_data_istreamandIBasic_binlog_file_reader, along with the introduction of new supporting structures. These changes enable:- Extraction of transaction event metadata without requiring full event decoding
- Direct access to transaction event payloads, eliminating the need for complete event decoding
- Preserve Commit Order on Replica: The server-side commit order manager has been integrated with the new applier, ensuring that transactions are committed in the correct order on the replica.
New Classes
- Csa_worker_context: A new
Csa_worker_contextclass has been introduced to represent the parallel worker context for the new applier. - Parallel_worker_context: An abstract
Parallel_worker_contextclass has been added to define the interface for parallel worker contexts for both, MTA and CSA workers
Compatibility
- Backward Compatibility: The new applier and related components are designed to be backward compatible with existing replication setups.
- Forward Compatibility: The new applier and related components are also designed to be forward compatible with future enhancements and updates.
Interface
The handle_slave_sql method now conditionally calls the CSA service or initializes replica workers and invokes old MTA functionality, depending on whether CSA is enabled. The CHANGE REPLICATION SOURCE TO command has been extended with two new options, one to steer the currently used applier version and one to specify a number of workers. Both options apply to the channel picked by the user.
Observability
The applier metadata table now stores the applier version, specified number of workers and specified memory limit for caching binlog events. The replication_applier_configuration Performance Schema (PFS) table exposes the aforementioned configuration parameters for a given channel.
DETAILED IMPLEMENTATION
Concurrency layer classes
Ticket Class
The Ticket class is designed to provide thread-safe access to the ticket. It is designed to be efficient, using a single atomic variable to store the ticket state. The Ticket class uses a lock-free approach to manage the ticket state, avoiding the use of locks and minimizing the use of atomic operations.
The Ticket class uses a single 64-bit integer to represent both the ticket value and the synchronization bit. This design choice minimizes memory usage and improves cache locality. The get_value() method clears the synchronization bit before returning the ticket value, ensuring that the returned value is always valid for comparison and arithmetic operations. The set_in_use() and set_used() methods modify the synchronization bit atomically to ensure thread safety.
Key components of the class are:
m_state: The internal state of the ticket, represented as a 64-bit integer. The most significant bit represents the synchronization bit, while the remaining 63 bits represent the ticket value.
kTicketUnset: A constant representing the default value of an unset ticket.
- min_value and max_value: Constants representing the minimum and maximum allowed ticket values.
The Ticket class implements the following methods:
- get_value: Returns the value of the ticket.
- get: Returns the internal value of the ticket, including the synchronization bit.
- set_in_use: Sets the synchronization bit to 1, indicating that the ticket is in use.
- set_used: Sets the synchronization bit to 0, indicating that the ticket is not in use.
- is_in_use: Returns whether the ticket is in use.
Atomic_ticket Class
The Atomic_ticket class uses an aligned atomic pointer to ensure that the ticket value is properly aligned and padded for atomic operations. The scoped_lock() method creates an Atomic_ticket_guard object to manage the lock acquisition and release, ensuring that the lock is always released when the guard goes out of scope. The load() and store() methods use atomic operations to ensure thread safety when accessing and modifying the ticket value.
Key components of the class are:
- m_ticket: An aligned atomic pointer to the ticket value, ensuring proper alignment and padding for atomic operations.
The Atomic_ticket class implements the following methods:
- load: Loads the value of the atomic ticket.
- store: Stores a new value in the atomic ticket.
- set_in_use: Sets the synchronization bit to 1, indicating that the ticket is in use.
- set_used: Sets the synchronization bit to 0, indicating that the ticket is not in use.
Atomic_ticket_guard Class
The Atomic_ticket_guard class uses a reference to the associated Atomic_ticket object to ensure that the lock is released when the guard goes out of scope. The get_next() and get_prev() methods provide access to the next and previous ticket values, respectively, allowing users to inspect the ticket values without modifying them. The set_next() method allows users to modify the next ticket value, which will be set when the lock is released.
Key components of the class are:
- m_ref: A reference to the atomic ticket being guarded.
- m_next_value: The next ticket value to be set when the lock is released.
- m_prev_value: The previous ticket value obtained when the lock
The Atomic_ticket class implements the following methods:
- get_next: Returns the next ticket value.
- get_prev: Returns the previous ticket value.
- set_next: Sets the next ticket value.
Sync_bounded_queue Class Template
Overall, the Sync_bounded_queue class template provides a high-performance, thread-safe solution for concurrent queue management, suitable for algorightms requiring efficient communication between multiple producers and consumers.
The Sync_bounded_queue class uses a circular buffer to manage the queue. The buffer is implemented as a vector of T objects, and the head and tail indices are used to keep track of the current position in the buffer.
The Sync_bounded_queue class uses atomic indexes to manage the head and tail positions in the buffer. This allows multiple threads to access the queue concurrently without fear of data corruption.
Validity Flags: Each element in the queue has an associated atomic validity flag (m_validity) indicating whether the element is valid (i.e., has been produced but not yet consumed) or invalid (i.e., has been consumed or never produced).
Atomic Indexes: Two atomic indexes (m_head and m_tail) keep track of the next position to produce and consume elements, respectively. These indexes wrap around the buffer when reaching the end.
Producer and Consumer Counters: Atomic counters (m_producer_counter and m_consumer_counter) track the number of active producers and consumers.
Template parameters of the class are:
- T: The type of elements stored in the queue. This allows the queue to be used with various data types, such as integers, strings, or custom objects.
- initial_capacity: The initial capacity of the queue. This determines the initial size of the underlying buffer and affects the performance characteristics of the queue.
Member variables implemented in the class are the following:
- m_values: A vector of T objects, representing the queue.
- m_validity: A vector of atomic_flags, representing the validity of each element in the queue.
- m_head: An atomic index representing the head of the queue.
- m_tail: An atomic index representing the tail of the queue.
- m_capacity: The capacity of the queue.
Methods implemented in the class are the following:
- enqueue: Adds an element to the queue, potentially blocking if the queue is full. Producers increment the m_head index and set the corresponding validity flag.
- dequeue: Removes an element from the queue, potentially blocking if the queue is empty. Consumers increment the m_tail index and clear the corresponding validity flag.
- peek: Returns the next element in the queue without removing it.
- get_next_index and get_index: Helper functions to calculate the next or current index for production or consumption, wrapping around the buffer if necessary.
- notify_all: Notifies all waiting consumers to wake up and check the end-of-execution flag (m_end).
- empty: Returns whether the queue is empty.
- full: Returns whether the queue is full.
Locking_queue Class Template
- A synchronized FIFO queue implementation, utilizing a fast spin lock for synchronization
- Designed for use 1 producer - 1 consumer communication scenarios
- Holds objects of any type, ensuring thread safety through the use of a mutex and condition variable
- Note: This class does not provide mechanisms to prevent consumers and producers from accessing the queue during destruction; ensure that the queue object outlives all producing and consuming threads
Attributes of the Locking_queue class template are:
m_queue: Underlying queue data structurem_mutex_access_queue: Mutex protecting access to the queuem_cv_empty_queue: Condition variable signaling when the queue is not emptym_memory_resource: Memory resource used for allocations
The Locking_queue class template implements the following methods:
enqueue: Adds an element to the queue (producer-side)- Returns
trueupon successful enqueueing (currently always returnstrue)
- Returns
empty: Checks if the queue is emptydequeue: Consumes an element from the queue (consumer-side), blocking until an element is available- Takes a callable predicate to determine when to stop waiting
- Returns a pair containing the consumed element and a boolean indicating success
notify_all: Notifies all waiting threads to unblock, facilitating graceful execution termination
Sharded_counter Class
- Implementation of a sharded concurrent counter, reducing contention on atomic counters when updated by multiple threads
- Utilizes a map to store thread-specific counter values, supporting efficient updates and minimizing contention
- Supports initialization for a specified number of threads, resetting statistics, and various update operations
Attributes of the Sharded_counter class are:
* m_value: Mapping between thread IDs and their corresponding counter
values, stored in an unordered map for fast lookups and updates
The Sharded_counter class implements the following methods:
* get: Retrieves the counter value for a specific thread ID, returning the
current value associated with the provided thread ID
* get: Performs a coalescing get operation, summing up the counter values
across all threads and returning the total
* store: Stores the counter value for a specific thread ID
* add: Updates the counter value (add operation) for a specific thread ID,
adding the provided argument to the current value
* init: Initializes the counter for a specified number of threads, setting
up the internal mapping with initial values of 0 for each thread ID
* reset: Resets the initialized statistics, clearing the counter values
for all threads
Spin_lock_mutex Class
- Implementation of a spin-lock mutex, satisfying the requirements of Mutex
- Provides lockable, default constructible, destructible, non-copyable, and non-movable properties
- Utilizes an atomic flag for internal synchronization, ensuring lock-free behavior
Attributes of the Spin_lock_mutex class are:
m_lock: Atomic flag indicating whether the lock is currently owned
The Spin_lock_mutex class implements the following methods:
lock: Acquires a lock, blocking access to a critical section untilunlockis called- Continuously attempts to acquire the lock using a test-and-set operation
- Waits until the lock is released using a yield instruction or pause hint
try_lock: Performs a non-blocking try-lock operation- Returns true if the lock is acquired successfully, false otherwise
- Checks the lock status using a relaxed memory order and attempts to set the lock using an acquire memory order
unlock: Unblocks access to a critical section for other threads- Clears the atomic flag using a release memory order
Condition_variable_wrapper Class
- MySQL wrapper for a condition variable, utilizing
mysql_cond_tas the underlying implementation - Provides a condition variable interface compatible with the STL, including
wait,wait_for,wait_until,notify_one, andnotify_allmethods - Ensures thread safety through the use of locks and atomic operations
Attributes of the Condition_variable_wrapper class are:
m_cv: The underlyingmysql_cond_tobject representing the condition variable
The Condition_variable_wrapper class implements the following methods:
wait: Blocks the current thread until notified or a predicate becomes true- Takes a unique lock and a predicate as arguments
- Continuously checks the predicate and waits on the condition variable until it becomes true
wait_for: Blocks the current thread for a specified duration or until notified- Takes a unique lock, a duration, and a predicate as arguments
- Waits on the condition variable for the specified duration or until the predicate becomes true
wait_until: Blocks the current thread until a specified time point or until notified- Takes a unique lock, a time point, and a predicate as arguments
- Waits on the condition variable until the specified time point or until the predicate becomes true
notify_one: Notifies one waiting thread- Signals the condition variable to wake up one waiting thread
notify_all: Notifies all waiting threads- Broadcasts the condition variable to wake up all waiting threads
Condition_variable_wrapper: Constructor initializing the condition variable- Takes a
PSI_cond_keyas an argument - Initializes the underlying
mysql_cond_tobject
- Takes a
~Condition_variable_wrapper: Destructor destroying the condition variable- Destroys the underlying
mysql_cond_tobject
- Destroys the underlying
Mutex_wrapper Class
- MySQL wrapper for a mutex, providing a template that can be specialized with a specific mutex implementation
- Satisfies the requirements of Lockable, Destructible, non-copyable, and non-movable
- Implements the standard C++ Named Requirement Lockable
- Offers methods for locking, trying to lock, unlocking, and accessing the native handle
Attributes of the Mutex_wrapper class are:
m_mutex: The underlyingmysql_mutex_tobject representing the mutex
The Mutex_wrapper class implements the following methods:
Mutex_wrapper: Constructor initializing the mutex- Takes a
PSI_mutex_keyas an argument - Initializes the underlying
mysql_mutex_tobject usingmysql_mutex_init
- Takes a
~Mutex_wrapper: Destructor destroying the mutex- Destroys the underlying
mysql_mutex_tobject usingmysql_mutex_destroy
- Destroys the underlying
lock: Acquires a lock, blocking access to a critical section untilunlockis called- Calls
mysql_mutex_lockto acquire the lock
- Calls
try_lock: Performs a non-blocking try-lock operation- Calls
mysql_mutex_trylockto attempt to acquire the lock - Returns true if the lock is acquired, false otherwise
- Calls
unlock: Unblocks access to a critical section for other threads- Calls
mysql_mutex_unlockto release the lock
- Calls
native_handle: Returns the native handle of the mutex- Returns a pointer to the underlying
mysql_mutex_tobject
- Returns a pointer to the underlying
Stage Header File
- Conditional inclusion of header files based on compilation environment
- Defines macros for setting stages in MySQL server and standalone library contexts
Macros defined in this header file:
MYSQL_PL_SET_STAGE(key): Sets the stage in MySQL server context- Expands to
mysql_set_stage(key)when compiled within the MySQL server - Expands to
(void)(arg)when compiled outside the MySQL server
- Expands to
MYSQL_PL_SET_THD_STAGE(thd, key): Sets the stage for a specific thread in MySQL server context- Expands to
THD_STAGE_INFO(thd, key)when compiled within the MySQL server - Expands to
(void)(thd); (void)(key)when compiled outside the MySQL server
- Expands to
Conditional includes:
- Includes
mysql/psi/mysql_stage.handsql/changestreams/apply/psi/psi.hwhen compiled within the MySQL server with PSI interface support - Includes either
mysql/concurrency/stage_stl.hormysql/concurrency/stage_srv.hdepending on whether the library is compiled standalone or within the MySQL server
Thread Header File
- Conditional inclusion of header files based on compilation environment
- Selects between MySQL server-specific and STL-based thread implementations
Conditional includes:
- Includes
mysql/concurrency/thread_stl.hwhen compiled as a standalone library (STANDALONE_LIBS_MYSQLdefined) - Includes
mysql/concurrency/thread_srv.hwhen compiled within the MySQL server (STANDALONE_LIBS_MYSQLnot defined)
Macro definitions:
MYSQL_CONCURRENCY_THREAD_SRV_H: Defined when compiling within the MySQL serverMYSQL_CONCURRENCY_THREAD_STL_H: Defined when compiling as a standalone library
Thread Class
- Wrapper around the MySQL thread API, matching the interface of
std::thread - Provides a way to create and manage threads in a MySQL-specific context
Attributes of the Thread class are:
m_thread_handle: Handle to the underlying MySQL threadm_thread_attr: Attributes of the threadm_thread_key: Key identifying the thread
Methods of the Thread class:
Thread: Default constructorThread: Move constructorThread: Constructor creating a new thread with a given function and arguments- Overloaded to take either a
Thread_keyor not - Takes a callable object and variable number of arguments
- Overloaded to take either a
join: Joins the thread, waiting for its completion- Calls
my_thread_jointo wait for the thread to finish
- Calls
Macro definitions:
MDEF_CREATE_THREAD: Convenience macro for creating a new thread- Expands to a
mysql::concurrency::Threadconstructor call with the given arguments
- Expands to a
Scheduler layer classes
Scheduler Class
The Scheduler class is designed to manage tasks and their dependencies in a
concurrent environment. It provides methods for enqueuing tasks, adding
dependencies between tasks, and synchronizing the scheduler.
Overall, the Scheduler class provides a robust and customizable way to manage
tasks and their dependencies in a concurrent environment. Its design allows
for efficient and safe execution of tasks, and its templates provide
flexibility and customization options.
Interaction Between Classes:
- Dependency Tracker (
Dependency_tracker): Manages task dependencies. - Thread Pool (
Thread_pool): Executes tasks concurrently. - Scheduled Task (
Scheduled_task): Represents tasks with defined time dependencies - Scheduler Clock (
Scheduler_clock): Manages time-related operations.
The Scheduler class provides various methods for managing tasks and their
dependencies. Here are the descriptions of each method:
Attributes of the Scheduler class:
m_tasks: Queue holding tasks ready for schedulingm_task_phases: Map of phase clocks and their associated task queuesm_dependencies: Dependency tracker for tasksm_thread_pool: Thread pool executing tasksm_scheduler_clock: Scheduler clock managing time dependenciesm_allowed_task_count: Maximum allowed number of tasks in the schedulerm_notification: Atomic wakeup flag for scheduler notificationsm_cv_scheduler: Condition variable used by the scheduler main threadm_mutex_tasks/m_mutex_phases: Locks protecting global and phase queuesm_psi: Performance Schema instrumentation parameters
Methods of the Scheduler class:
Scheduler: Constructor initializing the scheduler with instance ID, allowed task count, scheduling parameters, and PSI parametersdeinit: Deinitializes the scheduler, stopping the scheduler thread and releasing resourcesenqueue: Enqueues a task with the given schedule and functorenqueue_after: Enqueues a task after a predecessor task finishesadd_dependency: Adds a dependency between two taskssynchronize: Synchronizes the scheduler, waiting for tasks to finishsynchronize_partial: Synchronizes the scheduler partially, waiting for a portion of tasks to finishget_timeouts: Returns the number of timeouts occurred in the scheduleris_error: Checks if an error has occurred in the schedulerregister_phase: Registers a phase clock with the schedulerget_scheduled_tasks_count: Returns the number of scheduled taskscallback: Callback function executed after a task finishescallback_phase: Callback function executed after a task phase finishesenqueue_phase: Enqueues a task phaseenqueue_internal: Internal function enqueuing a taskrun_main_thread: Main loop of the scheduler threadend_execution: Ends the execution of the scheduler threadwait_for_scheduler_thread_to_stop: Waits for the scheduler thread to stophandle_error: Handles errors occurring in the schedulerscheduler_clean_up: Cleans up resources after an error occursnotify_scheduler: Notifies the scheduler to wake upget_phase_queue: Returns the task queue for a given phase clockget_phase_queue_lock: Returns the lock for a given phase clock
Task dependencies
There are two ways to ensure task dependencies in the Scheduler class:
Method 1: Using the Scheduler_clock
The Scheduler_clock class represents a clock that tasks can subscribe to for
execution at specific time windows. Tasks can call the add_time method to
subscribe to a particular time window, and the tick method is called when a
task finishes its execution. The now method returns the current time point,
which determines when a task is ready for execution.
This approach ensures that tasks are executed in a specific order based on their subscription time. For example, if Task A subscribes to time window 10 and Task B subscribes to time window 20, Task A will be executed before Task B.
This design allows for easy switching between different clock implementations, as the Scheduler class only depends on the Scheduler_clock interface, not the specific clock implementation. Additionally, new clock implementations can be added without modifying the Scheduler class, as long as they conform to the Scheduler_clock interface.
The Scheduler_clock interface has multiple concrete implementations in the
scheduler library, including Logical_clock, Clock_lwm_registry,
Clock_lwm_set, and Commit_order_clock.
In CSA, the global scheduler clock is Clock_lwm_registry, and a
Commit_order_clock phase clock is used when
replica_preserve_commit_order=ON (default).
Method 2: Using the Base_dependency_tracker
The Base_dependency_tracker class provides an interface for tracking
dependencies between tasks. It allows tasks to register themselves, add
dependencies between tasks, and check if a task's dependencies are met.
Tasks can use the add_dependency method to specify that they depend on
another task. The check_ready method can then be used to determine if a task's
dependencies are met, and the mark_dependency_met method is used to indicate
that a task has finished its execution and its dependencies are met.
For example, if Task A depends on Task B, Task A will not be executed until Task B has finished its execution and marked its dependencies as met.
Each of depencency tracking implementations interacts with the Scheduler class through interface class, allowing the Scheduler to manage task dependencies and schedule tasks based on their dependencies. Each of provided implementations provides a different approach to managing task dependencies, ranging from a simple stub implementation to a more complex dependency graph. The choice of implementation depends on the specific requirements of the application and the desired trade-off between complexity and performance.
The Base_dependency_tracker class has two concrete implementations:
Dependency_tracker_stub and Dependency_tracker_single_predecessor.
In CSA runtime, default wiring uses Dependency_tracker_stub.
Scheduler_stub Class
- Stub implementation of the
Schedulerclass for testing purposes - Provides basic functionality for enqueuing tasks and synchronizing the scheduler
Attributes of the Scheduler_stub class:
m_scheduled_tasks_cnt: Counter for the number of scheduled tasksm_thread_pool: Shared pointer to a thread poolm_is_error: Flag indicating whether an error has occurredm_scheduler_clock: Shared pointer to a scheduler clockinternal_time: Internal time variable for synchronization
Methods of the Scheduler_stub class:
Scheduler_stub: Constructor initializing the scheduler stubdeinit: Deinitializer for the scheduler stubenqueue: Enqueues a task with the given schedule and functorenqueue_after: Enqueues a task after a predecessor task finishessynchronize: Synchronizes the scheduler, waiting for tasks to finishis_error: Checks if an error has occurred in the schedulerenqueue_internal: Internal function for enqueuing tasks
Template functions:
enqueue_internal: Internal function for enqueuing tasks with variable argumentsenqueue: Enqueues a task with the given schedule and functorenqueue_after: Enqueues a task after a predecessor task finishes
Task_registry Class
The Task_registry class is a template class that provides a way to manage tasks and their associated objects in a thread-safe manner. It was designed as a concurrent data structure implemented over a circular buffer, utilizing the lock striping technique to reduce contention between different threads accessing resources of different tasks.
Template paramters are:
- Task_id_type: Type of the task ID, used to uniquely identify each task.
- Task_object_type: Type of the object associated with each task.
Member variables implemented in the class are the following:
- m_capacity: Maximum number of tasks that can be stored in the registry.
- m_tasks: Vector of Entry objects, where each Entry represents a task and its associated object.
- m_initialized: Flag indicating whether the registry has been initialized.
The Task_registry class provides several methods for managing tasks
and their associated objects:
- init: Initializes the registry with a specified capacity.
- deinit: Deinitializes the registry.
- apply: Applies a function to a task object.
- check_active: Checks if a task is active.
- register_entry: Registers a task and makes it active.
- activate_entry: Activates a previously registered task.
- activate_or_wait: Activates a previously registered task, waiting if necessary.
- get_copy: Returns a copy of a task object.
- get_ref: Returns a reference to a task object.
- get: Returns a reference to a task object, without acquiring the lock.
- lock: Acquires the lock for a task object.
- unlock: Releases the lock for a task object.
- deactivate_entry: Deactivates a task.
Task_registry_multi Class
The Task_registry_multi class is a template-based concurrent registry for tasks that handles hash conflicts by allowing multiple task entries to coexist in the same hash bucket. It implements thread-safe access to task objects using a bucketed hash table with locks per bucket.
Threads do not contend when they operate on separate buckets. When a hash conflict occurs, threads use spin locks to lock requested resources (bucket).
Spin locks excel when locks are held for very brief periods, such as in this registry, where operations like checking task activation or updating entries are quick atomic operations. The spin lock avoids expensive context switches that std::mutex would incur.
Each bucket (determined by task ID hash) contains an unordered_map mapping task IDs to their associated objects and state. This design resolves collisions that could occur with simple hashing, ensuring that tasks with IDs mapping to the same bucket index can be stored and accessed efficiently.
Key features: - Hash conflict resolution: Multiple tasks per bucket using std::unordered_map. - Thread-safe: Each bucket has its own lock (spin lock). - Template-based: Supports various task ID and object types.
Attributes of the Task_registry_multi class:
m_buckets: Vector of bucket pointers, each containing:std::unordered_map<Task_id_type, Sub_entry>: Maps task IDs to sub-entries.std::atomic<size_t> m_num_entries: Tracks the number of entries in the bucket.
Sub_entry: Wraps the task object, with access protected by the bucket lock.
The Task_registry_multi class implements the following methods:
Task_registry_multi: Constructor initializing buckets based on capacity (default 16384).apply: Applies a function to the task object if active. Returns true if successful.activate: Registers and activates a task. Returns true if newly registered.deactivate: Removes a task. Returns true if deactivated.bucket_active: Checks if the bucket for the task ID has any active tasks.
This class is used to keep track of the current LWM for executing transactions.
Thread Pool Class
The Thread_pool class is a template class that provides a way to manage a pool of worker threads that execute tasks asynchronously.
Template paramters are:
- T: Type of the tasks that can be executed by the thread pool.
Member variables implemented in the class are the following:
- m_workers: Vector of worker Thread objects.
- m_tasks: Bounded synchronized queue holding tasks to execute.
- m_end_execution: Per-worker stop flags used to gracefully terminate workers.
The Thread_pool class provides several methods for managing the thread pool and executing tasks:
- enqueue: Adds a task to the queue to be executed by a worker thread.
- run_worker: Single worker that runs in a loop, in each iteration removes a task from the queue and executes it.
- end_execution: Stops the thread pool from executing tasks (soft stop)
- debug methods: get queue size (queue_size) and print queue state ( 'print_queue_state').
Dependency_tracker_stub Class
This class provides a minimal implementation of the Base_dependency_tracker interface, which can be used as a placeholder or for testing purposes:
add_dependency: Does nothing.activate_task: Does nothing.check_ready: Always returns true.mark_dependency_met: Does nothing.
Dependency_tracker_single_predecessor Class
This class implements a thread-safe dependency tracker that supports at most one predecessor per task. The Dependency_tracker_single_predecessor class uses a Task_registry_multi to keep track of task dependencies. Each task is associated with at most one predecessor and can have multiple successors. When a task is completed, its successors are returned as newly ready for execution.
The Dependency_tracker_single_predecessor class uses lock striping to avoid high contention. This means that multiple threads can access the dependency information for different tasks without blocking each other.
Attributes of the Dependency_tracker_single_predecessor class are:
m_dependencies: A registry of tasks and their dependencies.
The Dependency_tracker_single_predecessor class provides the following
methods:
Dependency_tracker_single_predecessor: Constructor that initializes them_dependenciesattribute.add_dependency: Adds a dependency between two tasks, setting the predecessor for the successor and adding the successor to the predecessor's list.activate_task: Activates a task in the registry.check_ready: Checks if a task is ready to be executed by verifying if its predecessor (if any) has finished.mark_dependency_met: Marks a task as finished, deactivates it, and returns its successors as newly ready tasks.
The Dependency_tracker_single_predecessor class behaves as follows:
- When a dependency is added between two tasks, the
add_dependencymethod is called to update the registry, provided the predecessor is active. - When a task is activated, the
activate_taskmethod is called to mark the task as active in the registry. - When a task is checked for readiness, the
check_readymethod is called to verify if its single predecessor (if present) has finished execution. - When a task finishes, the
mark_dependency_metmethod is called to deactivate the task and return its successors, which are now ready to execute.
Scheduler Clock Implementations
Relevant Scheduler_clock implementations for CSA and scheduler tests are:
Logical_clock: lock-free, task-driven logical window progression.Clock_lwm_registry/Clock_lwm_set: low-water-mark based logical clocks.Commit_order_clock: phase clock used by the commit phase for RPCO.
In CSA runtime, Clock_lwm_registry is used as the global clock, while
Commit_order_clock is used for the commit phase when
replica_preserve_commit_order=ON (default).
Logical_clock Class
The Logical_clock class is a concrete implementation of the
Scheduler_clock interface. It is designed to manage the execution of tasks
in a logical clock framework.
The Logical_clock class is a concurrent wait-free implementation that
maintains the current time window open for processing and a subscription-based
mechanism for consecutive time windows.
Attributes of the Logical_clock class are:
m_subscription_time_window: An atomic ticket that represents the current subscription window.m_current_time_window: An atomic ticket that represents the current time window.m_subscription_window_count: The number of tasks subscribed to the current subscription window.m_current_window_processed_count: The number of tasks processed in the current time window.m_subscribers_per_time: A queue of counters representing the number of tasks subscribed to each time window.
The Logical_clock class implements the following methods:
Logical_clock: Constructor that initializes the attributes.now: Returns the current time window.start_time: Returns the start time of the logical clock.add_time: Subscribes a task to a specific time window.tick: Notifies the clock that a task has finished its execution.try_unblock: Attempts to unblock the clock if possible.advances_independently: Returns false, indicating that the clock does not advance independently.
The Logical_clock class behaves as follows:
- When a task is subscribed to a specific time window, the
add_timemethod is called to update the subscription window and the number of tasks subscribed to the current subscription window. - When a task finishes its execution, the
tickmethod is called to update the current time window and the number of tasks processed in the current time window.
Since the Logical_clock class implementation is based on the lock-free
integers queue implementation and efficient lock-free ticketing system,
it can be used in large-scale systems where many tasks need to be managed
concurrently. In scheduler this implementaton is used to efficiently
implement task callbacks, when a lot of threads needs to acknowledge the
scheuler that they finished execution of a task at the same time.
A simplified sequence diagram illustrating the Logical_clock is shown
below:
This diagram highlights the key interactions between the global scheduler clock and other system components, omitting certain error handling details for clarity.
Clock_lwm_registry Class
The Clock_lwm_registry class is the default concrete implementation
of the Scheduler_clock interface for LWM-based dependency handling.
It manages a registry of LWM (Low Water Mark) states, using
task IDs of finished tasks to compute the current LWM clock value.
LWM is monotonically increasing and represents the next task ID that can run
without violating the contiguous executed-prefix constraint.
Transactions with dependency delay <= current LWM clock value may
immediately execute. Transactions with dependency delay > current LWM clock
value must wait for the clock to advance.
Class uses lock-striping and atomic operations to reduce
contention between threads.
Attributes of the Clock_lwm_registry class are:
m_executed_registry:Task_registry_multistoring per-task execution state (started,finished).m_current_lwm: Atomic variable storing current low-water-mark clock value.
The Clock_lwm_registry class implements the following methods:
Clock_lwm_registry: Constructor initializing the LWM registry.now: Returns the current LWM-based time point.start_time: Returns the initial LWM timestamp.add_time: Activates task state in the executed-task registry.tick: Advances the LWM upon task completionadvances_independently: Returns false, as advancement depends on task completions.
The Clock_lwm_registry class behaves as follows:
- Tasks subscribe via
add_time. - On
tick, it confirms completion and potentially advances the LWM clock value. - This implementation uses atomic operations and a lock-striping data structure for scalability, integrating with Dependency_adapter_lwm for dependency resolution.
Clock_lwm_set Class
The Clock_lwm_set class is a concurrent data structure using the same LWM
computation algorithm as Clock_lwm_registry, using std::set guarded by a
single std::mutex. It computes LWM (Low Water Mark) based on executed tasks.
LWM is monotonically increasing and represents the next task ID that can run
while preserving the contiguous executed prefix.
Performance Characteristics:
- High single-threaded and low-concurrency performance
- Limited scaling
- Uses a global mutex for synchronization
- std::set operations are logarithmic in the number of tracked entries
- Suitable for workloads with moderate concurrency requirements
Dependency_adapter_lwm Class
Dependency adapter for LWM (Low Water Mark) based scheduling. Translates transaction sequence numbers into scheduler task IDs for LWM-based execution. This adapter handles key indexing differences between source and replica: - seq_num: Originates from the source (e.g., primary server) and starts at 1 for the first transaction in each group. It resets to 1 at group boundaries (e.g., server restarts, configuration changes, or failures). seq_num=0 is reserved for invalid cases (e.g., SEQ_UNINIT). - task_id: Assigned sequentially by the scheduler on the replica, starting from 0 and monotonically increasing as long as the applier is running. Unlike seq_num, task_id does not reset and continues growing across seq_num resets. Wrap-around handling: If task_id wraps (uint64_t overflow), the adapter detects it (new < max && max near UINT64_MAX), clears mappings, resets barrier to 0, and continues from the new baseline to maintain order. - LWM (Low Water Mark): Tracks the longest prefix of tasks executed in commit order (with replica_preserve_commit_order=1). It is computed on the replica based on task_ids. - LWM = 0: No tasks have executed. - LWM = 1: Exactly one task has executed (task_id = 0). - If tasks with task_id 0 through 3 have executed in order, LWM = 4. LWM advances as tasks commit, representing the next task_id that can proceed without violating order.
Commit_order_clock Class
The Commit_order_clock class is a concrete implementation of the
Scheduler_clock interface. It is designed to manage the commit order of
transactions. The Commit_order_clock class uses atomic operations
to update the timestamp, making it suitable for high-performance applications.
The Commit_order_clock class provides a way to track the commit order of
transactions. It uses a monotonic counter that advances on tick(), and the
clock is used as a start-to-start phase dependency for commit registration.
Attributes of the Commit_order_clock are:
m_clock: An atomic variable that stores the current timestamp.m_twicked_count: Counter used by clock unblocking logic.
Methods implemented in the Commit_order_clock class are the following:
Commit_order_clock: Constructor that initializes clock state with a start point.now: Returns the current timestamp.start_time: Returns the scheduler start offset for this clock (currently0).add_time: Always returns true, as this clock does not enforce any constraints on the timestamps.tick: Advances the commit-order counter for start-to-start sequencing and returns true.get_type: Returns the type of the clock, which isstart_to_start.
The Commit_order_clock class behaves as follows:
- When a new transaction is ready to commit, the
nowmethod is called the current commit timestamp to compare with the schedule of the next transaction to commit - When a transaction is registered for commit, the
tickmethod is called to increment the timestamp. - The
add_timemethod is not used in this implementation, as it does not enforce any constraints on the timestamps. - The transactions register for commit in order, note that commit order itself is parallel with respect to the Scheduler.
Task_schedule Class
Previous sections desribed how timing dependencies can be put on a task.
A task schedule is a representation of a task's timing dependencies. It defines when a task should be executed, i.e. what timing dependencies it has on other tasks. Moreover, task may be executed in several phases. The corner case is one phase, when task runs exactly one time and in one phase.
The Task_schedule class is an abstract base class that represents
a task schedule. It provides a common interface for all task schedules.
Attributes of the Task_schedule class are:
m_phase_ready: Atomic flag indicating that the current phase is ready.m_enqueued_by_worker: Flag indicating whether enqueue came from worker.
The Task_schedule implements the following methods:
Task_schedule: Constructor that initializes the attributes.has_higher_priority: Compares the priority of two tasks.next: Moves to the next phase of the task.get_task_delay: Returns the delay of the task.get_clock: Returns the clock used for the task.get_id: Returns the ID of the task.is_finished: Returns whether the task is finished.
Transaction_order_schedule Class
The Transaction_order_schedule class is a concrete implementation of the
Task_schedule interface. It represents a schedule for a transaction that
executes in two phases: apply and commit.
Apply is scheduled according to the 'global' transaction clock, whereas
commit phase is scheduled according to the commit order clock. Commit order is
a start-to-start dependency, therefore it runs in the following way:
- transaction finishes apply phase and waits for commit order
- transaction registeres for commit
- commit order clock is ticked to release next transaction in order to register
for commit
- commit is executed in parallel with respect to the Scheduler
- after commit finishes, transaction clock is ticket to release transaction
'global' dependencies.
Attributes of the Transaction_order_schedule class are:
m_task_id: The ID of the task.m_trx_clock: The clock used for the apply phase.m_trx_time: The time at which the apply phase starts.m_commit_clock: The clock used for the commit phase.m_commit_time: The time at which the commit phase starts.m_phase: The current phase of the transaction (apply or commit).
The Transaction_order_schedule implements the following methods:
Transaction_order_schedule: Constructor that initializes the attributes.next: Moves to the next phase of the transactionget_task_delay: Returns the delay of the current phase.get_clock: Returns the clock used for the current phase.get_id: Returns the ID of the task.is_finished: Returns whether the transaction is finished.
Delayed_schedule Class
The Delayed_schedule class is a concrete implementation of the
Task_schedule interface. It represents a schedule for a delayed task, which
executes once and in one phase.
Attributes of the Delayed_schedule class are:
m_task_id: The ID of the task.m_clock: The clock used for the task.m_task_delay: The delay of the task.
The Delayed_schedule implements the following methods:
Delayed_schedule: Constructor that initializes the attributes.next: Returns false, as a delayed task does not have multiple phases.get_task_delay: Returns the delay of the task.get_clock: Returns the clock used for the task.get_id: Returns the ID of the task.is_finished: Returns whether the task is finished.
Periodic_schedule Class
The Periodic_schedule class is a concrete implementation of the
Task_schedule interface. It represents a schedule for a periodic task.
This schedule is used only for testing purposes.
Attributes of the Periodic_schedule class are:
m_task_id: The ID of the task.m_clock: The clock used for the task.m_current_delay: The current delay of the task.m_rate: The rate at which the task is executed.m_last_delay: The last delay of the task.
The Periodic_schedule implements the following methods:
Periodic_schedule: Constructor that initializes the attributes.next: Moves to the next phase of the task.get_task_delay: Returns the current delay of the task.get_clock: Returns the clock used for the task.get_id: Returns the ID of the task.is_finished: Returns whether the task is finished.
Statistics_monitor Class
- Lightweight singleton-style access point for per-instance statistics monitors.
- In current implementation, registration and value updates are performed by
Statistics_instance_monitor;Statistics_monitorprimarily provides safe one-time global initialization and indexed access by instance id.
Attributes of the Statistics_monitor class:
m_instances: Static array ofStatistics_instance_monitorindexed by instance id.m_init: Atomic one-time initialization guard.m_ready: Atomic readiness flag used to synchronize concurrent callers ofget.
Methods of the Statistics_monitor class:
get: Returns reference toStatistics_instance_monitorfor a given instance id, with thread-safe lazy initialization.init: Internal initialization routine invoked once fromget.
Note: Per-statistic APIs such as register_stat, find_stat, get_stat, and
reset are implemented on Statistics_instance_monitor.
Statistics_map Class
- Enumerates supported scheduler statistics and provides one initializer API.
Enumerated statistics:
thp_queue_size: Current size of the thread pool queuethp_task_exec_time: Time spent by workers executing tasksthp_worker_exec_time: Time spent by workers waiting plus executing taskssched_task_exec_time: Task execution time without callback overheadthp_thread_internal_id: Worker internal id (set once when worker starts)
Methods:
init_statistics: Registers supported statistics for a given instance id and worker count usingStatistics_monitor::get(instance_id).
Note: The current Statistics_map implementation does
not keep explicit m_instances_initialized/m_ready maps; synchronization is
handled by Statistics_monitor.
Task_sequencer Class
- Simple class responsible for assigning consecutive IDs to tasks before they enter the Scheduler
- Provides a way to generate unique IDs for tasks
Attributes of the Task_sequencer class:
m_current_id: Current ID value, incremented for each new ID generated
Methods of the Task_sequencer class:
next_id: Generates and returns the next available ID- Increments the
m_current_idattribute and returns the new ID as aTask_idobject
- Increments the
Service layer classes
Job Applier Class
The Job_applier class is a concrete implementation of the Job_binlog
interface. It represents a job that applies events from a binlog
to a MySQL server.
Attributes of the Job_applier class are:
m_relay_context: The relay context used to manage the application of events.m_session_service: The session service used to acquire and release sessions.m_commit_event: The commit event that is used to commit the transaction.m_phase: The current phase of the job (prepare, commit_register, commit_binlog, retry_commit, or done).
The Job_applier implements the following methods:
Job_applier: Constructor that initializes the attributes.~Job_applier: Destructor that releases the session.attach: Attaches the job to the relay context.detach: Detaches the job from the relay context.commit: Commits the transaction.commit_register: Registers the transaction for commit.prepare: Prepares the transaction.run_phase: Runs the current phase of the job.restart: Restarts the job.to_string: Returns a string representation of the job.
The Job_applier class behaves as follows:
- When the job is created, the
Job_applierconstructor is called to initialize the attributes. - When the job is attached to the relay context, the
attachmethod is called. - When the job is detached from the relay context, the
detachmethod is called. - When the transaction is committed, the
commitmethod is called. - When the transaction is registered for commit, the
commit_registermethod is called. - When the transaction is prepared, the
preparemethod is called. - During prepare, the job may consume a streaming Fetchable Transaction: it waits until the next event becomes available, fetches it, applies it and keeps the commit event for the commit phase. The provider therefore does not need to finish reading the full transaction before the worker starts applying it.
- When the job is restarted, the
restartmethod is called. - In
retry_commit, the job is replayed in a single scheduler dispatch. The retry path is no longer re-shelved between prepare and commit and does not perform a second Commit Order Manager registration within the same transaction lifecycle. - When the job is converted to a string, the
to_stringmethod is called.
Job Binlog Class
The Job_binlog class is an abstract base class that represents a job that
applies events from a binlog to a MySQL server.
Attributes of the Job_binlog class are:
m_trx_id: The transaction ID of the job.m_channel: The channel that the job is associated with.m_fetch_metadata: The metadata associated with the job.m_first_event: The first event of the job.m_next_event: The next event to be processed.m_phase: The current phase of the job (prepare, commit_register, commit_binlog, retry_commit, or done).
The Job_binlog implements the following methods:
Job_binlog: Constructor that initializes the attributes.~Job_binlog: Destructor that releases the channel.get_channel: Returns the channel associated with the job.get_trx_id: Returns the transaction ID of the job.get_channel_id: Returns the channel ID of the job.get_last_committed: Returns the last committed sequence number of the job.get_sequence_number: Returns the sequence number of the job.is_complete: Returns whether the job is complete.restart: Restarts the job.run: Runs the job.is_trx: Returns whether the job is a transaction.set_success: Sets the success flag of the job.
The Job_binlog class behaves as follows:
- When the job is created, the
Job_binlogconstructor is called to initialize the attributes. - When the job is restarted, the
restartmethod is called. - When the job is run, the
runmethod is called. - When the job is checked for completeness, the
is_completemethod is called. - When the job is checked for transactionality, the
is_trxmethod is called. - When the job is set to success, the
set_successmethod is called.
Job Stub Class
The Job_stub class is a concrete implementation of the Job interface.
It represents a dummy job that does nothing and it is implemented for
testing purposes.
Log Purge Controller Class
This abstract class defines an interface for a relay log controller capable of purging relay logs. It provides a single pure virtual method for registering logs ready to be purged and potentially purging them if they are in order according to the index file content.
The Log_purge_controller contains the following methods:
concurrent_purge: Registers a log filename as ready to be purged. If previously registered logs are in order, this method purges them up to and including the specified log filename.~Log_purge_controller: A virtual destructor to ensure proper cleanup of derived classes.
Implementations of this interface should provide their own logic for handling
log purging based on the concurrent_purge method. Users of this interface
can then register logs for purging and rely on the implementation to manage
the purging process.
Event Reader Controller Class
This class serves as an event reader/controller, utilizing a low-level
reader to fetch consecutive events from the relay log. It extends
the Log_purge_controller interface and provides methods for
initialization, deinitialization, reading events, and purging consumed
relay log files.
Attributes of the Event_reader_controller class are:
m_is_error: Flag indicating whether an error has occurred.m_error_msg: Stores the error message if any.m_rli: Non-owning pointer to theRelay_log_infoobject.m_prefetcher: Shared pointer to theLog_prefetcherinstance.m_active_readerandm_inactive_reader: Readers for active and inactive files, respectively.m_current_reader: Non-owning pointer to the currently used reader.m_file_name: Stores the current file name obtained from the stream.m_logs_to_purge: Set of logs to be purged, protected bym_rli->data_lock.m_using_prefetcherandm_active_file_reading: Flags indicating whether the prefetcher is enabled and if reading from an active file, respectively.m_enable_prefetcher: Flag deciding whether to run the prefetcher, defaulting totune::prefetcher_enable.
The Event_reader_controller contains the following methods:
Event_reader_controller: Initializes the controller with aRelay_log_infopointer and a sharedLog_prefetcherpointer.open: Opens the first relay log and returns success or failure.close: Closes readers, stops the prefetcher, and clears internal state, including error state.read_next: Fetches next metadata/event payload according toReader_controller_read_typewith an optional timeout.concurrent_purge: Registers a log for purging and purges consecutive logs if they are in order, implementing theLog_purge_controllerinterface.move_to_log: Opens the next or current log file, optionally moving to the next log.move_to_next_log: Moves to the next log file.purge_applied_logs: Purges relay log files up to a specified log.wait_for_new_event: Passively waits for a new event when reading from an active file.check_cache_truncated: Reopens the reader if the cache is truncated to avoid reading corrupted data.set_error: Sets the internal error state with a message.is_data_availableandwait_data_ready: Check if data is available and wait for data readiness, respectively.choose_reader: Chooses between active and inactive file reading based on the current log and previous file.
Fetchable Transaction Class
The Fetchable_transaction class represents a transaction that can be fetched
from a Storage. It abstracts the implementation of fetching a transaction
from the particular Storage. Transaction can be fetched multiple times
depending on the number of allowed retries.
Attributes of the Fetchable_transaction class are:
m_event_set_batches: The batches of events that make up the transaction.m_current_batch_it: The current batch being processed.m_is_done: Whether the transaction is done.m_status: The status of the transaction.m_error_message: The error message associated with the transaction.
The Fetchable_transaction implements the following methods:
Fetchable_transaction: Constructor that initializes the attributes.~Fetchable_transaction: Destructor that resets the transaction.fetch_next: Fetches the next event from the transaction.is_fetching_error: Returns whether the transaction is in an error state.is_fetching_done: Returns whether the transaction is done.reset_fetching: Resets the transaction.get_fetch_error_msg: Returns the error message associated with the transaction.is_trx: Returns whether the transaction is a real transaction.set_success: Sets the success flag of the transaction.set_done: Sets the done flag of the transaction.
The Fetchable_transaction class behaves as follows:
- When the transaction is created, the
Fetchable_transactionconstructor is called to initialize the attributes. - The transaction may be populated incrementally by the relay-log reader. A single logical transaction may therefore consist of multiple event-set batches, for example when it spans several relay log files.
- Workers first wait for event availability and then fetch the next event. A wait returning false means the stream reached a terminal state (fully published, truncated, or failed), rather than "no event yet".
- When the transaction is checked for errors, the
is_fetching_errormethod is called. - When the transaction is checked for completeness, the
is_fetching_donemethod is called. - When the transaction is reset, the
reset_fetchingmethod is called. - When the error message is retrieved, the
get_fetch_error_msgmethod is called. - When the transaction is checked for transactionality, the
is_trxmethod is called. - When the success flag is set, the
set_successmethod is called. - Truncation is a terminal, non-error state used to unblock a worker when an incomplete transaction must be abandoned and replayed by the next Job.
- Completion is inferred by stream state (
is_done) and error state (is_error).
Task Exec Job Class
The Task_exec_job class represents a job that executes a task and meets
requirement of a Task, thus it can be supplied to the Scheduler for execution.
Attributes of the Task_exec_job class are:
m_job: The task to be executed.m_job_id: The ID of the task.m_session_service: The session service used to acquire and release sessions.m_is_error: Whether the task executed with an error.
The Task_exec_job implements the following methods:
Task_exec_job: Constructor that initializes the attributes.is_error: Returns whether the task executed with an error.set_error: Sets the internal error flag.operator(): Executes the task and updates runtime statistics.run: Internal execution routine with attach/run/retry/detach flow.
The Task_exec_job class behaves as follows:
- When the task is created, the
Task_exec_jobconstructor is called to initialize the attributes. - When the error flag is checked, the
is_errormethod is called. - When the error flag is set, the
set_errormethod is called. - When the task is executed, the
operator()method is called. - When the task is executed, the
runmethod is called. - Task completion handling (success/failure/detach) is performed internally in
runvia a local completion routine.
Event Set Fetchable Class
The Event_set_fetchable class represents a set of events that can be fetched
from a storage. It provides a way to fetch events from a storage and handle
errors that may occur during the fetching process.
Attributes of the Event_set_fetchable class are:
m_event_set_batches: The batches of events that make up the transaction.m_current_batch_it: The current batch being processed.m_is_done: Whether the transaction is done.m_status: The status of the transaction.m_error_message: The error message associated with the transaction.
The Event_set_fetchable implements the following methods:
Event_set_fetchable: Constructor that initializes the attributes.~Event_set_fetchable: Destructor that resets the transaction.fetch_next: Fetches the next event from the transaction.is_done: Returns whether the transaction is done.is_error: Returns whether the transaction is in an error state.get_error_str: Returns the error message associated with the transaction.reset: Resets the transaction.is_trx: Returns whether the transaction is a real transaction.set_success: Sets the success flag of the transaction.set_done: Sets the done flag of the transaction.
The Event_set_fetchable class behaves as follows:
- When the transaction is created, the
Event_set_fetchableconstructor is called to initialize the attributes. - When the transaction is fetched, the
fetch_nextmethod is called. - When the transaction is checked for errors, the
is_errormethod is called. - When the transaction is checked for completeness, the
is_donemethod is called. - When the transaction is reset, the
resetmethod is called. - When the error message is retrieved, the
get_error_strmethod is called. - When the transaction is checked for transactionality, the
is_trxmethod is called. - When the success flag is set, the
set_successmethod is called. - Completion is inferred by stream state (
is_done) and error state (is_error).
Event Set Fetchable Relay Log Class
The Event_set_fetchable_relay_log class is a concrete implementation of the
Event_set_fetchable interface. It represents a set of events that can be
fetched from a relay log.
Attributes of the Event_set_fetchable_relay_log class are:
m_file_name: The name of the file internal reader will open.m_start_file_pos: The file position internal reader will start from.- Terminal file position metadata, when present, describing where the sealed batch ends. In streaming mode this information may be unknown until the batch is explicitly closed by the publisher.
m_reader: The relay log file reader object, used by decompressing stream.m_input_stream: The decompressing stream object, will decompress if file contains compressed events.m_is_initialized: Flag indicating whether reader is open.m_is_done: Flag indicating end of work - true if finished or error occurred.m_failure_msg: Detailed error message if any.m_status: Status of the object.m_delete_file_handle: Handle to relay log deleter, relay log will be removed when last living reference to this file is released.m_is_trx: "is transaction" flag.
The Event_set_fetchable_relay_log implements the following methods:
Event_set_fetchable_relay_log: Constructor that initializes the attributes.fetch_next: Fetches the next event from the transaction.get_error_str: Returns the error message associated with the transaction.is_done: Returns whether the transaction is done.is_error: Returns whether the transaction is in an error state.to_string: Returns basic information about this batch.is_trx: Returns whether this event set batch contains a transaction.reset: Resets the state to be able to fetch event set again. Also, resets error state if any.safe_open_reader: Opens the reader safely.safe_close_reader: Closes the reader safely.start_reading: Starts reading from the file - opens it and creates input stream.set_success: Sets the success flag of the transaction.
The Event_set_fetchable_relay_log class behaves as follows:
- When the transaction is created, the
Event_set_fetchable_relay_logconstructor is called to initialize the attributes. - In streaming mode, the batch is appended to the transaction as soon as the reader starts publishing it. Additional events are appended to the same batch until a terminal event or a file transition seals that batch.
- When the transaction is fetched, the
fetch_nextmethod is called. - When the transaction is checked for errors, the
is_errormethod is called. - When the transaction is checked for completeness, the
is_donemethod is called. - When the error message is retrieved, the
get_error_strmethod is called. - When the transaction is checked for transactionality, the
is_trxmethod is called. - When the success flag is set, the
set_successmethod is called. - Completion is inferred by stream state (
is_done) and error state (is_error).
Event Set Fetchable Cache Class
The Event_set_fetchable_cache class is a concrete implementation of the
Event_set_fetchable interface. It represents a set of events that can be
fetched from a storage.
Attributes of the Event_set_fetchable_cache class are:
m_events: The events that make up the transaction.m_event_id: The index of the next-to-fetch event.m_is_done: Flag indicating end of work - true if finished or error occurred.m_failure_msg: Detailed error message if any.m_status: Status of the object.m_is_trx: "is transaction" flag.
The Event_set_fetchable_cache implements the following methods:
Event_set_fetchable_cache: Constructor that initializes the attributes.fetch_next: Fetches the next event from the transaction.get_error_str: Returns the error message associated with the transaction.is_done: Returns whether the transaction is done.is_error: Returns whether the transaction is in an error state.reset: Resets the state to be able to fetch event set again. Also, resets error state if any.is_trx: Returns whether this event set batch contains a transaction.
The Event_set_fetchable_cache class behaves as follows:
- When the transaction is created, the
Event_set_fetchable_cacheconstructor is called to initialize the attributes. - When the transaction is fetched, the
fetch_nextmethod is called. - When the transaction is checked for errors, the
is_errormethod is called. - When the transaction is checked for completeness, the
is_donemethod is called. - When the error message is retrieved, the
get_error_strmethod is called. - When the transaction is checked for transactionality, the
is_trxmethod is called. - When the success flag is set, the
set_successmethod is called. - Completion is inferred by stream state (
is_done) and error state (is_error).
Relay Log Deleter Class
The Relay_log_deleter class is responsible for removing relay log files when
the last handle to the deleter, i.e. particular relay log file is released.
Attributes of the Relay_log_deleter class are:
m_rl_file_name: Path to the relay log filem_reader: The relay log reader object, capable of removing the filem_subscribers: The number of subscribers to this particular relay log filem_successfull_subscribers: The number of subscribers that confirmed successfull execution of transactions from the handled relay lgo file
The Relay_log_deleter implements the following methods:
Relay_log_deleter: Constructor that initializes the attributes.~Relay_log_deleter: Destructor that removes the relay log file.add_subscriber: Adds a subscriber to the relay log file.set_subscriber_success: Sets the success flag for a subscriber.get_file_name: Returns the name of the relay log file.
The Relay_log_deleter class behaves as follows:
- When the deleter is created, the
Relay_log_deleterconstructor is called to initialize the attributes. - When a subscriber is added, the
add_subscribermethod is called. Subscribers are added by the transaction provider, i.e. relay log reader. - When a subscriber reports success, the
set_subscriber_successmethod is called. - When the deleter is destroyed, the
~Relay_log_deleterdestructor is called to remove the relay log file. The relay log file may be deleted in case all of its subscribers confirmed that they succesfully applied transactions from this relay log.
Relay Log Adaptive Reader Class
The Relay_log_adaptive_reader class reads metadata from a relay log and
creates jobs capable of being fetched from the relay log by asynchronous
workers.
Attributes of the Relay_log_adaptive_reader class are:
m_rli: The relay log context.m_reader: The relay log reader object.m_delete_handler: The relay log deleter handle.m_transaction_boundary_parser: The transaction boundary parser.
The Relay_log_adaptive_reader implements the following methods:
Relay_log_adaptive_reader: Constructor that initializes the attributes.~Relay_log_adaptive_reader: Destructor that stops the reader.read: Reads the next event from the relay log, depending on the reading typestop: Stops the reader.
The Relay_log_adaptive_reader class behaves as follows:
- When the reader is created, the
Relay_log_adaptive_readerconstructor is called to initialize the attributes. - When the next GTID event is read, the reader creates or updates the active Fetchable Transaction and may dispatch the Job immediately, before the full transaction tail is parsed.
- While the transaction remains open, the reader keeps publishing subsequent events into the current relay-log batch and starts a new batch when the same transaction crosses a relay log file boundary.
- If a GTID is encountered while a previous transaction is still open, the reader closes that previous transaction as truncated and dispatches it before starting the next one.
- When the reader is stopped or returns due to an incomplete read, it truncates the active transaction to unblock any waiting worker.
- When the reader is stopped, the
stopmethod is called. - When the next event metadata is read, the
readmethod is called.
Reader_controller_read_type Enum
The Reader_controller_read_type is an enumeration type that controls the
type of read in the `Relay_log_adaptive_reader'. Possible read types are:
- event - Reads and decodes event
- metadata - Reads event metadata, skips event payload
- cache_metadata - Reads event metadata and caches event payload
Sync_transaction_provider Class
- Implementation of the
Transaction_providerinterface - Utilizes the relay log reader to read consecutive events from the relay log
- Employs a prefetched stream for data reading
- Primary methods include:
start: No-op hook (synchronous provider does not spawn its own thread)stop: Requests stop and forwards stop to the underlying readernext: Blocks until the next transaction is fetched, or until a stop request or timeout occurs when outside a transaction boundaryis_stopped: Verifies whether a stop request has been issued (either externally or by the parent thread)
Attributes of the Sync_transaction_provider class are:
m_is_stopped: Atomic flag indicating whether the thread should be stoppedm_rli: Pointer to the channel's Relay Log Info objectm_reader: Shared reader objectm_stat_monitor: Reference to the statistics monitoring object
The Sync_transaction_provider implements the following methods:
start: No-op for the synchronous provider.stop: Sets the stop flag and stops the underlying reader.next: Retrieves the next job synchronously from the reader.is_stopped: Checks if a stop request has been issuedis_error: Determines if an error has occurred
Async_transaction_provider Class
- Implementation of the
Transaction_providerinterface - Utilizes the relay log reader to read consecutive events from the relay log
- Employs a prefetched stream for data reading
- Primary methods include:
start: Initiates an asynchronous thread for fetching transactions from the relay logstop: Terminates execution and blocks until the thread is joinednext: Blocks until the next transaction is fetched, or until a stop request or timeout occurs when outside a transaction boundaryis_stopped: Verifies whether a stop request has been issued (either externally or by the parent thread)
Attributes of the Async_transaction_provider class are:
m_cache: Queue used by the decoder thread to store data jobsm_thread: Decoder thread objectm_end: Notification atomic flag indicating the end of executionm_is_stopped: Atomic flag indicating whether the thread should be stoppedm_is_error: Atomic flag indicating whether an error has occurredm_rli: Pointer to the channel's Relay Log Info objectm_reader: Shared reader objectm_stat_monitor: Reference to the statistics monitoring object
The Async_transaction_provider implements the following methods:
start: Initializes the asynchronous thread for decoding jobs from the streamstop: Halts the asynchronous thread, notifies it, and waits for confirmation before joining the threadnext: Retrieves the next job, blocking until it is fetchedis_stopped: Checks if a stop request has been issuedis_error: Determines if an error has occurredrun_thread: Function executed by the decoder thread
Prefetched_ifile Class
- Implementation of the
Basic_binlog_ifileinterface, operating on a prefetched stream (Istream_prefetched) - Provides an additional method to set the prefetcher, accommodating legacy design limitations
- Key features include:
- Setting the prefetcher before opening the first file
- Reusing the same prefetcher object to initialize the
Istream_prefetchedobject, which leverages an asynchronous prefetcher for data retrieval
Attributes of the Prefetched_ifile class are:
m_current_file: The currently opened filem_prefetcher: The prefetcher object passed to the created streamm_error: Any binlog read errors encountered
The Prefetched_ifile class implements the following methods:
set_prefetcher: Sets the prefetcher object, which must be done before opening the first fileopen_file: Creates a stream object from the file, obtaining data via the prefetcher
Istream_prefetched Class
- Implementation of the
Basic_seekable_istreaminterface, reading data from the prefetched relay log - Reads are restricted to a single relay log file boundary, but the prefetcher can move across files
- Creating a new
Istream_prefetchedobject with the same shared prefetcher object allows reading from the next file - Supports:
- Reading a requested number of bytes into a preallocated buffer (
read) - Skipping a requested number of bytes (
skip) - Seeking to a given file position (forward only) (
seek) - Returning the size of the file (
length)
- Reading a requested number of bytes into a preallocated buffer (
- Does not introduce new errors, but propagates prefetcher errors; read, skip, and seek functions return an error value in case of a prefetcher error
Attributes of the Istream_prefetched class are:
m_prefetcher: Prefetcher object used to obtain datam_allowed_file_name: File name that data can be fetched fromm_current_batch: Currently used batchm_current_offset: Current batch offsetm_is_stopped: Flag to stop waiting for data (used inread)m_prefetcher_error: Flag indicating a prefetcher errorm_file_offset: Offset from the start of the file
The Istream_prefetched class implements the following methods:
Istream_prefetched: Constructor taking a prefetcher object and file nameread: Reads a requested number of bytes from the input stream, blocking if necessaryseek: Seeks to a given file offset, returning an error if impossible or if the prefetcher has errored outlength: Returns the size of the currently opened filecopy_to: Helper function copying data from the current batch to a bufferskip: Skips a requested number of bytes, applying the same rules asreadread_next_batch: Reads the next batch, returning an error if stopped or if the source filename changes
Log_prefetcher Class
- Responsible for starting an asynchronous thread that prefetches consecutive logs recorded in the index file
- Provides primary functions:
dequeue: Retrieves the next prefetched batch of dataopen: Opens a given file, checks if it's hot, and notifies the prefetcher to work on itstop: Gracefully stops the prefetcher and blocks until the thread is joinedis_waiting_for_next_file: Checks whether the prefetcher has finished processing a given file and is waiting for the next file or is blockedis_error: Verifies whether an error has occurred
- Ensures thread safety using mutexes and condition variables
Attributes of the Log_prefetcher class are:
m_log: TheMYSQL_BIN_LOGobject used to determine where to read data fromm_cache: Queue storing prefetched data batches read from a raw binary filem_prefetcher: Thread running the prefetching processm_log_active: Flag indicating whether the current log file is active (protected bym_mt_prefetcher)m_mt_prefetcher: Mutex protecting access tom_tasksm_cv_prefetcher: Condition variable used by the scheduler main thread to wait for available tasksm_end: Notification atomic for end of executionm_mt_move_file: Mutex protecting access tom_move_filem_cv_move_file: Condition variable notifying the watcher that the prefetcher has finished moving fromm_move_fileto the next filem_move_file: Tracks the actual file being moved from (used by the watcher)m_bytes_fetched: Atomic counter tracking the total number of bytes cached inm_cachem_batch_size: Default batch size (~16MB)max_bytes_fetched: Maximum number of bytes that can be prefetched (1GB)m_istream: Input stream for reading from the log filem_error_message: Error message if an error occurs (protected bym_mt_prefetcher)m_is_error: Flag indicating whether an error has been encounteredm_current_log_name: Currently processed log file name (protected bym_mt_prefetcher)m_current_file_length: Length of the currently opened filem_current_offset: Current file offsetm_key_th_prefetcher: Key for the prefetcher threadm_allocator: Memory resource handling all allocations
The Log_prefetcher class implements the following methods:
Log_prefetcher: Constructor taking aMYSQL_BIN_LOGobject and instrumentation keysstart_prefetcher: Starts the asynchronous prefetching threadopen: Opens a new file for reading, checking if it's hot and notifying the prefetcherdequeue: Retrieves the next prefetched batch of datais_stopped: Checks whether the prefetcher has been stoppedstop: Gracefully stops the prefetcher and blocks until the thread is joinedis_error: Verifies whether an error has occurredis_waiting_for_next_file: Checks whether the prefetcher has finished processing a given file and is waiting for the next file or is blockedopen_file: Opens a new or next file for readingensure_file_done: Waits until the prefetcher finishes rotating from a given fileset_error: Sets an internal error and error message, stopping the prefetcherget_current_file: Retrieves the currently processed log file under the lockupdate_current_file: Updates the current file and its activity under the lockrun_prefetch_thread: Runs the prefetching thread
Prefetched_relaylog_reader Type Alias
- A type alias for a specialized
Basic_binlog_file_readerinstantiation - Combines the following components:
Prefetched_ifile: An implementation of theBasic_binlog_ifileinterface, working with prefetched streamsBinlog_event_data_istream: A stream for reading binary log eventsBinlog_event_object_istream: A stream for reading binary log events as objectsDefault_binlog_event_allocator: The default allocator for binary log events
- Enables efficient and convenient reading of binary log files using prefetched streams
Statistics_map Class
- Manages supported statistics for tracking various aspects of system performance
- Recognized statistics include:
active_job_cnt: The number of currently executed jobs by thread pool workersactive_trx_cnt: The number of currently applied transactionscommitted_cnt: The count of committed transactionstrx_exec_time: The execution time of transactions for a particular instance ID/channel IDtrx_provided_cnt: The count of provided transactions (e.g., read)trx_scheduled_cnt: The count of scheduled transactionsap_queue_size: The size of the asynchronous provider queue
- Provides a method to initialize statistics for a given instance ID and number of threads
Methods of the Statistics_map class:
init_statistics: Registers CSA statistic keys in the scheduler statistics monitor for a given instance id and thread count.
Statistic_logger Struct
- Implements the
Enablableinterface, allowing for enablement/disabling of statistic logging - Responsible for periodic printing of statistics based on the current clock time and transaction length
- Maintains various counters for tracking different aspects of system
performance, including:
- Transaction numbers
- Time intervals
- GTID execution (if enabled)
- Clock cycles
- Committed transactions
- Task execution times (THP and scheduling)
- Transaction execution times
- Provided transaction counts
- Configurable print interval (default: 10 seconds)
Attributes of the Statistic_logger struct are:
m_trx_num: Transaction number countertime_check: Timestamp for the last statistic printm_print_interval: Interval between statistic prints (default: 10 seconds)m_state: Enablement state of the loggerprint_gtid_executed: Flag indicating whether to print GTID execution information (always true)m_clock: Sliding window counter for clock cyclesm_trx_length: Total transaction lengthm_committed_cnt: Sliding window counter for committed transactionsm_thp_task_exec_time: Sliding window counter for THP task execution timesm_thp_worker_exec_time: Sliding window counter for THP worker execution timesm_sched_task_exec_time: Sliding window counter for scheduling task execution timesm_trx_exec_time: Sliding window counter for transaction execution timesm_trx_provided_cnt: Sliding window counter for provided transaction counts
The Statistic_logger struct implements the following methods:
Statistic_logger: Constructor taking an initial statecheck_and_print: Periodically prints statistics based on the current clock time and transaction length
Sliding_window_counter Struct
- Designed to calculate statistic values within a time window
- Maintains two counters:
m_previous: Stores the previous global counter valuem_value: Tracks the counter value within the current window
- Provides methods for updating the window counter and retrieving the current window value
Attributes of the Sliding_window_counter struct are:
m_previous: Previous global counter valuem_value: Counter value within the current window
The Sliding_window_counter struct implements the following methods:
update: Updates the window counter with a new global valueget: Returns the counter value within the current time window
Transaction_conflict_manager Class
The Transaction_conflict_manager class uses a separate thread to execute
tasks, allowing for asynchronous processing and minimizing the impact on the
main thread. The class provides methods to start and stop the thread, as well
as to enqueue tasks for processing.
Key components of the class are:
m_cache: A synchronized queue that stores tasks to be processed by the thread.m_thread: The thread object that executes tasks from the queue.m_is_stopped: An atomic boolean that indicates whether the thread has been stopped.m_end: An atomic boolean that is used to notify the thread when it should exit.
The Transaction_conflict_manager class implements the following methods:
start: Starts the asynchronous thread.stop: Stops the thread and waits for it to finish.is_stopped: Returns whether the thread has been stopped.enqueue: Adds a task to the queue for processing by the thread.run_thread: The main loop that consumes tasks from the queue and executes them.
Transaction_conflict_monitor Class
The Transaction_conflict_monitor class is a singleton that manages
a collection of Transaction_conflict_manager instances. It is designed to
provide a thread-safe and efficient way to access Transaction_conflict_manager
instances.
The Transaction_conflict_monitor class uses a lazy initialization approach
to create Transaction_conflict_manager instances on demand. The class
provides a get method that returns a Transaction_conflict_manager instance
associated with a specific instance ID.
Key components of the class are:
m_instances: An array that storesTransaction_conflict_managerinstances.m_init: An atomic boolean that indicates whether the instances have been initialized.m_ready: An atomic boolean that is used to notify threads when the instances are ready.
The Transaction_conflict_monitor class implements the following methods:
get: Returns aTransaction_conflict_managerinstance associated with a specific instance ID.init: Initializes theTransaction_conflict_managerinstances. This method is called lazily when the firstgetrequest is made.
Rescue_task Class
The Rescue_task class is designed to represent a task that performs
a rescue operation, such as rolling back a conflicting transaction. It is
designed to be used in a concurrent environment, where tasks are executed
asynchronously.
The Rescue_task class encapsulates the necessary information and logic to
perform a rescue operation. It includes a reference to a Relay_log_info
object, which is associated with the transaction to be rescued, and
a Rescue_operation_type enum value that specifies the type of rescue
operation to be performed.
Key components of the class are:
m_rli: A pointer to aRelay_log_infoobject that is associated with the transaction to be rescued.m_rescue_operation_type: An enum value that specifies the type of rescue operation to be performed.m_action_promise: A reference to astd::promise<bool>object that is used to synchronize the thread waiting for the rescue operation to finish.
The Rescue_task class implements the following methods:
operator(): Performs the rescue operation based on them_rescue_operation_type. Currently, only rollback is supported.rollback_trx: Rolls back the transaction associated with them_rliobject. This method is called when them_rescue_operation_typeisRescue_operation_type::rollback.
The Rescue_task class is designed to be used in a concurrent environment,
where tasks are executed asynchronously. It provides a way to perform rescue
operations in a thread-safe manner, using a std::promise<bool> object to
synchronize the thread waiting for the rescue operation to finish.
Rescue_operation_type Enum
The Rescue_operation_type enum is used to specify the type of rescue operation
to be performed by a Rescue_task object. Currently, only one value is defined:
rollback: Specifies that the rescue operation should roll back the conflicting transaction.
This enum is used to determine the type of rescue operation to be performed by
a Rescue_task object.
Resource_monitor Class
The Resource_monitor class is a singleton that manages per-channel resource instances for the Change Stream Applier (CSA). It provides access to Resource_instance_monitor objects, one per channel, ensuring thread-safe resource management across different channels.
The Resource_monitor class implements the following methods:
get: Static method that returns a reference to the Resource_instance_monitor for the specified instance/channel ID.
Attributes of the Resource_monitor class are:
m_instances: Static array storing Resource_instance_monitor instances for each channel.
Resource_instance_monitor Class
The Resource_instance_monitor class provides per-channel resource management functionality for the Change Stream Applier (CSA). It allows registering named resources with configurable limits, acquiring and releasing resource locks, and supports both blocking and non-blocking operations. It is designed for concurrent access, using atomic operations for resource counts and spinning with sleep for waiting.
The Resource_instance_monitor class implements the following methods:
register_resource: Registers a named resource with the given limit and optional PSI stage.acquire_resource: Acquires a resource lock and returns a Locked_resource guard object.release_resource: Releases the specified amount of a named resource.lock_resource: Attempts to lock a resource, waiting if necessary.try_lock_resource_internal: Internal helper for optimistic locking.
Attributes of the Resource_instance_monitor class are:
m_resources: Map storing named resources and their Resource_entry objects.
Resource_entry
The Resource_entry struct represents an entry for a named resource in the Resource_instance_monitor. It manages the available count and synchronization for a specific resource, using atomic operations to ensure thread safety. This struct is used internally by the Resource_instance_monitor to track resource availability and limits.
Attributes of the Resource_entry struct are:
m_available: Atomic variable tracking the currently available amount of the resource.m_limit: Atomic variable representing the maximum available count for this resource.m_stage: Pointer to PSI stage information, set when waiting for resource lock if the requested amount exceeds currently available amount.
Locked_resource Class
The Locked_resource class is a RAII guard that acquires a resource lock upon construction and automatically releases it upon destruction, ensuring proper resource management and preventing resource leaks. It provides a safe way to hold resource locks for the duration of a scope.
The Locked_resource class implements the following methods:
Locked_resource: Constructor that acquires the resource lock.is_locked: Returns whether the resource lock was successfully acquired.~Locked_resource: Destructor that releases the resource lock if acquired.
Attributes of the Locked_resource class are:
m_resource: Pointer to the Resource_entry being locked.m_requested_amount: The amount of resource requested to lock.m_is_locked: Boolean indicating whether the lock was successfully acquired.
Applier_channel_monitor Class
The Applier_channel_monitor class monitors the applier for stalls by periodically checking progress metrics and attempting to unblock the scheduler if no progress is detected. It tracks applied events, scheduler/commit clocks, and active transactions over a configurable refresh interval (default 10 seconds). If a stall is detected, it tries to unblock scheduler with limited attempts per stall based on session count.
The Applier_channel_monitor class implements the following methods:
Applier_channel_monitor: Constructor taking a Csa_channel reference.init_monitoring: Initializes monitoring with a custom refresh interval.check_applier_progress: Detects stalls and attempts unblocking if needed.get_allowed_unblocks: Returns the number of allowed unblocks for current stall.get_current_unblock_counter: Returns the current unblock attempt count.get_total_unblock_counter: Returns the total unblock attempt count.
Attributes of the Applier_channel_monitor class are:
m_channel: Reference to the monitored CSA channel.m_previous_applied_events: Previous recorded applied events count.m_current_applied_events: Current recorded applied events count.m_previous_clock_value: Previous scheduler clock value.m_current_clock_value: Current scheduler clock value.m_active_trx: Current active transactions count.m_previous_commit_clock_value: Previous commit clock value.m_current_commit_clock_value: Current commit clock value.m_time_refresh: Last refresh timestamp.m_refresh_interval: Refresh interval duration.m_current_unblock_counter: Current unblock attempts for this stall.m_total_unblock_counter: Total unblock attempts ever.m_allowed_unblocks: Maximum unblocks allowed for current stall.
Session_service interface design
- Abstract base class defining the interface for session services
- Declares pure virtual methods for initialization, deinitialization, acquiring, and releasing sessions
The Session_service implements the following pure virtual methods:
init: Initializes the session servicedeinit: Deinitializes the session serviceacquire_session: Acquires a sessionrelease_session: Releases a session
Session_service_registry Class
- Concrete implementation of the Session_service interface
- Uses a cached approach to manage sessions
- Initializes a fixed-size pool of sessions during construction
- Acquires and releases sessions from the pool
Attributes of the Session_service_registry class are:
m_sessions: Concurrent registry of sessionsm_session_number: Number of sessions in the pool
The Session_service_registry implements the following methods:
init: Initializes the session service- Creates a registry of sessions with the specified size
- Registers and deactivates all sessions in the registry
deinit: Deinitializes the session service- Checks if all sessions are deactivated before deinitializing
acquire_session: Acquires a session- Activates the session with the given ID and returns it
release_session: Releases a session- Deactivates the session with the given ID
Session_service_dynamic Class
- Another concrete implementation of the Session_service interface
- Uses a cached approach to manage a given number of sessions
- Dynamically allocates new sessions in case a pool of cached sessions is exhausted
- Uses a list to store free sessions and an unordered map to track reserved sessions
- Synchronizes access to shared resources using one mutex and one condition variable
Attributes of the Session_service_dynamic class are:
m_free_sessions: List of free sessionsm_reserved_sessions: Map of reserved sessionsm_mutex: Mutex for synchronizing access to shared resourcesm_session_number: Maximum number of sessions
The Session_service_dynamic implements the following methods:
init: Initializes the session service- Sets the maximum number of sessions
- Allocates and initializes the free sessions list
deinit: Deinitializes the session service- Clears the free sessions list and reserved sessions map
acquire_session: Acquires a session- Waits until a free session is available
- Removes the first free session from the list and reserves it
- Returns the reserved session
release_session: Releases a session- Adds the released session back to the free sessions list
Session_service_bounded_queue Class
- Another concrete implementation of the Session_service interface
- Uses a cached approach to manage sessions
- Uses a synchronized Communication Queue to manage free sessions
Attributes of the Session_service_bounded_queue class are:
m_sessions: Queue managing free sessions
The Session_service_bounded_queue implements the following methods:
init: Initializes the session service- Sets the maximum number of sessions
- Allocates and initializes the free sessions list
deinit: Deinitializes the session service- Clears the free sessions list and reserved sessions map
acquire_session: Acquires a session- Waits until a free session is available
- Removes the first free session from the list and reserves it
- Returns the reserved session
release_session: Releases a session- Adds the released session back to the queue
Dependency_adapter interface design
- Abstract base class defining the interface for dependency adapters
- Declares pure virtual method for solving dependencies
- Provides common types and constants used across adapter implementations
Types and constraints implemented in the class are the following:
Task_id: Type representing a task identifierTask_id_resolved: Optional type representing a resolved task identifierClock_delay: Optional type representing a clock delay
The Dependency_adapter implements the following pure virtual methods:
set_worker_num: Method to set the worker numbersolve- Pure virtual method to solve dependencies
- Takes task ID, sequence number, and commit parent as input
- Returns a pair containing clock delay and resolved task ID
Sequential_clock_adapter Class
- Another concrete implementation of the Dependency_adapter interface
- Resolves dependencies based on transaction sequence number and last committed
- Returns the clock delay window at which a task should execute
Attributes of the Sequential_clock_adapter class are:
m_current_min_seq- Minimal transaction sequence number for the current time window
m_current_clock_delay- Current time window value
The Sequential_clock_adapter implements the following methods:
solve- Implements the solve method from the Dependency_adapter interface
- Updates the current minimum sequence number and clock delay based on input sequence number and last committed
- Returns a pair containing clock delay and resolved task ID
The Sequential_clock_adapter calculates a clock-delay value that defines the
time window during which a task is permitted to run.
This adapter is an alternative dependency resolution strategy associated with
Logical_clock; CSA runtime uses Dependency_adapter_lwm with
Clock_lwm_registry.
The adapter keeps track of two key pieces of state: * the current clock value, and * the smallest sequence number among all tasks assigned to the current execution window.
When the adapter receives a new task’s sequence number along with a last_committed value, it behaves as follows:
- If last_committed is 0: The clock counter is incremented. The minimal sequence number is updated to the sequence number of the current task. The adapter outputs the updated clock counter value.
- If last_committed is equal to or greater than the minimal sequence number for the current window: The clock counter is incremented. The minimal sequence number is updated to the current task’s sequence number. The adapter outputs the updated clock counter value.
- If last_committed is less than the current window’s minimal sequence number: The minimal sequence number is updated to the current task’s sequence number (the clock counter is not incremented). The adapter outputs the current clock counter value.
Commit Order Dependency Adaptation
Commit order and LWM-based dependency adaptation is implemented through
Dependency_adapter_lwm together with scheduler clocks such as
Clock_lwm_registry and Commit_order_clock.
Server layer classes
The Csa_worker_context and Parallel_worker_context classes are designed to
represent the parallel worker context for the new Change Stream Applier.
Parallel_worker_context Class
The Parallel_worker_context class is an abstract base class that defines the
interface for parallel worker contexts. It provides a standardized way to
access information about the worker, such as its ID, transaction ID,
and channel ID.
By designing the Parallel_worker_context interface and the
Csa_worker_context class in this way, the system achieves polymorphism and
encapsulation, making it easier to extend and modify the behavior of parallel
workers.
Type aliases in the Parallel_worker_context class are:
Trx_id: Transaction identifier type.Worker_id: Worker identifier type.Channel_id: Channel identifier type.
The Parallel_worker_context implements the following methods:
report_commit_order_deadlock(bool): Reports a detected commit-order deadlock.found_commit_order_deadlock(): Checks whether commit-order deadlock was detected.reset_commit_order_deadlock(): Resets commit-order deadlock state.get_transaction_ctx(): Returns the transaction context associated with the worker.get_worker_metrics(): Returns worker metrics handle.get_mdl_context(): Returns the MDL context associated with the worker.get_worker_id(): Returns the unique ID of the worker.get_trx_id(): Returns the transaction ID associated with the worker.can_be_retried(THD *thd): Checks whether transaction can be retried.is_same_channel(const Parallel_worker_context *other): Checks if the current worker and another worker are executing transactions from the same channel.get_for_channel_id(bool): Returns formatted "for channel" string.is_csa(): Indicates whether this is a CSA worker context.
Csa_worker_context Class
The Csa_worker_context class is a concrete implementation of the
Parallel_worker_context interface. It provides additional functionality
specific to the new Change Stream Applier.
Attributes of the Csa_worker_context class are:
m_trx_id: The transaction ID associated with the worker.m_worker_id: The unique ID of the worker.m_channel_id: The ID of the channel associated with the worker.m_trx_ctx: The transaction context associated with the worker.m_current_retry: The current retry attempt for the worker.m_retries_num: The total number of retries allowed for the worker.m_for_channel_id: Helper state variable used to hold channel string used on channel failurem_is_commit_order_deadlock: Atomic boolean variable indicating found commit order deadlockm_rollback: Atomic boolean used to determine whether an external rollback request was already enqueuedm_rpco_state: Atomic RPCO state with three values:preparing,prepared, andcommit. It distinguishes whether the transaction is still being applied, may be rescued externally, or already entered commit ownership.m_rollback_promise: Promise object used for synchronization after an external rollback
The Csa_worker_context implements the following methods:
report_commit_order_deadlock(): Reports a commit order deadlock detected by the worker.found_commit_order_deadlock(): Checks if a commit order deadlock has been detected by the worker.reset_commit_order_deadlock(): Resets the commit order deadlock flag for the worker.is_same_channel(const Parallel_worker_context *arg): Checks if the current worker and another worker are executing transactions from the same channel.get_channel_id(): Returns the ID of the channel associated with the worker.get_transaction_ctx(): Returns the transaction context associated with the worker.get_worker_id(): Returns the unique ID of the worker.get_mdl_context(): Returns the MDL context associated with the worker.get_trx_id(): Returns the transaction ID associated with the worker.can_be_retried(THD *thd): Checks if the worker can be retried based on the current transaction context.get_worker_metrics(): Returns worker metrics object (dummy implementation in CSA worker context).get_for_channel_id(bool upper_case): Returns the "for channel" ID string, optionally in upper case.has_temporary_error(): Checks if the transaction error is temporary and transaction is eligible for retryset_applied(): Moves the transaction into thepreparedRPCO state, meaning the apply phase finished and the transaction may be rescued externally.set_committing(): Moves the transaction frompreparedintocommit, meaning commit ownership stays with the committing thread.handle_commit_order_deadlock(): Handles commit order deadlock by requesting transaction rollback due to RPCO deadlock.wait_for_rollback(): Waits for external rollback to finish.update(...): Updates context values for new/retried transactions.is_csa(): Checks if the worker is a CSA worker (true for this class)
handle_slave_sql method
The handle_slave_sql method has been modified in a way that if CSA
was enabled by the 'CHANGE REPLICA SOURCE TO' command, it calls CSA service.
Otherwise, it initializes replica workers and call old MTA functionality.
CHANGE REPLICATION SOURCE TO command
The CHANGE REPLICATION SOURCE TO is extended with new options:
- APPLIER_VERSION: This option enables users to specify the applier version to be used; acceptable values: 1, 2
- APPLIER_WORKER_COUNT: This option allows users to configure the number of worker threads allocated to a channel for the new Change Stream Applier.
- APPLIER_EVENT_MEMORY_LIMIT: This option allows users to specify a per-channel memory limit for caching binlog events
Following the current persistence and observability requirements, applier version and specified number of workers are stored in the applier metadata table:
- Applier_version: unsigned integer in range <1,2>
- APPLIER_WORKER_COUNT: unsigned integer in range <1,1024>
- Applier_event_memory_limit: unsigned integer in range
and exposed in the 'replication_applier_configuration' PFS table:
- APPLIER_VERSION unsigned integer, not null
- APPLIER_WORKER_COUNT unsigned integer, not null
- APPLIER_EVENT_MEMORY_LIMIT unsigned integer, not null