MySQL 8.0.33
Source Code Documentation
os0thread-create.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 1995, 2023, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is also distributed with certain software (including but not
10limited to OpenSSL) that is licensed under separate terms, as designated in a
11particular file or component or in included license documentation. The authors
12of MySQL hereby grant you an additional permission to link the program and
13your derivative works with the separately licensed software that they have
14included with MySQL.
15
16This program is distributed in the hope that it will be useful, but WITHOUT
17ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19for more details.
20
21You should have received a copy of the GNU General Public License along with
22this program; if not, write to the Free Software Foundation, Inc.,
2351 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25*****************************************************************************/
26
27/** @file include/os0thread-create.h
28 The interface to the threading wrapper
29
30 Created 2016-May-17 Sunny Bains
31 *******************************************************/
32
33#ifndef os0thread_create_h
34#define os0thread_create_h
35
36#include <my_thread.h>
37
38#include "univ.i"
39
40#include "os0thread.h"
42
43#include <atomic>
44#include <functional>
45
46/** Maximum number of threads inside InnoDB */
47extern uint32_t srv_max_n_threads;
48
49/** Number of threads active. */
50extern std::atomic_int os_thread_count;
51
52/** Initializes OS thread management data structures. */
53inline void os_thread_open() { /* No op */
54}
55
56/** Check if there are threads active.
57@return true if the thread count > 0. */
58inline bool os_thread_any_active() {
59 return os_thread_count.load(std::memory_order_relaxed) > 0;
60}
61
62/** Frees OS thread management data structures. */
63inline void os_thread_close() {
65 ib::warn(ER_IB_MSG_1274, os_thread_count.load(std::memory_order_relaxed));
66 }
67}
68
69/** Register with MySQL infrastructure. */
71 public:
72#ifdef UNIV_PFS_THREAD
73 /** Constructor for the Runnable object.
74 @param[in] pfs_key Performance schema key
75 @param[in] pfs_seqnum Performance schema sequence number */
76 explicit MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
77 : m_pfs_key(pfs_key), m_pfs_seqnum(pfs_seqnum) {}
78#else
79 /** Constructor for the Runnable object.
80 @param[in] pfs_key Performance schema key (ignored)
81 @param[in] pfs_seqnum Performance schema sequence number */
83#endif /* UNIV_PFS_THREAD */
84
85 protected:
86 /** Register the thread with the server */
87 void preamble() {
88 const bool ret = my_thread_init();
89 ut_a(!ret);
90
91#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
93 auto &value = m_pfs_key.m_value;
94 auto psi = PSI_THREAD_CALL(new_thread)(value, m_pfs_seqnum, this, 0);
95
96 PSI_THREAD_CALL(set_thread_os_id)(psi);
97 PSI_THREAD_CALL(set_thread)(psi);
98 }
99#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
100 }
101
102 /** Deregister the thread */
103 void epilogue() {
105
106#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
108 PSI_THREAD_CALL(delete_current_thread)();
109 }
110#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
111 }
112
113 /** @return a THD instance. */
114 THD *create_mysql_thd() noexcept {
115#ifdef UNIV_PFS_THREAD
116 return create_thd(false, true, true, m_pfs_key.m_value, m_pfs_seqnum);
117#else
118 return create_thd(false, true, true, 0, 0);
119#endif /* UNIV_PFS_THREAD */
120 }
121
122 /** Destroy a THD instance.
123 @param[in,out] thd Instance to destroy. */
124 void destroy_mysql_thd(THD *thd) noexcept { destroy_thd(thd); }
125
126 protected:
127#ifdef UNIV_PFS_THREAD
128 /** Performance schema key */
130
131 /** Performance schema sequence number */
133#endif /* UNIV_PFS_THREAD */
134};
135
136/** Execute in the context of a non detached MySQL thread. */
137class Runnable : public MySQL_thread {
138 public:
139 /** Constructor for the Runnable object.
140 @param[in] pfs_key Performance schema key
141 @param[in] pfs_seqnum Performance schema sequence number */
142 explicit Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
143 : MySQL_thread(pfs_key, pfs_seqnum) {}
144
145 /** Method to execute the callable
146 @param[in] f Callable object
147 @param[in] args Variable number of args to F
148 @retval f return value. */
149 template <typename F, typename... Args>
150 dberr_t operator()(F &&f, Args &&... args) {
152
153 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
154
155 auto r = task();
156
158
159 return r;
160 }
161};
162
163/** Wrapper for a callable, it will count the number of registered
164Runnable instances and will register the thread executing the callable
165with the PFS and the Server threading infrastructure. */
167 public:
168 /** Constructor for the detached thread.
169 @param[in] pfs_key Performance schema key
170 @param[in] pfs_seqnum Performance schema sequence number */
172 PSI_thread_seqnum pfs_seqnum)
173 : MySQL_thread(pfs_key, pfs_seqnum) {
174 init();
175 }
176
177 /** Method to execute the callable
178 @param[in] f Callable object
179 @param[in] args Variable number of args to F */
180 template <typename F, typename... Args>
181 void operator()(F &&f, Args &&... args) {
183 UT_RELAX_CPU();
184 }
185
187
188 preamble();
189
191
192 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
193
194 task();
195
196 epilogue();
197
199 }
200
201 /** @return thread handle. */
202 IB_thread thread() const { return (m_thread); }
203
204 private:
205 /** Initializes the m_shared_future, uses the m_promise's get_future,
206 which cannot be used since then, according to its documentation. */
208
209 /** Register the thread with the server */
210 void preamble() {
212
213 std::atomic_thread_fence(std::memory_order_release);
214
215 auto old = os_thread_count.fetch_add(1, std::memory_order_relaxed);
216
217 ut_a(old <= static_cast<int>(srv_max_n_threads) - 1);
218 }
219
220 /** Deregister the thread */
221 void epilogue() {
222 m_promise.set_value();
223
224 std::atomic_thread_fence(std::memory_order_release);
225
226 auto old = os_thread_count.fetch_sub(1, std::memory_order_relaxed);
227
228 ut_a(old > 0);
229
231 }
232
233 private:
234 /** Future object which keeps the ref counter >= 1 at least
235 as long as the Detached_thread is not-destroyed. */
237
238 /** Promise which is set when task is done. */
239 std::promise<void> m_promise;
240};
241
242/** Check if thread is stopped
243@param[in] thread Thread handle.
244@return true if the thread has started, finished tasks and stopped. */
245inline bool thread_is_stopped(const IB_thread &thread) {
246 return thread.state() == IB_thread::State::STOPPED;
247}
248
249/** Check if thread is active
250@param[in] thread Thread handle.
251@return true if the thread is active. */
252inline bool thread_is_active(const IB_thread &thread) {
253 switch (thread.state()) {
255 /* Not yet started. */
256 return false;
257
259 /* Thread "thread" is already active, but start() has not been called.
260 Note that when start() is called, the thread's routine may decide to
261 check if it is active or trigger other thread to do similar check
262 regarding "thread". That could happen faster than thread's state
263 is advanced from ALLOWED_TO_START to STARTED. Therefore we must
264 already consider such thread as "active". */
265 return true;
266
268 /* Note, that potentially the thread might be doing its cleanup after
269 it has already ended its task. We still consider it active, until the
270 cleanup is finished. */
271 return true;
272
274 /* Ended its task and became marked as STOPPED (cleanup finished) */
275 return false;
276
278 default:
279 /* The thread object has not been assigned yet. */
280 return false;
281 }
282
283 /* Note that similar goal was achieved by the usage of shared_future:
284 return (shared_future.valid() && shared_future.wait_for(std::chrono::seconds(
285 0)) != std::future_status::ready);
286 However this resulted in longer execution of mtr tests (63minutes ->
287 75minutes). You could try `mtr --mem collations.esperanto` (cmake
288 WITH_DEBUG=1) */
289}
290
291/** Create a detached non-started thread. After thread is created, you should
292assign the received object to any of variables/fields which you later could
293access to check thread's state. You are allowed to either move or copy that
294object (any number of copies is allowed). After assigning you are allowed to
295start the thread by calling start() on any of those objects.
296@param[in] pfs_key Performance schema thread key
297@param[in] pfs_seqnum Performance schema thread sequence number
298@param[in] f Callable instance
299@param[in] args Zero or more args
300@return Object which allows to start the created thread, monitor its state and
301 wait until the thread is finished. */
302template <typename F, typename... Args>
304 PSI_thread_seqnum pfs_seqnum, F &&f,
305 Args &&... args) {
306 Detached_thread detached_thread{pfs_key, pfs_seqnum};
307 auto thread = detached_thread.thread();
308
309 std::thread t(std::move(detached_thread), f, args...);
310 t.detach();
311
312 /* Thread t is doing busy waiting until the state is changed
313 from NOT_STARTED to ALLOWED_TO_START. That will happen when
314 thread.start() will be called. */
315 ut_a(thread.state() == IB_thread::State::NOT_STARTED);
316
317 return thread;
318}
319
320#ifdef UNIV_PFS_THREAD
321#define os_thread_create(...) create_detached_thread(__VA_ARGS__)
322#else
323#define os_thread_create(k, s, ...) create_detached_thread(0, 0, __VA_ARGS__)
324#endif /* UNIV_PFS_THREAD */
325
326/** Parallel for loop over a container.
327@param[in] pfs_key Performance schema thread key
328@param[in] c Container to iterate over in parallel
329@param[in] n Number of threads to create
330@param[in] f Callable instance
331@param[in] args Zero or more args */
332template <typename Container, typename F, typename... Args>
333void par_for(mysql_pfs_key_t pfs_key, const Container &c, size_t n, F &&f,
334 Args &&... args) {
335 if (c.empty()) {
336 return;
337 }
338
339 size_t slice = (n > 0) ? c.size() / n : 0;
340
341 using Workers = std::vector<IB_thread>;
342
343 Workers workers;
344
345 workers.reserve(n);
346
347 for (size_t i = 0; i < n; ++i) {
348 auto b = c.begin() + (i * slice);
349 auto e = b + slice;
350
351 auto worker = os_thread_create(pfs_key, i, f, b, e, i, args...);
352 worker.start();
353
354 workers.push_back(std::move(worker));
355 }
356
357 f(c.begin() + (n * slice), c.end(), n, args...);
358
359 for (auto &worker : workers) {
360 worker.join();
361 }
362}
363
364#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
365#define par_for(...) par_for(__VA_ARGS__)
366#else
367#define par_for(k, ...) par_for(0, __VA_ARGS__)
368#endif /* UNIV_PFS_THREAD */
369
370#endif /* !os0thread_create_h */
Wrapper for a callable, it will count the number of registered Runnable instances and will register t...
Definition: os0thread-create.h:166
void operator()(F &&f, Args &&... args)
Method to execute the callable.
Definition: os0thread-create.h:181
void init()
Initializes the m_shared_future, uses the m_promise's get_future, which cannot be used since then,...
Definition: os0thread-create.h:207
std::promise< void > m_promise
Promise which is set when task is done.
Definition: os0thread-create.h:239
Detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the detached thread.
Definition: os0thread-create.h:171
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:210
IB_thread m_thread
Future object which keeps the ref counter >= 1 at least as long as the Detached_thread is not-destroy...
Definition: os0thread-create.h:236
IB_thread thread() const
Definition: os0thread-create.h:202
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:221
Definition: os0thread.h:45
void set_state(State state)
Definition: os0thread.cc:102
void init(std::promise< void > &promise)
Definition: os0thread.cc:97
State state() const
Definition: os0thread.h:49
Register with MySQL infrastructure.
Definition: os0thread-create.h:70
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:103
MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:76
PSI_thread_seqnum m_pfs_seqnum
Performance schema sequence number.
Definition: os0thread-create.h:132
THD * create_mysql_thd() noexcept
Definition: os0thread-create.h:114
const mysql_pfs_key_t m_pfs_key
Performance schema key.
Definition: os0thread-create.h:129
void destroy_mysql_thd(THD *thd) noexcept
Destroy a THD instance.
Definition: os0thread-create.h:124
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:87
Execute in the context of a non detached MySQL thread.
Definition: os0thread-create.h:137
Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:142
dberr_t operator()(F &&f, Args &&... args)
Method to execute the callable.
Definition: os0thread-create.h:150
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:33
The class warn is used to emit warnings.
Definition: ut0log.h:208
#define PSI_THREAD_CALL(M)
Definition: psi_thread.h:35
dberr_t
Definition: db0err.h:38
unsigned int PSI_thread_seqnum
Instrumented thread sequence number.
Definition: psi_thread_bits.h:58
Defines to make different thread packages compatible.
bool my_thread_init()
Allocate thread specific memory for the thread, used by mysys and dbug.
Definition: my_thr_init.cc:268
void my_thread_end()
Deallocate memory used by the thread for book-keeping.
Definition: my_thr_init.cc:303
stdx::expected< void, error_type > bind(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap bind() in a portable way.
Definition: socket.h:338
IB_thread create_detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum, F &&f, Args &&... args)
Create a detached non-started thread.
Definition: os0thread-create.h:303
void os_thread_open()
Initializes OS thread management data structures.
Definition: os0thread-create.h:53
std::atomic_int os_thread_count
Number of threads active.
Definition: os0thread.cc:55
#define os_thread_create(...)
Definition: os0thread-create.h:321
bool thread_is_active(const IB_thread &thread)
Check if thread is active.
Definition: os0thread-create.h:252
#define par_for(...)
Definition: os0thread-create.h:365
bool os_thread_any_active()
Check if there are threads active.
Definition: os0thread-create.h:58
void os_thread_close()
Frees OS thread management data structures.
Definition: os0thread-create.h:63
uint32_t srv_max_n_threads
Maximum number of threads inside InnoDB.
Definition: os0thread.cc:52
bool thread_is_stopped(const IB_thread &thread)
Check if thread is stopped.
Definition: os0thread-create.h:245
The interface to the operating system process and thread control primitives.
const mysql_service_registry_t * r
Definition: pfs_example_plugin_employee.cc:85
void destroy_thd(THD *thd, bool clear_pfs_events)
Cleanup the THD object, remove it from the global list of THDs and delete it.
Definition: sql_thd_internal_api.cc:168
Define for performance schema registration key.
Definition: sync0sync.h:50
unsigned int m_value
Definition: sync0sync.h:63
mysql_pfs_key_t PFS_NOT_INSTRUMENTED
THD * create_thd(Channel_info *channel_info)
Definition: connection_handler_manager.cc:266
Version control for database, common definitions, and include files.
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:56
#define UT_RELAX_CPU()
Definition: ut0ut.h:86
int n
Definition: xcom_base.cc:508