24#ifndef CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
25#define CONTAINER_INTEGRALS_LOCKFREE_QUEUE_INCLUDED
127template <typename T, T Null = std::numeric_limits<T>::max(), T Erased =
Null,
129 typename A = std::nullptr_t>
132 std::is_integral<T>::value,
133 "class `Integrals_lockfree_queue` requires an integral type as a first "
134 "template parameter");
148 static constexpr T null_value =
Null;
149 static constexpr T erased_value = Erased;
155 NO_MORE_ELEMENTS = -1,
157 NO_SPACE_AVAILABLE = -2
209 Iterator(
const Iterator &rhs);
210 Iterator(Iterator &&rhs);
214 Iterator &operator=(
const Iterator &rhs);
215 Iterator &operator=(Iterator &&rhs);
246 index_type m_current{std::numeric_limits<index_type>::max()};
263 typename D = T, T
M =
Null, T F = Erased,
typename J = I,
typename B = A,
264 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> * =
nullptr>
276 typename D = T, T
M =
Null, T F = Erased,
typename J = I,
typename B = A,
277 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> * =
nullptr>
504 template <
typename D = T, T
M =
Null, T F = Erased,
typename J = I,
505 typename B = A,
typename Pred, std::enable_if_t<M != F> * =
nullptr>
560 size_t m_capacity{0};
581template <
typename T, T Null, T Erased,
typename I,
typename A>
583 Integrals_lockfree_queue<T, Null, Erased, I, A>
const &parent,
584 Integrals_lockfree_queue<T, Null, Erased, I, A>::index_type position)
585 : m_current{position}, m_parent{&parent} {}
588template <
typename T, T Null, T Erased,
typename I,
typename A>
591 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {}
593template <
typename T, T Null, T Erased,
typename I,
typename A>
596 : m_current{rhs.m_current}, m_parent{rhs.m_parent} {
597 rhs.m_current = std::numeric_limits<index_type>::max();
598 rhs.m_parent =
nullptr;
601template <
typename T, T Null, T Erased,
typename I,
typename A>
610template <
typename T, T Null, T Erased,
typename I,
typename A>
615 this->m_parent = rhs.m_parent;
616 rhs.m_current = std::numeric_limits<index_type>::max();
617 rhs.m_parent =
nullptr;
621template <
typename T, T Null, T Erased,
typename I,
typename A>
624 A>::Iterator::operator++() {
629template <
typename T, T Null, T Erased,
typename I,
typename A>
633 A>::Iterator::operator*()
const {
634 return this->m_parent->
m_array[this->m_parent->translate(this->m_current)];
637template <
typename T, T Null, T Erased,
typename I,
typename A>
640 A>::Iterator::operator++(int) {
641 auto to_return = (*this);
646template <
typename T, T Null, T Erased,
typename I,
typename A>
650 A>::Iterator::operator->()
const {
651 return &(this->m_parent->m_array[this->m_parent->translate(this->m_current)]);
654template <
typename T, T Null, T Erased,
typename I,
typename A>
656 T,
Null, Erased, I, A>::Iterator::operator==(
Iterator const &rhs)
const {
657 return this->m_current == rhs.m_current && this->m_parent == rhs.m_parent;
660template <
typename T, T Null, T Erased,
typename I,
typename A>
662 T,
Null, Erased, I, A>::Iterator::operator!=(
Iterator const &rhs)
const {
663 return !((*this) == rhs);
666template <
typename T, T Null, T Erased,
typename I,
typename A>
669 this->m_parent->m_array[this->m_parent->translate(this->m_current)].store(
673template <
typename T, T Null, T Erased,
typename I,
typename A>
674template <
typename D, T
M, T F,
typename J,
typename B,
675 std::enable_if_t<!std::is_same<B, std::nullptr_t>::value> *>
684template <
typename T, T Null, T Erased,
typename I,
typename A>
685template <
typename D, T
M, T F,
typename J,
typename B,
686 std::enable_if_t<std::is_same<B, std::nullptr_t>::value> *>
689 : m_capacity{
size}, m_array{
size, null_value}, m_head{0}, m_tail{0} {}
691template <
typename T, T Null, T Erased,
typename I,
typename A>
695 return this->m_array;
698template <
typename T, T Null, T Erased,
typename I,
typename A>
701 for (
size_t idx = 0; idx != this->m_array.size(); ++idx)
703 this->m_head->store(0);
704 this->m_tail->store(0);
707template <
typename T, T Null, T Erased,
typename I,
typename A>
710 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
711 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
715template <
typename T, T Null, T Erased,
typename I,
typename A>
718 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
719 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
720 return tail == head + this->capacity();
723template <
typename T, T Null, T Erased,
typename I,
typename A>
726 return this->m_head->load(std::memory_order_seq_cst) & clear_bit;
729template <
typename T, T Null, T Erased,
typename I,
typename A>
732 return this->m_tail->load(std::memory_order_seq_cst) & clear_bit;
735template <
typename T, T Null, T Erased,
typename I,
typename A>
738 auto head = this->head();
740 this->m_array[this->translate(head)].load(std::memory_order_seq_cst);
741 return to_return == Erased ?
Null : to_return;
744template <
typename T, T Null, T Erased,
typename I,
typename A>
747 auto tail = this->tail();
748 if (tail == 0)
return Null;
750 this->m_array[this->translate(tail - 1)].load(std::memory_order_seq_cst);
751 return to_return == Erased ?
Null : to_return;
754template <
typename T, T Null, T Erased,
typename I,
typename A>
762template <
typename T, T Null, T Erased,
typename I,
typename A>
765 const_reference_type to_push) {
766 return this->
push(to_push);
769template <
typename T, T Null, T Erased,
typename I,
typename A>
773 auto head = this->m_head->load(std::memory_order_acquire) & clear_bit;
774 auto tail = this->m_tail->load(std::memory_order_relaxed) & clear_bit;
777 this->state() = enum_queue_state::NO_MORE_ELEMENTS;
781 auto new_head = head + 1;
783 if (this->m_head->compare_exchange_strong(
786 std::memory_order_release)) {
787 auto ¤t = this->m_array[this->translate(head)];
794 current.compare_exchange_strong(
796 std::memory_order_release)) {
797 new_head &= clear_bit;
799 this->m_head->store(new_head, std::memory_order_seq_cst);
800 if (value == Erased) {
806 std::this_thread::yield();
809 std::this_thread::yield();
814template <
typename T, T Null, T Erased,
typename I,
typename A>
818 assert(to_push !=
Null && to_push != Erased);
821 auto tail = this->m_tail->load(std::memory_order_acquire) & clear_bit;
822 auto head = this->m_head->load(std::memory_order_relaxed) & clear_bit;
824 if (tail == head + this->m_capacity) {
825 this->state() = enum_queue_state::NO_SPACE_AVAILABLE;
829 auto new_tail = tail + 1;
831 if (this->m_tail->compare_exchange_strong(
834 std::memory_order_release)) {
835 auto ¤t = this->m_array[this->translate(tail)];
839 if (current.compare_exchange_strong(
843 std::memory_order_acquire)) {
844 new_tail &= clear_bit;
846 this->m_tail->store(new_tail, std::memory_order_seq_cst);
849 std::this_thread::yield();
853 std::this_thread::yield();
858template <
typename T, T Null, T Erased,
typename I,
typename A>
861 return Iterator{*
this, this->head()};
864template <
typename T, T Null, T Erased,
typename I,
typename A>
867 return Iterator{*
this, this->tail()};
870template <
typename T, T Null, T Erased,
typename I,
typename A>
871template <
typename D, T
M, T F,
typename J,
typename B,
typename Pred,
872 std::enable_if_t<M != F> *>
877 for (
size_t idx = 0; idx != this->m_capacity; ++idx) {
878 auto ¤t = this->m_array[idx];
879 T value = current.load(std::memory_order_acquire);
880 if (value !=
Null && value != Erased && predicate(value)) {
881 if (current.compare_exchange_strong(value, Erased,
882 std::memory_order_release)) {
890template <
typename T, T Null, T Erased,
typename I,
typename A>
893 return this->m_capacity;
896template <
typename T, T Null, T Erased,
typename I,
typename A>
898 A>::allocated_size()
const {
902template <
typename T, T Null, T Erased,
typename I,
typename A>
905 this->state() = enum_queue_state::SUCCESS;
909template <
typename T, T Null, T Erased,
typename I,
typename A>
913 return this->state();
916template <
typename T, T Null, T Erased,
typename I,
typename A>
920 for (
auto value : (*
this)) {
921 out << (value ==
Null
930template <
typename T, T Null, T Erased,
typename I,
typename A>
933 return static_cast<size_t>(from %
static_cast<index_type>(this->m_capacity));
936template <
typename T, T Null, T Erased,
typename I,
typename A>
938 A>::enum_queue_state &
Array of std::atomic elements of type T.
Definition: atomics_array.h:84
Iterator helper class to iterate over the queue staring at the virtual index pointed to by the head,...
Definition: integrals_lockfree_queue.h:198
Integrals_lockfree_queue< T, Null, Erased, I, A > const * m_parent
The reference to the queue holding the elements.
Definition: integrals_lockfree_queue.h:248
T * pointer
Definition: integrals_lockfree_queue.h:202
Iterator & operator=(const Iterator &rhs)
Definition: integrals_lockfree_queue.h:603
index_type m_current
The position of the element this iterator is pointing to.
Definition: integrals_lockfree_queue.h:246
void set(value_type new_value)
Sets the value of the element the iterator is pointing to the given parameter.
Definition: integrals_lockfree_queue.h:667
T reference
Definition: integrals_lockfree_queue.h:203
virtual ~Iterator()=default
T value_type
Definition: integrals_lockfree_queue.h:201
std::ptrdiff_t difference_type
Definition: integrals_lockfree_queue.h:200
std::forward_iterator_tag iterator_category
Definition: integrals_lockfree_queue.h:204
Iterator(Integrals_lockfree_queue< T, Null, Erased, I, A > const &parent, index_type position)
Lock-free, fixed-size bounded, multiple-producer (MP), multiple-consumer (MC), circular FIFO queue fo...
Definition: integrals_lockfree_queue.h:130
Integrals_lockfree_queue< T, Null, Erased, I, A > & push(value_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:816
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
friend std::ostream & operator<<(std::ostream &out, Integrals_lockfree_queue< T, Null, Erased, I, A > const &in)
Definition: integrals_lockfree_queue.h:551
size_t erase_if(Pred predicate)
Erases values from the queue.
Definition: integrals_lockfree_queue.h:873
value_type pop()
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:770
atomic_type m_tail
The virtual index being pointed to by the tail of the queue.
Definition: integrals_lockfree_queue.h:566
T const * const_pointer_type
Definition: integrals_lockfree_queue.h:138
unsigned long long index_type
Definition: integrals_lockfree_queue.h:144
Iterator begin() const
Retrieves an iterator instance that points to the same position pointed by the head of the queue.
Definition: integrals_lockfree_queue.h:860
size_t translate(index_type from) const
Translates a virtual monotonically increasing index into an index bounded to the queue capacity.
Definition: integrals_lockfree_queue.h:931
Integrals_lockfree_queue< T, Null, Erased, I, A > & clear_state()
Clears the error state of the last performed operations, if any.
Definition: integrals_lockfree_queue.h:904
size_t capacity() const
Returns the maximum number of elements allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:891
enum_queue_state get_state() const
Retrieves the error/success state of the last performed operation.
Definition: integrals_lockfree_queue.h:912
enum_queue_state
Definition: integrals_lockfree_queue.h:153
Integrals_lockfree_queue(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
index_type head() const
Retrieves the virtual index that the head of the queue is pointing to.
Definition: integrals_lockfree_queue.h:725
std::string to_string() const
Return this queue textual representation.
Definition: integrals_lockfree_queue.h:918
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > &&rhs)=delete
static constexpr T null_value
Definition: integrals_lockfree_queue.h:148
bool is_full() const
Retrieves whether or not the tail of the queue is pointing to the same virtual index as the computed ...
Definition: integrals_lockfree_queue.h:716
value_type front() const
Retrieves the element at the front of the queue, i.e.
Definition: integrals_lockfree_queue.h:737
T * pointer_type
Definition: integrals_lockfree_queue.h:137
Integrals_lockfree_queue(size_t size)
Constructor allowing specific queue capacity.
Definition: integrals_lockfree_queue.h:688
bool is_empty() const
Retrieves whether or not the head and tail of the queue are pointing to the same virtual index.
Definition: integrals_lockfree_queue.h:708
T const & const_reference_type
Definition: integrals_lockfree_queue.h:140
std::atomic< T > element_type
Definition: integrals_lockfree_queue.h:143
value_type back() const
Retrieves the value of the position at the back of the queue, i.e.
Definition: integrals_lockfree_queue.h:746
size_t allocated_size() const
Returns the amount of bytes needed to store the maximum number of elements allowed to coexist in the ...
Definition: integrals_lockfree_queue.h:898
enum_queue_state & state() const
Retrieves the thread storage duration operation state variable.
Definition: integrals_lockfree_queue.h:939
T value_type
Definition: integrals_lockfree_queue.h:141
array_type m_array
The array of atomics in which the elements will be stored.
Definition: integrals_lockfree_queue.h:562
virtual ~Integrals_lockfree_queue()=default
Destructor for the class.
atomic_type m_head
The virtual index being pointed to by the head of the queue.
Definition: integrals_lockfree_queue.h:564
T & reference_type
Definition: integrals_lockfree_queue.h:139
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator=(Integrals_lockfree_queue< T, Null, Erased, I, A > const &rhs)=delete
size_t m_capacity
The maximum allowed number of element allowed to coexist in the queue.
Definition: integrals_lockfree_queue.h:560
Iterator end() const
Retrieves an iterator instance that points to the same position pointed by the tail of the queue.
Definition: integrals_lockfree_queue.h:866
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator>>(reference_type out)
Retrieves the value at the virtual index pointed by the head of the queue, clears that position,...
Definition: integrals_lockfree_queue.h:756
index_type tail() const
Retrieves the virtual index that the tail of the queue is pointing to.
Definition: integrals_lockfree_queue.h:731
T const const_value_type
Definition: integrals_lockfree_queue.h:142
void clear()
Sets all queue positions to Null and points the head and tail of the queue to the 0 virtual index.
Definition: integrals_lockfree_queue.h:699
Integrals_lockfree_queue(A &alloc, size_t size)
Constructor allowing a specific memory allocator and a specific queue capacity.
Definition: integrals_lockfree_queue.h:677
array_type & array()
Returns the underlying instance of memory::Atomics_array which holds the allocated memory for the arr...
Definition: integrals_lockfree_queue.h:694
Integrals_lockfree_queue< T, Null, Erased, I, A > & operator<<(const_reference_type to_push)
Takes the value passed on as a parameter, stores it in the virtual index pointed to by the tail of th...
Definition: integrals_lockfree_queue.h:764
Indexing provider that pads each of the array elements to the size of the CPU cache line,...
Definition: atomics_array_index_padding.h:57
ostream & operator<<(ostream &os, const Datetime &)
Definition: logger.cc:35
#define SUCCESS
Definition: completion_hash.h:27
#define M
Definition: ctype-tis620.cc:73
bool store(THD *thd, const Table *tp)
Stores the SDI for a table.
Definition: sdi.cc:607
static std::string to_string(const LEX_STRING &str)
Definition: lex_string.h:50
bool operator!=(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:158
bool operator==(const my_thread_handle &a, const my_thread_handle &b)
Definition: my_thread.h:151
borrowable::binary::Null Null
Definition: classic_protocol_binary.h:337
Definition: atomics_array.h:39
size_t size(const char *const c)
Definition: base64.h:46
static mysql_service_status_t flush(reference_caching_cache cache) noexcept
Definition: component.cc:114
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2871
std::set< Key, Compare, ut::allocator< Key > > set
Specialization of set which uses ut_allocator.
Definition: ut0new.h:2883
std::map< Key, Value, Compare, ut::allocator< std::pair< const Key, Value > > > map
Specialization of map which uses ut_allocator.
Definition: ut0new.h:2893
static bool push(mem_root_deque< Item * > *items, qep_row::mem_root_str &s, Item_null *nil)
Definition: opt_explain_traditional.cc:94
Ssl_acceptor_context_property_type & operator++(Ssl_acceptor_context_property_type &property_type)
Increment operator for Ssl_acceptor_context_type Used by iterator.
Definition: ssl_acceptor_context_data.cc:273