MySQL 9.1.0
Source Code Documentation
ut0mpmcbq.h
Go to the documentation of this file.
1/*****************************************************************************
2Copyright (c) 2017, 2024, Oracle and/or its affiliates.
3
4This program is free software; you can redistribute it and/or modify
5it under the terms of the GNU General Public License, version 2.0,
6as published by the Free Software Foundation.
7
8This program is designed to work with certain software (including
9but not limited to OpenSSL) that is licensed under separate terms,
10as designated in a particular file or component or in included license
11documentation. The authors of MySQL hereby grant you an additional
12permission to link the program and your derivative works with the
13separately licensed software that they have either included with
14the program or referenced in the documentation.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License, version 2.0, for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program; if not, write to the Free Software
23Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25*****************************************************************************/
26
27#ifndef ut0mpmcbq_h
28#define ut0mpmcbq_h
29
30#include "ut0cpu_cache.h"
31
32#include <atomic>
33
34/** Multiple producer consumer, bounded queue
35 Implementation of Dmitry Vyukov's MPMC algorithm
36 http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue */
37template <typename T>
38class mpmc_bq {
39 public:
40 /** Constructor
41 @param[in] n_elems Max number of elements allowed */
42 explicit mpmc_bq(size_t n_elems)
43 : m_ring(reinterpret_cast<Cell *>(ut::new_arr_withkey<Aligned>(
44 UT_NEW_THIS_FILE_PSI_KEY, ut::Count{n_elems}))),
45 m_capacity(n_elems - 1) {
46 /* Should be a power of 2 */
47 ut_a((n_elems >= 2) && ((n_elems & (n_elems - 1)) == 0));
48
49 for (size_t i = 0; i < n_elems; ++i) {
50 m_ring[i].m_pos.store(i, std::memory_order_relaxed);
51 }
52
53 m_enqueue_pos.store(0, std::memory_order_relaxed);
54 m_dequeue_pos.store(0, std::memory_order_relaxed);
55 }
56
57 /** Destructor */
59
60 /** Enqueue an element
61 @param[in] data Element to insert, it will be copied
62 @return true on success */
63 [[nodiscard]] bool enqueue(T const &data) {
64 /* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead
65 we use the capacity to convert the sequence to an array
66 index. This is why the ring buffer must be a size which
67 is a power of 2. This also allows the sequence to double
68 as a ticket/lock. */
69
70 size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
71
72 Cell *cell;
73
74 for (;;) {
75 cell = &m_ring[pos & m_capacity];
76
77 size_t seq;
78
79 seq = cell->m_pos.load(std::memory_order_acquire);
80
81 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
82
83 /* If they are the same then it means this cell is empty */
84
85 if (diff == 0) {
86 /* Claim our spot by moving head. If head isn't the same as we last
87 checked then that means someone beat us to the punch. Weak compare is
88 faster, but can return spurious results which in this instance is OK,
89 because it's in the loop */
90
91 if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1,
92 std::memory_order_relaxed)) {
93 break;
94 }
95
96 } else if (diff < 0) {
97 /* The queue is full */
98
99 return (false);
100
101 } else {
102 pos = m_enqueue_pos.load(std::memory_order_relaxed);
103 }
104 }
105
106 cell->m_data = data;
107
108 /* Increment the sequence so that the tail knows it's accessible */
109
110 cell->m_pos.store(pos + 1, std::memory_order_release);
111
112 return (true);
113 }
114
115 /** Dequeue an element
116 @param[out] data Element read from the queue
117 @return true on success */
118 [[nodiscard]] bool dequeue(T &data) {
119 Cell *cell;
120 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
121
122 for (;;) {
123 cell = &m_ring[pos & m_capacity];
124
125 size_t seq = cell->m_pos.load(std::memory_order_acquire);
126
127 auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
128
129 if (diff == 0) {
130 /* Claim our spot by moving the head. If head isn't the same as we last
131 checked then that means someone beat us to the punch. Weak compare is
132 faster, but can return spurious results. Which in this instance is
133 OK, because it's in the loop. */
134
135 if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1,
136 std::memory_order_relaxed)) {
137 break;
138 }
139
140 } else if (diff < 0) {
141 /* The queue is empty */
142 return (false);
143
144 } else {
145 /* Under normal circumstances this branch should never be taken. */
146 pos = m_dequeue_pos.load(std::memory_order_relaxed);
147 }
148 }
149
150 data = cell->m_data;
151
152 /* Set the sequence to what the head sequence should be next
153 time around */
154
155 cell->m_pos.store(pos + m_capacity + 1, std::memory_order_release);
156
157 return (true);
158 }
159
160 /** @return the capacity of the queue */
161 [[nodiscard]] size_t capacity() const { return (m_capacity + 1); }
162
163 /** @return true if the queue is empty. */
164 [[nodiscard]] bool empty() const {
165 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
166
167 for (;;) {
168 auto cell = &m_ring[pos & m_capacity];
169
170 size_t seq = cell->m_pos.load(std::memory_order_acquire);
171
172 auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
173
174 if (diff == 0) {
175 return (false);
176 } else if (diff < 0) {
177 return (true);
178 } else {
179 pos = m_dequeue_pos.load(std::memory_order_relaxed);
180 }
181 }
182 }
183
184 private:
186
187 struct Cell {
188 std::atomic<size_t> m_pos;
190 };
191
192 using Aligned =
193 typename std::aligned_storage<sizeof(Cell),
194 std::alignment_of<Cell>::value>::type;
195
197 Cell *const m_ring;
198 size_t const m_capacity;
200 std::atomic<size_t> m_enqueue_pos;
202 std::atomic<size_t> m_dequeue_pos;
204
205 mpmc_bq(mpmc_bq &&) = delete;
206 mpmc_bq(const mpmc_bq &) = delete;
207 mpmc_bq &operator=(mpmc_bq &&) = delete;
208 mpmc_bq &operator=(const mpmc_bq &) = delete;
209};
210
211#endif /* ut0mpmcbq_h */
Multiple producer consumer, bounded queue Implementation of Dmitry Vyukov's MPMC algorithm http://www...
Definition: ut0mpmcbq.h:38
Pad m_pad3
Definition: ut0mpmcbq.h:203
size_t const m_capacity
Definition: ut0mpmcbq.h:198
mpmc_bq(const mpmc_bq &)=delete
std::atomic< size_t > m_dequeue_pos
Definition: ut0mpmcbq.h:202
bool empty() const
Definition: ut0mpmcbq.h:164
Pad m_pad2
Definition: ut0mpmcbq.h:201
typename std::aligned_storage< sizeof(Cell), std::alignment_of< Cell >::value >::type Aligned
Definition: ut0mpmcbq.h:194
size_t capacity() const
Definition: ut0mpmcbq.h:161
bool dequeue(T &data)
Dequeue an element.
Definition: ut0mpmcbq.h:118
bool enqueue(T const &data)
Enqueue an element.
Definition: ut0mpmcbq.h:63
mpmc_bq & operator=(const mpmc_bq &)=delete
Pad m_pad0
Definition: ut0mpmcbq.h:196
Pad m_pad1
Definition: ut0mpmcbq.h:199
mpmc_bq(size_t n_elems)
Constructor.
Definition: ut0mpmcbq.h:42
byte[ut::INNODB_CACHE_LINE_SIZE] Pad
Definition: ut0mpmcbq.h:185
Cell *const m_ring
Definition: ut0mpmcbq.h:197
std::atomic< size_t > m_enqueue_pos
Definition: ut0mpmcbq.h:200
~mpmc_bq()
Destructor.
Definition: ut0mpmcbq.h:58
mpmc_bq & operator=(mpmc_bq &&)=delete
mpmc_bq(mpmc_bq &&)=delete
static Bigint * diff(Bigint *a, Bigint *b, Stack_alloc *alloc)
Definition: dtoa.cc:1081
This file contains a set of libraries providing overloads for regular dynamic allocation routines whi...
Definition: aligned_alloc.h:48
constexpr size_t INNODB_CACHE_LINE_SIZE
CPU cache line size.
Definition: ut0cpu_cache.h:41
void delete_arr(T *ptr) noexcept
Releases storage which has been dynamically allocated through any of the ut::new_arr*() variants.
Definition: ut0new.h:1111
T * new_arr_withkey(PSI_memory_key_t key, Args &&...args)
Dynamically allocates storage for an array of T's.
Definition: ut0new.h:876
required string type
Definition: replication_group_member_actions.proto:34
Definition: ut0mpmcbq.h:187
T m_data
Definition: ut0mpmcbq.h:189
std::atomic< size_t > m_pos
Definition: ut0mpmcbq.h:188
Utilities related to CPU cache.
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93
#define UT_NEW_THIS_FILE_PSI_KEY
Definition: ut0new.h:566