MySQL 9.1.0
Source Code Documentation
ut0link_buf.h
Go to the documentation of this file.
1/*****************************************************************************
2
3Copyright (c) 2017, 2024, Oracle and/or its affiliates.
4
5This program is free software; you can redistribute it and/or modify
6it under the terms of the GNU General Public License, version 2.0,
7as published by the Free Software Foundation.
8
9This program is designed to work with certain software (including
10but not limited to OpenSSL) that is licensed under separate terms,
11as designated in a particular file or component or in included license
12documentation. The authors of MySQL hereby grant you an additional
13permission to link the program and your derivative works with the
14separately licensed software that they have either included with
15the program or referenced in the documentation.
16
17This program is distributed in the hope that it will be useful,
18but WITHOUT ANY WARRANTY; without even the implied warranty of
19MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20GNU General Public License, version 2.0, for more details.
21
22You should have received a copy of the GNU General Public License
23along with this program; if not, write to the Free Software
24Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26Portions of this file contain modifications contributed and copyrighted by
27Google, Inc. Those modifications are gratefully acknowledged and are described
28briefly in the InnoDB documentation. The contributions by Google are
29incorporated with their permission, and subject to the conditions contained in
30the file COPYING.Google.
31
32*****************************************************************************/
33
34/**************************************************/ /**
35 @file include/ut0link_buf.h
36
37 Link buffer - concurrent data structure which allows:
38 - concurrent addition of links
39 - single-threaded tracking of connected path created by links
40 - limited size of window with holes (missing links)
41
42 Created 2017-08-30 Paweł Olchawa
43 *******************************************************/
44
45#ifndef ut0link_buf_h
46#define ut0link_buf_h
47
48#include <atomic>
49#include <cstdint>
50
51#include "ut0counter.h"
52#include "ut0dbg.h"
53#include "ut0new.h"
54#include "ut0ut.h"
55
56/** Concurrent data structure, which allows to track concurrently
57performed operations which locally might be dis-ordered.
58
59This data structure is informed about finished concurrent operations
60and tracks up to which point in a total order all operations have
61been finished (there are no holes).
62
63It also allows to limit the last period in which there might be holes.
64These holes refer to unfinished concurrent operations, which precede
65in the total order some operations that are already finished.
66
67Threads might concurrently report finished operations (lock-free).
68
69Threads might ask for maximum currently known position in total order,
70up to which all operations are already finished (lock-free).
71
72Single thread might track the reported finished operations and update
73maximum position in total order, up to which all operations are done.
74
75You might look at current usages of this data structure in log0buf.cc.
76*/
77template <typename Position = uint64_t>
78class Link_buf {
79 public:
80 /** Type used to express distance between two positions.
81 It could become a parameter of template if it was useful.
82 However there is no such need currently. */
83 typedef Position Distance;
84
85 /** Constructs the link buffer. Allocated memory for the links.
86 Initializes the tail pointer with 0.
87
88 @param[in] capacity number of slots in the ring buffer */
89 explicit Link_buf(size_t capacity);
90
91 Link_buf();
92
93 Link_buf(Link_buf &&rhs);
94
95 Link_buf(const Link_buf &rhs) = delete;
96
98
99 Link_buf &operator=(const Link_buf &rhs) = delete;
100
101 /** Destructs the link buffer. Deallocates memory for the links. */
102 ~Link_buf();
103
104 /** Add a directed link between two given positions. It is user's
105 responsibility to ensure that there is space for the link. This is
106 because it can be useful to ensure much earlier that there is space.
107
108 @param[in] from position where the link starts
109 @param[in] to position where the link ends (from -> to) */
110 void add_link(Position from, Position to);
111
112 /** Add a directed link between two given positions. It is user's
113 responsibility to ensure that there is space for the link. This is
114 because it can be useful to ensure much earlier that there is space.
115 In addition, advances the tail pointer in the buffer if possible.
116
117 @param[in] from position where the link starts
118 @param[in] to position where the link ends (from -> to) */
119 void add_link_advance_tail(Position from, Position to);
120
121 /** Advances the tail pointer in the buffer by following connected
122 path created by links. Starts at current position of the pointer.
123 Stops when the provided function returns true.
124
125 @param[in] stop_condition function used as a stop condition;
126 (lsn_t prev, lsn_t next) -> bool;
127 returns false if we should follow
128 the link prev->next, true to stop
129 @param[in] max_retry max fails to retry
130
131 @return true if and only if the pointer has been advanced */
132 template <typename Stop_condition>
133 bool advance_tail_until(Stop_condition stop_condition,
134 uint32_t max_retry = 1);
135
136 /** Advances the tail pointer in the buffer without additional
137 condition for stop. Stops at missing outgoing link.
138
139 @see advance_tail_until()
140
141 @return true if and only if the pointer has been advanced */
142 bool advance_tail();
143
144 /** @return capacity of the ring buffer */
145 size_t capacity() const;
146
147 /** @return the tail pointer */
148 Position tail() const;
149
150 /** Checks if there is space to add link at given position.
151 User has to use this function before adding the link, and
152 should wait until the free space exists.
153
154 @param[in] position position to check
155
156 @return true if and only if the space is free */
157 bool has_space(Position position);
158
159 /** Validates (using assertions) that there are no links set
160 in the range [begin, end). */
161 void validate_no_links(Position begin, Position end);
162
163 /** Validates (using assertions) that there no links at all. */
164 void validate_no_links();
165
166 private:
167 /** Translates position expressed in original unit to position
168 in the m_links (which is a ring buffer).
169
170 @param[in] position position in original unit
171
172 @return position in the m_links */
173 size_t slot_index(Position position) const;
174
175 /** Computes next position by looking into slots array and
176 following single link which starts in provided position.
177
178 @param[in] position position to start
179 @param[out] next computed next position
180
181 @return false if there was no link, true otherwise */
182 bool next_position(Position position, Position &next);
183
184 /** Deallocated memory, if it was allocated. */
185 void free();
186
187 /** Capacity of the buffer. */
189
190 /** Pointer to the ring buffer (unaligned). */
191 std::atomic<Distance> *m_links;
192
193 /** Tail pointer in the buffer (expressed in original unit). */
194 alignas(ut::INNODB_CACHE_LINE_SIZE) std::atomic<Position> m_tail;
195};
196
197template <typename Position>
199 : m_capacity(capacity), m_tail(0) {
200 if (capacity == 0) {
201 m_links = nullptr;
202 return;
203 }
204
205 ut_a((capacity & (capacity - 1)) == 0);
206
207 m_links = ut::new_arr_withkey<std::atomic<Distance>>(UT_NEW_THIS_FILE_PSI_KEY,
209
210 for (size_t i = 0; i < capacity; ++i) {
211 m_links[i].store(0);
212 }
213}
214
215template <typename Position>
217
218template <typename Position>
220 : m_capacity(rhs.m_capacity), m_tail(rhs.m_tail.load()) {
221 m_links = rhs.m_links;
222 rhs.m_links = nullptr;
223}
224
225template <typename Position>
227 free();
228
229 m_capacity = rhs.m_capacity;
230
231 m_tail.store(rhs.m_tail.load());
232
233 m_links = rhs.m_links;
234 rhs.m_links = nullptr;
235
236 return *this;
237}
238
239template <typename Position>
241 free();
242}
243
244template <typename Position>
246 if (m_links != nullptr) {
247 ut::delete_arr(m_links);
248 m_links = nullptr;
249 }
250}
251
252template <typename Position>
253inline void Link_buf<Position>::add_link(Position from, Position to) {
254 ut_ad(to > from);
255 ut_ad(to - from <= std::numeric_limits<Distance>::max());
256
257 const auto index = slot_index(from);
258
259 auto &slot = m_links[index];
260
261 slot.store(to);
262}
263
264template <typename Position>
265inline bool Link_buf<Position>::next_position(Position position,
266 Position &next) {
267 const auto index = slot_index(position);
268
269 auto &slot = m_links[index];
270
271 next = slot.load(std::memory_order_relaxed);
272
273 return next <= position;
274}
275
276template <typename Position>
278 Position to) {
279 ut_ad(to > from);
280 ut_ad(to - from <= std::numeric_limits<Distance>::max());
281
282 auto position = m_tail.load(std::memory_order_acquire);
283
284 ut_ad(position <= from);
285
286 if (position == from) {
287 /* can advance m_tail directly and exclusively, and it is unlock */
288 m_tail.store(to, std::memory_order_release);
289 } else {
290 auto index = slot_index(from);
291 auto &slot = m_links[index];
292
293 /* add link */
294 slot.store(to, std::memory_order_release);
295
296 auto stop_condition = [&](Position prev_pos, Position) {
297 return (prev_pos > from);
298 };
299
300 advance_tail_until(stop_condition);
301 }
302}
303
304template <typename Position>
305template <typename Stop_condition>
306bool Link_buf<Position>::advance_tail_until(Stop_condition stop_condition,
307 uint32_t max_retry) {
308 /* multi threaded aware */
309 auto position = m_tail.load(std::memory_order_acquire);
310 auto from = position;
311
312 uint32_t retry = 0;
313 while (true) {
314 auto index = slot_index(position);
315 auto &slot = m_links[index];
316
317 auto next_load = slot.load(std::memory_order_acquire);
318
319 if (next_load >= position + m_capacity) {
320 /* either we wrapped and tail was advanced meanwhile,
321 or there is link start_lsn -> end_lsn of length >= m_capacity */
322 position = m_tail.load(std::memory_order_acquire);
323 if (position != from) {
324 from = position;
325 continue;
326 }
327 }
328
329 if (next_load <= position || stop_condition(position, next_load)) {
330 /* nothing to advance for now */
331 return false;
332 }
333
334 /* try to lock as storing the end */
335 if (slot.compare_exchange_strong(next_load, position,
336 std::memory_order_acq_rel)) {
337 /* it could happen, that after thread read position = m_tail.load(),
338 it got scheduled out for longer; when it comes back it might still
339 see the link going forward in that slot but m_tail could have been
340 already advanced forward (as we do not reset slots when traversing
341 them); thread needs to re-check if m_tail is still behind the slot. */
342 position = m_tail.load(std::memory_order_acquire);
343 if (position == from) {
344 /* confirmed. can advance m_tail exclusively */
345 position = next_load;
346 break;
347 }
348 }
349
350 retry++;
351 if (retry > max_retry) {
352 /* give up */
353 return false;
354 }
355
356 UT_RELAX_CPU();
357 position = m_tail.load(std::memory_order_acquire);
358 if (position == from) {
359 /* no progress? */
360 return false;
361 }
362 from = position;
363 }
364
365 while (true) {
366 Position next;
367
368 bool stop = next_position(position, next);
369
370 if (stop || stop_condition(position, next)) {
371 break;
372 }
373
374 position = next;
375 }
376
377 ut_a(from == m_tail.load(std::memory_order_acquire));
378
379 /* unlock */
380 m_tail.store(position, std::memory_order_release);
381
382 if (position == from) {
383 return false;
384 }
385
386 return true;
387}
388
389template <typename Position>
391 auto stop_condition = [](Position, Position) { return false; };
392
393 return advance_tail_until(stop_condition);
394}
395
396template <typename Position>
397inline size_t Link_buf<Position>::capacity() const {
398 return m_capacity;
399}
400
401template <typename Position>
402inline Position Link_buf<Position>::tail() const {
403 return m_tail.load(std::memory_order_acquire);
404}
405
406template <typename Position>
407inline bool Link_buf<Position>::has_space(Position position) {
408 auto tail = m_tail.load(std::memory_order_acquire);
409 if (tail + m_capacity > position) {
410 return true;
411 }
412
413 auto stop_condition = [](Position, Position) { return false; };
414 advance_tail_until(stop_condition, 0);
415
416 tail = m_tail.load(std::memory_order_acquire);
417 return tail + m_capacity > position;
418}
419
420template <typename Position>
421inline size_t Link_buf<Position>::slot_index(Position position) const {
422 return position & (m_capacity - 1);
423}
424
425template <typename Position>
427 const auto tail = m_tail.load();
428
429 /* After m_capacity iterations we would have all slots tested. */
430
431 end = std::min(end, begin + m_capacity);
432
433 for (; begin < end; ++begin) {
434 const size_t index = slot_index(begin);
435
436 const auto &slot = m_links[index];
437
438 ut_a(slot.load() <= tail);
439 }
440}
441
442template <typename Position>
444 validate_no_links(0, m_capacity);
445}
446
447#endif /* ut0link_buf_h */
#define free(A)
Definition: lexyy.cc:915
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
const char * begin(const char *const c)
Definition: base64.h:44
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
constexpr size_t INNODB_CACHE_LINE_SIZE
CPU cache line size.
Definition: ut0cpu_cache.h:41
void delete_arr(T *ptr) noexcept
Releases storage which has been dynamically allocated through any of the ut::new_arr*() variants.
Definition: ut0new.h:1111
Light-weight and type-safe wrapper which serves a purpose of being able to select proper ut::new_arr*...
Definition: ut0new.h:983
Counter utility class.
Debug utilities for Innobase.
#define ut_ad(EXPR)
Debug assertion.
Definition: ut0dbg.h:105
#define ut_a(EXPR)
Abort execution if EXPR does not evaluate to nonzero.
Definition: ut0dbg.h:93
Dynamic memory allocation routines and custom allocators specifically crafted to support memory instr...
#define UT_NEW_THIS_FILE_PSI_KEY
Definition: ut0new.h:566
Various utilities.
#define UT_RELAX_CPU()
Definition: ut0ut.h:93
static task_env * retry
Definition: xcom_base.cc:436