23#ifndef CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
24#define CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
126template <typename T, T Null = std::numeric_limits<T>::max(), T Erased =
Null,
128 typename A = std::nullptr_t>
131 std::is_integral<T>::value,
132 "class `Integrals_lockfree_queue` requires an integral type as a first "
133 "template parameter");
147 static constexpr T null_value =
Null;
148 static constexpr T erased_value = Erased;
154 NO_MORE_ELEMENTS = -1,
156 NO_SPACE_AVAILABLE = -2
208 Iterator(
const Iterator &rhs);
209 Iterator(Iterator &&rhs);
213 Iterator &operator=(
const Iterator &rhs);
214 Iterator &operator=(Iterator &&rhs);
245 index_type m_current{std::numeric_limits<index_type>::max()};
262 typename D = T, T
M =
Null, T F = Erased,
typename J = I,
typename B = A,
263 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> * =
nullptr>
275 typename D = T, T
M =
Null, T F = Erased,
typename J = I,
typename B = A,
276 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> * =
nullptr>
503 template <
typename D = T, T
M =
Null, T F = Erased,
typename J = I,
504 typename B = A,
typename Pred, std::enable_if_t<M != F> * =
nullptr>
559 size_t m_capacity{0};
580template <
typename T, T Null, T Erased,
typename I,
typename A>
582 Integrals_lockfree_queue<T, Null, Erased, I, A>
const &parent,
583 Integrals_lockfree_queue<T, Null, Erased, I, A>::index_type position)
584 : m_current{position}, m_parent{&parent} {}
587template <
typename T, T Null, T Erased,
typename I,
typename A>
590 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {}
592template <
typename T, T Null, T Erased,
typename I,
typename A>
595 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {
596 rhs.m_current = std::numeric_limits<index_type>::max();
597 rhs.m_parent =
nullptr;
600template <
typename T, T Null, T Erased,
typename I,
typename A>
609template <
typename T, T Null, T Erased,
typename I,
typename A>
614 this->m_parent = rhs.m_parent;
615 rhs.m_current = std::numeric_limits<index_type>::max();
616 rhs.m_parent =
nullptr;
620template <
typename T, T Null, T Erased,
typename I,
typename A>
623 A>::Iterator::operator++() {
628template <
typename T, T Null, T Erased,
typename I,
typename A>
632 A>::Iterator::operator*()
const {
633 return this->m_parent->
m_array[this->m_parent->translate(this->m_current)];
636template <
typename T, T Null, T Erased,
typename I,
typename A>
639 A>::Iterator::operator++(int) {
640 auto to_return = (*this);
645template <
typename T, T Null, T Erased,
typename I,
typename A>
649 A>::Iterator::operator->()
const {
650 return &(this->m_parent->m_array[this->m_parent->translate(this->m_current)]);
653template <
typename T, T Null, T Erased,
typename I,
typename A>
655 T,
Null, Erased, I, A>::Iterator::operator==(
Iterator const &rhs)
const {
656 return this->m_current == rhs.m_current && this->m_parent == rhs.m_parent;
659template <
typename T, T Null, T Erased,
typename I,
typename A>
661 T,
Null, Erased, I, A>::Iterator::operator!=(
Iterator const &rhs)
const {
662 return !((*this) == rhs);
665template <
typename T, T Null, T Erased,
typename I,
typename A>
668 this->m_parent->m_array[this->m_parent->translate(this->m_current)].store(
672template <
typename T, T Null, T Erased,
typename I,
typename A>
673template <
typename D, T
M, T F,
typename J,
typename B,
674 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> *>
683template <
typename T, T Null, T Erased,
typename I,
typename A>
684template <
typename D, T
M, T F,
typename J,
typename B,
685 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> *>
688 : m_capacity{size}, m_array{size, null_value}, m_head{0}, m_tail{0} {}
690template <
typename T, T Null, T Erased,
typename I,
typename A>
694 return this->m_array;
697template <
typename T, T Null, T Erased,
typename I,
typename A>
700 for (
size_t idx = 0; idx != this->m_array.size(); ++idx)
702 this->m_head->store(0);
703 this->m_tail->store(0);
706template <
typename T, T Null, T Erased,
typename I,
typename A>
709 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
710 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
714template <
typename T, T Null, T Erased,
typename I,
typename A>
717 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
718 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
719 return tail == head + this->capacity();
722template <
typename T, T Null, T Erased,
typename I,
typename A>
725 return this->m_head->load(std::memory_order_seq_cst) & clear_bit;
728template <
typename T, T Null, T Erased,
typename I,
typename A>
731 return this->m_tail->load(std::memory_order_seq_cst) & clear_bit;
734template <
typename T, T Null, T Erased,
typename I,
typename A>
737 auto head = this->head();
739 this->m_array[this->translate(head)].load(std::memory_order_seq_cst);
740 return to_return == Erased ?
Null : to_return;
743template <
typename T, T Null, T Erased,
typename I,
typename A>
746 auto tail = this->tail();
747 if (tail == 0)
return Null;
749 this->m_array[this->translate(tail - 1)].load(std::memory_order_seq_cst);
750 return to_return == Erased ?
Null : to_return;
753template <
typename T, T Null, T Erased,
typename I,
typename A>
761template <
typename T, T Null, T Erased,
typename I,
typename A>
764 const_reference_type to_push) {
765 return this->
push(to_push);
768template <
typename T, T Null, T Erased,
typename I,
typename A>
772 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
773 auto tail = this->m_tail->load(std::memory_order_relaxed) & clear_bit;
776 this->state() = enum_queue_state::NO_MORE_ELEMENTS;
780 auto new_head = head + 1;
782 if (this->m_head->compare_exchange_strong(
785 std::memory_order_release)) {
786 auto ¤t = this->m_array[this->translate(head)];
793 current.compare_exchange_strong(
795 std::memory_order_release)) {
796 new_head &= clear_bit;
798 this->m_head->store(new_head, std::memory_order_seq_cst);
799 if (value == Erased) {
805 std::this_thread::yield();
808 std::this_thread::yield();
813template <
typename T, T Null, T Erased,
typename I,
typename A>
817 assert(to_push !=
Null && to_push != Erased);
820 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
821 auto head = this->m_head->load(std::memory_order_relaxed) & clear_bit;
823 if (tail == head + this->m_capacity) {
824 this->state() = enum_queue_state::NO_SPACE_AVAILABLE;
828 auto new_tail = tail + 1;
830 if (this->m_tail->compare_exchange_strong(
833 std::memory_order_release)) {
834 auto ¤t = this->m_array[this->translate(tail)];
838 if (current.compare_exchange_strong(
842 std::memory_order_acquire)) {
843 new_tail &= clear_bit;
845 this->m_tail->store(new_tail, std::memory_order_seq_cst);
848 std::this_thread::yield();
852 std::this_thread::yield();
857template <
typename T, T Null, T Erased,
typename I,
typename A>
860 return Iterator{*
this, this->head()};
863template <
typename T, T Null, T Erased,
typename I,
typename A>
866 return Iterator{*
this, this->tail()};
869template <
typename T, T Null, T Erased,
typename I,
typename A>
870template <
typename D, T
M, T F,
typename J,
typename B,
typename Pred,
871 std::enable_if_t<M != F> *>
876 for (
size_t idx = 0; idx != this->m_capacity; ++idx) {
877 auto ¤t = this->m_array[idx];
878 T value = current.load(std::memory_order_acquire);
879 if (value !=
Null && value != Erased && predicate(value)) {
880 if (current.compare_exchange_strong(value, Erased,
881 std::memory_order_release)) {
889template <
typename T, T Null, T Erased,
typename I,
typename A>
892 return this->m_capacity;
895template <
typename T, T Null, T Erased,
typename I,
typename A>
897 A>::allocated_size()
const {
901template <
typename T, T Null, T Erased,
typename I,
typename A>
904 this->state() = enum_queue_state::SUCCESS;
908template <
typename T, T Null, T Erased,
typename I,
typename A>
912 return this->state();
915template <
typename T, T Null, T Erased,
typename I,
typename A>
919 for (
auto value : (*
this)) {
920 out << (value ==
Null
929template <
typename T, T Null, T Erased,
typename I,
typename A>
932 return static_cast<size_t>(from %
static_cast<index_type>(this->m_capacity));
935template <
typename T, T Null, T Erased,
typename I,
typename A>
937 A>::enum_queue_state &
Array of std::atomic elements of type T.
Definition: atomics_array.h:83
Iterator helper class to iterate over the queue staring at the virtual index pointed to by the head,...
Definition: integrals_lockfree_queue.h:197
Integrals_lockfree_queue< T, Null, Erased, I, A > const * m_parent
The reference to the queue holding the elements.
Definition: integrals_lockfree_queue.h:247
T * pointer
Definition: integrals_lockfree_queue.h:201
Iterator & operator=(const Iterator &rhs)
Definition: integrals_lockfree_queue.h:602
index_type m_current
The position of the element this iterator is pointing to.
Definition: integrals_lockfree_queue.h:245
void set(value_type new_value)
Sets the value of the element the iterator is pointing to the given parameter.
Definition: integrals_lockfree_queue.h:666
T reference
Definition: integrals_lockfree_queue.h:202
virtual ~Iterator()=default
T value_type
Definition: integrals_lockfree_queue.h:200
std::ptrdiff_t difference_type
Definition: integrals_lockfree_queue.h:199
std::forward_iterator_tag iterator_category
Definition: integrals_lockfree_queue.h:203
Iterator(Integrals_lockfree_queue< T, Null, Erased, I, A > const &parent, index_type position)
Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer (MC), circular FIFO queue fo...
Definition: integrals_lockfree_queue.h:129
Integrals_lockfree_queue< T, Null, Erased, I, A > & push(value_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:815
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
friend std::ostream & operator<<(std::ostream &out, Integrals_lockfree_queue< T, Null, Erased, I, A > const &in)
Definition: integrals_lockfree_queue.h:550
size_t erase_if(Pred predicate)
Erases values from the queue.
Definition: integrals_lockfree_queue.h:872
value_type pop()
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:769
atomic_type m_tail
The virtual index being pointed to by the tail of the queue.
Definition: integrals_lockfree_queue.h:565
T const * const_pointer_type
Definition: integrals_lockfree_queue.h:137
unsigned long long index_type
Definition: integrals_lockfree_queue.h:143
Iterator begin() const
Retrieves an iterator instance that points to the same position pointed by the head of the queue.
Definition: integrals_lockfree_queue.h:859
size_t translate(index_type from) const
Translates a virtual monotonically increasing index into an index bounded to the queue capacity.
Definition: integrals_lockfree_queue.h:930
Integrals_lockfree_queue< T, Null, Erased, I, A > & clear_state()
Clears the error state of the last performed operations, if any.
Definition: integrals_lockfree_queue.h:903
size_t capacity() const
Returns the maximum number of elements allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:890
enum_queue_state get_state() const
Retrieves the error/success state of the last performed operation.
Definition: integrals_lockfree_queue.h:911
enum_queue_state
Definition: integrals_lockfree_queue.h:152
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
index_type head() const
Retrieves the virtual index that the head of the queue is pointing to.
Definition: integrals_lockfree_queue.h:724
std::string to_string() const
Return this queue textual representation.
Definition: integrals_lockfree_queue.h:917
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
static constexpr T null_value
Definition: integrals_lockfree_queue.h:147
bool is_full() const
Retrieves whether or not the tail of the queue is pointing to the same virtual index as the computed ...
Definition: integrals_lockfree_queue.h:715
value_type front() const
Retrieves the element at the front of the queue, i.e.
Definition: integrals_lockfree_queue.h:736
T * pointer_type
Definition: integrals_lockfree_queue.h:136
Integrals_lockfree_queue(size_t size)
Constructor allowing specific queue capacity.
Definition: integrals_lockfree_queue.h:687
bool is_empty() const
Retrieves whether or not the head and tail of the queue are pointing to the same virtual index.
Definition: integrals_lockfree_queue.h:707
T const & const_reference_type
Definition: integrals_lockfree_queue.h:139
std::atomic< T > element_type
Definition: integrals_lockfree_queue.h:142
value_type back() const
Retrieves the value of the position at the back of the queue, i.e.
Definition: integrals_lockfree_queue.h:745
size_t allocated_size() const
Returns the amount of bytes needed to store the maximum number of elements allowed to coexist in the ...
Definition: integrals_lockfree_queue.h:897
enum_queue_state & state() const
Retrieves the thread storage duration operation state variable.
Definition: integrals_lockfree_queue.h:938
T value_type
Definition: integrals_lockfree_queue.h:140
array_type m_array
The array of atomics in which the elements will be stored.
Definition: integrals_lockfree_queue.h:561
virtual ~Integrals_lockfree_queue()=default
Destructor for the class.
atomic_type m_head
The virtual index being pointed to by the head of the queue.
Definition: integrals_lockfree_queue.h:563
T & reference_type
Definition: integrals_lockfree_queue.h:138
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
size_t m_capacity
The maximum allowed number of element allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:559
Iterator end() const
Retrieves an iterator instance that points to the same position pointed by the tail of the queue.
Definition: integrals_lockfree_queue.h:865
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator>>(reference_type out)
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:755
index_type tail() const
Retrieves the virtual index that the tail of the queue is pointing to.
Definition: integrals_lockfree_queue.h:730
T const const_value_type
Definition: integrals_lockfree_queue.h:141
void clear()
Sets all queue positions to Null and points the head and tail of the queue to the 0 virtual index.
Definition: integrals_lockfree_queue.h:698
Integrals_lockfree_queue(A &alloc, size_t size)
Constructor allowing a specific memory allocator and a specific queue capacity.
Definition: integrals_lockfree_queue.h:676
array_type & array()
Returns the underlying instance of memory::Atomics_array which holds the allocated memory for the arr...
Definition: integrals_lockfree_queue.h:693
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator<<(const_reference_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:763
Indexing provider that pads each of the array elements to the size of the CPU cache line,...
Definition: atomics_array_index_padding.h:56
ostream & operator<<(ostream &os, const Datetime &)
Definition: logger.cc:34
#define SUCCESS
Definition: completion_hash.h:26
#define M
Definition: ctype-tis620.cc:73
bool store(THD *thd, const Table *tp)
Stores the SDI for a table.
Definition: sdi.cc:606
static std::string to_string(const LEX_STRING &str)
Definition: lex_string.h:49
bool operator!=(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:157
bool operator==(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:150
borrowable::wire::Null Null
Definition: classic_protocol_wire.h:144
Definition: atomics_array.h:38
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:121
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2868
std::set< Key, Compare, ut::allocator< Key > > set
Specialization of set which uses ut_allocator.
Definition: ut0new.h:2880
std::map< Key, Value, Compare, ut::allocator< std::pair< const Key, Value > > > map
Specialization of map which uses ut_allocator.
Definition: ut0new.h:2890
static bool push(mem_root_deque< Item * > *items, qep_row::mem_root_str &s, Item_null *nil)
Definition: opt_explain_traditional.cc:94
Ssl_acceptor_context_property_type & operator++(Ssl_acceptor_context_property_type &property_type)
Increment operator for Ssl_acceptor_context_type Used by iterator.
Definition: ssl_acceptor_context_data.cc:113