MySQL 9.1.0
Source Code Documentation
os0thread-create.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 1995, 2024, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify it under
6the terms of the GNU General Public License, version 2.0, as published by the
7Free Software Foundation.
8
9This program is designed to work with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have either included with
15the program or referenced in the documentation.
16
17This program is distributed in the hope that it will be useful, but WITHOUT
18ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20for more details.
21
22You should have received a copy of the GNU General Public License along with
23this program; if not, write to the Free Software Foundation, Inc.,
2451 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26*****************************************************************************/
27
28/** @file include/os0thread-create.h
29 The interface to the threading wrapper
30
31 Created 2016-May-17 Sunny Bains
32 *******************************************************/
33
34#ifndef os0thread_create_h
35#define os0thread_create_h
36
37#include <my_thread.h>
38
39#include "univ.i"
40
41#include "os0thread.h"
43
44#include <atomic>
45#include <functional>
46
47/** Maximum number of threads inside InnoDB */
48extern uint32_t srv_max_n_threads;
49
50/** Number of threads active. */
51extern std::atomic_int os_thread_count;
52
53/** Initializes OS thread management data structures. */
54inline void os_thread_open() { /* No op */
55}
56
57/** Check if there are threads active.
58@return true if the thread count > 0. */
59inline bool os_thread_any_active() {
60 return os_thread_count.load(std::memory_order_relaxed) > 0;
61}
62
63/** Frees OS thread management data structures. */
64inline void os_thread_close() {
66 ib::warn(ER_IB_MSG_1274, os_thread_count.load(std::memory_order_relaxed));
67 }
68}
69
70/** Register with MySQL infrastructure. */
72 public:
73#ifdef UNIV_PFS_THREAD
74 /** Constructor for the Runnable object.
75 @param[in] pfs_key Performance schema key
76 @param[in] pfs_seqnum Performance schema sequence number */
77 explicit MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
78 : m_pfs_key(pfs_key), m_pfs_seqnum(pfs_seqnum) {}
79#else
80 /** Constructor for the Runnable object.
81 @param[in] pfs_key Performance schema key (ignored)
82 @param[in] pfs_seqnum Performance schema sequence number */
84#endif /* UNIV_PFS_THREAD */
85
86 protected:
87 /** Register the thread with the server */
88 void preamble() {
89 const bool ret = my_thread_init();
90 ut_a(!ret);
91
92#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
94 auto &value = m_pfs_key.m_value;
95 auto psi = PSI_THREAD_CALL(new_thread)(value, m_pfs_seqnum, this, 0);
96
97 PSI_THREAD_CALL(set_thread_os_id)(psi);
98 PSI_THREAD_CALL(set_thread)(psi);
99 }
100#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
101 }
102
103 /** Deregister the thread */
104 void epilogue() {
106
107#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
109 PSI_THREAD_CALL(delete_current_thread)();
110 }
111#endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
112 }
113
114 /** @return a THD instance. */
115 THD *create_mysql_thd() noexcept {
116#ifdef UNIV_PFS_THREAD
117 return create_thd(false, true, true, m_pfs_key.m_value, m_pfs_seqnum);
118#else
119 return create_thd(false, true, true, 0, 0);
120#endif /* UNIV_PFS_THREAD */
121 }
122
123 /** Destroy a THD instance.
124 @param[in,out] thd Instance to destroy. */
125 void destroy_mysql_thd(THD *thd) noexcept { destroy_thd(thd); }
126
127 protected:
128#ifdef UNIV_PFS_THREAD
129 /** Performance schema key */
131
132 /** Performance schema sequence number */
134#endif /* UNIV_PFS_THREAD */
135};
136
137/** Execute in the context of a non detached MySQL thread. */
138class Runnable : public MySQL_thread {
139 public:
140 /** Constructor for the Runnable object.
141 @param[in] pfs_key Performance schema key
142 @param[in] pfs_seqnum Performance schema sequence number */
143 explicit Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
144 : MySQL_thread(pfs_key, pfs_seqnum) {}
145
146 /** Method to execute the callable
147 @param[in] f Callable object
148 @param[in] args Variable number of args to F
149 @retval f return value. */
150 template <typename F, typename... Args>
151 dberr_t operator()(F &&f, Args &&...args) {
153
154 auto r = std::invoke(std::forward<F>(f), std::forward<Args>(args)...);
155
157
158 return r;
159 }
160};
161
162/** Wrapper for a callable, it will count the number of registered
163Runnable instances and will register the thread executing the callable
164with the PFS and the Server threading infrastructure. */
166 public:
167 /** Constructor for the detached thread.
168 @param[in] pfs_key Performance schema key
169 @param[in] pfs_seqnum Performance schema sequence number */
171 PSI_thread_seqnum pfs_seqnum)
172 : MySQL_thread(pfs_key, pfs_seqnum) {
173 init();
174 }
175
176 /** Method to execute the callable
177 @param[in] f Callable object
178 @param[in] args Variable number of args to F */
179 template <typename F, typename... Args>
180 void operator()(F &&f, Args &&...args) {
182 UT_RELAX_CPU();
183 }
184
186
187 preamble();
188
190
191 std::invoke(std::forward<F>(f), std::forward<Args>(args)...);
192
193 epilogue();
194
196 }
197
198 /** @return thread handle. */
199 IB_thread thread() const { return (m_thread); }
200
201 private:
202 /** Initializes the m_shared_future, uses the m_promise's get_future,
203 which cannot be used since then, according to its documentation. */
205
206 /** Register the thread with the server */
207 void preamble() {
209
210 std::atomic_thread_fence(std::memory_order_release);
211
212 auto old = os_thread_count.fetch_add(1, std::memory_order_relaxed);
213
214 ut_a(old <= static_cast<int>(srv_max_n_threads) - 1);
215 }
216
217 /** Deregister the thread */
218 void epilogue() {
219 m_promise.set_value();
220
221 std::atomic_thread_fence(std::memory_order_release);
222
223 auto old = os_thread_count.fetch_sub(1, std::memory_order_relaxed);
224
225 ut_a(old > 0);
226
228 }
229
230 private:
231 /** Future object which keeps the ref counter >= 1 at least
232 as long as the Detached_thread is not-destroyed. */
234
235 /** Promise which is set when task is done. */
236 std::promise<void> m_promise;
237};
238
239/** Check if thread is stopped
240@param[in] thread Thread handle.
241@return true if the thread has started, finished tasks and stopped. */
242inline bool thread_is_stopped(const IB_thread &thread) {
243 return thread.state() == IB_thread::State::STOPPED;
244}
245
246/** Check if thread is active
247@param[in] thread Thread handle.
248@return true if the thread is active. */
249inline bool thread_is_active(const IB_thread &thread) {
250 switch (thread.state()) {
252 /* Not yet started. */
253 return false;
254
256 /* Thread "thread" is already active, but start() has not been called.
257 Note that when start() is called, the thread's routine may decide to
258 check if it is active or trigger other thread to do similar check
259 regarding "thread". That could happen faster than thread's state
260 is advanced from ALLOWED_TO_START to STARTED. Therefore we must
261 already consider such thread as "active". */
262 return true;
263
265 /* Note, that potentially the thread might be doing its cleanup after
266 it has already ended its task. We still consider it active, until the
267 cleanup is finished. */
268 return true;
269
271 /* Ended its task and became marked as STOPPED (cleanup finished) */
272 return false;
273
275 default:
276 /* The thread object has not been assigned yet. */
277 return false;
278 }
279
280 /* Note that similar goal was achieved by the usage of shared_future:
281 return (shared_future.valid() && shared_future.wait_for(std::chrono::seconds(
282 0)) != std::future_status::ready);
283 However this resulted in longer execution of mtr tests (63minutes ->
284 75minutes). You could try `mtr --mem collations.esperanto` (cmake
285 WITH_DEBUG=1) */
286}
287
288/** Create a detached non-started thread. After thread is created, you should
289assign the received object to any of variables/fields which you later could
290access to check thread's state. You are allowed to either move or copy that
291object (any number of copies is allowed). After assigning you are allowed to
292start the thread by calling start() on any of those objects.
293@param[in] pfs_key Performance schema thread key
294@param[in] pfs_seqnum Performance schema thread sequence number
295@param[in] f Callable instance
296@param[in] args Zero or more args
297@return Object which allows to start the created thread, monitor its state and
298 wait until the thread is finished. */
299template <typename F, typename... Args>
301 PSI_thread_seqnum pfs_seqnum, F &&f,
302 Args &&...args) {
303 Detached_thread detached_thread{pfs_key, pfs_seqnum};
304 auto thread = detached_thread.thread();
305
306 std::thread t(std::move(detached_thread), f, args...);
307 t.detach();
308
309 /* Thread t is doing busy waiting until the state is changed
310 from NOT_STARTED to ALLOWED_TO_START. That will happen when
311 thread.start() will be called. */
312 ut_a(thread.state() == IB_thread::State::NOT_STARTED);
313
314 return thread;
315}
316
317#ifdef UNIV_PFS_THREAD
318#define os_thread_create(...) create_detached_thread(__VA_ARGS__)
319#else
320#define os_thread_create(k, s, ...) create_detached_thread(0, 0, __VA_ARGS__)
321#endif /* UNIV_PFS_THREAD */
322
323/** Parallel for loop over a container.
324@param[in] pfs_key Performance schema thread key
325@param[in] c Container to iterate over in parallel
326@param[in] n Number of threads to create
327@param[in] f Callable instance
328@param[in] args Zero or more args */
329template <typename Container, typename F, typename... Args>
330void par_for(mysql_pfs_key_t pfs_key, const Container &c, size_t n, F &&f,
331 Args &&...args) {
332 if (c.empty()) {
333 return;
334 }
335
336 size_t slice = (n > 0) ? c.size() / n : 0;
337
338 using Workers = std::vector<IB_thread>;
339
340 Workers workers;
341
342 workers.reserve(n);
343
344 for (size_t i = 0; i < n; ++i) {
345 auto b = c.begin() + (i * slice);
346 auto e = b + slice;
347
348 auto worker = os_thread_create(pfs_key, i, f, b, e, i, args...);
349 worker.start();
350
351 workers.push_back(std::move(worker));
352 }
353
354 f(c.begin() + (n * slice), c.end(), n, args...);
355
356 for (auto &worker : workers) {
357 worker.join();
358 }
359}
360
361#if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
362#define par_for(...) par_for(__VA_ARGS__)
363#else
364#define par_for(k, ...) par_for(0, __VA_ARGS__)
365#endif /* UNIV_PFS_THREAD */
366
367#endif /* !os0thread_create_h */
Wrapper for a callable, it will count the number of registered Runnable instances and will register t...
Definition: os0thread-create.h:165
void operator()(F &&f, Args &&...args)
Method to execute the callable.
Definition: os0thread-create.h:180
void init()
Initializes the m_shared_future, uses the m_promise's get_future, which cannot be used since then,...
Definition: os0thread-create.h:204
std::promise< void > m_promise
Promise which is set when task is done.
Definition: os0thread-create.h:236
Detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the detached thread.
Definition: os0thread-create.h:170
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:207
IB_thread m_thread
Future object which keeps the ref counter >= 1 at least as long as the Detached_thread is not-destroy...
Definition: os0thread-create.h:233
IB_thread thread() const
Definition: os0thread-create.h:199
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:218
Definition: os0thread.h:47
void set_state(State state)
Definition: os0thread.cc:103
void init(std::promise< void > &promise)
Definition: os0thread.cc:98
State state() const
Definition: os0thread.h:51
Register with MySQL infrastructure.
Definition: os0thread-create.h:71
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:104
MySQL_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:77
PSI_thread_seqnum m_pfs_seqnum
Performance schema sequence number.
Definition: os0thread-create.h:133
THD * create_mysql_thd() noexcept
Definition: os0thread-create.h:115
const mysql_pfs_key_t m_pfs_key
Performance schema key.
Definition: os0thread-create.h:130
void destroy_mysql_thd(THD *thd) noexcept
Destroy a THD instance.
Definition: os0thread-create.h:125
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:88
Execute in the context of a non detached MySQL thread.
Definition: os0thread-create.h:138
dberr_t operator()(F &&f, Args &&...args)
Method to execute the callable.
Definition: os0thread-create.h:151
Runnable(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum)
Constructor for the Runnable object.
Definition: os0thread-create.h:143
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
The class warn is used to emit warnings.
Definition: ut0log.h:210
#define PSI_THREAD_CALL(M)
Definition: psi_thread.h:36
dberr_t
Definition: db0err.h:39
unsigned int PSI_thread_seqnum
Instrumented thread sequence number.
Definition: psi_thread_bits.h:59
Defines to make different thread packages compatible.
bool my_thread_init()
Allocate thread specific memory for the thread, used by mysys and dbug.
Definition: my_thr_init.cc:263
void my_thread_end()
Deallocate memory used by the thread for book-keeping.
Definition: my_thr_init.cc:312
void os_thread_open()
Initializes OS thread management data structures.
Definition: os0thread-create.h:54
std::atomic_int os_thread_count
Number of threads active.
Definition: os0thread.cc:56
IB_thread create_detached_thread(mysql_pfs_key_t pfs_key, PSI_thread_seqnum pfs_seqnum, F &&f, Args &&...args)
Create a detached non-started thread.
Definition: os0thread-create.h:300
#define os_thread_create(...)
Definition: os0thread-create.h:318
bool thread_is_active(const IB_thread &thread)
Check if thread is active.
Definition: os0thread-create.h:249
#define par_for(...)
Definition: os0thread-create.h:362
bool os_thread_any_active()
Check if there are threads active.
Definition: os0thread-create.h:59
void os_thread_close()
Frees OS thread management data structures.
Definition: os0thread-create.h:64
uint32_t srv_max_n_threads
Maximum number of threads inside InnoDB.
Definition: os0thread.cc:53
bool thread_is_stopped(const IB_thread &thread)
Check if thread is stopped.
Definition: os0thread-create.h:242
The interface to the operating system process and thread control primitives.
const mysql_service_registry_t * r
Definition: pfs_example_plugin_employee.cc:86
void destroy_thd(THD *thd, bool clear_pfs_events)
Cleanup the THD object, remove it from the global list of THDs and delete it.
Definition: sql_thd_internal_api.cc:169
Define for performance schema registration key.
Definition: sync0sync.h:51
unsigned int m_value
Definition: sync0sync.h:64
mysql_pfs_key_t PFS_NOT_INSTRUMENTED
THD * create_thd(Channel_info *channel_info)
Definition: connection_handler_manager.cc:271
Version control for database, common definitions, and include files.
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93
#define UT_RELAX_CPU()
Definition: ut0ut.h:93
int n
Definition: xcom_base.cc:509