MySQL 9.2.0
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
commit_order_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
25#define CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
26#include <cstdint>
27#include <vector>
28
29#include "sql/containers/integrals_lockfree_queue.h" // container::Integrals_lockfree_queue
30#include "sql/locks/shared_spin_lock.h" // lock::Shared_spin_lock
31#include "sql/mdl.h" // MDL_context
32#include "sql/memory/aligned_atomic.h" // memory::Aligned_atomic
33
34namespace cs {
35namespace apply {
36/**
37 Queue to maintain the ordered sequence of workers waiting for commit.
38
39 The queue has a static list of elements, each one representing each worker
40 commit information.
41
42 The management of the order by which each worker will commit is implemented
43 using:
44
45 - A member variable pointing to the first worker to commit, the `head`.
46 - A member variable pointing to the last worker to commit, the `tail`.
47 - Each queue element holds a member variable that points to the next worker to
48 commit, the `next`.
49 - Pushing a new element will move the `tail`.
50 - Popping an element will move the `head`.
51
52 Atomics are used to make the queue thread-safe without the need for an
53 explicit lock.
54 */
56 public:
57 using value_type = unsigned long;
59 using sequence_type = unsigned long long;
60 static constexpr value_type NO_WORKER{
61 queue_type::null_value}; // No worker on the queue
62
63 /**
64 Enumeration to represent each worker state
65 */
66 enum class enum_worker_stage {
67 REGISTERED, // Transaction was handed-over to worker, for applying
68 FINISHED_APPLYING, // Transaction execution finished
69 REQUESTED_GRANT, // Request for turn to commit has been placed
70 WAITED, // Waited for the turn to commit
71 WAITED_OVERTAKE, // Waited to commit, allowed to commit out of order
72 FINISHED // Committed and finished processing the transaction
73 };
74
75 /**
76 Queue element, holding the needed information to manage the commit ordering.
77 */
78 class Node {
79 public:
80 friend class Commit_order_queue;
81
82 /** The identifier of the worker that maps to a queue index. */
84 /** The MDL context to be used to wait on the MDL graph. */
86 /** Which stage is the worker on. */
89
90 /**
91 Marks the commit request sequence number this node's worker is
92 processing as frozen iff the sequence number current value is equal
93 to the `expected` parameter.
94
95 Commit request sequence numbers are monotonically ever increasing
96 numbers that are used by worker threads to ensure ownership of the
97 worker commit turn unblocking operation:
98 1) A worker holding a sequence number `N` can only unblock worker
99 with sequence number `N + 1`.
100 2) A worker with sequence number `N + 1` can't be assigned a new
101 sequence number if worker with sequence number `N` is executing
102 the unblocking operation.
103
104 @param expected the commit request sequence number this node must
105 currently hold in order for it to be frozen
106
107 @return true if this nodes commit request sequence number has been
108 frozen, false otherwise.
109 */
111 /**
112 Removes the frozen mark from the commit request sequence number this
113 node's worker is processing if it was previously frozen.
114
115 Commit request sequence numbers are monotonically ever increasing
116 numbers that are used by worker threads to ensure ownership of the
117 worker commit turn unblocking operation:
118 1) A worker holding a sequence number `N` can only unblock worker
119 with sequence number `N + 1`.
120 2) A worker with sequence number `N + 1` can't be assigned a new
121 sequence number if worker with sequence number `N` is executing
122 the unblocking operation.
123
124 @param previously_frozen the sequence number value that was provided
125 while previously freezing.
126
127 @return true if this nodes commit request sequence number was frozen
128 and is now unfrozen, false otherwise.
129 */
131 Commit_order_queue::sequence_type previously_frozen);
132
133 private:
134 // No commit request sequence number assigned
136 // Commit request sequence number is marked as frozen
138
139 /** The sequence number for the commit request this node's worker is
140 processing. */
143
144 /** The sequence number for the commit request of the worker scheduled
145 after this one. */
148
149 /**
150 Sets the commit request sequence number for this node as unassigned. If
151 the sequence number is currently frozen, invoking this method will make
152 the invoking thread to spin until the sequence number is unfrozen.
153
154 Commit request sequence numbers are monotonically ever increasing
155 numbers that are used by worker threads to ensure ownership of the
156 worker commit turn unblocking operation:
157 1) A worker holding a sequence number `N` can only unblock worker
158 with sequence number `N + 1`.
159 2) A worker with sequence number `N + 1` can't be assigned a new
160 sequence number if worker with sequence number `N` is executing
161 the unblocking operation.
162
163 @return the sequence number for the commit request sequence number
164 of the worker scheduled after this one, or NO_SEQUENCE_NR
165 if there is no next worker.
166 */
168 };
169
170 /**
171 Iterator helper class to iterate over the Commit_order_queue following the
172 underlying commit order.
173
174
175 Check C++ documentation on `Iterator named requirements` for more
176 information on the implementation.
177 */
178 class Iterator {
179 public:
180 using difference_type = std::ptrdiff_t;
184 using iterator_category = std::forward_iterator_tag;
186
187 explicit Iterator(Commit_order_queue &parent, index_type position);
188 Iterator(const Iterator &rhs);
189 Iterator(Iterator &&rhs);
190 virtual ~Iterator() = default;
191
192 // BASIC ITERATOR METHODS //
197 // END / BASIC ITERATOR METHODS //
198
199 // INPUT ITERATOR METHODS //
200 Iterator operator++(int);
202 bool operator==(Iterator const &rhs) const;
203 bool operator!=(Iterator const &rhs) const;
204 // END / INPUT ITERATOR METHODS //
205
206 // OUTPUT ITERATOR METHODS //
207 // reference operator*(); <- already defined
208 // iterator operator++(int); <- already defined
209 // END / OUTPUT ITERATOR METHODS //
210
211 // FORWARD ITERATOR METHODS //
212 // Enable support for both input and output iterator <- already enabled
213 // END / FORWARD ITERATOR METHODS //
214
215 private:
216 /** The target queue that holds the list to be iterated. */
218 /** The iterator pointing to the underlying queue position. */
219 Commit_order_queue::queue_type::Iterator m_current;
220 };
221
222 /**
223 Constructor for the class, takes the number of workers and initializes the
224 underlying static list with such size.
225
226 @param n_workers The number of workers to include in the commit order
227 managerment.
228 */
229 Commit_order_queue(size_t n_workers);
230 /**
231 Default destructor for the class.
232 */
233 virtual ~Commit_order_queue() = default;
234 /**
235 Retrieve the commit order information Node for worker identified by `id`.
236
237 @param id The identifier of the worker
238
239 @return A reference to the commit order information Node for the given
240 worker.
241 */
243 /**
244 Retrieves the error state for the current thread last executed queue
245 operation. Values may be:
246
247 - SUCCESS is the operation succeeded.
248 - NO_MORE_ELEMENTS if the last pop tried to access an empty queue.
249 - NO_SPACE_AVAILABLE if the last push tried to push while the queue was
250 full.
251
252
253 @return The error state for the thread's last operation on the queue.
254 */
256 /**
257 Whether or not there are more workers to commit.
258
259 @return True if there are no more workers, false otherwise.
260 */
261 bool is_empty();
262 /**
263 Removes from the queue and returns the identifier of the worker that is
264 first in-line to commit.
265
266 If another thread is accessing the commit order sequence number and has
267 frozen it's state, this operation will spin until the state is
268 unfrozen.
269
270 @return A tuple holding the identifier of the worker that is first
271 in-line to commit and the sequence number for the commit
272 request of the worker scheduled after this one (or NO_SEQUENCE_NR
273 if there is no next worker).
274 */
275 std::tuple<value_type, sequence_type> pop();
276 /**
277 Adds to the end of the commit queue the worker identifier passed as
278 parameter.
279
280 @param id The identifier of the worker to add to the commit queue.
281 */
282 void push(value_type id);
283 /**
284 Retrieves the identifier of the worker that is first in-line to commit.
285
286 @return The identifier of the worker that is first in-line to commit.
287 */
289 /**
290 Removes all remaining workers from the queue.
291 */
292 void clear();
293 /**
294 Acquires exclusivity over changes (push, pop) on the queue.
295 */
296 void freeze();
297 /**
298 Releases exclusivity over changes (push, pop) on the queue.
299 */
300 void unfreeze();
301 /**
302 Retrieves an iterator instance that points to the head of the commit queue
303 and that will iterate over the worker Nodes that are in-line to commit,
304 following the requested commit order.
305
306 @return An instance of `Iterator` pointing to the queue's head, that
307 iterates over the workers that are in-line to commit and following
308 the requested commit order.
309 */
310 Iterator begin();
311 /**
312 Retrieves an iterator instance that points to the tail of the commit queue.
313
314 @return An instance of `Iterator` that points to the tail of the
315 queue.
316 */
317 Iterator end();
318 /**
319 Retrieves the textual representation of this object's underlying commit
320 queue.
321
322 @return The textual representation of this object's underlying commit queue.
323 */
324 std::string to_string();
325 /**
326 Friend operator for writing to an `std::ostream` object.
327
328 @see `std::ostream::operator<<`
329 */
330 friend inline std::ostream &operator<<(std::ostream &out,
331 Commit_order_queue &to_output) {
332 out << to_output.to_string() << std::flush;
333 return out;
334 }
335
336 /**
337 Returns the expected next number in the ticket sequence.
338
339 @param current_seq_nr The current sequence number, for which the next
340 should be computed.
341
342 @return The expected next number in the ticket sequence.
343 */
344 static sequence_type get_next_sequence_nr(sequence_type current_seq_nr);
345
346 /**
347 Removes from the queue and returns the first identifier of the worker
348 that is equal to `index`.
349
350 @return A tuple holding the identifier of the worker equal to `index`
351 and the sequence number for the commit request of the worker
352 scheduled after this one (or NO_SEQUENCE_NR if there is no
353 next worker).
354 */
355 std::tuple<value_type, sequence_type> remove(value_type index);
356
357 private:
358 /** The commit sequence number counter */
361 /** The list of worker Nodes, indexed by worker ID. */
362 std::vector<Commit_order_queue::Node> m_workers;
363 /** The queue to hold the sequence of worker IDs waiting to commit. */
365 /** The lock to acquire exlusivity over changes on the queue. */
367 /**
368 Removes from the commit queue the first identifier that is equal to `index`.
369
370 @param to_remove The value to remove from the queue.
371
372 @return A tuple, where:
373 - The first component is the removed value, or NO_WORKER if the
374 value was not found.
375 - The second component is the value preceding the removed one,
376 or NO_WORKER if the value was not found, or was found in the
377 first slot.
378 */
379 std::tuple<value_type, value_type> remove_from_commit_queue(
380 value_type to_remove);
381};
382} // namespace apply
383} // namespace cs
384#endif // CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
Context of the owner of metadata locks.
Definition: mdl.h:1415
unsigned long long index_type
Definition: integrals_lockfree_queue.h:144
enum_queue_state
Definition: integrals_lockfree_queue.h:153
static constexpr value_type null_value
Definition: integrals_lockfree_queue.h:148
Iterator helper class to iterate over the Commit_order_queue following the underlying commit order.
Definition: commit_order_queue.h:178
Commit_order_queue * m_target
The target queue that holds the list to be iterated.
Definition: commit_order_queue.h:217
Iterator & operator=(const Iterator &rhs)
reference operator*()
Definition: commit_order_queue.cc:93
Commit_order_queue::queue_type::index_type index_type
Definition: commit_order_queue.h:185
Iterator & operator++()
Definition: commit_order_queue.cc:87
bool operator!=(Iterator const &rhs) const
Definition: commit_order_queue.cc:118
pointer operator->()
Definition: commit_order_queue.cc:107
Commit_order_queue::queue_type::Iterator m_current
The iterator pointing to the underlying queue position.
Definition: commit_order_queue.h:219
std::ptrdiff_t difference_type
Definition: commit_order_queue.h:180
std::forward_iterator_tag iterator_category
Definition: commit_order_queue.h:184
Iterator(Commit_order_queue &parent, index_type position)
Definition: commit_order_queue.cc:62
bool operator==(Iterator const &rhs) const
Definition: commit_order_queue.cc:113
Queue element, holding the needed information to manage the commit ordering.
Definition: commit_order_queue.h:78
memory::Aligned_atomic< Commit_order_queue::sequence_type > m_commit_sequence_nr
The sequence number for the commit request this node's worker is processing.
Definition: commit_order_queue.h:142
value_type m_worker_id
The identifier of the worker that maps to a queue index.
Definition: commit_order_queue.h:83
memory::Aligned_atomic< Commit_order_queue::sequence_type > m_next_commit_sequence_nr
The sequence number for the commit request of the worker scheduled after this one.
Definition: commit_order_queue.h:147
Commit_order_queue::sequence_type reset_commit_sequence_nr()
Sets the commit request sequence number for this node as unassigned.
Definition: commit_order_queue.cc:44
memory::Aligned_atomic< Commit_order_queue::enum_worker_stage > m_stage
Which stage is the worker on.
Definition: commit_order_queue.h:87
bool freeze_commit_sequence_nr(Commit_order_queue::sequence_type expected)
Marks the commit request sequence number this node's worker is processing as frozen iff the sequence ...
Definition: commit_order_queue.cc:30
MDL_context * m_mdl_context
The MDL context to be used to wait on the MDL graph.
Definition: commit_order_queue.h:85
static constexpr Commit_order_queue::sequence_type NO_SEQUENCE_NR
Definition: commit_order_queue.h:135
static constexpr Commit_order_queue::sequence_type SEQUENCE_NR_FROZEN
Definition: commit_order_queue.h:137
bool unfreeze_commit_sequence_nr(Commit_order_queue::sequence_type previously_frozen)
Removes the frozen mark from the commit request sequence number this node's worker is processing if i...
Definition: commit_order_queue.cc:36
Queue to maintain the ordered sequence of workers waiting for commit.
Definition: commit_order_queue.h:55
std::vector< Commit_order_queue::Node > m_workers
The list of worker Nodes, indexed by worker ID.
Definition: commit_order_queue.h:362
virtual ~Commit_order_queue()=default
Default destructor for the class.
void unfreeze()
Releases exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:198
Iterator begin()
Retrieves an iterator instance that points to the head of the commit queue and that will iterate over...
Definition: commit_order_queue.cc:202
enum_worker_stage
Enumeration to represent each worker state.
Definition: commit_order_queue.h:66
unsigned long long sequence_type
Definition: commit_order_queue.h:59
std::string to_string()
Retrieves the textual representation of this object's underlying commit queue.
Definition: commit_order_queue.cc:212
void freeze()
Acquires exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:194
Iterator end()
Retrieves an iterator instance that points to the tail of the commit queue.
Definition: commit_order_queue.cc:207
static constexpr value_type NO_WORKER
Definition: commit_order_queue.h:60
Node & operator[](value_type id)
Retrieve the commit order information Node for worker identified by id.
Definition: commit_order_queue.cc:134
std::tuple< value_type, value_type > remove_from_commit_queue(value_type to_remove)
Removes from the commit queue the first identifier that is equal to index.
Definition: commit_order_queue.cc:273
void push(value_type id)
Adds to the end of the commit queue the worker identifier passed as parameter.
Definition: commit_order_queue.cc:166
value_type front()
Retrieves the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:185
memory::Aligned_atomic< sequence_type > m_commit_sequence_generator
The commit sequence number counter.
Definition: commit_order_queue.h:359
lock::Shared_spin_lock m_push_pop_lock
The lock to acquire exlusivity over changes on the queue.
Definition: commit_order_queue.h:366
friend std::ostream & operator<<(std::ostream &out, Commit_order_queue &to_output)
Friend operator for writing to an std::ostream object.
Definition: commit_order_queue.h:330
queue_type m_commit_queue
The queue to hold the sequence of worker IDs waiting to commit.
Definition: commit_order_queue.h:364
void clear()
Removes all remaining workers from the queue.
Definition: commit_order_queue.cc:192
bool is_empty()
Whether or not there are more workers to commit.
Definition: commit_order_queue.cc:146
static sequence_type get_next_sequence_nr(sequence_type current_seq_nr)
Returns the expected next number in the ticket sequence.
Definition: commit_order_queue.cc:217
unsigned long value_type
Definition: commit_order_queue.h:57
std::tuple< value_type, sequence_type > remove(value_type index)
Removes from the queue and returns the first identifier of the worker that is equal to index.
Definition: commit_order_queue.cc:228
Commit_order_queue::queue_type::enum_queue_state get_state()
Retrieves the error state for the current thread last executed queue operation.
Definition: commit_order_queue.cc:142
Commit_order_queue(size_t n_workers)
Constructor for the class, takes the number of workers and initializes the underlying static list wit...
Definition: commit_order_queue.cc:123
std::tuple< value_type, sequence_type > pop()
Removes from the queue and returns the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:152
Definition: shared_spin_lock.h:80
Templated class that encapsulates an std::atomic within a byte buffer that is padded to the processor...
Definition: aligned_atomic.h:158
Definition: commit_order_queue.h:34
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:114