MySQL 8.0.32
Source Code Documentation
ut0link_buf.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2017, 2022, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify
6it under the terms of the GNU General Public License, version 2.0,
7as published by the Free Software Foundation.
8
9This program is also distributed with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have included with MySQL.
15
16This program is distributed in the hope that it will be useful,
17but WITHOUT ANY WARRANTY; without even the implied warranty of
18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19GNU General Public License, version 2.0, for more details.
20
21You should have received a copy of the GNU General Public License
22along with this program; if not, write to the Free Software
23Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25Portions of this file contain modifications contributed and copyrighted by
26Google, Inc. Those modifications are gratefully acknowledged and are described
27briefly in the InnoDB documentation. The contributions by Google are
28incorporated with their permission, and subject to the conditions contained in
29the file COPYING.Google.
30
31*****************************************************************************/
32
33/**************************************************/ /**
34 @file include/ut0link_buf.h
35
36 Link buffer - concurrent data structure which allows:
37 - concurrent addition of links
38 - single-threaded tracking of connected path created by links
39 - limited size of window with holes (missing links)
40
41 Created 2017-08-30 Paweł Olchawa
42 *******************************************************/
43
44#ifndef ut0link_buf_h
45#define ut0link_buf_h
46
47#include <atomic>
48#include <cstdint>
49
50#include "ut0counter.h"
51#include "ut0dbg.h"
52#include "ut0new.h"
53#include "ut0ut.h"
54
55/** Concurrent data structure, which allows to track concurrently
56performed operations which locally might be dis-ordered.
57
58This data structure is informed about finished concurrent operations
59and tracks up to which point in a total order all operations have
60been finished (there are no holes).
61
62It also allows to limit the last period in which there might be holes.
63These holes refer to unfinished concurrent operations, which precede
64in the total order some operations that are already finished.
65
66Threads might concurrently report finished operations (lock-free).
67
68Threads might ask for maximum currently known position in total order,
69up to which all operations are already finished (lock-free).
70
71Single thread might track the reported finished operations and update
72maximum position in total order, up to which all operations are done.
73
74You might look at current usages of this data structure in log0buf.cc.
75*/
76template <typename Position = uint64_t>
77class Link_buf {
78 public:
79 /** Type used to express distance between two positions.
80 It could become a parameter of template if it was useful.
81 However there is no such need currently. */
82 typedef Position Distance;
83
84 /** Constructs the link buffer. Allocated memory for the links.
85 Initializes the tail pointer with 0.
86
87 @param[in] capacity number of slots in the ring buffer */
88 explicit Link_buf(size_t capacity);
89
90 Link_buf();
91
92 Link_buf(Link_buf &&rhs);
93
94 Link_buf(const Link_buf &rhs) = delete;
95
97
98 Link_buf &operator=(const Link_buf &rhs) = delete;
99
100 /** Destructs the link buffer. Deallocates memory for the links. */
101 ~Link_buf();
102
103 /** Add a directed link between two given positions. It is user's
104 responsibility to ensure that there is space for the link. This is
105 because it can be useful to ensure much earlier that there is space.
106
107 @param[in] from position where the link starts
108 @param[in] to position where the link ends (from -> to) */
109 void add_link(Position from, Position to);
110
111 /** Add a directed link between two given positions. It is user's
112 responsibility to ensure that there is space for the link. This is
113 because it can be useful to ensure much earlier that there is space.
114 In addition, advances the tail pointer in the buffer if possible.
115
116 @param[in] from position where the link starts
117 @param[in] to position where the link ends (from -> to) */
118 void add_link_advance_tail(Position from, Position to);
119
120 /** Advances the tail pointer in the buffer by following connected
121 path created by links. Starts at current position of the pointer.
122 Stops when the provided function returns true.
123
124 @param[in] stop_condition function used as a stop condition;
125 (lsn_t prev, lsn_t next) -> bool;
126 returns false if we should follow
127 the link prev->next, true to stop
128 @param[in] max_retry max fails to retry
129
130 @return true if and only if the pointer has been advanced */
131 template <typename Stop_condition>
132 bool advance_tail_until(Stop_condition stop_condition,
133 uint32_t max_retry = 1);
134
135 /** Advances the tail pointer in the buffer without additional
136 condition for stop. Stops at missing outgoing link.
137
138 @see advance_tail_until()
139
140 @return true if and only if the pointer has been advanced */
141 bool advance_tail();
142
143 /** @return capacity of the ring buffer */
144 size_t capacity() const;
145
146 /** @return the tail pointer */
147 Position tail() const;
148
149 /** Checks if there is space to add link at given position.
150 User has to use this function before adding the link, and
151 should wait until the free space exists.
152
153 @param[in] position position to check
154
155 @return true if and only if the space is free */
156 bool has_space(Position position);
157
158 /** Validates (using assertions) that there are no links set
159 in the range [begin, end). */
160 void validate_no_links(Position begin, Position end);
161
162 /** Validates (using assertions) that there no links at all. */
163 void validate_no_links();
164
165 private:
166 /** Translates position expressed in original unit to position
167 in the m_links (which is a ring buffer).
168
169 @param[in] position position in original unit
170
171 @return position in the m_links */
172 size_t slot_index(Position position) const;
173
174 /** Computes next position by looking into slots array and
175 following single link which starts in provided position.
176
177 @param[in] position position to start
178 @param[out] next computed next position
179
180 @return false if there was no link, true otherwise */
181 bool next_position(Position position, Position &next);
182
183 /** Deallocated memory, if it was allocated. */
184 void free();
185
186 /** Capacity of the buffer. */
188
189 /** Pointer to the ring buffer (unaligned). */
190 std::atomic<Distance> *m_links;
191
192 /** Tail pointer in the buffer (expressed in original unit). */
193 alignas(ut::INNODB_CACHE_LINE_SIZE) std::atomic<Position> m_tail;
194};
195
196template <typename Position>
198 : m_capacity(capacity), m_tail(0) {
199 if (capacity == 0) {
200 m_links = nullptr;
201 return;
202 }
203
204 ut_a((capacity & (capacity - 1)) == 0);
205
206 m_links = ut::new_arr_withkey<std::atomic<Distance>>(UT_NEW_THIS_FILE_PSI_KEY,
208
209 for (size_t i = 0; i < capacity; ++i) {
210 m_links[i].store(0);
211 }
212}
213
214template <typename Position>
216
217template <typename Position>
219 : m_capacity(rhs.m_capacity), m_tail(rhs.m_tail.load()) {
220 m_links = rhs.m_links;
221 rhs.m_links = nullptr;
222}
223
224template <typename Position>
226 free();
227
228 m_capacity = rhs.m_capacity;
229
230 m_tail.store(rhs.m_tail.load());
231
232 m_links = rhs.m_links;
233 rhs.m_links = nullptr;
234
235 return *this;
236}
237
238template <typename Position>
240 free();
241}
242
243template <typename Position>
245 if (m_links != nullptr) {
246 ut::delete_arr(m_links);
247 m_links = nullptr;
248 }
249}
250
251template <typename Position>
252inline void Link_buf<Position>::add_link(Position from, Position to) {
253 ut_ad(to > from);
254 ut_ad(to - from <= std::numeric_limits<Distance>::max());
255
256 const auto index = slot_index(from);
257
258 auto &slot = m_links[index];
259
260 slot.store(to);
261}
262
263template <typename Position>
264inline bool Link_buf<Position>::next_position(Position position,
265 Position &next) {
266 const auto index = slot_index(position);
267
268 auto &slot = m_links[index];
269
270 next = slot.load(std::memory_order_relaxed);
271
272 return next <= position;
273}
274
275template <typename Position>
277 Position to) {
278 ut_ad(to > from);
279 ut_ad(to - from <= std::numeric_limits<Distance>::max());
280
281 auto position = m_tail.load(std::memory_order_acquire);
282
283 ut_ad(position <= from);
284
285 if (position == from) {
286 /* can advance m_tail directly and exclusively, and it is unlock */
287 m_tail.store(to, std::memory_order_release);
288 } else {
289 auto index = slot_index(from);
290 auto &slot = m_links[index];
291
292 /* add link */
293 slot.store(to, std::memory_order_release);
294
295 auto stop_condition = [&](Position prev_pos, Position) {
296 return (prev_pos > from);
297 };
298
299 advance_tail_until(stop_condition);
300 }
301}
302
303template <typename Position>
304template <typename Stop_condition>
305bool Link_buf<Position>::advance_tail_until(Stop_condition stop_condition,
306 uint32_t max_retry) {
307 /* multi threaded aware */
308 auto position = m_tail.load(std::memory_order_acquire);
309 auto from = position;
310
311 uint32_t retry = 0;
312 while (true) {
313 auto index = slot_index(position);
314 auto &slot = m_links[index];
315
316 auto next_load = slot.load(std::memory_order_acquire);
317
318 if (next_load >= position + m_capacity) {
319 /* either we wrapped and tail was advanced meanwhile,
320 or there is link start_lsn -> end_lsn of length >= m_capacity */
321 position = m_tail.load(std::memory_order_acquire);
322 if (position != from) {
323 from = position;
324 continue;
325 }
326 }
327
328 if (next_load <= position || stop_condition(position, next_load)) {
329 /* nothing to advance for now */
330 return false;
331 }
332
333 /* try to lock as storing the end */
334 if (slot.compare_exchange_strong(next_load, position,
335 std::memory_order_acq_rel)) {
336 /* it could happen, that after thread read position = m_tail.load(),
337 it got scheduled out for longer; when it comes back it might still
338 see the link going forward in that slot but m_tail could have been
339 already advanced forward (as we do not reset slots when traversing
340 them); thread needs to re-check if m_tail is still behind the slot. */
341 position = m_tail.load(std::memory_order_acquire);
342 if (position == from) {
343 /* confirmed. can advance m_tail exclusively */
344 position = next_load;
345 break;
346 }
347 }
348
349 retry++;
350 if (retry > max_retry) {
351 /* give up */
352 return false;
353 }
354
355 UT_RELAX_CPU();
356 position = m_tail.load(std::memory_order_acquire);
357 if (position == from) {
358 /* no progress? */
359 return false;
360 }
361 from = position;
362 }
363
364 while (true) {
365 Position next;
366
367 bool stop = next_position(position, next);
368
369 if (stop || stop_condition(position, next)) {
370 break;
371 }
372
373 position = next;
374 }
375
376 ut_a(from == m_tail.load(std::memory_order_acquire));
377
378 /* unlock */
379 m_tail.store(position, std::memory_order_release);
380
381 if (position == from) {
382 return false;
383 }
384
385 return true;
386}
387
388template <typename Position>
390 auto stop_condition = [](Position, Position) { return false; };
391
392 return advance_tail_until(stop_condition);
393}
394
395template <typename Position>
396inline size_t Link_buf<Position>::capacity() const {
397 return m_capacity;
398}
399
400template <typename Position>
401inline Position Link_buf<Position>::tail() const {
402 return m_tail.load(std::memory_order_acquire);
403}
404
405template <typename Position>
406inline bool Link_buf<Position>::has_space(Position position) {
407 auto tail = m_tail.load(std::memory_order_acquire);
408 if (tail + m_capacity > position) {
409 return true;
410 }
411
412 auto stop_condition = [](Position, Position) { return false; };
413 advance_tail_until(stop_condition, 0);
414
415 tail = m_tail.load(std::memory_order_acquire);
416 return tail + m_capacity > position;
417}
418
419template <typename Position>
420inline size_t Link_buf<Position>::slot_index(Position position) const {
421 return position & (m_capacity - 1);
422}
423
424template <typename Position>
425void Link_buf<Position>::validate_no_links(Position begin, Position end) {
426 const auto tail = m_tail.load();
427
428 /* After m_capacity iterations we would have all slots tested. */
429
430 end = std::min(end, begin + m_capacity);
431
432 for (; begin < end; ++begin) {
433 const size_t index = slot_index(begin);
434
435 const auto &slot = m_links[index];
436
437 ut_a(slot.load() <= tail);
438 }
439}
440
441template <typename Position>
443 validate_no_links(0, m_capacity);
444}
445
446#endif /* ut0link_buf_h */
#define free(A)
Definition: lexyy.cc:915
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:307
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
constexpr size_t INNODB_CACHE_LINE_SIZE
CPU cache line size.
Definition: ut0cpu_cache.h:40
void delete_arr(T *ptr) noexcept
Releases storage which has been dynamically allocated through any of the ut::new_arr*() variants.
Definition: ut0new.h:1107
Light-weight and type-safe wrapper which serves a purpose of being able to select proper ut::new_arr*...
Definition: ut0new.h:979
Counter utility class.
Debug utilities for Innobase.
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:68
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:56
Dynamic memory allocation routines and custom allocators specifically crafted to support memory instr...
#define UT_NEW_THIS_FILE_PSI_KEY
Definition: ut0new.h:562
Various utilities.
#define UT_RELAX_CPU()
Definition: ut0ut.h:86
static task_env * retry
Definition: xcom_base.cc:435