MySQL 8.3.0
Source Code Documentation
integrals_lockfree_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2020, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
24#define CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
25
26#include <algorithm>
27#include <atomic>
28#include <cmath>
29#include <map>
30#include <memory>
31#include <sstream>
32#include <thread>
33#include <tuple>
34
38
39namespace container {
40
41/**
42 Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer
43 (MC), circular FIFO queue for integral elements.
44
45 Monotonically ever increasing virtual indexes are used to keep track of
46 the head and tail pointer for the size-bounded circular queue. Virtual
47 indexes are translated into memory indexes by calculating the remainder
48 of the integer division of the virtual index by the queue capacity. The
49 maximum value of a virtual index is 2^63 - 1.
50
51 Head is the pointer to the virtual index of the first position that holds
52 an element to be popped, if any.
53
54 Tail is the pointer to the virtual index of the first available position
55 to push an element to, if any.
56
57 Template parameters are as follows:
58 - `T`: The integral type for the queue elements.
59 - `Null`: Value of type `T`, that will be used to mark a queue position
60 as empty.
61 - `Erased`: Value of type `T`, that will be used to mark an queue
62 position as erased.
63 - `I`: Type of indexing to be used by the underlying array in the form of
64 a class. Available classes are `container::Padded_indexing` and
65 `container::Interleaved_indexing`, check the classes documentation
66 for further details. The parameter defaults to
67 `container::Padded_indexing`.
68 - `A`: Type of memory allocator to be used, in the form of a class
69 (defaults to no allocator).
70
71 All the available operations are thread-safe, in the strict sense of no
72 memory problems rise from multiple threads trying to perform operations
73 concurrently.
74
75 However, being a lock-free structure, the queue may be changing at the
76 same time as operations access both pointers and values or a client of
77 the API evaluates the result of the invoked operation. The operation
78 results and returning states are always based on the thread local view of
79 the queue state, which may be safe or unsafe to proceed with the given
80 operation. Therefore, extra validations, client-side serialization and/or
81 retry mechanisms may be needed while using the queue operations.
82
83 Available methods are:
84 - `pop`: if the head of the queue points to a virtual index different
85 from the one pointed by the tail of the queue, removes the
86 element pointed to by the head of the queue, points the head to
87 the next virtual index and returns the retrieved value. If head
88 and tail of the queue point to the same virtual index, the queue
89 is empty, `Null` is returned and the thread operation state is
90 set to `NO_MORE_ELEMENTS`.
91 - `push`: if the tail of the queue points to a virtual index different
92 from the one pointed by head-plus-queue-capacity, sets the
93 position pointed by tail with the provided element and points
94 the tail to the next virtual index. If tail and head plus queue
95 capacity point to the same virtual index, the queue is full, no
96 operation is performed and the thread operation state is set to
97 `NO_SPACE_AVAILABLE`.
98 - `capacity`: maximum number of elements allowed to coexist in the queue.
99 - `head`: pointer to the virtual index of the first available element, if
100 any.
101 - `tail`: pointer to the virtual index of the first available position to
102 add an element to. Additionally, the value returned by `tail()`
103 can also give an lower-than approximation of the total amount
104 of elements already pushed into the queue -in between reading
105 the virtual index tail is pointing to and evaluating it, some
106 more elements may have been pushed.
107 - `front`: the first available element. If none, `Null` is returned.
108 - `back`: the last available element. If none, `Null` is returned.
109 - `clear`: sets all positions of the underlying array to `Null` and
110 resets the virtual index pointed to by both the head and the
111 tail of the queue to `0`.
112 - `is_empty`: whether or not the head and tail of the queue point to the
113 same virtual index.
114 - `is_full`: whether or not the tail and head-plus-queue-capacity point
115 to the same virtual index.
116 - `erase_if`: traverses the queue, starting at the queue relative memory
117 index 0, stopping at the relative memory index
118 `capacity() - 1` and invoking the passed-on predicate over each
119 position value, while disregarding positions that have
120 `Null` or `Erased` values.
121
122 Note that, if `Null` and `Erased` hold the same value, the resulting class
123 will not include the `erase_if` method.
124
125 */
126template <typename T, T Null = std::numeric_limits<T>::max(), T Erased = Null,
128 typename A = std::nullptr_t>
130 static_assert(
131 std::is_integral<T>::value,
132 "class `Integrals_lockfree_queue` requires an integral type as a first "
133 "template parameter");
134
135 public:
136 using pointer_type = T *;
137 using const_pointer_type = T const *;
138 using reference_type = T &;
139 using const_reference_type = T const &;
140 using value_type = T;
141 using const_value_type = T const;
142 using element_type = std::atomic<T>;
143 using index_type = unsigned long long;
146
147 static constexpr T null_value = Null;
148 static constexpr T erased_value = Erased;
149 static constexpr index_type set_bit = 1ULL << 63;
150 static constexpr index_type clear_bit = set_bit - 1;
151
152 enum class enum_queue_state : short {
153 SUCCESS = 0, // Last operation was successful
154 NO_MORE_ELEMENTS = -1, // Last operation was unsuccessful because there
155 // are no elements in the queue
156 NO_SPACE_AVAILABLE = -2 // Last operation was unsuccessful because there
157 // is no space available
158 };
159
160 /**
161 Iterator helper class to iterate over the queue staring at the virtual
162 index pointed to by the head, up until the virtual index pointed to by
163 the tail.
164
165 Being an iterator over a lock-free structure, it will not be
166 invalidated upon queue changes since operations are thread-safe and no
167 invalid memory access should stem from iterating over and changing the
168 queue, simultaneously.
169
170 However, the following possible iteration scenarios, not common in
171 non-thread-safe structures, should be taken into consideration and
172 analysed while using standard library features that use the `Iterator`
173 requirement, like `for_each` loops, `std::find`, `std::find_if`, etc:
174
175 a) The iteration never stops because there is always an element being
176 pushed before the queue `end()` method is invoked.
177
178 b) The iterator points to `Null` values at the beginning of the
179 iteration because elements were popped just after the queue `begin()`
180 method is invoked.
181
182 c) The iterator points to `Null` or `Erased` in between pointing to
183 values different from `Null` or `Erased`.
184
185 d) The iterator may point to values that do not correspond to the
186 virtual index being held by the iterator, in the case of the amount
187 of both pop and push operations between two iteration loops was
188 higher then the queue `capacity()`.
189
190 If one of the above scenarios is harmful to your use-case, an
191 additional serialization mechanism may be needed to iterate over the
192 queue. Or another type of structure may be more adequate.
193
194 Check C++ documentation for the definition of `Iterator` named
195 requirement for more information.
196 */
197 class Iterator {
198 public:
199 using difference_type = std::ptrdiff_t;
200 using value_type = T;
201 using pointer = T *;
202 using reference = T;
203 using iterator_category = std::forward_iterator_tag;
204
205 explicit Iterator(
207 index_type position);
208 Iterator(const Iterator &rhs);
209 Iterator(Iterator &&rhs);
210 virtual ~Iterator() = default;
211
212 // BASIC ITERATOR METHODS //
213 Iterator &operator=(const Iterator &rhs);
214 Iterator &operator=(Iterator &&rhs);
215 Iterator &operator++();
216 reference operator*() const;
217 // END / BASIC ITERATOR METHODS //
218
219 // INPUT ITERATOR METHODS //
220 Iterator operator++(int);
221 pointer operator->() const;
222 bool operator==(Iterator const &rhs) const;
223 bool operator!=(Iterator const &rhs) const;
224 // END / INPUT ITERATOR METHODS //
225
226 // OUTPUT ITERATOR METHODS //
227 // reference operator*(); <- already defined
228 // iterator operator++(int); <- already defined
229 // END / OUTPUT ITERATOR METHODS //
230
231 // FORWARD ITERATOR METHODS //
232 // Enable support for both input and output iterator <- already enabled
233 // END / FORWARD ITERATOR METHODS //
234
235 /**
236 Sets the value of the element the iterator is pointing to the given
237 parameter.
238
239 @param new_value The new value to set the element to.
240 */
241 void set(value_type new_value);
242
243 private:
244 /** The position of the element this iterator is pointing to. */
245 index_type m_current{std::numeric_limits<index_type>::max()};
246 /** The reference to the queue holding the elements. */
248 };
249
250 /**
251 Constructor allowing a specific memory allocator and a specific queue
252 capacity.
253
254 The queue allocated memory may differ from `capacity() * sizeof(T)`
255 since additional space may be required to prevent false sharing between
256 threads.
257
258 @param alloc The memory allocator instance
259 @param size The queue maximum capacity
260 */
261 template <
262 typename D = T, T M = Null, T F = Erased, typename J = I, typename B = A,
263 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> * = nullptr>
264 Integrals_lockfree_queue(A &alloc, size_t size);
265 /**
266 Constructor allowing specific queue capacity.
267
268 The queue allocated memory may differ from `capacity() * sizeof(T)`
269 since additional space may be required to prevent false sharing between
270 threads.
271
272 @param size The queue maximum capacity
273 */
274 template <
275 typename D = T, T M = Null, T F = Erased, typename J = I, typename B = A,
276 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> * = nullptr>
278 // Deleted copy and move constructors.
283 //
284 /**
285 Destructor for the class.
286 */
287 virtual ~Integrals_lockfree_queue() = default;
288 // Deleted copy and move operators.
293 //
294
295 /**
296 Returns the underlying instance of `memory::Atomics_array` which holds
297 the allocated memory for the array of `std::atomic<T>` elements.
298
299 @return the underlying instance of `memory::Atomics_array`
300 */
302 /**
303 Sets all queue positions to `Null` and points the head and tail of the
304 queue to the `0` virtual index.
305 */
306 void clear();
307 /**
308 Retrieves whether or not the head and tail of the queue are pointing to
309 the same virtual index.
310
311 No evaluation of the value held in the given position is made. If, for
312 instance, head and tail point to consecutive virtual indexes and the
313 value stored in the position pointed to by head is `Erased`, `is_empty`
314 will return false and `pop` will return `Null` and trigger a
315 `NO_MORE_ELEMENTS` state change.
316
317 @return true if there aren't more elements to be popped, false otherwise
318 */
319 bool is_empty() const;
320 /**
321 Retrieves whether or not the tail of the queue is pointing to the same
322 virtual index as the computed by adding the queue `capacity()` to the
323 virtual index pointed to by the head of the queue.
324
325 No evaluation of the value held in the given position is made. If, for
326 instance, all the values stored in the positions between head and tail
327 are `Erased`, `is_full` will return true and `pop` will return `Null`
328 and trigger a `NO_MORE_ELEMENTS` state change.
329
330 @return true if there isn't space for more elements to be pushed, false
331 otherwise
332 */
333 bool is_full() const;
334 /**
335 Retrieves the virtual index that the head of the queue is pointing to.
336
337 @return the virtual index the head of the queue is pointing to
338 */
340 /**
341 Retrieves the virtual index that the tail of the queue is pointing to.
342
343 @return the virtual index the tail of the queue is pointing to
344 */
346 /**
347 Retrieves the element at the front of the queue, i.e. the value stored
348 in the virtual index pointed to by the head of the queue.
349
350 The returned value may be `Null`, `Erased` or whatever value that is
351 held by the given virtual index position at the moment it's accessed.
352
353 As this method is an alias for `array()[head()]`, the queue may be
354 changed concurrently and, because it is a circular queue, it is
355 possible for this method to return a value that has not been popped yet
356 and it will not be popped in the next call for `pop()` (the circular
357 queue logic made the tail to wrap and overlap the thread local value of
358 head).
359
360 @return the element at the front of the queue
361 */
363 /**
364 Retrieves the value of the position at the back of the queue, i.e. the
365 value stored in the virtual index just prior to the virtual index
366 pointed to by the tail of the queue.
367
368 The returned value may be `Null` or `Erased`, whatever value that is
369 held by the given virtual index position.
370
371 As this method is an alias for `array()[tail()]`, the queue may be
372 changed concurrently and it is possible for this method to return a
373 value assigned to a position outside the bounds of the head and tail of
374 the queue (between thread-local fetch of the tail pointer and the
375 access to the position indexed by the local value, operations moved the
376 head to a virtual index higher than the locally stored). This means
377 that `Null` may be returned or that a value that is currently being
378 popped may be returned.
379
380 @return the element at the back of the queue
381 */
383 /**
384 Retrieves the value at the virtual index pointed by the head of the
385 queue, clears that position, updates the virtual index stored in the
386 head and clears the value returned by `get_state()`, setting it to
387 `SUCCESS`.
388
389 If the head of the queue points to a virtual index that has no element
390 assigned (queue is empty), the operation fails, `Null` is stored in the
391 `out` parameter and the value returned by `get_state()` is
392 `NO_MORE_ELEMENTS`.
393
394 @param out The variable reference to store the value in.
395
396 @return The reference to `this` object, for chaining purposes.
397 */
399 reference_type out);
400 /**
401 Takes the value passed on as a parameter, stores it in the virtual
402 index pointed to by the tail of the queue, updates the virtual index
403 stored in the tail and clears the value returned by `get_state()`,
404 setting it to `SUCCESS`.
405
406 If the tail of the queue points to a virtual index that has already an
407 element assigned (queue is full), the operation fails and the value
408 returned by `get_state()` is `NO_SPACE AVAILABLE`.
409
410 @param to_push The value to push into the queue.
411
412 @return The reference to `this` object, for chaining purposes.
413 */
415 const_reference_type to_push);
416 /**
417 Retrieves the value at the virtual index pointed by the head of the
418 queue, clears that position, updates the virtual index stored in the
419 head and clears the value returned by `get_state()`, setting it to
420 `SUCCESS`.
421
422 If the head of the queue points to a virtual index that has no element
423 assigned yet (queue is empty), the operation returns `Null` and the
424 value returned by `get_state()` is `NO_MORE_ELEMENTS`.
425
426 @return The value retrieved from the queue or `Null` if no element is
427 available for popping
428 */
430 /**
431 Takes the value passed on as a parameter, stores it in the virtual
432 index pointed to by the tail of the queue, updates the virtual index
433 stored in the tail and clears the value returned by `get_state()`,
434 setting it to `SUCCESS`.
435
436 If the tail of the queue points to a virtual index that has already an
437 element assigned (queue is full), the operation fails and the value
438 returned by `get_state()` is `NO_SPACE AVAILABLE`.
439
440 @param to_push The value to push into the queue.
441
442 @return The reference to `this` object, for chaining purposes.
443 */
445 /**
446 Retrieves an iterator instance that points to the same position pointed
447 by the head of the queue.
448
449 Please, be aware that, while using standard library features that use
450 the `Iterator` requirement, like `for_each` loops, `std::find`,
451 `std::find_if`, etc:
452
453 - The iterator may point to `Null` values at the beginning of the
454 iteration because elements were popped just after the queue `begin()`
455 method is invoked, the moment when the iterator pointing to the same
456 virtual index as the head of the queue is computed.
457
458 @return An instance of an iterator instance that points to virtual
459 index pointed by the head of the queue
460 */
461 Iterator begin() const;
462 /**
463 Retrieves an iterator instance that points to the same position pointed
464 by the tail of the queue.
465
466 Please, be aware that, while using standard library features that use
467 the `Iterator` requirement, like `for_each` loops, `std::find`,
468 `std::find_if`, etc:
469
470 - The iteration may never stop because there is always an element being
471 pushed before the queue `end()` method is invoked, the moment when
472 the iterator pointing to the same virtual index has the tail of the
473 queue is computed.
474
475 @return An instance of an iterator instance that points to virtual
476 index pointed by the tail of the queue
477 */
478 Iterator end() const;
479 /**
480 Erases values from the queue. The traversing is linear and not in
481 between the virtual indexes pointed to by the head and the tail of the
482 queue but rather between 0 and `Integrals_lockfree_queue::capacity() -
483 1`.
484
485 An element may be conditionally erased according to the evaluation of
486 the predicate `predicate` which should be any predicate which is
487 translatable to `[](value_type value) -> bool`. If the predicate
488 evaluates to `true`, the value is replace by `Erased`.
489
490 If both `Null` and `Erased` evaluate to the same value, this method
491 will not be available after the template substitutions since erased
492 values must be identifiable by the pop and push operations.
493
494 Check C++ documentation for the definition of `Predicate` named
495 requirement for more information.
496
497 @param predicate The predicate invoked upon a given queue position and
498 if evaluated to `true` will force the removal of such
499 element.
500
501 @return The number of values erased.
502 */
503 template <typename D = T, T M = Null, T F = Erased, typename J = I,
504 typename B = A, typename Pred, std::enable_if_t<M != F> * = nullptr>
505 size_t erase_if(Pred predicate);
506 /**
507 Returns the maximum number of elements allowed to coexist in the queue.
508
509 @return The maximum number of elements allowed to coexist in the queue
510 */
511 size_t capacity() const;
512 /**
513 Returns the amount of bytes needed to store the maximum number of
514 elements allowed to coexist in the queue.
515
516 @return the amount of bytes needed to store the maximum number of
517 elements allowed to coexist in th queue
518 */
519 size_t allocated_size() const;
520 /**
521 Clears the error state of the last performed operations, if any. The
522 operation state is a thread storage duration variable, making it a
523 per-thread state.
524
525 @return The reference to `this` object, for chaining purposes.
526 */
528 /**
529 Retrieves the error/success state of the last performed operation. The
530 operation state is a thread storage duration variable, making it a
531 per-thread state.
532
533 Possible values are:
534 - `SUCCESS` if the operation was successful
535 - `NO_MORE_ELEMENTS` if there are no more elements to pop
536 - `NO_SPACE_AVAILABLE` if there is no more room for pushing elements
537
538 State may be changed by any of the `pop`, `push` operations.
539
540 @return the error/success state of the invoking thread last operation.
541 */
543 /**
544 Return `this` queue textual representation.
545
546 @return the textual representation for `this` queue.
547 */
548 std::string to_string() const;
549
550 friend std::ostream &operator<<(
551 std::ostream &out,
553 out << in.to_string() << std::flush;
554 return out;
555 }
556
557 private:
558 /** The maximum allowed number of element allowed to coexist in the queue */
559 size_t m_capacity{0};
560 /** The array of atomics in which the elements will be stored */
562 /** The virtual index being pointed to by the head of the queue */
563 atomic_type m_head{0};
564 /** The virtual index being pointed to by the tail of the queue */
565 atomic_type m_tail{0};
566
567 /**
568 Translates a virtual monotonically increasing index into an index bounded to
569 the queue capacity.
570 */
571 size_t translate(index_type from) const;
572 /**
573 Retrieves the thread storage duration operation state variable.
574 */
576};
577} // namespace container
578
579#ifndef IN_DOXYGEN // Doxygen doesn't understand this construction.
580template <typename T, T Null, T Erased, typename I, typename A>
582 Integrals_lockfree_queue<T, Null, Erased, I, A> const &parent,
583 Integrals_lockfree_queue<T, Null, Erased, I, A>::index_type position)
584 : m_current{position}, m_parent{&parent} {}
585#endif
586
587template <typename T, T Null, T Erased, typename I, typename A>
589 const Iterator &rhs)
590 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {}
591
592template <typename T, T Null, T Erased, typename I, typename A>
594 Iterator &&rhs)
595 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {
596 rhs.m_current = std::numeric_limits<index_type>::max();
597 rhs.m_parent = nullptr;
598}
599
600template <typename T, T Null, T Erased, typename I, typename A>
603 const Iterator &rhs) {
604 this->m_current = rhs.m_current;
605 this->m_parent = rhs.m_parent;
606 return (*this);
607}
608
609template <typename T, T Null, T Erased, typename I, typename A>
612 Iterator &&rhs) {
613 this->m_current = rhs.m_current;
614 this->m_parent = rhs.m_parent;
615 rhs.m_current = std::numeric_limits<index_type>::max();
616 rhs.m_parent = nullptr;
617 return (*this);
618}
619
620template <typename T, T Null, T Erased, typename I, typename A>
623 A>::Iterator::operator++() {
624 ++this->m_current;
625 return (*this);
626}
627
628template <typename T, T Null, T Erased, typename I, typename A>
629typename container::Integrals_lockfree_queue<T, Null, Erased, I,
632 A>::Iterator::operator*() const {
633 return this->m_parent->m_array[this->m_parent->translate(this->m_current)];
634}
635
636template <typename T, T Null, T Erased, typename I, typename A>
639 A>::Iterator::operator++(int) {
640 auto to_return = (*this);
641 ++(*this);
642 return to_return;
643}
644
645template <typename T, T Null, T Erased, typename I, typename A>
646typename container::Integrals_lockfree_queue<T, Null, Erased, I,
649 A>::Iterator::operator->() const {
650 return &(this->m_parent->m_array[this->m_parent->translate(this->m_current)]);
651}
652
653template <typename T, T Null, T Erased, typename I, typename A>
655 T, Null, Erased, I, A>::Iterator::operator==(Iterator const &rhs) const {
656 return this->m_current == rhs.m_current && this->m_parent == rhs.m_parent;
657}
658
659template <typename T, T Null, T Erased, typename I, typename A>
661 T, Null, Erased, I, A>::Iterator::operator!=(Iterator const &rhs) const {
662 return !((*this) == rhs);
663}
664
665template <typename T, T Null, T Erased, typename I, typename A>
667 value_type new_value) {
668 this->m_parent->m_array[this->m_parent->translate(this->m_current)].store(
669 new_value);
670}
671
672template <typename T, T Null, T Erased, typename I, typename A>
673template <typename D, T M, T F, typename J, typename B,
674 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> *>
677 size_t size)
678 : m_capacity{size},
679 m_array{alloc, size, null_value},
680 m_head{0},
681 m_tail{0} {}
682
683template <typename T, T Null, T Erased, typename I, typename A>
684template <typename D, T M, T F, typename J, typename B,
685 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> *>
688 : m_capacity{size}, m_array{size, null_value}, m_head{0}, m_tail{0} {}
689
690template <typename T, T Null, T Erased, typename I, typename A>
691typename container::Integrals_lockfree_queue<T, Null, Erased, I,
692 A>::array_type &
694 return this->m_array;
695}
696
697template <typename T, T Null, T Erased, typename I, typename A>
699 this->clear_state();
700 for (size_t idx = 0; idx != this->m_array.size(); ++idx)
701 this->m_array[idx].store(Null);
702 this->m_head->store(0);
703 this->m_tail->store(0);
704}
705
706template <typename T, T Null, T Erased, typename I, typename A>
708 const {
709 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
710 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
711 return head == tail;
712}
713
714template <typename T, T Null, T Erased, typename I, typename A>
716 const {
717 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
718 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
719 return tail == head + this->capacity();
720}
721
722template <typename T, T Null, T Erased, typename I, typename A>
725 return this->m_head->load(std::memory_order_seq_cst) & clear_bit;
726}
727
728template <typename T, T Null, T Erased, typename I, typename A>
731 return this->m_tail->load(std::memory_order_seq_cst) & clear_bit;
732}
733
734template <typename T, T Null, T Erased, typename I, typename A>
737 auto head = this->head();
738 auto to_return =
739 this->m_array[this->translate(head)].load(std::memory_order_seq_cst);
740 return to_return == Erased ? Null : to_return;
741}
742
743template <typename T, T Null, T Erased, typename I, typename A>
746 auto tail = this->tail();
747 if (tail == 0) return Null;
748 auto to_return =
749 this->m_array[this->translate(tail - 1)].load(std::memory_order_seq_cst);
750 return to_return == Erased ? Null : to_return;
751}
752
753template <typename T, T Null, T Erased, typename I, typename A>
756 reference_type out) {
757 out = this->pop();
758 return (*this);
759}
760
761template <typename T, T Null, T Erased, typename I, typename A>
764 const_reference_type to_push) {
765 return this->push(to_push);
766}
767
768template <typename T, T Null, T Erased, typename I, typename A>
770 this->clear_state();
771 for (; true;) {
772 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
773 auto tail = this->m_tail->load(std::memory_order_relaxed) & clear_bit;
774
775 if (head == tail) {
776 this->state() = enum_queue_state::NO_MORE_ELEMENTS;
777 break;
778 }
779
780 auto new_head = head + 1;
781 new_head |= set_bit; // Set the occupied bit
782 if (this->m_head->compare_exchange_strong(
783 head, new_head, // Concurrent pop may have reached the CAS first
784 // or the occupied bit hasn't been unset yet
785 std::memory_order_release)) {
786 auto &current = this->m_array[this->translate(head)];
787
788 for (; true;) {
789 value_type value = current.load();
790 if (value != Null && // It may be `Null` if some concurrent push
791 // operation hasn't finished setting the
792 // element value
793 current.compare_exchange_strong(
794 value, Null, // It may have been set to `Erased` concurrently
795 std::memory_order_release)) {
796 new_head &= clear_bit; // Unset the occupied bit, signaling that
797 // finished popping
798 this->m_head->store(new_head, std::memory_order_seq_cst);
799 if (value == Erased) { // If the element was `Erased`, try to
800 // pop again
801 break;
802 }
803 return value;
804 }
805 std::this_thread::yield();
806 }
807 }
808 std::this_thread::yield();
809 }
810 return Null;
811}
812
813template <typename T, T Null, T Erased, typename I, typename A>
816 value_type to_push) {
817 assert(to_push != Null && to_push != Erased);
818 this->clear_state();
819 for (; true;) {
820 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
821 auto head = this->m_head->load(std::memory_order_relaxed) & clear_bit;
822
823 if (tail == head + this->m_capacity) {
824 this->state() = enum_queue_state::NO_SPACE_AVAILABLE;
825 return (*this);
826 }
827
828 auto new_tail = tail + 1;
829 new_tail |= set_bit; // Set the occupied bit
830 if (this->m_tail->compare_exchange_strong(
831 tail, new_tail, // Concurrent push may have reached the CAS first
832 // or the occupied bit hasn't been unset yet
833 std::memory_order_release)) {
834 auto &current = this->m_array[this->translate(tail)];
835
836 for (; true;) {
837 T null_ref{Null};
838 if (current.compare_exchange_strong(
839 null_ref, to_push, // It may not be `Null` if some concurrent
840 // pop operation hasn't finished setting the
841 // element value to `Null
842 std::memory_order_acquire)) {
843 new_tail &= clear_bit; // Unset the occupied bit, signaling that
844 // finished pushing
845 this->m_tail->store(new_tail, std::memory_order_seq_cst);
846 break;
847 }
848 std::this_thread::yield();
849 }
850 break;
851 }
852 std::this_thread::yield();
853 }
854 return (*this);
855}
856
857template <typename T, T Null, T Erased, typename I, typename A>
860 return Iterator{*this, this->head()};
861}
862
863template <typename T, T Null, T Erased, typename I, typename A>
866 return Iterator{*this, this->tail()};
867}
868
869template <typename T, T Null, T Erased, typename I, typename A>
870template <typename D, T M, T F, typename J, typename B, typename Pred,
871 std::enable_if_t<M != F> *>
873 Pred predicate) {
874 this->clear_state();
875 size_t erased{0};
876 for (size_t idx = 0; idx != this->m_capacity; ++idx) {
877 auto &current = this->m_array[idx];
878 T value = current.load(std::memory_order_acquire);
879 if (value != Null && value != Erased && predicate(value)) {
880 if (current.compare_exchange_strong(value, Erased,
881 std::memory_order_release)) {
882 ++erased;
883 }
884 }
885 }
886 return erased;
887}
888
889template <typename T, T Null, T Erased, typename I, typename A>
891 const {
892 return this->m_capacity;
893}
894
895template <typename T, T Null, T Erased, typename I, typename A>
896size_t container::Integrals_lockfree_queue<T, Null, Erased, I,
897 A>::allocated_size() const {
898 return this->m_array.allocated_size();
899}
900
901template <typename T, T Null, T Erased, typename I, typename A>
904 this->state() = enum_queue_state::SUCCESS;
905 return (*this);
906}
907
908template <typename T, T Null, T Erased, typename I, typename A>
909typename container::Integrals_lockfree_queue<T, Null, Erased, I,
910 A>::enum_queue_state
912 return this->state();
913}
914
915template <typename T, T Null, T Erased, typename I, typename A>
916std::string
919 for (auto value : (*this)) {
920 out << (value == Null
921 ? "Null"
922 : (value == Erased ? "Erased" : std::to_string(value)))
923 << ", ";
924 }
925 out << "EOF" << std::flush;
926 return out.str();
927}
928
929template <typename T, T Null, T Erased, typename I, typename A>
931 index_type from) const {
932 return static_cast<size_t>(from % static_cast<index_type>(this->m_capacity));
933}
934
935template <typename T, T Null, T Erased, typename I, typename A>
936typename container::Integrals_lockfree_queue<T, Null, Erased, I,
937 A>::enum_queue_state &
939 // TODO: garbage collect this if queues start to be used more dynamically
940 static thread_local std::map<
943 state;
944 return state[this];
945}
946
947#endif // CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
Array of std::atomic elements of type T.
Definition: atomics_array.h:83
Iterator helper class to iterate over the queue staring at the virtual index pointed to by the head,...
Definition: integrals_lockfree_queue.h:197
Integrals_lockfree_queue< T, Null, Erased, I, A > const * m_parent
The reference to the queue holding the elements.
Definition: integrals_lockfree_queue.h:247
T * pointer
Definition: integrals_lockfree_queue.h:201
Iterator & operator=(const Iterator &rhs)
Definition: integrals_lockfree_queue.h:602
index_type m_current
The position of the element this iterator is pointing to.
Definition: integrals_lockfree_queue.h:245
void set(value_type new_value)
Sets the value of the element the iterator is pointing to the given parameter.
Definition: integrals_lockfree_queue.h:666
T reference
Definition: integrals_lockfree_queue.h:202
T value_type
Definition: integrals_lockfree_queue.h:200
std::ptrdiff_t difference_type
Definition: integrals_lockfree_queue.h:199
std::forward_iterator_tag iterator_category
Definition: integrals_lockfree_queue.h:203
Iterator(Integrals_lockfree_queue< T, Null, Erased, I, A > const &parent, index_type position)
Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer (MC), circular FIFO queue fo...
Definition: integrals_lockfree_queue.h:129
Integrals_lockfree_queue< T, Null, Erased, I, A > & push(value_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:815
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
friend std::ostream & operator<<(std::ostream &out, Integrals_lockfree_queue< T, Null, Erased, I, A > const &in)
Definition: integrals_lockfree_queue.h:550
size_t erase_if(Pred predicate)
Erases values from the queue.
Definition: integrals_lockfree_queue.h:872
value_type pop()
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:769
atomic_type m_tail
The virtual index being pointed to by the tail of the queue.
Definition: integrals_lockfree_queue.h:565
T const * const_pointer_type
Definition: integrals_lockfree_queue.h:137
unsigned long long index_type
Definition: integrals_lockfree_queue.h:143
Iterator begin() const
Retrieves an iterator instance that points to the same position pointed by the head of the queue.
Definition: integrals_lockfree_queue.h:859
size_t translate(index_type from) const
Translates a virtual monotonically increasing index into an index bounded to the queue capacity.
Definition: integrals_lockfree_queue.h:930
Integrals_lockfree_queue< T, Null, Erased, I, A > & clear_state()
Clears the error state of the last performed operations, if any.
Definition: integrals_lockfree_queue.h:903
size_t capacity() const
Returns the maximum number of elements allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:890
enum_queue_state get_state() const
Retrieves the error/success state of the last performed operation.
Definition: integrals_lockfree_queue.h:911
enum_queue_state
Definition: integrals_lockfree_queue.h:152
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
index_type head() const
Retrieves the virtual index that the head of the queue is pointing to.
Definition: integrals_lockfree_queue.h:724
std::string to_string() const
Return this queue textual representation.
Definition: integrals_lockfree_queue.h:917
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
static constexpr T null_value
Definition: integrals_lockfree_queue.h:147
bool is_full() const
Retrieves whether or not the tail of the queue is pointing to the same virtual index as the computed ...
Definition: integrals_lockfree_queue.h:715
value_type front() const
Retrieves the element at the front of the queue, i.e.
Definition: integrals_lockfree_queue.h:736
T * pointer_type
Definition: integrals_lockfree_queue.h:136
Integrals_lockfree_queue(size_t size)
Constructor allowing specific queue capacity.
Definition: integrals_lockfree_queue.h:687
bool is_empty() const
Retrieves whether or not the head and tail of the queue are pointing to the same virtual index.
Definition: integrals_lockfree_queue.h:707
T const & const_reference_type
Definition: integrals_lockfree_queue.h:139
std::atomic< T > element_type
Definition: integrals_lockfree_queue.h:142
value_type back() const
Retrieves the value of the position at the back of the queue, i.e.
Definition: integrals_lockfree_queue.h:745
size_t allocated_size() const
Returns the amount of bytes needed to store the maximum number of elements allowed to coexist in the ...
Definition: integrals_lockfree_queue.h:897
enum_queue_state & state() const
Retrieves the thread storage duration operation state variable.
Definition: integrals_lockfree_queue.h:938
T value_type
Definition: integrals_lockfree_queue.h:140
array_type m_array
The array of atomics in which the elements will be stored.
Definition: integrals_lockfree_queue.h:561
virtual ~Integrals_lockfree_queue()=default
Destructor for the class.
atomic_type m_head
The virtual index being pointed to by the head of the queue.
Definition: integrals_lockfree_queue.h:563
T & reference_type
Definition: integrals_lockfree_queue.h:138
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
size_t m_capacity
The maximum allowed number of element allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:559
Iterator end() const
Retrieves an iterator instance that points to the same position pointed by the tail of the queue.
Definition: integrals_lockfree_queue.h:865
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator>>(reference_type out)
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:755
index_type tail() const
Retrieves the virtual index that the tail of the queue is pointing to.
Definition: integrals_lockfree_queue.h:730
T const const_value_type
Definition: integrals_lockfree_queue.h:141
void clear()
Sets all queue positions to Null and points the head and tail of the queue to the 0 virtual index.
Definition: integrals_lockfree_queue.h:698
Integrals_lockfree_queue(A &alloc, size_t size)
Constructor allowing a specific memory allocator and a specific queue capacity.
Definition: integrals_lockfree_queue.h:676
array_type & array()
Returns the underlying instance of memory::Atomics_array which holds the allocated memory for the arr...
Definition: integrals_lockfree_queue.h:693
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator<<(const_reference_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:763
Indexing provider that pads each of the array elements to the size of the CPU cache line,...
Definition: atomics_array_index_padding.h:56
ostream & operator<<(ostream &os, const Datetime &)
Definition: logger.cc:34
#define SUCCESS
Definition: completion_hash.h:26
#define M
Definition: ctype-tis620.cc:72
bool store(THD *thd, const Table *tp)
Stores the SDI for a table.
Definition: sdi.cc:606
static std::string to_string(const LEX_STRING &str)
Definition: lex_string.h:49
bool operator!=(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:157
bool operator==(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:150
borrowable::binary::Null Null
Definition: classic_protocol_binary.h:336
Definition: atomics_array.h:38
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:113
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2869
std::set< Key, Compare, ut::allocator< Key > > set
Specialization of set which uses ut_allocator.
Definition: ut0new.h:2881
std::map< Key, Value, Compare, ut::allocator< std::pair< const Key, Value > > > map
Specialization of map which uses ut_allocator.
Definition: ut0new.h:2891
static bool push(mem_root_deque< Item * > *items, qep_row::mem_root_str &s, Item_null *nil)
Definition: opt_explain_traditional.cc:93
Ssl_acceptor_context_property_type & operator++(Ssl_acceptor_context_property_type &property_type)
Increment operator for Ssl_acceptor_context_type Used by iterator.
Definition: ssl_acceptor_context_data.cc:272