MySQL 8.3.0
Source Code Documentation
gcs_mpsc_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef GCS_MPSC_QUEUE_INCLUDED
24#define GCS_MPSC_QUEUE_INCLUDED
25
26#include <atomic>
27#include <cassert>
28#include <memory>
29
30/**
31 * MPSC queue with FIFO semantics.
32 *
33 * Implemented as a linked list of nodes.
34 * Inspired by Dmitry Vyukov's "non-intrusive MPSC node-based queue" algorithm,
35 * available on 2017-07-10 at
36 * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
37 */
38template <typename T, typename Deleter = std::default_delete<T>>
40 private:
41 /**
42 * Node that holds an element (payload) of the MPSC queue.
43 */
45 public:
46 /**
47 * Creates an unlinked and empty node.
48 */
50 /**
51 * Creates an unlinked node with the given payload.
52 * The node takes ownership of @c initial_payload.
53 *
54 * @param initial_payload the payload
55 */
56 Gcs_mpsc_queue_node(T *initial_payload) : m_payload(initial_payload) {
57 m_next.store(nullptr, std::memory_order_relaxed);
58 }
59 /* Not copyable nor movable. */
64 /**
65 * Gets the next node in the linked list.
66 *
67 * @param memory_order the desired memory ordering semantics for the load
68 * @retval Gcs_mpsc_queue_node* if there is a linked node
69 * @retval nullptr otherwise
70 */
71 Gcs_mpsc_queue_node *get_next(std::memory_order memory_order) {
72 return m_next.load(memory_order);
73 }
74 /**
75 * Links a node to this node.
76 *
77 * @param node the node to link
78 * @param memory_order the desired memory ordering semantics for the store
79 */
80 void set_next(Gcs_mpsc_queue_node *node, std::memory_order memory_order) {
81 m_next.store(node, memory_order);
82 }
83 /**
84 * Extracts the payload from this node.
85 * Transfers ownership of the payload to the caller, i.e. after calling this
86 * method this node has no payload.
87 *
88 * @retval T* if there is a payload
89 * @retval nullptr otherwise
90 */
92 T *result = m_payload;
93 m_payload = nullptr;
94 return result;
95 }
96
97 private:
98 /**
99 * The next node in the linked list.
100 */
101 std::atomic<Gcs_mpsc_queue_node *> m_next;
102 /**
103 * The payload.
104 */
106 };
107
108 public:
109 /**
110 * Create an empty queue.
111 */
113 Gcs_mpsc_queue(Deleter custom_deleter)
114 : m_payload_deleter(custom_deleter), m_tail(new Gcs_mpsc_queue_node()) {
115 m_head.store(m_tail, std::memory_order_relaxed);
116 }
117 /**
118 * Destroy the queued nodes.
119 */
121 // delete the list
122 for (T *payload = pop(); payload != nullptr; payload = pop()) {
123 m_payload_deleter(payload);
124 }
125 // delete stub node
126 assert(m_tail == m_head.load(std::memory_order_relaxed));
127 delete m_tail;
128 }
129 /* Not copyable nor movable. */
134 /**
135 * Insert @c payload at the end of the queue.
136 *
137 * @param payload the element to insert
138 * @returns true if the insertion was successful, false otherwise
139 */
140 bool push(T *payload) {
141 bool successful = false;
142 auto *new_node = new (std::nothrow) Gcs_mpsc_queue_node(payload);
143 if (new_node != nullptr) {
144 Gcs_mpsc_queue_node *previous =
145 m_head.exchange(new_node, std::memory_order_acq_rel);
146 previous->set_next(new_node, std::memory_order_release);
147 successful = true;
148 }
149 return successful;
150 }
151 /**
152 * Attempt to retrieve the first element from the queue.
153 *
154 * Note that this is a non-blocking method.
155 *
156 * @retval T* if the queue is not empty
157 * @retval nullptr if the queue is empty
158 */
159 T *pop() {
160 T *result = nullptr;
161 Gcs_mpsc_queue_node *old_tail = m_tail;
163 m_tail->get_next(std::memory_order_acquire);
164 if (next_node != nullptr) {
166 delete old_tail;
168 }
169 return result;
170 }
171
172 private:
175 std::atomic<Gcs_mpsc_queue_node *> m_head; // last in
176};
177
178#endif /* GCS_MPSC_QUEUE_INCLUDED */
Kerberos Client Authentication nullptr
Definition: auth_kerberos_client_plugin.cc:250
Definition: tls_cipher.cc:36
Node that holds an element (payload) of the MPSC queue.
Definition: gcs_mpsc_queue.h:44
T * m_payload
The payload.
Definition: gcs_mpsc_queue.h:105
Gcs_mpsc_queue_node(Gcs_mpsc_queue_node &&)=delete
Gcs_mpsc_queue_node & operator=(Gcs_mpsc_queue_node const &)=delete
void set_next(Gcs_mpsc_queue_node *node, std::memory_order memory_order)
Links a node to this node.
Definition: gcs_mpsc_queue.h:80
Gcs_mpsc_queue_node()
Creates an unlinked and empty node.
Definition: gcs_mpsc_queue.h:49
std::atomic< Gcs_mpsc_queue_node * > m_next
The next node in the linked list.
Definition: gcs_mpsc_queue.h:101
Gcs_mpsc_queue_node(T *initial_payload)
Creates an unlinked node with the given payload.
Definition: gcs_mpsc_queue.h:56
Gcs_mpsc_queue_node * get_next(std::memory_order memory_order)
Gets the next node in the linked list.
Definition: gcs_mpsc_queue.h:71
Gcs_mpsc_queue_node & operator=(Gcs_mpsc_queue_node &&)=delete
Gcs_mpsc_queue_node(Gcs_mpsc_queue_node const &)=delete
T * extract_payload()
Extracts the payload from this node.
Definition: gcs_mpsc_queue.h:91
MPSC queue with FIFO semantics.
Definition: gcs_mpsc_queue.h:39
Gcs_mpsc_queue_node * m_tail
Definition: gcs_mpsc_queue.h:174
Gcs_mpsc_queue & operator=(Gcs_mpsc_queue &&)=delete
std::atomic< Gcs_mpsc_queue_node * > m_head
Definition: gcs_mpsc_queue.h:175
Gcs_mpsc_queue(Deleter custom_deleter)
Definition: gcs_mpsc_queue.h:113
Gcs_mpsc_queue(Gcs_mpsc_queue &&)=delete
Deleter m_payload_deleter
Definition: gcs_mpsc_queue.h:173
T * pop()
Attempt to retrieve the first element from the queue.
Definition: gcs_mpsc_queue.h:159
Gcs_mpsc_queue & operator=(Gcs_mpsc_queue const &)=delete
~Gcs_mpsc_queue()
Destroy the queued nodes.
Definition: gcs_mpsc_queue.h:120
Gcs_mpsc_queue()
Create an empty queue.
Definition: gcs_mpsc_queue.h:112
bool push(T *payload)
Insert payload at the end of the queue.
Definition: gcs_mpsc_queue.h:140
Gcs_mpsc_queue(Gcs_mpsc_queue const &)=delete
static std::atomic< uchar * > & next_node(LF_PINBOX *P, uchar *X)
Definition: lf_alloc-pin.cc:349
struct result result
Definition: result.h:33
Definition: result.h:29