MySQL 8.4.0
Source Code Documentation
os0thread-create.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 1995, 2024, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is designed to work with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have either included with
15the program or referenced in the documentation.
16
17This program is distributed in the hope that it will be useful, but WITHOUT
18ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20for more details.
21
22You should have received a copy of the GNU General Public License along with
23this program; if not, write to the Free Software Foundation, Inc.,
2451 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26*****************************************************************************/
27
28/** @file include/os0thread-create.h
29 The interface to the threading wrapper
30
31 Created 2016-May-17 Sunny Bains
32 *******************************************************/
33
34#ifndef os0thread_create_h
35#define os0thread_create_h
36
37#include <my_thread.h>
38
39#include "univ.i"
40
41#include "os0thread.h"
43
44#include <atomic>
45#include <functional>
46
47/** Maximum number of threads inside InnoDB */
48extern uint32_t srv_max_n_threads;
49
50/** Number of threads active. */
51extern std::atomic_int os_thread_count;
52
53/** Initializes OS thread management data structures. */
54inline void os_thread_open() { /* No op */
55}
56
57/** Check if there are threads active.
58@return true if the thread count > 0. */
59inline bool os_thread_any_active() {
60 return os_thread_count.load(std::memory_order_relaxed) > 0;
61}
62
63/** Frees OS thread management data structures. */
64inline void os_thread_close() {
66 ib::warn(ER_IB_MSG_1274, os_thread_count.load(std::memory_order_relaxed));
67 }
68}
69
70/** Register with MySQL infrastructure. */
72 public:
73#ifdef UNIV_PFS_THREAD
74 /** Constructor for the Runnable object.
75 @param[in] pfs_key Performance schema key
76 @param[in] pfs_seqnum Performance schema sequence number */
77 explicit MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
78 : m_pfs_key(pfs_key), m_pfs_seqnum(pfs_seqnum) {}
79#else
80 /** Constructor for the Runnable object.
81 @param[in] pfs_key Performance schema key (ignored)
82 @param[in] pfs_seqnum Performance schema sequence number */
84#endif /* UNIV_PFS_THREAD */
85
86 protected:
87 /** Register the thread with the server */
88 void preamble() {
89 const bool ret = my_thread_init();
90 ut_a(!ret);
91
92#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
94 auto &value = m_pfs_key.m_value;
95 auto psi = PSI_THREAD_CALL(new_thread)(value, m_pfs_seqnum, this, 0);
96
97 PSI_THREAD_CALL(set_thread_os_id)(psi);
98 PSI_THREAD_CALL(set_thread)(psi);
99 }
100#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
101 }
102
103 /** Deregister the thread */
104 void epilogue() {
106
107#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
109 PSI_THREAD_CALL(delete_current_thread)();
110 }
111#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
112 }
113
114 /** @return a THD instance. */
115 THD *create_mysql_thd() noexcept {
116#ifdef UNIV_PFS_THREAD
117 return create_thd(false, true, true, m_pfs_key.m_value, m_pfs_seqnum);
118#else
119 return create_thd(false, true, true, 0, 0);
120#endif /* UNIV_PFS_THREAD */
121 }
122
123 /** Destroy a THD instance.
124 @param[in,out] thd Instance to destroy. */
125 void destroy_mysql_thd(THD *thd) noexcept { destroy_thd(thd); }
126
127 protected:
128#ifdef UNIV_PFS_THREAD
129 /** Performance schema key */
131
132 /** Performance schema sequence number */
134#endif /* UNIV_PFS_THREAD */
135};
136
137/** Execute in the context of a non detached MySQL thread. */
138class Runnable : public MySQL_thread {
139 public:
140 /** Constructor for the Runnable object.
141 @param[in] pfs_key Performance schema key
142 @param[in] pfs_seqnum Performance schema sequence number */
143 explicit Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
144 : MySQL_thread(pfs_key, pfs_seqnum) {}
145
146 /** Method to execute the callable
147 @param[in] f Callable object
148 @param[in] args Variable number of args to F
149 @retval f return value. */
150 template <typename F, typename... Args>
151 dberr_t operator()(F &&f, Args &&... args) {
153
154 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
155
156 auto r = task();
157
159
160 return r;
161 }
162};
163
164/** Wrapper for a callable, it will count the number of registered
165Runnable instances and will register the thread executing the callable
166with the PFS and the Server threading infrastructure. */
168 public:
169 /** Constructor for the detached thread.
170 @param[in] pfs_key Performance schema key
171 @param[in] pfs_seqnum Performance schema sequence number */
173 PSI_thread_seqnum pfs_seqnum)
174 : MySQL_thread(pfs_key, pfs_seqnum) {
175 init();
176 }
177
178 /** Method to execute the callable
179 @param[in] f Callable object
180 @param[in] args Variable number of args to F */
181 template <typename F, typename... Args>
182 void operator()(F &&f, Args &&... args) {
184 UT_RELAX_CPU();
185 }
186
188
189 preamble();
190
192
193 auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
194
195 task();
196
197 epilogue();
198
200 }
201
202 /** @return thread handle. */
203 IB_thread thread() const { return (m_thread); }
204
205 private:
206 /** Initializes the m_shared_future, uses the m_promise's get_future,
207 which cannot be used since then, according to its documentation. */
209
210 /** Register the thread with the server */
211 void preamble() {
213
214 std::atomic_thread_fence(std::memory_order_release);
215
216 auto old = os_thread_count.fetch_add(1, std::memory_order_relaxed);
217
218 ut_a(old <= static_cast<int>(srv_max_n_threads) - 1);
219 }
220
221 /** Deregister the thread */
222 void epilogue() {
223 m_promise.set_value();
224
225 std::atomic_thread_fence(std::memory_order_release);
226
227 auto old = os_thread_count.fetch_sub(1, std::memory_order_relaxed);
228
229 ut_a(old > 0);
230
232 }
233
234 private:
235 /** Future object which keeps the ref counter >= 1 at least
236 as long as the Detached_thread is not-destroyed. */
238
239 /** Promise which is set when task is done. */
240 std::promise<void> m_promise;
241};
242
243/** Check if thread is stopped
244@param[in] thread Thread handle.
245@return true if the thread has started, finished tasks and stopped. */
246inline bool thread_is_stopped(const IB_thread &thread) {
247 return thread.state() == IB_thread::State::STOPPED;
248}
249
250/** Check if thread is active
251@param[in] thread Thread handle.
252@return true if the thread is active. */
253inline bool thread_is_active(const IB_thread &thread) {
254 switch (thread.state()) {
256 /* Not yet started. */
257 return false;
258
260 /* Thread "thread" is already active, but start() has not been called.
261 Note that when start() is called, the thread's routine may decide to
262 check if it is active or trigger other thread to do similar check
263 regarding "thread". That could happen faster than thread's state
264 is advanced from ALLOWED_TO_START to STARTED. Therefore we must
265 already consider such thread as "active". */
266 return true;
267
269 /* Note, that potentially the thread might be doing its cleanup after
270 it has already ended its task. We still consider it active, until the
271 cleanup is finished. */
272 return true;
273
275 /* Ended its task and became marked as STOPPED (cleanup finished) */
276 return false;
277
279 default:
280 /* The thread object has not been assigned yet. */
281 return false;
282 }
283
284 /* Note that similar goal was achieved by the usage of shared_future:
285 return (shared_future.valid() && shared_future.wait_for(std::chrono::seconds(
286 0)) != std::future_status::ready);
287 However this resulted in longer execution of mtr tests (63minutes ->
288 75minutes). You could try `mtr --mem collations.esperanto` (cmake
289 WITH_DEBUG=1) */
290}
291
292/** Create a detached non-started thread. After thread is created, you should
293assign the received object to any of variables/fields which you later could
294access to check thread's state. You are allowed to either move or copy that
295object (any number of copies is allowed). After assigning you are allowed to
296start the thread by calling start() on any of those objects.
297@param[in] pfs_key Performance schema thread key
298@param[in] pfs_seqnum Performance schema thread sequence number
299@param[in] f Callable instance
300@param[in] args Zero or more args
301@return Object which allows to start the created thread, monitor its state and
302 wait until the thread is finished. */
303template <typename F, typename... Args>
305 PSI_thread_seqnum pfs_seqnum, F &&f,
306 Args &&... args) {
307 Detached_thread detached_thread{pfs_key, pfs_seqnum};
308 auto thread = detached_thread.thread();
309
310 std::thread t(std::move(detached_thread), f, args...);
311 t.detach();
312
313 /* Thread t is doing busy waiting until the state is changed
314 from NOT_STARTED to ALLOWED_TO_START. That will happen when
315 thread.start() will be called. */
316 ut_a(thread.state() == IB_thread::State::NOT_STARTED);
317
318 return thread;
319}
320
321#ifdef UNIV_PFS_THREAD
322#define os_thread_create(...) create_detached_thread(__VA_ARGS__)
323#else
324#define os_thread_create(k, s, ...) create_detached_thread(0, 0, __VA_ARGS__)
325#endif /* UNIV_PFS_THREAD */
326
327/** Parallel for loop over a container.
328@param[in] pfs_key Performance schema thread key
329@param[in] c Container to iterate over in parallel
330@param[in] n Number of threads to create
331@param[in] f Callable instance
332@param[in] args Zero or more args */
333template <typename Container, typename F, typename... Args>
334void par_for(mysql_pfs_key_t pfs_key, const Container &c, size_t n, F &&f,
335 Args &&... args) {
336 if (c.empty()) {
337 return;
338 }
339
340 size_t slice = (n > 0) ? c.size() / n : 0;
341
342 using Workers = std::vector<IB_thread>;
343
344 Workers workers;
345
346 workers.reserve(n);
347
348 for (size_t i = 0; i < n; ++i) {
349 auto b = c.begin() + (i * slice);
350 auto e = b + slice;
351
352 auto worker = os_thread_create(pfs_key, i, f, b, e, i, args...);
353 worker.start();
354
355 workers.push_back(std::move(worker));
356 }
357
358 f(c.begin() + (n * slice), c.end(), n, args...);
359
360 for (auto &worker : workers) {
361 worker.join();
362 }
363}
364
365#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
366#define par_for(...) par_for(__VA_ARGS__)
367#else
368#define par_for(k, ...) par_for(0, __VA_ARGS__)
369#endif /* UNIV_PFS_THREAD */
370
371#endif /* !os0thread_create_h */
Wrapper for a callable, it will count the number of registered Runnable instances and will register t...
Definition: os0thread-create.h:167
void operator()(F &&f, Args &&... args)
Method to execute the callable.
Definition: os0thread-create.h:182
void init()
Initializes the m_shared_future, uses the m_promise's get_future, which cannot be used since then,...
Definition: os0thread-create.h:208
std::promise< void > m_promise
Promise which is set when task is done.
Definition: os0thread-create.h:240
Detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the detached thread.
Definition: os0thread-create.h:172
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:211
IB_thread m_thread
Future object which keeps the ref counter >= 1 at least as long as the Detached_thread is not-destroy...
Definition: os0thread-create.h:237
IB_thread thread() const
Definition: os0thread-create.h:203
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:222
Definition: os0thread.h:46
void set_state(State state)
Definition: os0thread.cc:103
void init(std::promise< void > &promise)
Definition: os0thread.cc:98
State state() const
Definition: os0thread.h:50
Register with MySQL infrastructure.
Definition: os0thread-create.h:71
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:104
MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:77
PSI_thread_seqnum m_pfs_seqnum
Performance schema sequence number.
Definition: os0thread-create.h:133
THD * create_mysql_thd() noexcept
Definition: os0thread-create.h:115
const mysql_pfs_key_t m_pfs_key
Performance schema key.
Definition: os0thread-create.h:130
void destroy_mysql_thd(THD *thd) noexcept
Destroy a THD instance.
Definition: os0thread-create.h:125
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:88
Execute in the context of a non detached MySQL thread.
Definition: os0thread-create.h:138
Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:143
dberr_t operator()(F &&f, Args &&... args)
Method to execute the callable.
Definition: os0thread-create.h:151
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
The class warn is used to emit warnings.
Definition: ut0log.h:210
#define PSI_THREAD_CALL(M)
Definition: psi_thread.h:36
dberr_t
Definition: db0err.h:39
unsigned int PSI_thread_seqnum
Instrumented thread sequence number.
Definition: psi_thread_bits.h:59
Defines to make different thread packages compatible.
bool my_thread_init()
Allocate thread specific memory for the thread, used by mysys and dbug.
Definition: my_thr_init.cc:263
void my_thread_end()
Deallocate memory used by the thread for book-keeping.
Definition: my_thr_init.cc:298
stdx::expected< void, error_type > bind(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap bind() in a portable way.
Definition: socket.h:339
IB_thread create_detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum, F &&f, Args &&... args)
Create a detached non-started thread.
Definition: os0thread-create.h:304
void os_thread_open()
Initializes OS thread management data structures.
Definition: os0thread-create.h:54
std::atomic_int os_thread_count
Number of threads active.
Definition: os0thread.cc:56
#define os_thread_create(...)
Definition: os0thread-create.h:322
bool thread_is_active(const IB_thread &thread)
Check if thread is active.
Definition: os0thread-create.h:253
#define par_for(...)
Definition: os0thread-create.h:366
bool os_thread_any_active()
Check if there are threads active.
Definition: os0thread-create.h:59
void os_thread_close()
Frees OS thread management data structures.
Definition: os0thread-create.h:64
uint32_t srv_max_n_threads
Maximum number of threads inside InnoDB.
Definition: os0thread.cc:53
bool thread_is_stopped(const IB_thread &thread)
Check if thread is stopped.
Definition: os0thread-create.h:246
The interface to the operating system process and thread control primitives.
const mysql_service_registry_t * r
Definition: pfs_example_plugin_employee.cc:86
void destroy_thd(THD *thd, bool clear_pfs_events)
Cleanup the THD object, remove it from the global list of THDs and delete it.
Definition: sql_thd_internal_api.cc:169
Define for performance schema registration key.
Definition: sync0sync.h:51
unsigned int m_value
Definition: sync0sync.h:64
mysql_pfs_key_t PFS_NOT_INSTRUMENTED
THD * create_thd(Channel_info *channel_info)
Definition: connection_handler_manager.cc:269
Version control for database, common definitions, and include files.
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93
#define UT_RELAX_CPU()
Definition: ut0ut.h:93
int n
Definition: xcom_base.cc:509