MySQL 9.3.0
Source Code Documentation
ut0counting_semaphore.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2024, 2025, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is also distributed with certain software (including but not
10limited to OpenSSL) that is licensed under separate terms, as designated in a
11particular file or component or in included license documentation. The authors
12of MySQL hereby grant you an additional permission to link the program and
13your derivative works with the separately licensed software that they have
14included with MySQL.
15
16This program is distributed in the hope that it will be useful, but WITHOUT
17ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19for more details.
20
21You should have received a copy of the GNU General Public License along with
22this program; if not, write to the Free Software Foundation, Inc.,
2351 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25*****************************************************************************/
26
27#pragma once
28#include "ut0class_life_cycle.h"
29
30#include <atomic>
31#include <condition_variable>
32#include <cstdint>
33#include <mutex>
34#include <type_traits>
35
36namespace ut {
37
38namespace detail {
39/* TODO: Simplify this once all platforms support std::atomic<T>::notify_one().
40In particular gcc 10.2 which we use on ARM platforms with EL 7, still has
41no support for it, thus we provide a workaround which uses std::mutex and
42std::condition_variable. */
43
44/** A SFINAE helper type which has value=true if and only if std::atomic<T>
45supports wait(T). This is the generic fallback which sets it to false. */
46template <typename, typename = void>
47struct has_atomic_wait : std::false_type {};
48
49/** A SFINAE specialization which kicks in when std::atomic::wait(T) is
50implemented. */
51template <typename T>
52struct has_atomic_wait<T, std::void_t<decltype(std::atomic<T>().wait(T{}))>>
53 : std::true_type {};
54
55/** A SFINAE helper from which ut::counting_semaphore inherits whole behaviour.
56The has_atomic_wait_support argument is not meant to be specified explicitly.
57Instead it will be deduced to be true or false based on the platform.
58Then exactly one of the two variants will be instantiated.
59Also, in practice T will always be uint32_t, but it has to be a type variable,
60so that `has_atomic_wait_support` can *depend* on it. */
63
64/** Just like std::counting_semaphore except that:
65
66 - acquire(foo) will call foo() whenever it sees the counter==0, which can be
67 used to trigger some kind of wakeup mechanism for whoever is hogging the
68 resources.
69*/
70template <typename T>
72 public:
73 /** Initializes the counter to initial_value */
75 : m_counter{initial_value}, m_waiters{0} {}
76 /** Waits until the counter is positive, and decrements it by one. */
77 template <typename F>
78 void acquire(F &&execute_when_zero) {
79 for (auto seen = m_counter.load();;) {
80 if (seen == 0) {
81 std::forward<F>(execute_when_zero)();
82 m_waiters.fetch_add(1);
83 /* If this thread goes to sleep here, and is in the kernel at moment T,
84 then there will be a moment L after T, at which m_waiters.load() will
85 be executed from release(). This is sufficient to prove that as long
86 as someone is sleeping in the kernel, someone else will try to wake
87 them, as m_waiters.load() is non zero as long as someone sleeps, and
88 thus m_counter.notify_one() will be called.
89
90 The proof that such moment L must exist is by contradiction.
91
92 If there is no such moment, then there must be well defined "last
93 moment at which m_counter.fetch_add(1) was called", let's call it M.
94 Clearly M is before T. The same thread which executed this last
95 fetch_add, then faced one of 3 cases:
96
97 A) It saw m_waiters.load() == 0, and did nothing.
98
99 B) It saw m_waiters.load() > 0, called m_counter.notify_one(), but
100 nobody was asleep at that moment, so woke up nobody.
101
102 C) It saw m_waiters.load() > 0, called m_counter.notify_one() and woken
103 up somebody.
104
105 In Case (A), note that since the moment when the sleeping thread
106 incremented the m_waiters, it remained above zero at least until the
107 moment T at which it still was asleep. This means this
108 m_waiters.fetch_add(1) had to happen after M. The sleeping thread went
109 to sleep because m_counter.wait(0) saw m_counter==0, and that
110 happened-after M. That means that at least one thread successfully
111 performed CAS, and thus acquire()d after moment M. But the contract
112 requires that after each acquire() someone calls release(). So there
113 must be another call to m_counter.fetch_add(1) after M. The
114 contradiction ends the proof.
115
116 In Case (B), note that at moment T the thread is asleep, so the call to
117 m_counter.notify_one() must've executed the check for sleepers inside
118 notify_one() at a moment which is before the sleeping thread has been
119 executing the check for m_counter==0 and going to sleep. So, m_counter
120 was equal to 0 at some point after M. But that means someone
121 successfully executed CAS after M, and just like in case (A) someone
122 will call release() after M, contradicting the definition of M.
123
124 In Case (C), this woken up thread is one of runnable threads which have
125 a chance to take a look at value of m_counter which was incremented at
126 moment M. Either this thread, or some other thread, will therefore
127 succeed to CAS it. */
128 m_counter.wait(0);
129 m_waiters.fetch_sub(1);
130 seen = m_counter.load();
131 } else if (m_counter.compare_exchange_weak(seen, seen - 1)) {
132 return;
133 }
134 }
135 }
136 /** Increments the counter by one */
137 void release() {
138 m_counter.fetch_add(1);
139 if (0 < m_waiters.load()) {
140 m_counter.notify_one();
141 }
142 }
143
144 private:
145 std::atomic<T> m_counter;
146 std::atomic<size_t> m_waiters;
147};
148
149/** A specialization to be used on old platforms such as gcc 10.2, which lack
150support for atomic wait. The implementation uses mutex and conditional_variable
151to achieve same observable behaviour in less optimal way. Still, it tries to use
152notify_one() instead of notify_all() and only when m_waiters is non-zero. */
153template <typename T>
155 public:
157 : m_counter{initial_value}, m_waiters{0} {}
158 template <typename F>
159 void acquire(F &&execute_when_zero) {
160 std::unique_lock guard{m_mutex};
161 ++m_waiters;
162 /* If this thread goes to sleep here, and is in the kernel at moment T,
163 then there will be a moment L after T, when release() will be executed.
164 This is sufficient to prove that as long as someone is sleeping in the
165 kernel, someone else will try to wake them, as m_waiters is non zero,
166 as long as someone sleeps, and thus m_is_non_zero.notify_one() will be
167 called from release().
168
169 The proof that such moment L must exist is by contradiction.
170
171 If there is no such moment, then there must be well defined "last
172 moment at which release() was called", let's call it M.
173 Clearly M is before T. The same thread which executed it faced one
174 of 3 cases:
175
176 A) It saw m_waiters == 0, and did nothing.
177
178 B) It saw m_waiters > 0, called m_is_non_zero.notify_one(), but
179 nobody was asleep at that moment, so woke up nobody.
180
181 C) It saw m_waiters > 0, called m_is_non_zero.notify_one() and woken
182 up somebody.
183
184 In Case (A), note that since the moment when the sleeping thread
185 did ++m_waiters, it remained above zero at least until the
186 moment T at which it still was asleep. But this ++m_waiters happens
187 under m_mutex, so had to happen after or before release() at M. It must
188 have happened after, given that M saw m_waiters==0. The thread which
189 went to sleep saw m_counter == 0, in same critical section, so it
190 means some time after release() in M which did ++m_counter, someone
191 decremented it again in acquire(). That means someone successfully
192 performed acquire(), after moment M. The contract requires that after
193 each acquire() one calls release(). So, there must be another call to
194 release() after M. The contradiction ends the proof.
195
196 In Case (B), note that at moment T the thread is asleep, so the call to
197 m_is_non_zero.notify_one() must've executed the check for sleepers inside
198 notify_one() at a moment which is before the sleeping thread has been
199 executing the check for m_counter==0 and going to sleep. So, m_counter
200 was equal to 0 at some point after M. But that means someone successfully
201 executed acquire() after M, and just like in case (A), will call
202 release() after M, contradicting the definition of M.
203
204 In Case (C), this woken up thread is one of runnable threads which have a
205 chance to take a look at value of m_counter which was incremented at moment
206 M. Either this thread, or some other thread, will therefore succeed to call
207 acquire(), and then release(), again contradicting the definition of M. */
208 m_is_non_zero.wait(guard, [&]() {
209 if (0 < m_counter) {
210 return true;
211 }
212 std::forward<F>(execute_when_zero)();
213 return false;
214 });
215 --m_waiters;
216 --m_counter;
217 }
218 void release() {
219 std::lock_guard guard{m_mutex};
220 ++m_counter;
221 if (0 < m_waiters) {
222 m_is_non_zero.notify_one();
223 }
224 }
225
226 private:
227 std::mutex m_mutex;
228 std::condition_variable m_is_non_zero;
230 size_t m_waiters;
231};
232
233} // namespace detail
234
235/** Just like std::counting_semaphore except that:
236
237 - acquire(foo) will call foo() whenever it sees the counter==0, which can be
238 used to trigger some kind of wakeup mechanism for whoever is hogging the
239 resources.
240
241 - compiles on platforms which do not have std::counting_semaphore like
242 gcc 10.2
243
244 - implementation is tailored to specific platform, using
245 std::atomic<uint32_t>::wait()/notify_one() where possible, and falling back
246 to std::mutex + std::conditional_variable on older platforms.
247
248 - The counter is forced to be 32-bit to ensure efficient underlying
249 implementation. The std::counting_sempahore<T> relies on
250 std::atomic<T>::wait()/notify() mechanism, which in turn is implemented by
251 using operating system primitives. On Windows, WaitOnAddress can handle
252 64-bit memory locations just fine. But on Linux gcc uses a Futex mechanism
253 which supports 32-bit memory locations only. The libstdc++ tries to be more
254 abstract and handle even 64-bit integers by hashing their addresses to a
255 shared pool of 32-bit integers, and uses Futex on them. But because of
256 https://gcc.gnu.org/bugzilla/show_bug.cgi?id=115955 it causes contention.
257 So, our ut::counting_semaphore does not let you specify the type of the
258 counter forcing it to be std::atomic<uint32_t> which guarantees fast
259 execution on Linux, too.
260*/
262
263} // namespace ut
A utility class which, if inherited from, prevents the descendant class from being copied,...
Definition: ut0class_life_cycle.h:41
size_t m_waiters
Definition: ut0counting_semaphore.h:230
counting_semaphore_base(T initial_value)
Definition: ut0counting_semaphore.h:156
T m_counter
Definition: ut0counting_semaphore.h:229
std::mutex m_mutex
Definition: ut0counting_semaphore.h:227
std::condition_variable m_is_non_zero
Definition: ut0counting_semaphore.h:228
void acquire(F &&execute_when_zero)
Definition: ut0counting_semaphore.h:159
void release()
Definition: ut0counting_semaphore.h:218
void release()
Increments the counter by one.
Definition: ut0counting_semaphore.h:137
counting_semaphore_base(T initial_value)
Initializes the counter to initial_value.
Definition: ut0counting_semaphore.h:74
std::atomic< T > m_counter
Definition: ut0counting_semaphore.h:145
std::atomic< size_t > m_waiters
Definition: ut0counting_semaphore.h:146
void acquire(F &&execute_when_zero)
Waits until the counter is positive, and decrements it by one.
Definition: ut0counting_semaphore.h:78
A SFINAE helper from which ut::counting_semaphore inherits whole behaviour.
Definition: ut0counting_semaphore.h:62
#define T
Definition: jit_executor_value.cc:373
#define F
Definition: jit_executor_value.cc:374
Definition: fts0fts.cc:238
ValueType value(const std::optional< ValueType > &v)
Definition: gtid.h:83
Definition: gcs_xcom_synode.h:64
This file contains a set of libraries providing overloads for regular dynamic allocation routines whi...
Definition: aligned_alloc.h:48
A SFINAE helper type which has value=true if and only if std::atomic<T> supports wait(T).
Definition: ut0counting_semaphore.h:47
Utilities related to class lifecycle.