MySQL 8.4.2
Source Code Documentation
commit_order_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
25#define CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
26#include <cstdint>
27#include <vector>
28
29#include "sql/containers/integrals_lockfree_queue.h" // container::Integrals_lockfree_queue
30#include "sql/locks/shared_spin_lock.h" // lock::Shared_spin_lock
31#include "sql/mdl.h" // MDL_context
32#include "sql/memory/aligned_atomic.h" // memory::Aligned_atomic
33
34namespace cs {
35namespace apply {
36/**
37 Queue to maintain the ordered sequence of workers waiting for commit.
38
39 The queue has a static list of elements, each one representing each worker
40 commit information.
41
42 The management of the order by which each worker will commit is implemented
43 using:
44
45 - A member variable pointing to the first worker to commit, the `head`.
46 - A member variable pointing to the last worker to commit, the `tail`.
47 - Each queue element holds a member variable that points to the next worker to
48 commit, the `next`.
49 - Pushing a new element will move the `tail`.
50 - Popping an element will move the `head`.
51
52 Atomics are used to make the queue thread-safe without the need for an
53 explicit lock.
54 */
56 public:
57 using value_type = unsigned long;
59 using sequence_type = unsigned long long;
60 static constexpr value_type NO_WORKER{
61 queue_type::null_value}; // No worker on the queue
62
63 /**
64 Enumeration to represent each worker state
65 */
66 enum class enum_worker_stage {
67 REGISTERED, // Transaction was handed-over to worker, for applying
68 FINISHED_APPLYING, // Transaction execution finished
69 REQUESTED_GRANT, // Request for turn to commit has been placed
70 WAITED, // Waited for the turn to commit
71 FINISHED // Committed and finished processing the transaction
72 };
73
74 /**
75 Queue element, holding the needed information to manage the commit ordering.
76 */
77 class Node {
78 public:
79 friend class Commit_order_queue;
80
81 /** The identifier of the worker that maps to a queue index. */
83 /** The MDL context to be used to wait on the MDL graph. */
85 /** Which stage is the worker on. */
88
89 /**
90 Marks the commit request sequence number this node's worker is
91 processing as frozen iff the sequence number current value is equal
92 to the `expected` parameter.
93
94 Commit request sequence numbers are monotonically ever increasing
95 numbers that are used by worker threads to ensure ownership of the
96 worker commit turn unblocking operation:
97 1) A worker holding a sequence number `N` can only unblock worker
98 with sequence number `N + 1`.
99 2) A worker with sequence number `N + 1` can't be assigned a new
100 sequence number if worker with sequence number `N` is executing
101 the unblocking operation.
102
103 @param expected the commit request sequence number this node must
104 currently hold in order for it to be frozen
105
106 @return true if this nodes commit request sequence number has been
107 frozen, false otherwise.
108 */
110 /**
111 Removes the frozen mark from the commit request sequence number this
112 node's worker is processing if it was previously frozen.
113
114 Commit request sequence numbers are monotonically ever increasing
115 numbers that are used by worker threads to ensure ownership of the
116 worker commit turn unblocking operation:
117 1) A worker holding a sequence number `N` can only unblock worker
118 with sequence number `N + 1`.
119 2) A worker with sequence number `N + 1` can't be assigned a new
120 sequence number if worker with sequence number `N` is executing
121 the unblocking operation.
122
123 @param previously_frozen the sequence number value that was provided
124 while previously freezing.
125
126 @return true if this nodes commit request sequence number was frozen
127 and is now unfrozen, false otherwise.
128 */
130 Commit_order_queue::sequence_type previously_frozen);
131
132 private:
133 // No commit request sequence number assigned
135 // Commit request sequence number is marked as frozen
137
138 /** The sequence number for the commit request this node's worker is
139 processing. */
142
143 /**
144 Sets the commit request sequence number for this node as unassigned. If
145 the sequence number is currently frozen, invoking this method will make
146 the invoking thread to spin until the sequence number is unfrozen.
147
148 Commit request sequence numbers are monotonically ever increasing
149 numbers that are used by worker threads to ensure ownership of the
150 worker commit turn unblocking operation:
151 1) A worker holding a sequence number `N` can only unblock worker
152 with sequence number `N + 1`.
153 2) A worker with sequence number `N + 1` can't be assigned a new
154 sequence number if worker with sequence number `N` is executing
155 the unblocking operation.
156
157 @return the sequence number for the commit request sequence number
158 this node's worker is been cleared of
159 */
161 };
162
163 /**
164 Iterator helper class to iterate over the Commit_order_queue following the
165 underlying commit order.
166
167
168 Check C++ documentation on `Iterator named requirements` for more
169 information on the implementation.
170 */
171 class Iterator {
172 public:
173 using difference_type = std::ptrdiff_t;
177 using iterator_category = std::forward_iterator_tag;
179
180 explicit Iterator(Commit_order_queue &parent, index_type position);
181 Iterator(const Iterator &rhs);
182 Iterator(Iterator &&rhs);
183 virtual ~Iterator() = default;
184
185 // BASIC ITERATOR METHODS //
190 // END / BASIC ITERATOR METHODS //
191
192 // INPUT ITERATOR METHODS //
193 Iterator operator++(int);
195 bool operator==(Iterator const &rhs) const;
196 bool operator!=(Iterator const &rhs) const;
197 // END / INPUT ITERATOR METHODS //
198
199 // OUTPUT ITERATOR METHODS //
200 // reference operator*(); <- already defined
201 // iterator operator++(int); <- already defined
202 // END / OUTPUT ITERATOR METHODS //
203
204 // FORWARD ITERATOR METHODS //
205 // Enable support for both input and output iterator <- already enabled
206 // END / FORWARD ITERATOR METHODS //
207
208 private:
209 /** The target queue that holds the list to be iterated. */
211 /** The iterator pointing to the underlying queue position. */
212 Commit_order_queue::queue_type::Iterator m_current;
213 };
214
215 /**
216 Constructor for the class, takes the number of workers and initializes the
217 underlying static list with such size.
218
219 @param n_workers The number of workers to include in the commit order
220 managerment.
221 */
222 Commit_order_queue(size_t n_workers);
223 /**
224 Default destructor for the class.
225 */
226 virtual ~Commit_order_queue() = default;
227 /**
228 Retrieve the commit order information Node for worker identified by `id`.
229
230 @param id The identifier of the worker
231
232 @return A reference to the commit order information Node for the given
233 worker.
234 */
236 /**
237 Retrieves the error state for the current thread last executed queue
238 operation. Values may be:
239
240 - SUCCESS is the operation succeeded.
241 - NO_MORE_ELEMENTS if the last pop tried to access an empty queue.
242 - NO_SPACE_AVAILABLE if the last push tried to push while the queue was
243 full.
244
245
246 @return The error state for the thread's last operation on the queue.
247 */
249 /**
250 Whether or not there are more workers to commit.
251
252 @return True if there are no more workers, false otherwise.
253 */
254 bool is_empty();
255 /**
256 Removes from the queue and returns the identifier of the worker that is
257 first in-line to commit.
258
259 If another thread is accessing the commit order sequence number and has
260 frozen it's state, this operation will spin until the state is
261 unfrozen.
262
263 @return A tuple holding the identifier of the worker that is first
264 in-line to commit and the associated commit order sequence
265 number.
266 */
267 std::tuple<value_type, sequence_type> pop();
268 /**
269 Adds to the end of the commit queue the worker identifier passed as
270 parameter.
271
272 @param id The identifier of the worker to add to the commit queue.
273 */
274 void push(value_type id);
275 /**
276 Retrieves the identifier of the worker that is first in-line to commit.
277
278 @return The identifier of the worker that is first in-line to commit.
279 */
281 /**
282 Removes all remaining workers from the queue.
283 */
284 void clear();
285 /**
286 Acquires exclusivity over changes (push, pop) on the queue.
287 */
288 void freeze();
289 /**
290 Releases exclusivity over changes (push, pop) on the queue.
291 */
292 void unfreeze();
293 /**
294 Retrieves an iterator instance that points to the head of the commit queue
295 and that will iterate over the worker Nodes that are in-line to commit,
296 following the requested commit order.
297
298 @return An instance of `Iterator` pointing to the queue's head, that
299 iterates over the workers that are in-line to commit and following
300 the requested commit order.
301 */
302 Iterator begin();
303 /**
304 Retrieves an iterator instance that points to the tail of the commit queue.
305
306 @return An instance of `Iterator` that points to the tail of the
307 queue.
308 */
309 Iterator end();
310 /**
311 Retrieves the textual representation of this object's underlying commit
312 queue.
313
314 @return The textual representation of this object's underlying commit queue.
315 */
316 std::string to_string();
317 /**
318 Friend operator for writing to an `std::ostream` object.
319
320 @see `std::ostream::operator<<`
321 */
322 friend inline std::ostream &operator<<(std::ostream &out,
323 Commit_order_queue &to_output) {
324 out << to_output.to_string() << std::flush;
325 return out;
326 }
327
328 /**
329 Returns the expected next number in the ticket sequence.
330
331 @param current_seq_nr The current sequence number, for which the next
332 should be computed.
333
334 @return The expected next number in the ticket sequence.
335 */
336 static sequence_type get_next_sequence_nr(sequence_type current_seq_nr);
337
338 private:
339 /** The commit sequence number counter */
342 /** The list of worker Nodes, indexed by worker ID. */
343 std::vector<Commit_order_queue::Node> m_workers;
344 /** The queue to hold the sequence of worker IDs waiting to commit. */
346 /** The lock to acquire exlusivity over changes on the queue. */
348};
349} // namespace apply
350} // namespace cs
351#endif // CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
Context of the owner of metadata locks.
Definition: mdl.h:1412
unsigned long long index_type
Definition: integrals_lockfree_queue.h:144
enum_queue_state
Definition: integrals_lockfree_queue.h:153
static constexpr value_type null_value
Definition: integrals_lockfree_queue.h:148
Iterator helper class to iterate over the Commit_order_queue following the underlying commit order.
Definition: commit_order_queue.h:171
Commit_order_queue * m_target
The target queue that holds the list to be iterated.
Definition: commit_order_queue.h:210
Iterator & operator=(const Iterator &rhs)
reference operator*()
Definition: commit_order_queue.cc:89
Commit_order_queue::queue_type::index_type index_type
Definition: commit_order_queue.h:178
Iterator & operator++()
Definition: commit_order_queue.cc:83
bool operator!=(Iterator const &rhs) const
Definition: commit_order_queue.cc:114
pointer operator->()
Definition: commit_order_queue.cc:103
Commit_order_queue::queue_type::Iterator m_current
The iterator pointing to the underlying queue position.
Definition: commit_order_queue.h:212
std::ptrdiff_t difference_type
Definition: commit_order_queue.h:173
std::forward_iterator_tag iterator_category
Definition: commit_order_queue.h:177
Iterator(Commit_order_queue &parent, index_type position)
Definition: commit_order_queue.cc:58
bool operator==(Iterator const &rhs) const
Definition: commit_order_queue.cc:109
Queue element, holding the needed information to manage the commit ordering.
Definition: commit_order_queue.h:77
memory::Aligned_atomic< Commit_order_queue::sequence_type > m_commit_sequence_nr
The sequence number for the commit request this node's worker is processing.
Definition: commit_order_queue.h:141
value_type m_worker_id
The identifier of the worker that maps to a queue index.
Definition: commit_order_queue.h:82
Commit_order_queue::sequence_type reset_commit_sequence_nr()
Sets the commit request sequence number for this node as unassigned.
Definition: commit_order_queue.cc:44
memory::Aligned_atomic< Commit_order_queue::enum_worker_stage > m_stage
Which stage is the worker on.
Definition: commit_order_queue.h:86
bool freeze_commit_sequence_nr(Commit_order_queue::sequence_type expected)
Marks the commit request sequence number this node's worker is processing as frozen iff the sequence ...
Definition: commit_order_queue.cc:30
MDL_context * m_mdl_context
The MDL context to be used to wait on the MDL graph.
Definition: commit_order_queue.h:84
static constexpr Commit_order_queue::sequence_type NO_SEQUENCE_NR
Definition: commit_order_queue.h:134
static constexpr Commit_order_queue::sequence_type SEQUENCE_NR_FROZEN
Definition: commit_order_queue.h:136
bool unfreeze_commit_sequence_nr(Commit_order_queue::sequence_type previously_frozen)
Removes the frozen mark from the commit request sequence number this node's worker is processing if i...
Definition: commit_order_queue.cc:36
Queue to maintain the ordered sequence of workers waiting for commit.
Definition: commit_order_queue.h:55
std::vector< Commit_order_queue::Node > m_workers
The list of worker Nodes, indexed by worker ID.
Definition: commit_order_queue.h:343
virtual ~Commit_order_queue()=default
Default destructor for the class.
void unfreeze()
Releases exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:190
Iterator begin()
Retrieves an iterator instance that points to the head of the commit queue and that will iterate over...
Definition: commit_order_queue.cc:194
enum_worker_stage
Enumeration to represent each worker state.
Definition: commit_order_queue.h:66
unsigned long long sequence_type
Definition: commit_order_queue.h:59
std::string to_string()
Retrieves the textual representation of this object's underlying commit queue.
Definition: commit_order_queue.cc:204
void freeze()
Acquires exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:186
Iterator end()
Retrieves an iterator instance that points to the tail of the commit queue.
Definition: commit_order_queue.cc:199
static constexpr value_type NO_WORKER
Definition: commit_order_queue.h:60
Node & operator[](value_type id)
Retrieve the commit order information Node for worker identified by id.
Definition: commit_order_queue.cc:130
void push(value_type id)
Adds to the end of the commit queue the worker identifier passed as parameter.
Definition: commit_order_queue.cc:163
value_type front()
Retrieves the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:180
memory::Aligned_atomic< sequence_type > m_commit_sequence_generator
The commit sequence number counter.
Definition: commit_order_queue.h:340
lock::Shared_spin_lock m_push_pop_lock
The lock to acquire exlusivity over changes on the queue.
Definition: commit_order_queue.h:347
friend std::ostream & operator<<(std::ostream &out, Commit_order_queue &to_output)
Friend operator for writing to an std::ostream object.
Definition: commit_order_queue.h:322
queue_type m_commit_queue
The queue to hold the sequence of worker IDs waiting to commit.
Definition: commit_order_queue.h:345
void clear()
Removes all remaining workers from the queue.
Definition: commit_order_queue.cc:184
bool is_empty()
Whether or not there are more workers to commit.
Definition: commit_order_queue.cc:142
static sequence_type get_next_sequence_nr(sequence_type current_seq_nr)
Returns the expected next number in the ticket sequence.
Definition: commit_order_queue.cc:209
unsigned long value_type
Definition: commit_order_queue.h:57
Commit_order_queue::queue_type::enum_queue_state get_state()
Retrieves the error state for the current thread last executed queue operation.
Definition: commit_order_queue.cc:138
Commit_order_queue(size_t n_workers)
Constructor for the class, takes the number of workers and initializes the underlying static list wit...
Definition: commit_order_queue.cc:119
std::tuple< value_type, sequence_type > pop()
Removes from the queue and returns the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:148
Definition: shared_spin_lock.h:80
Templated class that encapsulates an std::atomic within a byte buffer that is padded to the processor...
Definition: aligned_atomic.h:158
Definition: commit_order_queue.h:34
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:114