MySQL 8.1.0
Source Code Documentation
mpsc_queue.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2023, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23*/
24
25#ifndef MYSQL_HARNESS_MPSC_UNBOUND_QUEUE_INCLUDED
26#define MYSQL_HARNESS_MPSC_UNBOUND_QUEUE_INCLUDED
27
28#include <atomic>
29
31
32namespace mysql_harness {
33
34/**
35 * a unbounded multi-producer single-consumer queue.
36 *
37 * multiple threads can write at the same time into the queue, only one can read
38 *
39 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
40 */
41template <typename T>
43 public:
44 using value_type = T;
45
47 : head_{new Node}, tail_{head_.load(std::memory_order_relaxed)} {}
48
50 T item;
51
52 // free all items in the queue
53 while (dequeue(item)) {
54 }
55
56 // free the head node
57 Node *front = head_.load(std::memory_order_relaxed);
58 delete front;
59 }
60
61 MPSCQueueDV(const MPSCQueueDV &) = delete;
62 MPSCQueueDV &operator=(const MPSCQueueDV &) = delete;
63 MPSCQueueDV(MPSCQueueDV &&) = default;
65
66 /**
67 * enqueue an element.
68 *
69 * @param item item to enqueue
70 *
71 * @returns if item was enqueued
72 * @retval true item got assigned to queue
73 */
74 bool enqueue(const T &item) {
75 Node *node = new Node;
76 node->data = item;
77 node->next.store(nullptr, std::memory_order_relaxed);
78
79 Node *prev_head = head_.exchange(node, std::memory_order_acq_rel);
80 prev_head->next.store(node, std::memory_order_release);
81
82 return true;
83 }
84
85 /**
86 * enqueue an element.
87 *
88 * @param item item to enqueue
89 *
90 * @returns if item was enqueued
91 * @retval true item got assigned to queue
92 */
93 bool enqueue(T &&item) {
94 Node *node = new Node;
95 node->data = std::move(item);
96 node->next.store(nullptr, std::memory_order_relaxed);
97
98 Node *prev_head = head_.exchange(node, std::memory_order_acq_rel);
99 prev_head->next.store(node, std::memory_order_release);
100
101 return true;
102 }
103
104 /**
105 * try to dequeue element.
106 *
107 * @param item location of dequeued item if dequeue() was successful
108 *
109 * @returns if item was written
110 * @retval true first item removed from the queue and assigned to item
111 * @retval false queue is empty
112 */
113 bool dequeue(T &item) {
114 Node *tail = tail_.load(std::memory_order_relaxed);
115 Node *next = tail->next.load(std::memory_order_acquire);
116
117 if (next == nullptr) {
118 return false;
119 }
120
121 item = std::move(next->data);
122
123 tail_.store(next, std::memory_order_relaxed);
124
125 delete tail;
126
127 return true;
128 }
129
130 private:
131 struct Node {
133 std::atomic<Node *> next{nullptr};
134 };
135
136 std::atomic<Node *> head_;
137 std::atomic<Node *> tail_;
138};
139
140// allow to switch implementation later
141template <typename T>
143
144template <typename T>
146
147} // namespace mysql_harness
148
149#endif
a unbounded multi-producer single-consumer queue.
Definition: mpsc_queue.h:42
~MPSCQueueDV()
Definition: mpsc_queue.h:49
MPSCQueueDV()
Definition: mpsc_queue.h:46
MPSCQueueDV(MPSCQueueDV &&)=default
MPSCQueueDV(const MPSCQueueDV &)=delete
bool enqueue(const T &item)
enqueue an element.
Definition: mpsc_queue.h:74
MPSCQueueDV & operator=(MPSCQueueDV &&)=default
T value_type
Definition: mpsc_queue.h:44
bool dequeue(T &item)
try to dequeue element.
Definition: mpsc_queue.h:113
MPSCQueueDV & operator=(const MPSCQueueDV &)=delete
bool enqueue(T &&item)
enqueue an element.
Definition: mpsc_queue.h:93
std::atomic< Node * > tail_
Definition: mpsc_queue.h:137
std::atomic< Node * > head_
Definition: mpsc_queue.h:136
provide waiting pop and push operator to thread-safe queues.
Definition: waiting_queue_adaptor.h:38
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:307
Definition: common.h:41
Definition: varlen_sort.h:183
Definition: mpsc_queue.h:131
T data
Definition: mpsc_queue.h:132
std::atomic< Node * > next
Definition: mpsc_queue.h:133
int front
Definition: xcom_base.cc:4083