MySQL 8.0.39
Source Code Documentation
mpmc_queue.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_MPMC_UNBOUND_QUEUE_INCLUDED
27#define MYSQL_HARNESS_MPMC_UNBOUND_QUEUE_INCLUDED
28
29#include <atomic>
30#include <mutex>
31
33
34namespace mysql_harness {
35
36/**
37 * a unbounded multi-producer multi-consumer queue.
38 *
39 * multiple threads can read and write at the same time into the queue
40 *
41 * - enqueue and dequeue do not block each other
42 *
43 * see:
44 * - Micheal & Scott: two-lock concurrent queue
45 * - "An Unbounded Total Queue" in "The Art of MultiProcessor Programming"
46 */
47
48/*
49 * Implementation works everywhere, where mutex are available. If atomics and
50 * CAS/LL_SC are available faster implementations could be used:
51 *
52 * - https://github.com/cameron314/concurrentqueue
53 *
54 * Micheal & Scott's "lock-free" algorithm mentioned in the 1996 paper requires
55 * GC and hazard pointers, but here is also:
56 *
57 * - https://idea.popcount.org/2012-09-11-concurrent-queue-in-c/
58 */
59template <typename T>
61 public:
62 using value_type = T;
63
65
67 T item;
68
69 // free all items in the queue
70 while (dequeue(item)) {
71 }
72
73 // free the head node
74 delete head_;
75 }
76
77 /**
78 * enqueue an element.
79 *
80 * @param item item to enqueue
81 *
82 * @returns if item was enqueued
83 * @retval true item got assigned to queue
84 */
85 bool enqueue(const T &item) {
86 Node *node = new Node;
87 node->data = item;
88
89 {
90 std::unique_lock<std::mutex> lk(tail_mutex_);
91
92 tail_->next = node;
93 tail_ = node;
94 }
95
96 return true;
97 }
98
99 /**
100 * enqueue an element.
101 *
102 * @param item item to enqueue
103 *
104 * @returns if item was enqueued
105 * @retval true item got assigned to queue
106 */
107 bool enqueue(T &&item) {
108 Node *node = new Node;
109 node->data = std::move(item);
110
111 {
112 std::unique_lock<std::mutex> lk(tail_mutex_);
113
114 tail_->next = node;
115 tail_ = node;
116 }
117
118 return true;
119 }
120
121 /**
122 * try to dequeue element.
123 *
124 * @param item location of dequeued item if dequeue() was successful
125 *
126 * @returns if item was written
127 * @retval true first item removed from the queue and assigned to item
128 * @retval false queue is empty
129 */
130 bool dequeue(T &item) {
131 Node *node = nullptr;
132 {
133 std::unique_lock<std::mutex> lk(head_mutex_);
134
135 node = head_;
136 Node *new_head = node->next;
137
138 if (new_head == nullptr) {
139 return false;
140 }
141
142 item = std::move(new_head->data);
143
144 head_ = new_head;
145 }
146
147 delete node;
148
149 return true;
150 }
151
152 private:
153 struct Node {
155 std::atomic<Node *> next{nullptr};
156 };
157
158 std::mutex head_mutex_;
159 std::mutex tail_mutex_;
162};
163
164// allow to switch implementation to a lock-free implementation later
165template <typename T>
167
168template <typename T>
170
171} // namespace mysql_harness
172
173#endif
a unbounded multi-producer multi-consumer queue.
Definition: mpmc_queue.h:60
bool enqueue(T &&item)
enqueue an element.
Definition: mpmc_queue.h:107
std::mutex head_mutex_
Definition: mpmc_queue.h:158
bool dequeue(T &item)
try to dequeue element.
Definition: mpmc_queue.h:130
Node * tail_
Definition: mpmc_queue.h:161
~MPMCQueueMS2Lock()
Definition: mpmc_queue.h:66
Node * head_
Definition: mpmc_queue.h:160
bool enqueue(const T &item)
enqueue an element.
Definition: mpmc_queue.h:85
MPMCQueueMS2Lock()
Definition: mpmc_queue.h:64
std::mutex tail_mutex_
Definition: mpmc_queue.h:159
T value_type
Definition: mpmc_queue.h:62
provide waiting pop and push operator to thread-safe queues.
Definition: waiting_queue_adaptor.h:39
Definition: common.h:42
Definition: mpmc_queue.h:153
std::atomic< Node * > next
Definition: mpmc_queue.h:155
T data
Definition: mpmc_queue.h:154