MySQL  8.0.19
Source Code Documentation
os0thread-create.h
Go to the documentation of this file.
1 /*****************************************************************************
2 
3 Copyright (c) 2018, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 
25 *****************************************************************************/
26 
27 /** @file include/os0thread-create.h
28  The interface to the threading wrapper
29 
30  Created 2016-May-17 Sunny Bains
31  *******************************************************/
32 
33 #ifndef os0thread_create_h
34 #define os0thread_create_h
35 
36 #include <my_thread.h>
37 
38 #include "univ.i"
39 
40 #include "os0thread.h"
41 
42 #include <atomic>
43 #include <functional>
44 
45 /** Maximum number of threads inside InnoDB */
46 extern ulint srv_max_n_threads;
47 
48 /** Number of threads active. */
49 extern std::atomic_int os_thread_count;
50 
51 /** Initializes OS thread management data structures. */
52 inline void os_thread_open() { /* No op */
53 }
54 
55 /** Check if there are threads active.
56 @return true if the thread count > 0. */
57 inline bool os_thread_any_active() {
58  return (os_thread_count.load(std::memory_order_relaxed) > 0);
59 }
60 
61 /** Frees OS thread management data structures. */
62 inline void os_thread_close() {
63  if (os_thread_any_active()) {
64  ib::warn(ER_IB_MSG_1274, os_thread_count.load(std::memory_order_relaxed));
65  }
66 }
67 
68 /** Wrapper for a callable, it will count the number of registered
69 Runnable instances and will register the thread executing the callable
70 with the PFS and the Server threading infrastructure. */
71 class Runnable {
72  public:
73 #ifdef UNIV_PFS_THREAD
74  /** Constructor for the Runnable object.
75  @param[in] pfs_key Performance schema key */
76  explicit Runnable(mysql_pfs_key_t pfs_key) : m_pfs_key(pfs_key) { init(); }
77 #else
78  /** Constructor for the Runnable object.
79  @param[in] pfs_key Performance schema key (ignored) */
80  explicit Runnable(mysql_pfs_key_t) { init(); }
81 #endif /* UNIV_PFS_THREAD */
82 
83  public:
84  /** Method to execute the callable
85  @param[in] f Callable object
86  @param[in] args Variable number of args to F */
87  template <typename F, typename... Args>
88  void operator()(F &&f, Args &&... args) {
90  UT_RELAX_CPU();
91  }
92 
94 
95  preamble();
96 
98 
99  auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
100 
101  task();
102 
103  epilogue();
104 
106  }
107 
108  IB_thread thread() const { return (m_thread); }
109 
110  private:
111  /** Register the thread with the server */
112  void preamble() {
113  my_thread_init();
114 
115 #if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
117  PSI_thread *psi;
118 
119  psi = PSI_THREAD_CALL(new_thread)(m_pfs_key.m_value, nullptr, 0);
120 
121  PSI_THREAD_CALL(set_thread_os_id)(psi);
122  PSI_THREAD_CALL(set_thread)(psi);
123  }
124 #endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
125 
126  std::atomic_thread_fence(std::memory_order_release);
127 
128  int old;
129 
130  old = os_thread_count.fetch_add(1, std::memory_order_relaxed);
131 
132  ut_a(old <= static_cast<int>(srv_max_n_threads) - 1);
133  }
134 
135  /** Deregister the thread */
136  void epilogue() {
137  std::atomic_thread_fence(std::memory_order_release);
138 
139  int old;
140 
141  old = os_thread_count.fetch_sub(1, std::memory_order_relaxed);
142  ut_a(old > 0);
143 
144  my_thread_end();
145 
146 #if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
148  PSI_THREAD_CALL(delete_current_thread)();
149  }
150 #endif /* UNIV_PFS_THREAD && !UNIV_HOTBACKUP */
151 
152  m_promise.set_value();
153  }
154 
155  private:
156 #ifdef UNIV_PFS_THREAD
157  /** Performance schema key */
159 #endif /* UNIV_PFS_THREAD */
160 
161  /** Promise which is set when task is done. */
162  std::promise<void> m_promise;
163 
164  /** Future object which keeps the ref counter >= 1 at least
165  as long as the Runnable is non-destroyed. */
167 
168  /** Initializes the m_shared_future, uses the m_promise's get_future,
169  which cannot be used since then, according to its documentation. */
171 };
172 
173 /** Create a detached thread
174 @param[in] thread Thread handle.
175 @return true if the thread is active. */
176 inline bool thread_is_active(const IB_thread &thread) {
177  switch (thread.state()) {
179  /* Not yet started. */
180  return (false);
181 
183  /* Thread "thread" is already active, but start() has not been called.
184  Note that when start() is called, the thread's routine may decide to
185  check if it is active or trigger other thread to do similar check
186  regarding "thread". That could happen faster than thread's state
187  is advanced from ALLOWED_TO_START to STARTED. Therefore we must
188  already consider such thread as "active". */
189  return (true);
190 
192  /* Note, that potentially the thread might be doing its cleanup after
193  it has already ended its task. We still consider it active, until the
194  cleanup is finished. */
195  return (true);
196 
198  /* Ended its task and became marked as STOPPED (cleanup finished) */
199  return (false);
200 
202  default:
203  /* The thread object has not been assigned yet. */
204  return (false);
205  }
206 
207  /* Note that similar goal was achieved by the usage of shared_future:
208  return (shared_future.valid() && shared_future.wait_for(std::chrono::seconds(
209  0)) != std::future_status::ready);
210  However this resulted in longer execution of mtr tests (63minutes ->
211  75minutes). You could try `mtr --mem collations.esperanto` (cmake
212  WITH_DEBUG=1) */
213 }
214 
215 /** Create a detached non-started thread. After thread is created, you should
216 assign the received object to any of variables/fields which you later could
217 access to check thread's state. You are allowed to either move or copy that
218 object (any number of copies is allowed). After assigning you are allowed to
219 start the thread by calling start() on any of those objects.
220 @param[in] pfs_key Performance schema thread key
221 @param[in] f Callable instance
222 @param[in] args Zero or more args
223 @return Object which allows to start the created thread, monitor its state and
224  wait until the thread is finished. */
225 template <typename F, typename... Args>
227  Args &&... args) {
228  Runnable runnable{pfs_key};
229 
230  auto thread = runnable.thread();
231 
232  std::thread t(std::move(runnable), f, args...);
233  t.detach();
234 
235  /* Thread t is doing busy waiting until the state is changed
236  from NOT_STARTED to ALLOWED_TO_START. That will happen when
237  thread.start() will be called. */
238  ut_a(thread.state() == IB_thread::State::NOT_STARTED);
239 
240  return (thread);
241 }
242 
243 #ifdef UNIV_PFS_THREAD
244 #define os_thread_create(...) create_detached_thread(__VA_ARGS__)
245 #else
246 #define os_thread_create(k, ...) create_detached_thread(0, __VA_ARGS__)
247 #endif /* UNIV_PFS_THREAD */
248 
249 /** Parallel for loop over a container.
250 @param[in] pfs_key Performance schema thread key
251 @param[in] c Container to iterate over in parallel
252 @param[in] n Number of threads to create
253 @param[in] f Callable instance
254 @param[in] args Zero or more args */
255 template <typename Container, typename F, typename... Args>
256 void par_for(mysql_pfs_key_t pfs_key, const Container &c, size_t n, F &&f,
257  Args &&... args) {
258  if (c.empty()) {
259  return;
260  }
261 
262  size_t slice = (n > 0) ? c.size() / n : 0;
263 
264  using Workers = std::vector<IB_thread>;
265 
266  Workers workers;
267 
268  workers.reserve(n);
269 
270  for (size_t i = 0; i < n; ++i) {
271  auto b = c.begin() + (i * slice);
272  auto e = b + slice;
273 
274  auto worker = os_thread_create(pfs_key, f, b, e, i, args...);
275  worker.start();
276 
277  workers.push_back(std::move(worker));
278  }
279 
280  f(c.begin() + (n * slice), c.end(), n, args...);
281 
282  for (auto &worker : workers) {
283  worker.join();
284  }
285 }
286 
287 #if defined(UNIV_PFS_THREAD) && !defined(UNIV_HOTBACKUP)
288 #define par_for(...) par_for(__VA_ARGS__)
289 #else
290 #define par_for(k, ...) par_for(0, __VA_ARGS__)
291 #endif /* UNIV_PFS_THREAD */
292 
293 #endif /* !os0thread_create_h */
IB_thread::init
void init(std::promise< void > &promise)
Definition: os0thread.cc:80
os_thread_create
#define os_thread_create(...)
Definition: os0thread-create.h:244
my_thread_init
bool my_thread_init()
Allocate thread specific memory for the thread, used by mysys and dbug.
Definition: my_thr_init.cc:268
IB_thread::State::STOPPED
@ STOPPED
os_thread_count
std::atomic_int os_thread_count
Number of threads active.
Definition: os0thread.cc:53
os0thread.h
ut_a
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:53
os_thread_close
void os_thread_close()
Frees OS thread management data structures.
Definition: os0thread-create.h:62
Runnable::init
void init()
Initializes the m_shared_future, uses the m_promise's get_future, which cannot be used since then,...
Definition: os0thread-create.h:170
IB_thread::state
State state() const
Definition: os0thread.h:45
Runnable
Wrapper for a callable, it will count the number of registered Runnable instances and will register t...
Definition: os0thread-create.h:71
Runnable::m_pfs_key
const mysql_pfs_key_t m_pfs_key
Performance schema key.
Definition: os0thread-create.h:158
Runnable::preamble
void preamble()
Register the thread with the server.
Definition: os0thread-create.h:112
my_thread.h
IB_thread::State::INVALID
@ INVALID
my_thread_end
void my_thread_end()
Deallocate memory used by the thread for book-keeping.
Definition: my_thr_init.cc:303
srv_max_n_threads
ulint srv_max_n_threads
Maximum number of threads inside InnoDB.
Definition: os0thread.cc:50
IB_thread::State::STARTED
@ STARTED
mysql_pfs_key_t
Define for performance schema registration key.
Definition: sync0sync.h:50
UT_RELAX_CPU
#define UT_RELAX_CPU()
Definition: ut0ut.h:119
Runnable::m_promise
std::promise< void > m_promise
Promise which is set when task is done.
Definition: os0thread-create.h:162
Runnable::operator()
void operator()(F &&f, Args &&... args)
Method to execute the callable.
Definition: os0thread-create.h:88
IB_thread::State::ALLOWED_TO_START
@ ALLOWED_TO_START
os_thread_open
void os_thread_open()
Initializes OS thread management data structures.
Definition: os0thread-create.h:52
IB_thread::State::NOT_STARTED
@ NOT_STARTED
PFS_NOT_INSTRUMENTED
mysql_pfs_key_t PFS_NOT_INSTRUMENTED
Runnable::thread
IB_thread thread() const
Definition: os0thread-create.h:108
IB_thread
Definition: os0thread.h:41
Runnable::epilogue
void epilogue()
Deregister the thread.
Definition: os0thread-create.h:136
Runnable::Runnable
Runnable(mysql_pfs_key_t pfs_key)
Constructor for the Runnable object.
Definition: os0thread-create.h:76
par_for
#define par_for(...)
Definition: os0thread-create.h:288
create_detached_thread
IB_thread create_detached_thread(mysql_pfs_key_t pfs_key, F &&f, Args &&... args)
Create a detached non-started thread.
Definition: os0thread-create.h:226
n
int n
Definition: xcom_base.c:425
PSI_thread
struct PSI_thread PSI_thread
Definition: psi_thread_bits.h:71
thread_is_active
bool thread_is_active(const IB_thread &thread)
Create a detached thread.
Definition: os0thread-create.h:176
os_thread_any_active
bool os_thread_any_active()
Check if there are threads active.
Definition: os0thread-create.h:57
IB_thread::set_state
void set_state(State state)
Definition: os0thread.cc:85
ib::warn
The class warn is used to emit warnings.
Definition: ut0ut.h:656
PSI_THREAD_CALL
#define PSI_THREAD_CALL(M)
Definition: psi_thread.h:31
mysql_pfs_key_t::m_value
unsigned int m_value
Definition: sync0sync.h:63
Runnable::m_thread
IB_thread m_thread
Future object which keeps the ref counter >= 1 at least as long as the Runnable is non-destroyed.
Definition: os0thread-create.h:166