MySQL 8.3.0
Source Code Documentation
ut0mpmcbq.h
Go to the documentation of this file.
1/*****************************************************************************
2Copyright (c) 2017, 2023, Oracle and/or its affiliates.
3
4This program is free software; you can redistribute it and/or modify
5it under the terms of the GNU General Public License, version 2.0,
6as published by the Free Software Foundation.
7
8This program is also distributed with certain software (including
9but not limited to OpenSSL) that is licensed under separate terms,
10as designated in a particular file or component or in included license
11documentation. The authors of MySQL hereby grant you an additional
12permission to link the program and your derivative works with the
13separately licensed software that they have included with MySQL.
14
15This program is distributed in the hope that it will be useful,
16but WITHOUT ANY WARRANTY; without even the implied warranty of
17MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18GNU General Public License, version 2.0, for more details.
19
20You should have received a copy of the GNU General Public License
21along with this program; if not, write to the Free Software
22Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
24*****************************************************************************/
25
26#ifndef ut0mpmcbq_h
27#define ut0mpmcbq_h
28
29#include "ut0cpu_cache.h"
30
31#include <atomic>
32
33/** Multiple producer consumer, bounded queue
34 Implementation of Dmitry Vyukov's MPMC algorithm
35 http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue */
36template <typename T>
37class mpmc_bq {
38 public:
39 /** Constructor
40 @param[in] n_elems Max number of elements allowed */
41 explicit mpmc_bq(size_t n_elems)
42 : m_ring(reinterpret_cast<Cell *>(ut::new_arr_withkey<Aligned>(
43 UT_NEW_THIS_FILE_PSI_KEY, ut::Count{n_elems}))),
44 m_capacity(n_elems - 1) {
45 /* Should be a power of 2 */
46 ut_a((n_elems >= 2) && ((n_elems & (n_elems - 1)) == 0));
47
48 for (size_t i = 0; i < n_elems; ++i) {
49 m_ring[i].m_pos.store(i, std::memory_order_relaxed);
50 }
51
52 m_enqueue_pos.store(0, std::memory_order_relaxed);
53 m_dequeue_pos.store(0, std::memory_order_relaxed);
54 }
55
56 /** Destructor */
58
59 /** Enqueue an element
60 @param[in] data Element to insert, it will be copied
61 @return true on success */
62 [[nodiscard]] bool enqueue(T const &data) {
63 /* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead
64 we use the capacity to convert the sequence to an array
65 index. This is why the ring buffer must be a size which
66 is a power of 2. This also allows the sequence to double
67 as a ticket/lock. */
68
69 size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
70
71 Cell *cell;
72
73 for (;;) {
74 cell = &m_ring[pos & m_capacity];
75
76 size_t seq;
77
78 seq = cell->m_pos.load(std::memory_order_acquire);
79
80 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
81
82 /* If they are the same then it means this cell is empty */
83
84 if (diff == 0) {
85 /* Claim our spot by moving head. If head isn't the same as we last
86 checked then that means someone beat us to the punch. Weak compare is
87 faster, but can return spurious results which in this instance is OK,
88 because it's in the loop */
89
90 if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1,
91 std::memory_order_relaxed)) {
92 break;
93 }
94
95 } else if (diff < 0) {
96 /* The queue is full */
97
98 return (false);
99
100 } else {
101 pos = m_enqueue_pos.load(std::memory_order_relaxed);
102 }
103 }
104
105 cell->m_data = data;
106
107 /* Increment the sequence so that the tail knows it's accessible */
108
109 cell->m_pos.store(pos + 1, std::memory_order_release);
110
111 return (true);
112 }
113
114 /** Dequeue an element
115 @param[out] data Element read from the queue
116 @return true on success */
117 [[nodiscard]] bool dequeue(T &data) {
118 Cell *cell;
119 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
120
121 for (;;) {
122 cell = &m_ring[pos & m_capacity];
123
124 size_t seq = cell->m_pos.load(std::memory_order_acquire);
125
126 auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
127
128 if (diff == 0) {
129 /* Claim our spot by moving the head. If head isn't the same as we last
130 checked then that means someone beat us to the punch. Weak compare is
131 faster, but can return spurious results. Which in this instance is
132 OK, because it's in the loop. */
133
134 if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1,
135 std::memory_order_relaxed)) {
136 break;
137 }
138
139 } else if (diff < 0) {
140 /* The queue is empty */
141 return (false);
142
143 } else {
144 /* Under normal circumstances this branch should never be taken. */
145 pos = m_dequeue_pos.load(std::memory_order_relaxed);
146 }
147 }
148
149 data = cell->m_data;
150
151 /* Set the sequence to what the head sequence should be next
152 time around */
153
154 cell->m_pos.store(pos + m_capacity + 1, std::memory_order_release);
155
156 return (true);
157 }
158
159 /** @return the capacity of the queue */
160 [[nodiscard]] size_t capacity() const { return (m_capacity + 1); }
161
162 /** @return true if the queue is empty. */
163 [[nodiscard]] bool empty() const {
164 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
165
166 for (;;) {
167 auto cell = &m_ring[pos & m_capacity];
168
169 size_t seq = cell->m_pos.load(std::memory_order_acquire);
170
171 auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
172
173 if (diff == 0) {
174 return (false);
175 } else if (diff < 0) {
176 return (true);
177 } else {
178 pos = m_dequeue_pos.load(std::memory_order_relaxed);
179 }
180 }
181 }
182
183 private:
185
186 struct Cell {
187 std::atomic<size_t> m_pos;
189 };
190
191 using Aligned =
192 typename std::aligned_storage<sizeof(Cell),
193 std::alignment_of<Cell>::value>::type;
194
196 Cell *const m_ring;
197 size_t const m_capacity;
199 std::atomic<size_t> m_enqueue_pos;
201 std::atomic<size_t> m_dequeue_pos;
203
204 mpmc_bq(mpmc_bq &&) = delete;
205 mpmc_bq(const mpmc_bq &) = delete;
206 mpmc_bq &operator=(mpmc_bq &&) = delete;
207 mpmc_bq &operator=(const mpmc_bq &) = delete;
208};
209
210#endif /* ut0mpmcbq_h */
Multiple producer consumer, bounded queue Implementation of Dmitry Vyukov's MPMC algorithm http://www...
Definition: ut0mpmcbq.h:37
Pad m_pad3
Definition: ut0mpmcbq.h:202
size_t const m_capacity
Definition: ut0mpmcbq.h:197
mpmc_bq(const mpmc_bq &)=delete
std::atomic< size_t > m_dequeue_pos
Definition: ut0mpmcbq.h:201
bool empty() const
Definition: ut0mpmcbq.h:163
Pad m_pad2
Definition: ut0mpmcbq.h:200
typename std::aligned_storage< sizeof(Cell), std::alignment_of< Cell >::value >::type Aligned
Definition: ut0mpmcbq.h:193
size_t capacity() const
Definition: ut0mpmcbq.h:160
bool dequeue(T &data)
Dequeue an element.
Definition: ut0mpmcbq.h:117
bool enqueue(T const &data)
Enqueue an element.
Definition: ut0mpmcbq.h:62
mpmc_bq & operator=(const mpmc_bq &)=delete
Pad m_pad0
Definition: ut0mpmcbq.h:195
Pad m_pad1
Definition: ut0mpmcbq.h:198
mpmc_bq(size_t n_elems)
Constructor.
Definition: ut0mpmcbq.h:41
byte[ut::INNODB_CACHE_LINE_SIZE] Pad
Definition: ut0mpmcbq.h:184
Cell *const m_ring
Definition: ut0mpmcbq.h:196
std::atomic< size_t > m_enqueue_pos
Definition: ut0mpmcbq.h:199
~mpmc_bq()
Destructor.
Definition: ut0mpmcbq.h:57
mpmc_bq & operator=(mpmc_bq &&)=delete
mpmc_bq(mpmc_bq &&)=delete
static Bigint * diff(Bigint *a, Bigint *b, Stack_alloc *alloc)
Definition: dtoa.cc:1076
This file contains a set of libraries providing overloads for regular dynamic allocation routines whi...
Definition: aligned_alloc.h:47
constexpr size_t INNODB_CACHE_LINE_SIZE
CPU cache line size.
Definition: ut0cpu_cache.h:40
T * new_arr_withkey(PSI_memory_key_t key, Args &&... args)
Dynamically allocates storage for an array of T's.
Definition: ut0new.h:873
void delete_arr(T *ptr) noexcept
Releases storage which has been dynamically allocated through any of the ut::new_arr*() variants.
Definition: ut0new.h:1108
required string type
Definition: replication_group_member_actions.proto:33
Definition: ut0mpmcbq.h:186
T m_data
Definition: ut0mpmcbq.h:188
std::atomic< size_t > m_pos
Definition: ut0mpmcbq.h:187
Utilities related to CPU cache.
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:92
#define UT_NEW_THIS_FILE_PSI_KEY
Definition: ut0new.h:563