MySQL 8.3.0
Source Code Documentation
commit_order_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2014, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
24#define CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
25#include <cstdint>
26#include <vector>
27
28#include "sql/containers/integrals_lockfree_queue.h" // container::Integrals_lockfree_queue
29#include "sql/locks/shared_spin_lock.h" // lock::Shared_spin_lock
30#include "sql/mdl.h" // MDL_context
31#include "sql/memory/aligned_atomic.h" // memory::Aligned_atomic
32
33namespace cs {
34namespace apply {
35/**
36 Queue to maintain the ordered sequence of workers waiting for commit.
37
38 The queue has a static list of elements, each one representing each worker
39 commit information.
40
41 The management of the order by which each worker will commit is implemented
42 using:
43
44 - A member variable pointing to the first worker to commit, the `head`.
45 - A member variable pointing to the last worker to commit, the `tail`.
46 - Each queue element holds a member variable that points to the next worker to
47 commit, the `next`.
48 - Pushing a new element will move the `tail`.
49 - Popping an element will move the `head`.
50
51 Atomics are used to make the queue thread-safe without the need for an
52 explicit lock.
53 */
55 public:
56 using value_type = unsigned long;
58 using sequence_type = unsigned long long;
59 static constexpr value_type NO_WORKER{
60 queue_type::null_value}; // No worker on the queue
61
62 /**
63 Enumeration to represent each worker state
64 */
65 enum class enum_worker_stage {
66 REGISTERED, // Transaction was handed-over to worker, for applying
67 FINISHED_APPLYING, // Transaction execution finished
68 REQUESTED_GRANT, // Request for turn to commit has been placed
69 WAITED, // Waited for the turn to commit
70 FINISHED // Committed and finished processing the transaction
71 };
72
73 /**
74 Queue element, holding the needed information to manage the commit ordering.
75 */
76 class Node {
77 public:
78 friend class Commit_order_queue;
79
80 /** The identifier of the worker that maps to a queue index. */
82 /** The MDL context to be used to wait on the MDL graph. */
84 /** Which stage is the worker on. */
87
88 /**
89 Marks the commit request sequence number this node's worker is
90 processing as frozen iff the sequence number current value is equal
91 to the `expected` parameter.
92
93 Commit request sequence numbers are monotonically ever increasing
94 numbers that are used by worker threads to ensure ownership of the
95 worker commit turn unblocking operation:
96 1) A worker holding a sequence number `N` can only unblock worker
97 with sequence number `N + 1`.
98 2) A worker with sequence number `N + 1` can't be assigned a new
99 sequence number if worker with sequence number `N` is executing
100 the unblocking operation.
101
102 @param expected the commit request sequence number this node must
103 currently hold in order for it to be frozen
104
105 @return true if this nodes commit request sequence number has been
106 frozen, false otherwise.
107 */
109 /**
110 Removes the frozen mark from the commit request sequence number this
111 node's worker is processing if it was previously frozen.
112
113 Commit request sequence numbers are monotonically ever increasing
114 numbers that are used by worker threads to ensure ownership of the
115 worker commit turn unblocking operation:
116 1) A worker holding a sequence number `N` can only unblock worker
117 with sequence number `N + 1`.
118 2) A worker with sequence number `N + 1` can't be assigned a new
119 sequence number if worker with sequence number `N` is executing
120 the unblocking operation.
121
122 @param previously_frozen the sequence number value that was provided
123 while previously freezing.
124
125 @return true if this nodes commit request sequence number was frozen
126 and is now unfrozen, false otherwise.
127 */
129 Commit_order_queue::sequence_type previously_frozen);
130
131 private:
132 // No commit request sequence number assigned
134 // Commit request sequence number is marked as frozen
136
137 /** The sequence number for the commit request this node's worker is
138 processing. */
141
142 /**
143 Sets the commit request sequence number for this node as unassigned. If
144 the sequence number is currently frozen, invoking this method will make
145 the invoking thread to spin until the sequence number is unfrozen.
146
147 Commit request sequence numbers are monotonically ever increasing
148 numbers that are used by worker threads to ensure ownership of the
149 worker commit turn unblocking operation:
150 1) A worker holding a sequence number `N` can only unblock worker
151 with sequence number `N + 1`.
152 2) A worker with sequence number `N + 1` can't be assigned a new
153 sequence number if worker with sequence number `N` is executing
154 the unblocking operation.
155
156 @return the sequence number for the commit request sequence number
157 this node's worker is been cleared of
158 */
160 };
161
162 /**
163 Iterator helper class to iterate over the Commit_order_queue following the
164 underlying commit order.
165
166
167 Check C++ documentation on `Iterator named requirements` for more
168 information on the implementation.
169 */
170 class Iterator {
171 public:
172 using difference_type = std::ptrdiff_t;
176 using iterator_category = std::forward_iterator_tag;
178
179 explicit Iterator(Commit_order_queue &parent, index_type position);
180 Iterator(const Iterator &rhs);
181 Iterator(Iterator &&rhs);
182 virtual ~Iterator() = default;
183
184 // BASIC ITERATOR METHODS //
189 // END / BASIC ITERATOR METHODS //
190
191 // INPUT ITERATOR METHODS //
192 Iterator operator++(int);
194 bool operator==(Iterator const &rhs) const;
195 bool operator!=(Iterator const &rhs) const;
196 // END / INPUT ITERATOR METHODS //
197
198 // OUTPUT ITERATOR METHODS //
199 // reference operator*(); <- already defined
200 // iterator operator++(int); <- already defined
201 // END / OUTPUT ITERATOR METHODS //
202
203 // FORWARD ITERATOR METHODS //
204 // Enable support for both input and output iterator <- already enabled
205 // END / FORWARD ITERATOR METHODS //
206
207 private:
208 /** The target queue that holds the list to be iterated. */
210 /** The iterator pointing to the underlying queue position. */
211 Commit_order_queue::queue_type::Iterator m_current;
212 };
213
214 /**
215 Constructor for the class, takes the number of workers and initializes the
216 underlying static list with such size.
217
218 @param n_workers The number of workers to include in the commit order
219 managerment.
220 */
221 Commit_order_queue(size_t n_workers);
222 /**
223 Default destructor for the class.
224 */
225 virtual ~Commit_order_queue() = default;
226 /**
227 Retrieve the commit order information Node for worker identified by `id`.
228
229 @param id The identifier of the worker
230
231 @return A reference to the commit order information Node for the given
232 worker.
233 */
235 /**
236 Retrieves the error state for the current thread last executed queue
237 operation. Values may be:
238
239 - SUCCESS is the operation succeeded.
240 - NO_MORE_ELEMENTS if the last pop tried to access an empty queue.
241 - NO_SPACE_AVAILABLE if the last push tried to push while the queue was
242 full.
243
244
245 @return The error state for the thread's last operation on the queue.
246 */
248 /**
249 Whether or not there are more workers to commit.
250
251 @return True if there are no more workers, false otherwise.
252 */
253 bool is_empty();
254 /**
255 Removes from the queue and returns the identifier of the worker that is
256 first in-line to commit.
257
258 If another thread is accessing the commit order sequence number and has
259 frozen it's state, this operation will spin until the state is
260 unfrozen.
261
262 @return A tuple holding the identifier of the worker that is first
263 in-line to commit and the associated commit order sequence
264 number.
265 */
266 std::tuple<value_type, sequence_type> pop();
267 /**
268 Adds to the end of the commit queue the worker identifier passed as
269 parameter.
270
271 @param id The identifier of the worker to add to the commit queue.
272 */
273 void push(value_type id);
274 /**
275 Retrieves the identifier of the worker that is first in-line to commit.
276
277 @return The identifier of the worker that is first in-line to commit.
278 */
280 /**
281 Removes all remaining workers from the queue.
282 */
283 void clear();
284 /**
285 Acquires exclusivity over changes (push, pop) on the queue.
286 */
287 void freeze();
288 /**
289 Releases exclusivity over changes (push, pop) on the queue.
290 */
291 void unfreeze();
292 /**
293 Retrieves an iterator instance that points to the head of the commit queue
294 and that will iterate over the worker Nodes that are in-line to commit,
295 following the requested commit order.
296
297 @return An instance of `Iterator` pointing to the queue's head, that
298 iterates over the workers that are in-line to commit and following
299 the requested commit order.
300 */
301 Iterator begin();
302 /**
303 Retrieves an iterator instance that points to the tail of the commit queue.
304
305 @return An instance of `Iterator` that points to the tail of the
306 queue.
307 */
308 Iterator end();
309 /**
310 Retrieves the textual representation of this object's underlying commit
311 queue.
312
313 @return The textual representation of this object's underlying commit queue.
314 */
315 std::string to_string();
316 /**
317 Friend operator for writing to an `std::ostream` object.
318
319 @see `std::ostream::operator<<`
320 */
321 friend inline std::ostream &operator<<(std::ostream &out,
322 Commit_order_queue &to_output) {
323 out << to_output.to_string() << std::flush;
324 return out;
325 }
326
327 /**
328 Returns the expected next number in the ticket sequence.
329
330 @param current_seq_nr The current sequence number, for which the next
331 should be computed.
332
333 @return The expected next number in the ticket sequence.
334 */
335 static sequence_type get_next_sequence_nr(sequence_type current_seq_nr);
336
337 private:
338 /** The commit sequence number counter */
341 /** The list of worker Nodes, indexed by worker ID. */
342 std::vector<Commit_order_queue::Node> m_workers;
343 /** The queue to hold the sequence of worker IDs waiting to commit. */
345 /** The lock to acquire exlusivity over changes on the queue. */
347};
348} // namespace apply
349} // namespace cs
350#endif // CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
Context of the owner of metadata locks.
Definition: mdl.h:1411
unsigned long long index_type
Definition: integrals_lockfree_queue.h:143
enum_queue_state
Definition: integrals_lockfree_queue.h:152
static constexpr value_type null_value
Definition: integrals_lockfree_queue.h:147
Iterator helper class to iterate over the Commit_order_queue following the underlying commit order.
Definition: commit_order_queue.h:170
Commit_order_queue * m_target
The target queue that holds the list to be iterated.
Definition: commit_order_queue.h:209
Iterator & operator=(const Iterator &rhs)
reference operator*()
Definition: commit_order_queue.cc:88
Commit_order_queue::queue_type::index_type index_type
Definition: commit_order_queue.h:177
Iterator & operator++()
Definition: commit_order_queue.cc:82
bool operator!=(Iterator const &rhs) const
Definition: commit_order_queue.cc:113
pointer operator->()
Definition: commit_order_queue.cc:102
Commit_order_queue::queue_type::Iterator m_current
The iterator pointing to the underlying queue position.
Definition: commit_order_queue.h:211
std::ptrdiff_t difference_type
Definition: commit_order_queue.h:172
std::forward_iterator_tag iterator_category
Definition: commit_order_queue.h:176
Iterator(Commit_order_queue &parent, index_type position)
Definition: commit_order_queue.cc:57
bool operator==(Iterator const &rhs) const
Definition: commit_order_queue.cc:108
Queue element, holding the needed information to manage the commit ordering.
Definition: commit_order_queue.h:76
memory::Aligned_atomic< Commit_order_queue::sequence_type > m_commit_sequence_nr
The sequence number for the commit request this node's worker is processing.
Definition: commit_order_queue.h:140
value_type m_worker_id
The identifier of the worker that maps to a queue index.
Definition: commit_order_queue.h:81
Commit_order_queue::sequence_type reset_commit_sequence_nr()
Sets the commit request sequence number for this node as unassigned.
Definition: commit_order_queue.cc:43
memory::Aligned_atomic< Commit_order_queue::enum_worker_stage > m_stage
Which stage is the worker on.
Definition: commit_order_queue.h:85
bool freeze_commit_sequence_nr(Commit_order_queue::sequence_type expected)
Marks the commit request sequence number this node's worker is processing as frozen iff the sequence ...
Definition: commit_order_queue.cc:29
MDL_context * m_mdl_context
The MDL context to be used to wait on the MDL graph.
Definition: commit_order_queue.h:83
static constexpr Commit_order_queue::sequence_type NO_SEQUENCE_NR
Definition: commit_order_queue.h:133
static constexpr Commit_order_queue::sequence_type SEQUENCE_NR_FROZEN
Definition: commit_order_queue.h:135
bool unfreeze_commit_sequence_nr(Commit_order_queue::sequence_type previously_frozen)
Removes the frozen mark from the commit request sequence number this node's worker is processing if i...
Definition: commit_order_queue.cc:35
Queue to maintain the ordered sequence of workers waiting for commit.
Definition: commit_order_queue.h:54
std::vector< Commit_order_queue::Node > m_workers
The list of worker Nodes, indexed by worker ID.
Definition: commit_order_queue.h:342
virtual ~Commit_order_queue()=default
Default destructor for the class.
void unfreeze()
Releases exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:189
Iterator begin()
Retrieves an iterator instance that points to the head of the commit queue and that will iterate over...
Definition: commit_order_queue.cc:193
enum_worker_stage
Enumeration to represent each worker state.
Definition: commit_order_queue.h:65
unsigned long long sequence_type
Definition: commit_order_queue.h:58
std::string to_string()
Retrieves the textual representation of this object's underlying commit queue.
Definition: commit_order_queue.cc:203
void freeze()
Acquires exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:185
Iterator end()
Retrieves an iterator instance that points to the tail of the commit queue.
Definition: commit_order_queue.cc:198
static constexpr value_type NO_WORKER
Definition: commit_order_queue.h:59
Node & operator[](value_type id)
Retrieve the commit order information Node for worker identified by id.
Definition: commit_order_queue.cc:129
void push(value_type id)
Adds to the end of the commit queue the worker identifier passed as parameter.
Definition: commit_order_queue.cc:162
value_type front()
Retrieves the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:179
memory::Aligned_atomic< sequence_type > m_commit_sequence_generator
The commit sequence number counter.
Definition: commit_order_queue.h:339
lock::Shared_spin_lock m_push_pop_lock
The lock to acquire exlusivity over changes on the queue.
Definition: commit_order_queue.h:346
friend std::ostream & operator<<(std::ostream &out, Commit_order_queue &to_output)
Friend operator for writing to an std::ostream object.
Definition: commit_order_queue.h:321
queue_type m_commit_queue
The queue to hold the sequence of worker IDs waiting to commit.
Definition: commit_order_queue.h:344
void clear()
Removes all remaining workers from the queue.
Definition: commit_order_queue.cc:183
bool is_empty()
Whether or not there are more workers to commit.
Definition: commit_order_queue.cc:141
static sequence_type get_next_sequence_nr(sequence_type current_seq_nr)
Returns the expected next number in the ticket sequence.
Definition: commit_order_queue.cc:208
unsigned long value_type
Definition: commit_order_queue.h:56
Commit_order_queue::queue_type::enum_queue_state get_state()
Retrieves the error state for the current thread last executed queue operation.
Definition: commit_order_queue.cc:137
Commit_order_queue(size_t n_workers)
Constructor for the class, takes the number of workers and initializes the underlying static list wit...
Definition: commit_order_queue.cc:118
std::tuple< value_type, sequence_type > pop()
Removes from the queue and returns the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:147
Definition: shared_spin_lock.h:79
Templated class that encapsulates an std::atomic within a byte buffer that is padded to the processor...
Definition: aligned_atomic.h:157
Definition: commit_order_queue.h:33
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:113