MySQL  8.0.27
Source Code Documentation
commit_order_queue.h
Go to the documentation of this file.
1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
24 #define CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
25 #include <cstdint>
26 #include <vector>
27 
28 #include "sql/containers/integrals_lockfree_queue.h" // container::Integrals_lockfree_queue
29 #include "sql/locks/shared_spin_lock.h" // lock::Shared_spin_lock
30 #include "sql/mdl.h" // MDL_context
31 #include "sql/memory/aligned_atomic.h" // memory::Aligned_atomic
32 
33 namespace cs {
34 namespace apply {
35 /**
36  Queue to maintan the ordered sequence of workers waiting for commit.
37 
38  The queue has a static list of elements, each one representing each worker
39  commit information.
40 
41  The management of the order by which each worker will commit is implemented
42  using:
43 
44  - A member variable pointing to the first worker to commit, the `head`.
45  - A member variable pointing to the last worker to commit, the `tail`.
46  - Each queue element holds a member variable that points to the next worker to
47  commit, the `next`.
48  - Pushing a new element will move the `tail`.
49  - Poping an element will move the `head`.
50 
51  Atomics are used to make the queue thread-safe without the need for an
52  explicit lock.
53  */
55  public:
56  using value_type = unsigned long;
58  using sequence_type = unsigned long long;
59  static constexpr value_type NO_WORKER{
60  queue_type::null_value}; // No worker on the queue
61 
62  /**
63  Enumeration to represent each worker state
64  */
65  enum class enum_worker_stage {
66  REGISTERED, // Transaction was handed-over to worker, for applying
67  FINISHED_APPLYING, // Transaction execution finished
68  REQUESTED_GRANT, // Request for turn to commit has been placed
69  WAITED, // Waited for the turn to commit
70  FINISHED // Committed and finished processing the transaction
71  };
72 
73  /**
74  Queue element, holding the needed information to manage the commit ordering.
75  */
76  class Node {
77  public:
78  friend class Commit_order_queue;
79 
80  /** The identifier of the worker that maps to a queue index. */
82  /** The MDL context to be used to wait on the MDL graph. */
84  /** Which stage is the worker on. */
87 
88  /**
89  Marks the commit request sequence number this node's worker is
90  processing as frozen iff the sequence number current value is equal
91  to the `expected` parameter.
92 
93  Commit request sequence numbers are monotonically ever increasing
94  numbers that are used by worker threads to ensure ownership of the
95  worker commit turn unblocking operation:
96  1) A worker holding a sequence number `N` can only unblock worker
97  with sequence number `N + 1`.
98  2) A worker with sequence number `N + 1` can't be assigned a new
99  sequence number if worker with sequence number `N` is executing
100  the unblocking operation.
101 
102  @param expected the commit request sequence number this node must
103  currently hold in order for it to be frozen
104 
105  @return true if this nodes commit request sequence number has been
106  frozen, false otherwise.
107  */
109  /**
110  Removes the frozen mark from the commit request sequence number this
111  node's worker is processing iff it was previosly frozen.
112 
113  Commit request sequence numbers are monotonically ever increasing
114  numbers that are used by worker threads to ensure ownership of the
115  worker commit turn unblocking operation:
116  1) A worker holding a sequence number `N` can only unblock worker
117  with sequence number `N + 1`.
118  2) A worker with sequence number `N + 1` can't be assigned a new
119  sequence number if worker with sequence number `N` is executing
120  the unblocking operation.
121 
122  @param previously_frozen the sequence number value that was provided
123  while previously freezing.
124 
125  @return true if this nodes commit request sequence number was frozen
126  and is now unfrozen, false otherwise.
127  */
129  Commit_order_queue::sequence_type previously_frozen);
130 
131  private:
132  // No commit request sequence number assiged
134  // Commit request sequence number is marked as frozen
136 
137  /** The sequence number for the commit request this node's worker is
138  processing. */
141 
142  /**
143  Sets the commit request sequence number for this node as unassigned. If
144  the sequence number is currently frozen, invoking this method will make
145  the invoking thread to spin until the sequence number is unfrozen.
146 
147  Commit request sequence numbers are monotonically ever increasing
148  numbers that are used by worker threads to ensure ownership of the
149  worker commit turn unblocking operation:
150  1) A worker holding a sequence number `N` can only unblock worker
151  with sequence number `N + 1`.
152  2) A worker with sequence number `N + 1` can't be assigned a new
153  sequence number if worker with sequence number `N` is executing
154  the unblocking operation.
155 
156  @return the sequence number for the commit request sequence number
157  this node's worker is been cleared of
158  */
160  };
161 
162  /**
163  Iterator helper class to iterate over the Commit_order_queue following the
164  underlying commit order.
165 
166 
167  Check C++ documentation on `Iterator named requirements` for more
168  information on the implementation.
169  */
170  class Iterator {
171  public:
172  using difference_type = std::ptrdiff_t;
176  using iterator_category = std::forward_iterator_tag;
178 
179  explicit Iterator(Commit_order_queue &parent, index_type position);
180  Iterator(const Iterator &rhs);
181  Iterator(Iterator &&rhs);
182  virtual ~Iterator() = default;
183 
184  // BASIC ITERATOR METHODS //
186  Iterator &operator=(Iterator &&rhs);
187  Iterator &operator++();
189  // END / BASIC ITERATOR METHODS //
190 
191  // INPUT ITERATOR METHODS //
192  Iterator operator++(int);
194  bool operator==(Iterator const &rhs) const;
195  bool operator!=(Iterator const &rhs) const;
196  // END / INPUT ITERATOR METHODS //
197 
198  // OUTPUT ITERATOR METHODS //
199  // reference operator*(); <- already defined
200  // iterator operator++(int); <- already defined
201  // END / OUTPUT ITERATOR METHODS //
202 
203  // FORWARD ITERATOR METHODS //
204  // Enable support for both input and output iterator <- already enabled
205  // END / FORWARD ITERATOR METHODS //
206 
207  private:
208  /** The target queue that holds the list to be iterated. */
210  /** The iterator pointing to the underlying queue position. */
211  Commit_order_queue::queue_type::Iterator m_current;
212  };
213 
214  /**
215  Constructor for the class, takes the number of workers and initializes the
216  underlying static list with such size.
217 
218  @param n_workers The number of workers to include in the commit order
219  managerment.
220  */
221  Commit_order_queue(size_t n_workers);
222  /**
223  Default destructor for the class.
224  */
225  virtual ~Commit_order_queue() = default;
226  /**
227  Retrieve the commit order information Node for worker identified by `id`.
228 
229  @param id The identifier of the worker
230 
231  @return A reference to the commit order information Node for the given
232  worker.
233  */
235  /**
236  Retrieves the error state for the current thread last executed queue
237  operation. Values may be:
238 
239  - SUCCESS is the operation succeeded.
240  - NO_MORE_ELEMENTS if the last pop tried to access an empty queue.
241  - NO_SPACE_AVAILABLE if the last push tried to push while the queue was
242  full.
243 
244 
245  @return The error state for the thread's last operation on the queue.
246  */
248  /**
249  Whether or not there are more workers to commmit.
250 
251  @return True if there are no more workers, false otherwise.
252  */
253  bool is_empty();
254  /**
255  Removes from the queue and returns the identifier of the worker that is
256  first in-line to commit.
257 
258  If another thread is accessing the commit order sequence number and has
259  frozen it's state, this operation will spin until the state is
260  unfrozen.
261 
262  @return A tuple holding the identifier of the worker that is first
263  in-line to commit and the associated commit order sequence
264  number.
265  */
266  std::tuple<value_type, sequence_type> pop();
267  /**
268  Adds to the end of the commit queue the worker identifier passed as
269  parameter.
270 
271  @param id The identifier of the worker to add to the commit queue.
272  */
273  void push(value_type id);
274  /**
275  Retrieves the identifier of the worker that is first in-line to commit.
276 
277  @return The identifier of the worker that is first in-line to commit.
278  */
279  value_type front();
280  /**
281  Removes all remaining workers from the queue.
282  */
283  void clear();
284  /**
285  Acquires exclusivity over changes (push, pop) on the queue.
286  */
287  void freeze();
288  /**
289  Releases exclusivity over changes (push, pop) on the queue.
290  */
291  void unfreeze();
292  /**
293  Retrieves an iterator instance that points to the head of the commit queue
294  and that will iterate over the worker Nodes that are in-line to commit,
295  following the requested commit order.
296 
297  @return An instance of `Iterator` pointing to the queue's head, that
298  iterates over the workers that are in-line to commit and following
299  the requested commit order.
300  */
301  Iterator begin();
302  /**
303  Retrieves an iterator instance that points to the tail of the commit queue.
304 
305  @return An instance of `Iterator` that points to the tail of the
306  queue.
307  */
308  Iterator end();
309  /**
310  Retrieves the textual representation of this object's underlying commit
311  queue.
312 
313  @return The textual representation of this object's underlying commit queue.
314  */
315  std::string to_string();
316  /**
317  Friend operator for writing to an `std::ostream` object.
318 
319  @see `std::ostream::operator<<`
320  */
321  friend inline std::ostream &operator<<(std::ostream &out,
322  Commit_order_queue &to_output) {
323  out << to_output.to_string() << std::flush;
324  return out;
325  }
326 
327  private:
328  /** The commit sequence number counter */
331  /** The list of worker Nodes, indexed by worker ID. */
332  std::vector<Commit_order_queue::Node> m_workers;
333  /** The queue to hold the sequence of worker IDs waiting to commit. */
335  /** The lock to acquire exlusivity over changes on the queue. */
337 };
338 } // namespace apply
339 } // namespace cs
340 #endif // CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
Context of the owner of metadata locks.
Definition: mdl.h:1407
unsigned long long index_type
Definition: integrals_lockfree_queue.h:143
enum_queue_state
Definition: integrals_lockfree_queue.h:152
static constexpr value_type null_value
Definition: integrals_lockfree_queue.h:147
Iterator helper class to iterate over the Commit_order_queue following the underlying commit order.
Definition: commit_order_queue.h:170
Commit_order_queue * m_target
The target queue that holds the list to be iterated.
Definition: commit_order_queue.h:209
Iterator & operator=(const Iterator &rhs)
reference operator*()
Definition: commit_order_queue.cc:88
Commit_order_queue::queue_type::index_type index_type
Definition: commit_order_queue.h:177
Iterator & operator++()
Definition: commit_order_queue.cc:82
bool operator!=(Iterator const &rhs) const
Definition: commit_order_queue.cc:113
pointer operator->()
Definition: commit_order_queue.cc:102
Commit_order_queue::queue_type::Iterator m_current
The iterator pointing to the underlying queue position.
Definition: commit_order_queue.h:211
std::ptrdiff_t difference_type
Definition: commit_order_queue.h:172
std::forward_iterator_tag iterator_category
Definition: commit_order_queue.h:176
Iterator(Commit_order_queue &parent, index_type position)
Definition: commit_order_queue.cc:57
bool operator==(Iterator const &rhs) const
Definition: commit_order_queue.cc:108
Queue element, holding the needed information to manage the commit ordering.
Definition: commit_order_queue.h:76
memory::Aligned_atomic< Commit_order_queue::sequence_type > m_commit_sequence_nr
The sequence number for the commit request this node's worker is processing.
Definition: commit_order_queue.h:140
value_type m_worker_id
The identifier of the worker that maps to a queue index.
Definition: commit_order_queue.h:81
Commit_order_queue::sequence_type reset_commit_sequence_nr()
Sets the commit request sequence number for this node as unassigned.
Definition: commit_order_queue.cc:43
memory::Aligned_atomic< Commit_order_queue::enum_worker_stage > m_stage
Which stage is the worker on.
Definition: commit_order_queue.h:85
bool freeze_commit_sequence_nr(Commit_order_queue::sequence_type expected)
Marks the commit request sequence number this node's worker is processing as frozen iff the sequence ...
Definition: commit_order_queue.cc:29
MDL_context * m_mdl_context
The MDL context to be used to wait on the MDL graph.
Definition: commit_order_queue.h:83
static constexpr Commit_order_queue::sequence_type NO_SEQUENCE_NR
Definition: commit_order_queue.h:133
static constexpr Commit_order_queue::sequence_type SEQUENCE_NR_FROZEN
Definition: commit_order_queue.h:135
bool unfreeze_commit_sequence_nr(Commit_order_queue::sequence_type previously_frozen)
Removes the frozen mark from the commit request sequence number this node's worker is processing iff ...
Definition: commit_order_queue.cc:35
Queue to maintan the ordered sequence of workers waiting for commit.
Definition: commit_order_queue.h:54
friend std::ostream & operator<<(std::ostream &out, Commit_order_queue &to_output)
Friend operator for writing to an std::ostream object.
Definition: commit_order_queue.h:321
std::vector< Commit_order_queue::Node > m_workers
The list of worker Nodes, indexed by worker ID.
Definition: commit_order_queue.h:332
virtual ~Commit_order_queue()=default
Default destructor for the class.
void unfreeze()
Releases exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:182
Iterator begin()
Retrieves an iterator instance that points to the head of the commit queue and that will iterate over...
Definition: commit_order_queue.cc:186
enum_worker_stage
Enumeration to represent each worker state.
Definition: commit_order_queue.h:65
unsigned long long sequence_type
Definition: commit_order_queue.h:58
std::string to_string()
Retrieves the textual representation of this object's underlying commit queue.
Definition: commit_order_queue.cc:196
void freeze()
Acquires exclusivity over changes (push, pop) on the queue.
Definition: commit_order_queue.cc:178
Iterator end()
Retrieves an iterator instance that points to the tail of the commit queue.
Definition: commit_order_queue.cc:191
static constexpr value_type NO_WORKER
Definition: commit_order_queue.h:59
Node & operator[](value_type id)
Retrieve the commit order information Node for worker identified by id.
Definition: commit_order_queue.cc:125
void push(value_type id)
Adds to the end of the commit queue the worker identifier passed as parameter.
Definition: commit_order_queue.cc:158
value_type front()
Retrieves the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:172
memory::Aligned_atomic< sequence_type > m_commit_sequence_generator
The commit sequence number counter.
Definition: commit_order_queue.h:329
lock::Shared_spin_lock m_push_pop_lock
The lock to acquire exlusivity over changes on the queue.
Definition: commit_order_queue.h:336
queue_type m_commit_queue
The queue to hold the sequence of worker IDs waiting to commit.
Definition: commit_order_queue.h:334
void clear()
Removes all remaining workers from the queue.
Definition: commit_order_queue.cc:176
bool is_empty()
Whether or not there are more workers to commmit.
Definition: commit_order_queue.cc:137
unsigned long value_type
Definition: commit_order_queue.h:56
Commit_order_queue::queue_type::enum_queue_state get_state()
Retrieves the error state for the current thread last executed queue operation.
Definition: commit_order_queue.cc:133
Commit_order_queue(size_t n_workers)
Constructor for the class, takes the number of workers and initializes the underlying static list wit...
Definition: commit_order_queue.cc:118
std::tuple< value_type, sequence_type > pop()
Removes from the queue and returns the identifier of the worker that is first in-line to commit.
Definition: commit_order_queue.cc:143
Definition: shared_spin_lock.h:79
Definition: commit_order_queue.h:33
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:121