MySQL 9.1.0
Source Code Documentation
integrals_lockfree_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2020, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
25#define CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
26
27#include <algorithm>
28#include <atomic>
29#include <cmath>
30#include <map>
31#include <memory>
32#include <sstream>
33#include <thread>
34#include <tuple>
35
39
40namespace container {
41
42/**
43 Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer
44 (MC), circular FIFO queue for integral elements.
45
46 Monotonically ever increasing virtual indexes are used to keep track of
47 the head and tail pointer for the size-bounded circular queue. Virtual
48 indexes are translated into memory indexes by calculating the remainder
49 of the integer division of the virtual index by the queue capacity. The
50 maximum value of a virtual index is 2^63 - 1.
51
52 Head is the pointer to the virtual index of the first position that holds
53 an element to be popped, if any.
54
55 Tail is the pointer to the virtual index of the first available position
56 to push an element to, if any.
57
58 Template parameters are as follows:
59 - `T`: The integral type for the queue elements.
60 - `Null`: Value of type `T`, that will be used to mark a queue position
61 as empty.
62 - `Erased`: Value of type `T`, that will be used to mark an queue
63 position as erased.
64 - `I`: Type of indexing to be used by the underlying array in the form of
65 a class. Available classes are `container::Padded_indexing` and
66 `container::Interleaved_indexing`, check the classes documentation
67 for further details. The parameter defaults to
68 `container::Padded_indexing`.
69 - `A`: Type of memory allocator to be used, in the form of a class
70 (defaults to no allocator).
71
72 All the available operations are thread-safe, in the strict sense of no
73 memory problems rise from multiple threads trying to perform operations
74 concurrently.
75
76 However, being a lock-free structure, the queue may be changing at the
77 same time as operations access both pointers and values or a client of
78 the API evaluates the result of the invoked operation. The operation
79 results and returning states are always based on the thread local view of
80 the queue state, which may be safe or unsafe to proceed with the given
81 operation. Therefore, extra validations, client-side serialization and/or
82 retry mechanisms may be needed while using the queue operations.
83
84 Available methods are:
85 - `pop`: if the head of the queue points to a virtual index different
86 from the one pointed by the tail of the queue, removes the
87 element pointed to by the head of the queue, points the head to
88 the next virtual index and returns the retrieved value. If head
89 and tail of the queue point to the same virtual index, the queue
90 is empty, `Null` is returned and the thread operation state is
91 set to `NO_MORE_ELEMENTS`.
92 - `push`: if the tail of the queue points to a virtual index different
93 from the one pointed by head-plus-queue-capacity, sets the
94 position pointed by tail with the provided element and points
95 the tail to the next virtual index. If tail and head plus queue
96 capacity point to the same virtual index, the queue is full, no
97 operation is performed and the thread operation state is set to
98 `NO_SPACE_AVAILABLE`.
99 - `capacity`: maximum number of elements allowed to coexist in the queue.
100 - `head`: pointer to the virtual index of the first available element, if
101 any.
102 - `tail`: pointer to the virtual index of the first available position to
103 add an element to. Additionally, the value returned by `tail()`
104 can also give an lower-than approximation of the total amount
105 of elements already pushed into the queue -in between reading
106 the virtual index tail is pointing to and evaluating it, some
107 more elements may have been pushed.
108 - `front`: the first available element. If none, `Null` is returned.
109 - `back`: the last available element. If none, `Null` is returned.
110 - `clear`: sets all positions of the underlying array to `Null` and
111 resets the virtual index pointed to by both the head and the
112 tail of the queue to `0`.
113 - `is_empty`: whether or not the head and tail of the queue point to the
114 same virtual index.
115 - `is_full`: whether or not the tail and head-plus-queue-capacity point
116 to the same virtual index.
117 - `erase_if`: traverses the queue, starting at the queue relative memory
118 index 0, stopping at the relative memory index
119 `capacity() - 1` and invoking the passed-on predicate over each
120 position value, while disregarding positions that have
121 `Null` or `Erased` values.
122
123 Note that, if `Null` and `Erased` hold the same value, the resulting class
124 will not include the `erase_if` method.
125
126 */
127template <typename T, T Null = std::numeric_limits<T>::max(), T Erased = Null,
129 typename A = std::nullptr_t>
131 static_assert(
132 std::is_integral<T>::value,
133 "class `Integrals_lockfree_queue` requires an integral type as a first "
134 "template parameter");
135
136 public:
137 using pointer_type = T *;
138 using const_pointer_type = T const *;
139 using reference_type = T &;
140 using const_reference_type = T const &;
141 using value_type = T;
142 using const_value_type = T const;
143 using element_type = std::atomic<T>;
144 using index_type = unsigned long long;
147
148 static constexpr T null_value = Null;
149 static constexpr T erased_value = Erased;
150 static constexpr index_type set_bit = 1ULL << 63;
151 static constexpr index_type clear_bit = set_bit - 1;
152
153 enum class enum_queue_state : short {
154 SUCCESS = 0, // Last operation was successful
155 NO_MORE_ELEMENTS = -1, // Last operation was unsuccessful because there
156 // are no elements in the queue
157 NO_SPACE_AVAILABLE = -2 // Last operation was unsuccessful because there
158 // is no space available
159 };
160
161 /**
162 Iterator helper class to iterate over the queue staring at the virtual
163 index pointed to by the head, up until the virtual index pointed to by
164 the tail.
165
166 Being an iterator over a lock-free structure, it will not be
167 invalidated upon queue changes since operations are thread-safe and no
168 invalid memory access should stem from iterating over and changing the
169 queue, simultaneously.
170
171 However, the following possible iteration scenarios, not common in
172 non-thread-safe structures, should be taken into consideration and
173 analysed while using standard library features that use the `Iterator`
174 requirement, like `for_each` loops, `std::find`, `std::find_if`, etc:
175
176 a) The iteration never stops because there is always an element being
177 pushed before the queue `end()` method is invoked.
178
179 b) The iterator points to `Null` values at the beginning of the
180 iteration because elements were popped just after the queue `begin()`
181 method is invoked.
182
183 c) The iterator points to `Null` or `Erased` in between pointing to
184 values different from `Null` or `Erased`.
185
186 d) The iterator may point to values that do not correspond to the
187 virtual index being held by the iterator, in the case of the amount
188 of both pop and push operations between two iteration loops was
189 higher then the queue `capacity()`.
190
191 If one of the above scenarios is harmful to your use-case, an
192 additional serialization mechanism may be needed to iterate over the
193 queue. Or another type of structure may be more adequate.
194
195 Check C++ documentation for the definition of `Iterator` named
196 requirement for more information.
197 */
198 class Iterator {
199 public:
200 using difference_type = std::ptrdiff_t;
201 using value_type = T;
202 using pointer = T *;
203 using reference = T;
204 using iterator_category = std::forward_iterator_tag;
205
206 explicit Iterator(
208 index_type position);
209 Iterator(const Iterator &rhs);
210 Iterator(Iterator &&rhs);
211 virtual ~Iterator() = default;
212
213 // BASIC ITERATOR METHODS //
214 Iterator &operator=(const Iterator &rhs);
215 Iterator &operator=(Iterator &&rhs);
216 Iterator &operator++();
217 reference operator*() const;
218 // END / BASIC ITERATOR METHODS //
219
220 // INPUT ITERATOR METHODS //
221 Iterator operator++(int);
222 pointer operator->() const;
223 bool operator==(Iterator const &rhs) const;
224 bool operator!=(Iterator const &rhs) const;
225 // END / INPUT ITERATOR METHODS //
226
227 // OUTPUT ITERATOR METHODS //
228 // reference operator*(); <- already defined
229 // iterator operator++(int); <- already defined
230 // END / OUTPUT ITERATOR METHODS //
231
232 // FORWARD ITERATOR METHODS //
233 // Enable support for both input and output iterator <- already enabled
234 // END / FORWARD ITERATOR METHODS //
235
236 /**
237 Sets the value of the element the iterator is pointing to the given
238 parameter.
239
240 @param new_value The new value to set the element to.
241 */
242 void set(value_type new_value);
243
244 private:
245 /** The position of the element this iterator is pointing to. */
246 index_type m_current{std::numeric_limits<index_type>::max()};
247 /** The reference to the queue holding the elements. */
249 };
250
251 /**
252 Constructor allowing a specific memory allocator and a specific queue
253 capacity.
254
255 The queue allocated memory may differ from `capacity() * sizeof(T)`
256 since additional space may be required to prevent false sharing between
257 threads.
258
259 @param alloc The memory allocator instance
260 @param size The queue maximum capacity
261 */
262 template <
263 typename D = T, T M = Null, T F = Erased, typename J = I, typename B = A,
264 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> * = nullptr>
266 /**
267 Constructor allowing specific queue capacity.
268
269 The queue allocated memory may differ from `capacity() * sizeof(T)`
270 since additional space may be required to prevent false sharing between
271 threads.
272
273 @param size The queue maximum capacity
274 */
275 template <
276 typename D = T, T M = Null, T F = Erased, typename J = I, typename B = A,
277 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> * = nullptr>
279 // Deleted copy and move constructors.
284 //
285 /**
286 Destructor for the class.
287 */
288 virtual ~Integrals_lockfree_queue() = default;
289 // Deleted copy and move operators.
294 //
295
296 /**
297 Returns the underlying instance of `memory::Atomics_array` which holds
298 the allocated memory for the array of `std::atomic<T>` elements.
299
300 @return the underlying instance of `memory::Atomics_array`
301 */
303 /**
304 Sets all queue positions to `Null` and points the head and tail of the
305 queue to the `0` virtual index.
306 */
307 void clear();
308 /**
309 Retrieves whether or not the head and tail of the queue are pointing to
310 the same virtual index.
311
312 No evaluation of the value held in the given position is made. If, for
313 instance, head and tail point to consecutive virtual indexes and the
314 value stored in the position pointed to by head is `Erased`, `is_empty`
315 will return false and `pop` will return `Null` and trigger a
316 `NO_MORE_ELEMENTS` state change.
317
318 @return true if there aren't more elements to be popped, false otherwise
319 */
320 bool is_empty() const;
321 /**
322 Retrieves whether or not the tail of the queue is pointing to the same
323 virtual index as the computed by adding the queue `capacity()` to the
324 virtual index pointed to by the head of the queue.
325
326 No evaluation of the value held in the given position is made. If, for
327 instance, all the values stored in the positions between head and tail
328 are `Erased`, `is_full` will return true and `pop` will return `Null`
329 and trigger a `NO_MORE_ELEMENTS` state change.
330
331 @return true if there isn't space for more elements to be pushed, false
332 otherwise
333 */
334 bool is_full() const;
335 /**
336 Retrieves the virtual index that the head of the queue is pointing to.
337
338 @return the virtual index the head of the queue is pointing to
339 */
341 /**
342 Retrieves the virtual index that the tail of the queue is pointing to.
343
344 @return the virtual index the tail of the queue is pointing to
345 */
347 /**
348 Retrieves the element at the front of the queue, i.e. the value stored
349 in the virtual index pointed to by the head of the queue.
350
351 The returned value may be `Null`, `Erased` or whatever value that is
352 held by the given virtual index position at the moment it's accessed.
353
354 As this method is an alias for `array()[head()]`, the queue may be
355 changed concurrently and, because it is a circular queue, it is
356 possible for this method to return a value that has not been popped yet
357 and it will not be popped in the next call for `pop()` (the circular
358 queue logic made the tail to wrap and overlap the thread local value of
359 head).
360
361 @return the element at the front of the queue
362 */
364 /**
365 Retrieves the value of the position at the back of the queue, i.e. the
366 value stored in the virtual index just prior to the virtual index
367 pointed to by the tail of the queue.
368
369 The returned value may be `Null` or `Erased`, whatever value that is
370 held by the given virtual index position.
371
372 As this method is an alias for `array()[tail()]`, the queue may be
373 changed concurrently and it is possible for this method to return a
374 value assigned to a position outside the bounds of the head and tail of
375 the queue (between thread-local fetch of the tail pointer and the
376 access to the position indexed by the local value, operations moved the
377 head to a virtual index higher than the locally stored). This means
378 that `Null` may be returned or that a value that is currently being
379 popped may be returned.
380
381 @return the element at the back of the queue
382 */
384 /**
385 Retrieves the value at the virtual index pointed by the head of the
386 queue, clears that position, updates the virtual index stored in the
387 head and clears the value returned by `get_state()`, setting it to
388 `SUCCESS`.
389
390 If the head of the queue points to a virtual index that has no element
391 assigned (queue is empty), the operation fails, `Null` is stored in the
392 `out` parameter and the value returned by `get_state()` is
393 `NO_MORE_ELEMENTS`.
394
395 @param out The variable reference to store the value in.
396
397 @return The reference to `this` object, for chaining purposes.
398 */
400 reference_type out);
401 /**
402 Takes the value passed on as a parameter, stores it in the virtual
403 index pointed to by the tail of the queue, updates the virtual index
404 stored in the tail and clears the value returned by `get_state()`,
405 setting it to `SUCCESS`.
406
407 If the tail of the queue points to a virtual index that has already an
408 element assigned (queue is full), the operation fails and the value
409 returned by `get_state()` is `NO_SPACE AVAILABLE`.
410
411 @param to_push The value to push into the queue.
412
413 @return The reference to `this` object, for chaining purposes.
414 */
416 const_reference_type to_push);
417 /**
418 Retrieves the value at the virtual index pointed by the head of the
419 queue, clears that position, updates the virtual index stored in the
420 head and clears the value returned by `get_state()`, setting it to
421 `SUCCESS`.
422
423 If the head of the queue points to a virtual index that has no element
424 assigned yet (queue is empty), the operation returns `Null` and the
425 value returned by `get_state()` is `NO_MORE_ELEMENTS`.
426
427 @return The value retrieved from the queue or `Null` if no element is
428 available for popping
429 */
431 /**
432 Takes the value passed on as a parameter, stores it in the virtual
433 index pointed to by the tail of the queue, updates the virtual index
434 stored in the tail and clears the value returned by `get_state()`,
435 setting it to `SUCCESS`.
436
437 If the tail of the queue points to a virtual index that has already an
438 element assigned (queue is full), the operation fails and the value
439 returned by `get_state()` is `NO_SPACE AVAILABLE`.
440
441 @param to_push The value to push into the queue.
442
443 @return The reference to `this` object, for chaining purposes.
444 */
446 /**
447 Retrieves an iterator instance that points to the same position pointed
448 by the head of the queue.
449
450 Please, be aware that, while using standard library features that use
451 the `Iterator` requirement, like `for_each` loops, `std::find`,
452 `std::find_if`, etc:
453
454 - The iterator may point to `Null` values at the beginning of the
455 iteration because elements were popped just after the queue `begin()`
456 method is invoked, the moment when the iterator pointing to the same
457 virtual index as the head of the queue is computed.
458
459 @return An instance of an iterator instance that points to virtual
460 index pointed by the head of the queue
461 */
462 Iterator begin() const;
463 /**
464 Retrieves an iterator instance that points to the same position pointed
465 by the tail of the queue.
466
467 Please, be aware that, while using standard library features that use
468 the `Iterator` requirement, like `for_each` loops, `std::find`,
469 `std::find_if`, etc:
470
471 - The iteration may never stop because there is always an element being
472 pushed before the queue `end()` method is invoked, the moment when
473 the iterator pointing to the same virtual index has the tail of the
474 queue is computed.
475
476 @return An instance of an iterator instance that points to virtual
477 index pointed by the tail of the queue
478 */
479 Iterator end() const;
480 /**
481 Erases values from the queue. The traversing is linear and not in
482 between the virtual indexes pointed to by the head and the tail of the
483 queue but rather between 0 and `Integrals_lockfree_queue::capacity() -
484 1`.
485
486 An element may be conditionally erased according to the evaluation of
487 the predicate `predicate` which should be any predicate which is
488 translatable to `[](value_type value) -> bool`. If the predicate
489 evaluates to `true`, the value is replace by `Erased`.
490
491 If both `Null` and `Erased` evaluate to the same value, this method
492 will not be available after the template substitutions since erased
493 values must be identifiable by the pop and push operations.
494
495 Check C++ documentation for the definition of `Predicate` named
496 requirement for more information.
497
498 @param predicate The predicate invoked upon a given queue position and
499 if evaluated to `true` will force the removal of such
500 element.
501
502 @return The number of values erased.
503 */
504 template <typename D = T, T M = Null, T F = Erased, typename J = I,
505 typename B = A, typename Pred, std::enable_if_t<M != F> * = nullptr>
506 size_t erase_if(Pred predicate);
507 /**
508 Returns the maximum number of elements allowed to coexist in the queue.
509
510 @return The maximum number of elements allowed to coexist in the queue
511 */
512 size_t capacity() const;
513 /**
514 Returns the amount of bytes needed to store the maximum number of
515 elements allowed to coexist in the queue.
516
517 @return the amount of bytes needed to store the maximum number of
518 elements allowed to coexist in th queue
519 */
520 size_t allocated_size() const;
521 /**
522 Clears the error state of the last performed operations, if any. The
523 operation state is a thread storage duration variable, making it a
524 per-thread state.
525
526 @return The reference to `this` object, for chaining purposes.
527 */
529 /**
530 Retrieves the error/success state of the last performed operation. The
531 operation state is a thread storage duration variable, making it a
532 per-thread state.
533
534 Possible values are:
535 - `SUCCESS` if the operation was successful
536 - `NO_MORE_ELEMENTS` if there are no more elements to pop
537 - `NO_SPACE_AVAILABLE` if there is no more room for pushing elements
538
539 State may be changed by any of the `pop`, `push` operations.
540
541 @return the error/success state of the invoking thread last operation.
542 */
544 /**
545 Return `this` queue textual representation.
546
547 @return the textual representation for `this` queue.
548 */
549 std::string to_string() const;
550
551 friend std::ostream &operator<<(
552 std::ostream &out,
554 out << in.to_string() << std::flush;
555 return out;
556 }
557
558 private:
559 /** The maximum allowed number of element allowed to coexist in the queue */
560 size_t m_capacity{0};
561 /** The array of atomics in which the elements will be stored */
563 /** The virtual index being pointed to by the head of the queue */
564 atomic_type m_head{0};
565 /** The virtual index being pointed to by the tail of the queue */
566 atomic_type m_tail{0};
567
568 /**
569 Translates a virtual monotonically increasing index into an index bounded to
570 the queue capacity.
571 */
572 size_t translate(index_type from) const;
573 /**
574 Retrieves the thread storage duration operation state variable.
575 */
577};
578} // namespace container
579
580#ifndef IN_DOXYGEN // Doxygen doesn't understand this construction.
581template <typename T, T Null, T Erased, typename I, typename A>
583 Integrals_lockfree_queue<T, Null, Erased, I, A> const &parent,
584 Integrals_lockfree_queue<T, Null, Erased, I, A>::index_type position)
585 : m_current{position}, m_parent{&parent} {}
586#endif
587
588template <typename T, T Null, T Erased, typename I, typename A>
590 const Iterator &rhs)
591 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {}
592
593template <typename T, T Null, T Erased, typename I, typename A>
595 Iterator &&rhs)
596 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {
597 rhs.m_current = std::numeric_limits<index_type>::max();
598 rhs.m_parent = nullptr;
599}
600
601template <typename T, T Null, T Erased, typename I, typename A>
604 const Iterator &rhs) {
605 this->m_current = rhs.m_current;
606 this->m_parent = rhs.m_parent;
607 return (*this);
608}
609
610template <typename T, T Null, T Erased, typename I, typename A>
613 Iterator &&rhs) {
614 this->m_current = rhs.m_current;
615 this->m_parent = rhs.m_parent;
616 rhs.m_current = std::numeric_limits<index_type>::max();
617 rhs.m_parent = nullptr;
618 return (*this);
619}
620
621template <typename T, T Null, T Erased, typename I, typename A>
624 A>::Iterator::operator++() {
625 ++this->m_current;
626 return (*this);
627}
628
629template <typename T, T Null, T Erased, typename I, typename A>
630typename container::Integrals_lockfree_queue<T, Null, Erased, I,
633 A>::Iterator::operator*() const {
634 return this->m_parent->m_array[this->m_parent->translate(this->m_current)];
635}
636
637template <typename T, T Null, T Erased, typename I, typename A>
640 A>::Iterator::operator++(int) {
641 auto to_return = (*this);
642 ++(*this);
643 return to_return;
644}
645
646template <typename T, T Null, T Erased, typename I, typename A>
647typename container::Integrals_lockfree_queue<T, Null, Erased, I,
650 A>::Iterator::operator->() const {
651 return &(this->m_parent->m_array[this->m_parent->translate(this->m_current)]);
652}
653
654template <typename T, T Null, T Erased, typename I, typename A>
656 T, Null, Erased, I, A>::Iterator::operator==(Iterator const &rhs) const {
657 return this->m_current == rhs.m_current && this->m_parent == rhs.m_parent;
658}
659
660template <typename T, T Null, T Erased, typename I, typename A>
662 T, Null, Erased, I, A>::Iterator::operator!=(Iterator const &rhs) const {
663 return !((*this) == rhs);
664}
665
666template <typename T, T Null, T Erased, typename I, typename A>
668 value_type new_value) {
669 this->m_parent->m_array[this->m_parent->translate(this->m_current)].store(
670 new_value);
671}
672
673template <typename T, T Null, T Erased, typename I, typename A>
674template <typename D, T M, T F, typename J, typename B,
675 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> *>
678 size_t size)
679 : m_capacity{size},
680 m_array{alloc, size, null_value},
681 m_head{0},
682 m_tail{0} {}
683
684template <typename T, T Null, T Erased, typename I, typename A>
685template <typename D, T M, T F, typename J, typename B,
686 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> *>
689 : m_capacity{size}, m_array{size, null_value}, m_head{0}, m_tail{0} {}
690
691template <typename T, T Null, T Erased, typename I, typename A>
692typename container::Integrals_lockfree_queue<T, Null, Erased, I,
693 A>::array_type &
695 return this->m_array;
696}
697
698template <typename T, T Null, T Erased, typename I, typename A>
700 this->clear_state();
701 for (size_t idx = 0; idx != this->m_array.size(); ++idx)
702 this->m_array[idx].store(Null);
703 this->m_head->store(0);
704 this->m_tail->store(0);
705}
706
707template <typename T, T Null, T Erased, typename I, typename A>
709 const {
710 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
711 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
712 return head == tail;
713}
714
715template <typename T, T Null, T Erased, typename I, typename A>
717 const {
718 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
719 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
720 return tail == head + this->capacity();
721}
722
723template <typename T, T Null, T Erased, typename I, typename A>
726 return this->m_head->load(std::memory_order_seq_cst) & clear_bit;
727}
728
729template <typename T, T Null, T Erased, typename I, typename A>
732 return this->m_tail->load(std::memory_order_seq_cst) & clear_bit;
733}
734
735template <typename T, T Null, T Erased, typename I, typename A>
738 auto head = this->head();
739 auto to_return =
740 this->m_array[this->translate(head)].load(std::memory_order_seq_cst);
741 return to_return == Erased ? Null : to_return;
742}
743
744template <typename T, T Null, T Erased, typename I, typename A>
747 auto tail = this->tail();
748 if (tail == 0) return Null;
749 auto to_return =
750 this->m_array[this->translate(tail - 1)].load(std::memory_order_seq_cst);
751 return to_return == Erased ? Null : to_return;
752}
753
754template <typename T, T Null, T Erased, typename I, typename A>
757 reference_type out) {
758 out = this->pop();
759 return (*this);
760}
761
762template <typename T, T Null, T Erased, typename I, typename A>
765 const_reference_type to_push) {
766 return this->push(to_push);
767}
768
769template <typename T, T Null, T Erased, typename I, typename A>
771 this->clear_state();
772 for (; true;) {
773 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
774 auto tail = this->m_tail->load(std::memory_order_relaxed) & clear_bit;
775
776 if (head == tail) {
777 this->state() = enum_queue_state::NO_MORE_ELEMENTS;
778 break;
779 }
780
781 auto new_head = head + 1;
782 new_head |= set_bit; // Set the occupied bit
783 if (this->m_head->compare_exchange_strong(
784 head, new_head, // Concurrent pop may have reached the CAS first
785 // or the occupied bit hasn't been unset yet
786 std::memory_order_release)) {
787 auto &current = this->m_array[this->translate(head)];
788
789 for (; true;) {
790 value_type value = current.load();
791 if (value != Null && // It may be `Null` if some concurrent push
792 // operation hasn't finished setting the
793 // element value
794 current.compare_exchange_strong(
795 value, Null, // It may have been set to `Erased` concurrently
796 std::memory_order_release)) {
797 new_head &= clear_bit; // Unset the occupied bit, signaling that
798 // finished popping
799 this->m_head->store(new_head, std::memory_order_seq_cst);
800 if (value == Erased) { // If the element was `Erased`, try to
801 // pop again
802 break;
803 }
804 return value;
805 }
806 std::this_thread::yield();
807 }
808 }
809 std::this_thread::yield();
810 }
811 return Null;
812}
813
814template <typename T, T Null, T Erased, typename I, typename A>
817 value_type to_push) {
818 assert(to_push != Null && to_push != Erased);
819 this->clear_state();
820 for (; true;) {
821 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
822 auto head = this->m_head->load(std::memory_order_relaxed) & clear_bit;
823
824 if (tail == head + this->m_capacity) {
825 this->state() = enum_queue_state::NO_SPACE_AVAILABLE;
826 return (*this);
827 }
828
829 auto new_tail = tail + 1;
830 new_tail |= set_bit; // Set the occupied bit
831 if (this->m_tail->compare_exchange_strong(
832 tail, new_tail, // Concurrent push may have reached the CAS first
833 // or the occupied bit hasn't been unset yet
834 std::memory_order_release)) {
835 auto &current = this->m_array[this->translate(tail)];
836
837 for (; true;) {
838 T null_ref{Null};
839 if (current.compare_exchange_strong(
840 null_ref, to_push, // It may not be `Null` if some concurrent
841 // pop operation hasn't finished setting the
842 // element value to `Null
843 std::memory_order_acquire)) {
844 new_tail &= clear_bit; // Unset the occupied bit, signaling that
845 // finished pushing
846 this->m_tail->store(new_tail, std::memory_order_seq_cst);
847 break;
848 }
849 std::this_thread::yield();
850 }
851 break;
852 }
853 std::this_thread::yield();
854 }
855 return (*this);
856}
857
858template <typename T, T Null, T Erased, typename I, typename A>
861 return Iterator{*this, this->head()};
862}
863
864template <typename T, T Null, T Erased, typename I, typename A>
867 return Iterator{*this, this->tail()};
868}
869
870template <typename T, T Null, T Erased, typename I, typename A>
871template <typename D, T M, T F, typename J, typename B, typename Pred,
872 std::enable_if_t<M != F> *>
874 Pred predicate) {
875 this->clear_state();
876 size_t erased{0};
877 for (size_t idx = 0; idx != this->m_capacity; ++idx) {
878 auto &current = this->m_array[idx];
879 T value = current.load(std::memory_order_acquire);
880 if (value != Null && value != Erased && predicate(value)) {
881 if (current.compare_exchange_strong(value, Erased,
882 std::memory_order_release)) {
883 ++erased;
884 }
885 }
886 }
887 return erased;
888}
889
890template <typename T, T Null, T Erased, typename I, typename A>
892 const {
893 return this->m_capacity;
894}
895
896template <typename T, T Null, T Erased, typename I, typename A>
897size_t container::Integrals_lockfree_queue<T, Null, Erased, I,
898 A>::allocated_size() const {
899 return this->m_array.allocated_size();
900}
901
902template <typename T, T Null, T Erased, typename I, typename A>
905 this->state() = enum_queue_state::SUCCESS;
906 return (*this);
907}
908
909template <typename T, T Null, T Erased, typename I, typename A>
910typename container::Integrals_lockfree_queue<T, Null, Erased, I,
911 A>::enum_queue_state
913 return this->state();
914}
915
916template <typename T, T Null, T Erased, typename I, typename A>
917std::string
920 for (auto value : (*this)) {
921 out << (value == Null
922 ? "Null"
923 : (value == Erased ? "Erased" : std::to_string(value)))
924 << ", ";
925 }
926 out << "EOF" << std::flush;
927 return out.str();
928}
929
930template <typename T, T Null, T Erased, typename I, typename A>
932 index_type from) const {
933 return static_cast<size_t>(from % static_cast<index_type>(this->m_capacity));
934}
935
936template <typename T, T Null, T Erased, typename I, typename A>
937typename container::Integrals_lockfree_queue<T, Null, Erased, I,
938 A>::enum_queue_state &
940 // TODO: garbage collect this if queues start to be used more dynamically
941 static thread_local std::map<
944 state;
945 return state[this];
946}
947
948#endif // CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
Array of std::atomic elements of type T.
Definition: atomics_array.h:84
Iterator helper class to iterate over the queue staring at the virtual index pointed to by the head,...
Definition: integrals_lockfree_queue.h:198
Integrals_lockfree_queue< T, Null, Erased, I, A > const * m_parent
The reference to the queue holding the elements.
Definition: integrals_lockfree_queue.h:248
T * pointer
Definition: integrals_lockfree_queue.h:202
Iterator & operator=(const Iterator &rhs)
Definition: integrals_lockfree_queue.h:603
index_type m_current
The position of the element this iterator is pointing to.
Definition: integrals_lockfree_queue.h:246
void set(value_type new_value)
Sets the value of the element the iterator is pointing to the given parameter.
Definition: integrals_lockfree_queue.h:667
T reference
Definition: integrals_lockfree_queue.h:203
T value_type
Definition: integrals_lockfree_queue.h:201
std::ptrdiff_t difference_type
Definition: integrals_lockfree_queue.h:200
std::forward_iterator_tag iterator_category
Definition: integrals_lockfree_queue.h:204
Iterator(Integrals_lockfree_queue< T, Null, Erased, I, A > const &parent, index_type position)
Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer (MC), circular FIFO queue fo...
Definition: integrals_lockfree_queue.h:130
Integrals_lockfree_queue< T, Null, Erased, I, A > & push(value_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:816
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
friend std::ostream & operator<<(std::ostream &out, Integrals_lockfree_queue< T, Null, Erased, I, A > const &in)
Definition: integrals_lockfree_queue.h:551
size_t erase_if(Pred predicate)
Erases values from the queue.
Definition: integrals_lockfree_queue.h:873
value_type pop()
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:770
atomic_type m_tail
The virtual index being pointed to by the tail of the queue.
Definition: integrals_lockfree_queue.h:566
T const * const_pointer_type
Definition: integrals_lockfree_queue.h:138
unsigned long long index_type
Definition: integrals_lockfree_queue.h:144
Iterator begin() const
Retrieves an iterator instance that points to the same position pointed by the head of the queue.
Definition: integrals_lockfree_queue.h:860
size_t translate(index_type from) const
Translates a virtual monotonically increasing index into an index bounded to the queue capacity.
Definition: integrals_lockfree_queue.h:931
Integrals_lockfree_queue< T, Null, Erased, I, A > & clear_state()
Clears the error state of the last performed operations, if any.
Definition: integrals_lockfree_queue.h:904
size_t capacity() const
Returns the maximum number of elements allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:891
enum_queue_state get_state() const
Retrieves the error/success state of the last performed operation.
Definition: integrals_lockfree_queue.h:912
enum_queue_state
Definition: integrals_lockfree_queue.h:153
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
index_type head() const
Retrieves the virtual index that the head of the queue is pointing to.
Definition: integrals_lockfree_queue.h:725
std::string to_string() const
Return this queue textual representation.
Definition: integrals_lockfree_queue.h:918
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
static constexpr T null_value
Definition: integrals_lockfree_queue.h:148
bool is_full() const
Retrieves whether or not the tail of the queue is pointing to the same virtual index as the computed ...
Definition: integrals_lockfree_queue.h:716
value_type front() const
Retrieves the element at the front of the queue, i.e.
Definition: integrals_lockfree_queue.h:737
T * pointer_type
Definition: integrals_lockfree_queue.h:137
Integrals_lockfree_queue(size_t size)
Constructor allowing specific queue capacity.
Definition: integrals_lockfree_queue.h:688
bool is_empty() const
Retrieves whether or not the head and tail of the queue are pointing to the same virtual index.
Definition: integrals_lockfree_queue.h:708
T const & const_reference_type
Definition: integrals_lockfree_queue.h:140
std::atomic< T > element_type
Definition: integrals_lockfree_queue.h:143
value_type back() const
Retrieves the value of the position at the back of the queue, i.e.
Definition: integrals_lockfree_queue.h:746
size_t allocated_size() const
Returns the amount of bytes needed to store the maximum number of elements allowed to coexist in the ...
Definition: integrals_lockfree_queue.h:898
enum_queue_state & state() const
Retrieves the thread storage duration operation state variable.
Definition: integrals_lockfree_queue.h:939
T value_type
Definition: integrals_lockfree_queue.h:141
array_type m_array
The array of atomics in which the elements will be stored.
Definition: integrals_lockfree_queue.h:562
virtual ~Integrals_lockfree_queue()=default
Destructor for the class.
atomic_type m_head
The virtual index being pointed to by the head of the queue.
Definition: integrals_lockfree_queue.h:564
T & reference_type
Definition: integrals_lockfree_queue.h:139
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
size_t m_capacity
The maximum allowed number of element allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:560
Iterator end() const
Retrieves an iterator instance that points to the same position pointed by the tail of the queue.
Definition: integrals_lockfree_queue.h:866
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator>>(reference_type out)
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:756
index_type tail() const
Retrieves the virtual index that the tail of the queue is pointing to.
Definition: integrals_lockfree_queue.h:731
T const const_value_type
Definition: integrals_lockfree_queue.h:142
void clear()
Sets all queue positions to Null and points the head and tail of the queue to the 0 virtual index.
Definition: integrals_lockfree_queue.h:699
Integrals_lockfree_queue(A &alloc, size_t size)
Constructor allowing a specific memory allocator and a specific queue capacity.
Definition: integrals_lockfree_queue.h:677
array_type & array()
Returns the underlying instance of memory::Atomics_array which holds the allocated memory for the arr...
Definition: integrals_lockfree_queue.h:694
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator<<(const_reference_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:764
Indexing provider that pads each of the array elements to the size of the CPU cache line,...
Definition: atomics_array_index_padding.h:57
ostream & operator<<(ostream &os, const Datetime &)
Definition: logger.cc:35
#define SUCCESS
Definition: completion_hash.h:27
#define M
Definition: ctype-tis620.cc:73
bool store(THD *thd, const Table *tp)
Stores the SDI for a table.
Definition: sdi.cc:607
static std::string to_string(const LEX_STRING &str)
Definition: lex_string.h:50
bool operator!=(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:158
bool operator==(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:151
borrowable::binary::Null Null
Definition: classic_protocol_binary.h:337
Definition: atomics_array.h:39
size_t size(const char *const c)
Definition: base64.h:46
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:114
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2872
std::set< Key, Compare, ut::allocator< Key > > set
Specialization of set which uses ut_allocator.
Definition: ut0new.h:2884
std::map< Key, Value, Compare, ut::allocator< std::pair< const Key, Value > > > map
Specialization of map which uses ut_allocator.
Definition: ut0new.h:2894
static bool push(mem_root_deque< Item * > *items, qep_row::mem_root_str &s, Item_null *nil)
Definition: opt_explain_traditional.cc:94
Ssl_acceptor_context_property_type & operator++(Ssl_acceptor_context_property_type &property_type)
Increment operator for Ssl_acceptor_context_type Used by iterator.
Definition: ssl_acceptor_context_data.cc:273