MySQL  8.0.27
Source Code Documentation
rpl_commit_stage_manager.h
Go to the documentation of this file.
1 /* Copyright (c) 2019, 2021, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef RPL_COMMIT_STAGE_MANAGER
24 #define RPL_COMMIT_STAGE_MANAGER
25 
26 #include <atomic>
27 #include <utility>
28 
29 #include "my_dbug.h"
30 #include "mysql/psi/mysql_cond.h"
31 #include "mysql/psi/mysql_mutex.h"
32 #include "sql/sql_class.h"
33 #include "thr_mutex.h"
34 
35 class THD;
36 
37 /**
38  Class for maintaining the commit stages for binary log group commit.
39  */
41  public:
42  class Mutex_queue {
43  friend class Commit_stage_manager;
44 
45  public:
47 
49 
50  bool is_empty() const { return m_first == nullptr; }
51 
52  /**
53  Append a linked list of threads to the queue.
54 
55  @param[in] first Linked list of threads to be appended to queue
56 
57  @retval true The queue was empty before this operation.
58  @retval false The queue was non-empty before this operation.
59  */
60  bool append(THD *first);
61 
62  /**
63  Fetch the entire queue for a stage. It is a wrapper over
64  fetch_and_empty() and acquires queue lock before fetching
65  and emptying the queue threads.
66 
67  @return Pointer to the first session of the queue.
68  */
70 
71  /**
72  Fetch the entire queue for a stage. It is a wrapper over
73  fetch_and_empty(). The caller must acquire queue lock before
74  calling this function.
75 
76  @return Pointer to the first session of the queue.
77  */
79 
80  /**
81  Remove first member from the queue
82 
83  @retval Returns std::pair<bool, THD *> object.
84  The first boolean value of pair if true determines queue
85  is not empty, and false determines queue is empty.
86  The second value returns the first removed member.
87  */
88  std::pair<bool, THD *> pop_front();
89 
90  /**
91  Get number of elements in the queue.
92 
93  @retval Returns number of element in the queue.
94  */
95  inline int32 get_size() { return m_size.load(); }
96 
97  /**
98  Fetch the first thread of the queue.
99 
100  @return first thread of the queue.
101  */
102  THD *get_leader() { return m_first; }
103 
104  void lock() {
107  }
108 
110 
112 
113  private:
114  /**
115  Fetch the entire queue for a stage.
116 
117  @retval This will fetch the entire queue in one go.
118  */
119  THD *fetch_and_empty();
120 
121  /**
122  Pointer to the first thread in the queue, or nullptr if the queue is
123  empty.
124  */
126 
127  /**
128  Pointer to the location holding the end of the queue.
129 
130  This is either @c &first, or a pointer to the @c next_to_commit of
131  the last thread that is enqueued.
132  */
134 
135  /** size of the queue */
136  std::atomic<int32> m_size;
137 
138  /** Lock for protecting the queue. */
140 
141  /*
142  This attribute did not have the desired effect, at least not according
143  to -fsanitize=undefined with gcc 5.2.1
144  */
145  }; // MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
146 
147  private:
149 
151 
153 
154  public:
155  /**
156  Fetch Commit_stage_manager class instance.
157 
158  @return Reference to the Commit_stage_manager class instance.
159  */
161 
162  /**
163  Constants for queues for different stages.
164  */
165  enum StageID {
171  };
172 
173  /**
174  Initializes m_stage_cond_binlog, m_stage_cond_commit_order,
175  m_stage_cond_leader condition variables and m_lock_done mutex.
176 
177  The binlog follower threads blocks on m_stage_cond_binlog condition
178  variable till signalled to wake up from leader thread. And similarly
179  commit order follower threads blocks on m_stage_cond_commit_order
180  condition variable till signalled to wake up from leader thread.
181 
182  The first binlog thread supposed to be leader finds that commit order queue
183  is not empty then it blocks on m_stage_cond_leader till commit order leader
184  signals it to awake and become new leader.
185 
186  m_lock_done mutex is shared by all three stages.
187 
188  @param key_LOCK_flush_queue mutex instrumentation key
189  @param key_LOCK_sync_queue mutex instrumentation key
190  @param key_LOCK_commit_queue mutex instrumentation key
191  @param key_LOCK_done mutex instrumentation key
192  @param key_COND_done cond instrumentation key
193  @param key_COND_flush_queue cond instrumentation key
194  */
195  void init(PSI_mutex_key key_LOCK_flush_queue,
196  PSI_mutex_key key_LOCK_sync_queue,
197  PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_done,
198  PSI_cond_key key_COND_done, PSI_cond_key key_COND_flush_queue);
199 
200  /**
201  Deinitializes m_stage_cond_binlog, m_stage_cond_commit_order,
202  m_stage_cond_leader condition variables and m_lock_done mutex.
203  */
204  void deinit();
205 
206  /**
207  Enroll a set of sessions for a stage.
208 
209  This will queue the session thread for writing and flushing.
210 
211  If the thread being queued is assigned as stage leader, it will
212  return immediately.
213 
214  If wait_if_follower is true the thread is not the stage leader,
215  the thread will be wait for the queue to be processed by the
216  leader before it returns.
217  In DBUG-ON version the follower marks is preempt status as ready.
218 
219  The sesssion threads entering this function acquires mutexes, and few of
220  them are not released while exiting based on thread and stage type.
221  - A binlog leader (returning true when stage!=COMMIT_ORDER_FLUSH_STAGE) will
222  acquire the stage mutex in this function and not release it.
223  - A commit order leader of the flush stage (returning true when
224  stage==COMMIT_ORDER_FLUSH_STAGE) will acquire both the stage mutex and the
225  flush queue mutex in this function, and not release them.
226  - A follower (returning false) will release any mutexes it takes, before
227  returning from the function.
228 
229  @param[in] stage Stage identifier for the queue to append to.
230  @param[in] first Queue to append.
231  @param[in] stage_mutex
232  Pointer to the currently held stage mutex, or nullptr if we're
233  not in a stage, that will be released when changing stage.
234  @param[in] enter_mutex
235  Pointer to the mutex that will be taken when changing stage.
236 
237  @retval true Thread is stage leader.
238  @retval false Thread was not stage leader and processing has been done.
239  */
240  bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex,
241  mysql_mutex_t *enter_mutex);
242 
243  /**
244  Remove first member from the queue for given stage
245 
246  @retval Returns std::pair<bool, THD *> object.
247  The first boolean value of pair if true determines queue
248  is not empty, and false determines queue is empty.
249  The second value returns the first removed member.
250  */
251  std::pair<bool, THD *> pop_front(StageID stage) {
252  return m_queue[stage].pop_front();
253  }
254 
255 #ifndef NDEBUG
256  /**
257  The method ensures the follower's execution path can be preempted
258  by the leader's thread.
259  Preempt status of @c head follower is checked to engange the leader
260  into waiting when set.
261 
262  @param head THD* of a follower thread
263  */
264  void clear_preempt_status(THD *head);
265 #endif
266 
267  /**
268  Fetch the entire queue and empty it. It acquires queue lock before fetching
269  and emptying the queue threads.
270 
271  @param[in] stage Stage identifier for the queue to append to.
272 
273  @return Pointer to the first session of the queue.
274  */
276 
277  /**
278  Fetch the entire queue and empty it. The caller must acquire queue lock
279  before calling this function.
280 
281  @param[in] stage Stage identifier for the queue to append to.
282 
283  @return Pointer to the first session of the queue.
284  */
286 
287  /**
288  Introduces a wait operation on the executing thread. The
289  waiting is done until the timeout elapses or count is
290  reached (whichever comes first).
291 
292  If count == 0, then the session will wait until the timeout
293  elapses. If timeout == 0, then there is no waiting.
294 
295  @param usec the number of microseconds to wait.
296  @param count wait for as many as count to join the queue the
297  session is waiting on
298  @param stage which stage queue size to compare count against.
299  */
300  void wait_count_or_timeout(ulong count, long usec, StageID stage);
301 
302  /**
303  The function is called after follower thread are processed by leader,
304  to unblock follower threads.
305 
306  @param queue the thread list which needs to ne unblocked
307  @param stage Stage identifier current thread belong to.
308  */
310 
311  /**
312  This function gets called after transactions are flushed to the engine
313  i.e. after calling ha_flush_logs, to unblock commit order thread list
314  which are not needed to wait for other stages.
315 
316  @param first the thread list which needs to ne unblocked
317  */
319 
320  /**
321  Wrapper on Mutex_queue lock(), acquires lock on stage queue.
322 
323  @param[in] stage Stage identifier for the queue to append to.
324  */
325  void lock_queue(StageID stage) { m_queue[stage].lock(); }
326 
327  /**
328  Wrapper on Mutex_queue unlock(), releases lock on stage queue.
329 
330  @param[in] stage Stage identifier for the queue to append to.
331  */
332  void unlock_queue(StageID stage) { m_queue[stage].unlock(); }
333 
334  private:
335  /** check if Commit_stage_manager variables already initalized. */
337 
338  /**
339  Queues for sessions.
340 
341  We need four queues:
342  - Binlog flush queue: transactions that are going to be flushed to the
343  engine and written to the binary log.
344  - Commit order flush queue: transactions that are not going to write the
345  binlog at all, but participate in the beginning
346  of the group commit, up to and including the
347  engine flush.
348  - Sync queue: transactions that are going to be synced to disk
349  - Commit queue: transactions that are going to to be committed
350  (when binlog_order_commit=1).
351  */
353 
354  /**
355  The binlog leader waits on this condition variable till it is indicated
356  to wake up. If binlog flush queue gets first thread in the queue but
357  by then commit order flush queue has already elected leader. The the
358  first thread of binlog queue waits on this condition variable and get
359  signalled to wake up from commit order flush queue leader later.
360  */
362 
363  /**
364  Condition variable to indicate that the binlog threads can wake up
365  and continue.
366  */
368 
369  /**
370  Condition variable to indicate that the flush to storage engine
371  is done and commit order threads can again wake up and continue.
372  */
374 
375  /** Mutex used for the condition variable above */
377 
378  /** Mutex used for the stage level locks */
380 
381 #ifndef NDEBUG
382  /** Save pointer to leader thread which is used later to awake leader */
384 
385  /** Flag is set by Leader when it starts waiting for follower's all-clear */
387 
388  /** Condition variable to indicate a follower started waiting for commit */
390 #endif
391 };
392 
393 #endif /*RPL_COMMIT_STAGE_MANAGER*/
Definition: rpl_commit_stage_manager.h:42
THD * fetch_and_empty_acquire_lock()
Fetch the entire queue for a stage.
Definition: rpl_commit_stage_manager.cc:331
THD * fetch_and_empty_skip_acquire_lock()
Fetch the entire queue for a stage.
Definition: rpl_commit_stage_manager.cc:338
std::atomic< int32 > m_size
size of the queue
Definition: rpl_commit_stage_manager.h:136
mysql_mutex_t * m_lock
Lock for protecting the queue.
Definition: rpl_commit_stage_manager.h:139
bool is_empty() const
Definition: rpl_commit_stage_manager.h:50
bool append(THD *first)
Append a linked list of threads to the queue.
Definition: rpl_commit_stage_manager.cc:36
void assert_owner()
Definition: rpl_commit_stage_manager.h:111
void init(mysql_mutex_t *lock)
Definition: rpl_commit_stage_manager.h:48
int32 get_size()
Get number of elements in the queue.
Definition: rpl_commit_stage_manager.h:95
void unlock()
Definition: rpl_commit_stage_manager.h:109
void lock()
Definition: rpl_commit_stage_manager.h:104
THD ** m_last
Pointer to the location holding the end of the queue.
Definition: rpl_commit_stage_manager.h:133
THD * get_leader()
Fetch the first thread of the queue.
Definition: rpl_commit_stage_manager.h:102
THD * fetch_and_empty()
Fetch the entire queue for a stage.
Definition: rpl_commit_stage_manager.cc:343
Mutex_queue()
Definition: rpl_commit_stage_manager.h:46
std::pair< bool, THD * > pop_front()
Remove first member from the queue.
Definition: rpl_commit_stage_manager.cc:70
THD * m_first
Pointer to the first thread in the queue, or nullptr if the queue is empty.
Definition: rpl_commit_stage_manager.h:125
Class for maintaining the commit stages for binary log group commit.
Definition: rpl_commit_stage_manager.h:40
const Commit_stage_manager & operator=(const Commit_stage_manager &)=delete
mysql_mutex_t m_lock_done
Mutex used for the condition variable above.
Definition: rpl_commit_stage_manager.h:376
mysql_cond_t m_stage_cond_leader
The binlog leader waits on this condition variable till it is indicated to wake up.
Definition: rpl_commit_stage_manager.h:361
void deinit()
Deinitializes m_stage_cond_binlog, m_stage_cond_commit_order, m_stage_cond_leader condition variables...
Definition: rpl_commit_stage_manager.cc:134
bool leader_await_preempt_status
Flag is set by Leader when it starts waiting for follower's all-clear.
Definition: rpl_commit_stage_manager.h:386
mysql_cond_t m_stage_cond_binlog
Condition variable to indicate that the binlog threads can wake up and continue.
Definition: rpl_commit_stage_manager.h:367
void clear_preempt_status(THD *head)
The method ensures the follower's execution path can be preempted by the leader's thread.
Definition: rpl_commit_stage_manager.cc:419
Mutex_queue m_queue[STAGE_COUNTER]
Queues for sessions.
Definition: rpl_commit_stage_manager.h:352
Commit_stage_manager(const Commit_stage_manager &)=delete
mysql_cond_t m_stage_cond_commit_order
Condition variable to indicate that the flush to storage engine is done and commit order threads can ...
Definition: rpl_commit_stage_manager.h:373
mysql_mutex_t m_queue_lock[STAGE_COUNTER - 1]
Mutex used for the stage level locks.
Definition: rpl_commit_stage_manager.h:379
StageID
Constants for queues for different stages.
Definition: rpl_commit_stage_manager.h:165
@ COMMIT_ORDER_FLUSH_STAGE
Definition: rpl_commit_stage_manager.h:169
@ COMMIT_STAGE
Definition: rpl_commit_stage_manager.h:168
@ BINLOG_FLUSH_STAGE
Definition: rpl_commit_stage_manager.h:166
@ SYNC_STAGE
Definition: rpl_commit_stage_manager.h:167
@ STAGE_COUNTER
Definition: rpl_commit_stage_manager.h:170
static Commit_stage_manager & get_instance()
Fetch Commit_stage_manager class instance.
Definition: rpl_commit_stage_manager.cc:432
THD * fetch_queue_acquire_lock(StageID stage)
Fetch the entire queue and empty it.
Definition: rpl_commit_stage_manager.cc:384
bool m_is_initialized
check if Commit_stage_manager variables already initalized.
Definition: rpl_commit_stage_manager.h:336
Commit_stage_manager()
Definition: rpl_commit_stage_manager.h:148
void wait_count_or_timeout(ulong count, long usec, StageID stage)
Introduces a wait operation on the executing thread.
Definition: rpl_commit_stage_manager.cc:361
void lock_queue(StageID stage)
Wrapper on Mutex_queue lock(), acquires lock on stage queue.
Definition: rpl_commit_stage_manager.h:325
THD * leader_thd
Save pointer to leader thread which is used later to awake leader.
Definition: rpl_commit_stage_manager.h:383
void process_final_stage_for_ordered_commit_group(THD *first)
This function gets called after transactions are flushed to the engine i.e.
Definition: rpl_commit_stage_manager.cc:394
std::pair< bool, THD * > pop_front(StageID stage)
Remove first member from the queue for given stage.
Definition: rpl_commit_stage_manager.h:251
THD * fetch_queue_skip_acquire_lock(StageID stage)
Fetch the entire queue and empty it.
Definition: rpl_commit_stage_manager.cc:389
mysql_cond_t m_cond_preempt
Condition variable to indicate a follower started waiting for commit.
Definition: rpl_commit_stage_manager.h:389
void signal_done(THD *queue, StageID stage=BINLOG_FLUSH_STAGE)
The function is called after follower thread are processed by leader, to unblock follower threads.
Definition: rpl_commit_stage_manager.cc:402
void init(PSI_mutex_key key_LOCK_flush_queue, PSI_mutex_key key_LOCK_sync_queue, PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_done, PSI_cond_key key_COND_done, PSI_cond_key key_COND_flush_queue)
Initializes m_stage_cond_binlog, m_stage_cond_commit_order, m_stage_cond_leader condition variables a...
Definition: rpl_commit_stage_manager.cc:95
void unlock_queue(StageID stage)
Wrapper on Mutex_queue unlock(), releases lock on stage queue.
Definition: rpl_commit_stage_manager.h:332
bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex, mysql_mutex_t *enter_mutex)
Enroll a set of sessions for a stage.
Definition: rpl_commit_stage_manager.cc:146
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_class.h:821
#define mysql_mutex_lock(M)
Definition: mysql_mutex.h:49
#define mysql_mutex_unlock(M)
Definition: mysql_mutex.h:56
Dialog Client Authentication nullptr
Definition: dialog.cc:352
unsigned int PSI_cond_key
Instrumented cond key.
Definition: psi_cond_bits.h:43
unsigned int PSI_mutex_key
Instrumented mutex key.
Definition: psi_mutex_bits.h:51
#define mysql_mutex_assert_not_owner(M)
Wrapper, to use safe_mutex_assert_not_owner with instrumented mutexes.
Definition: mysql_mutex.h:125
#define mysql_mutex_assert_owner(M)
Wrapper, to use safe_mutex_assert_owner with instrumented mutexes.
Definition: mysql_mutex.h:111
int32_t int32
Definition: my_inttypes.h:65
static int count
Definition: myisam_ftdump.cc:42
static QUEUE queue
Definition: myisampack.cc:206
Provides atomic access in shared-exclusive modes.
Definition: shared_spin_lock.h:78
Instrumentation helpers for conditions.
Instrumentation helpers for mutexes.
An instrumented cond structure.
Definition: mysql_cond_bits.h:49
An instrumented mutex structure.
Definition: mysql_mutex_bits.h:49
MySQL mutex implementation.