MySQL 8.0.39
Source Code Documentation
mpsc_queue.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_MPSC_UNBOUND_QUEUE_INCLUDED
27#define MYSQL_HARNESS_MPSC_UNBOUND_QUEUE_INCLUDED
28
29#include <atomic>
30
32
33namespace mysql_harness {
34
35/**
36 * a unbounded multi-producer single-consumer queue.
37 *
38 * multiple threads can write at the same time into the queue, only one can read
39 *
40 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
41 */
42template <typename T>
44 public:
45 using value_type = T;
46
48 : head_{new Node}, tail_{head_.load(std::memory_order_relaxed)} {}
49
51 T item;
52
53 // free all items in the queue
54 while (dequeue(item)) {
55 }
56
57 // free the head node
58 Node *front = head_.load(std::memory_order_relaxed);
59 delete front;
60 }
61
62 MPSCQueueDV(const MPSCQueueDV &) = delete;
63 MPSCQueueDV &operator=(const MPSCQueueDV &) = delete;
64 MPSCQueueDV(MPSCQueueDV &&) = default;
66
67 /**
68 * enqueue an element.
69 *
70 * @param item item to enqueue
71 *
72 * @returns if item was enqueued
73 * @retval true item got assigned to queue
74 */
75 bool enqueue(const T &item) {
76 Node *node = new Node;
77 node->data = item;
78 node->next.store(nullptr, std::memory_order_relaxed);
79
80 Node *prev_head = head_.exchange(node, std::memory_order_acq_rel);
81 prev_head->next.store(node, std::memory_order_release);
82
83 return true;
84 }
85
86 /**
87 * enqueue an element.
88 *
89 * @param item item to enqueue
90 *
91 * @returns if item was enqueued
92 * @retval true item got assigned to queue
93 */
94 bool enqueue(T &&item) {
95 Node *node = new Node;
96 node->data = std::move(item);
97 node->next.store(nullptr, std::memory_order_relaxed);
98
99 Node *prev_head = head_.exchange(node, std::memory_order_acq_rel);
100 prev_head->next.store(node, std::memory_order_release);
101
102 return true;
103 }
104
105 /**
106 * try to dequeue element.
107 *
108 * @param item location of dequeued item if dequeue() was successful
109 *
110 * @returns if item was written
111 * @retval true first item removed from the queue and assigned to item
112 * @retval false queue is empty
113 */
114 bool dequeue(T &item) {
115 Node *tail = tail_.load(std::memory_order_relaxed);
116 Node *next = tail->next.load(std::memory_order_acquire);
117
118 if (next == nullptr) {
119 return false;
120 }
121
122 item = std::move(next->data);
123
124 tail_.store(next, std::memory_order_relaxed);
125
126 delete tail;
127
128 return true;
129 }
130
131 private:
132 struct Node {
134 std::atomic<Node *> next{nullptr};
135 };
136
137 std::atomic<Node *> head_;
138 std::atomic<Node *> tail_;
139};
140
141// allow to switch implementation later
142template <typename T>
144
145template <typename T>
147
148} // namespace mysql_harness
149
150#endif
a unbounded multi-producer single-consumer queue.
Definition: mpsc_queue.h:43
~MPSCQueueDV()
Definition: mpsc_queue.h:50
MPSCQueueDV()
Definition: mpsc_queue.h:47
MPSCQueueDV(MPSCQueueDV &&)=default
MPSCQueueDV(const MPSCQueueDV &)=delete
bool enqueue(const T &item)
enqueue an element.
Definition: mpsc_queue.h:75
MPSCQueueDV & operator=(MPSCQueueDV &&)=default
T value_type
Definition: mpsc_queue.h:45
bool dequeue(T &item)
try to dequeue element.
Definition: mpsc_queue.h:114
MPSCQueueDV & operator=(const MPSCQueueDV &)=delete
bool enqueue(T &&item)
enqueue an element.
Definition: mpsc_queue.h:94
std::atomic< Node * > tail_
Definition: mpsc_queue.h:138
std::atomic< Node * > head_
Definition: mpsc_queue.h:137
provide waiting pop and push operator to thread-safe queues.
Definition: waiting_queue_adaptor.h:39
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
Definition: common.h:42
Definition: gcs_xcom_synode.h:64
Definition: mpsc_queue.h:132
T data
Definition: mpsc_queue.h:133
std::atomic< Node * > next
Definition: mpsc_queue.h:134
int front
Definition: xcom_base.cc:4057