MySQL 8.2.0
Source Code Documentation
mpmc_queue.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2023, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23*/
24
25#ifndef MYSQL_HARNESS_MPMC_UNBOUND_QUEUE_INCLUDED
26#define MYSQL_HARNESS_MPMC_UNBOUND_QUEUE_INCLUDED
27
28#include <atomic>
29#include <mutex>
30
32
33namespace mysql_harness {
34
35/**
36 * a unbounded multi-producer multi-consumer queue.
37 *
38 * multiple threads can read and write at the same time into the queue
39 *
40 * - enqueue and dequeue do not block each other
41 *
42 * see:
43 * - Micheal & Scott: two-lock concurrent queue
44 * - "An Unbounded Total Queue" in "The Art of MultiProcessor Programming"
45 */
46
47/*
48 * Implementation works everywhere, where mutex are available. If atomics and
49 * CAS/LL_SC are available faster implementations could be used:
50 *
51 * - https://github.com/cameron314/concurrentqueue
52 *
53 * Micheal & Scott's "lock-free" algorithm mentioned in the 1996 paper requires
54 * GC and hazard pointers, but here is also:
55 *
56 * - https://idea.popcount.org/2012-09-11-concurrent-queue-in-c/
57 */
58template <typename T>
60 public:
61 using value_type = T;
62
64
66 T item;
67
68 // free all items in the queue
69 while (dequeue(item)) {
70 }
71
72 // free the head node
73 delete head_;
74 }
75
76 /**
77 * enqueue an element.
78 *
79 * @param item item to enqueue
80 *
81 * @returns if item was enqueued
82 * @retval true item got assigned to queue
83 */
84 bool enqueue(const T &item) {
85 Node *node = new Node;
86 node->data = item;
87
88 {
89 std::unique_lock<std::mutex> lk(tail_mutex_);
90
91 tail_->next = node;
92 tail_ = node;
93 }
94
95 return true;
96 }
97
98 /**
99 * enqueue an element.
100 *
101 * @param item item to enqueue
102 *
103 * @returns if item was enqueued
104 * @retval true item got assigned to queue
105 */
106 bool enqueue(T &&item) {
107 Node *node = new Node;
108 node->data = std::move(item);
109
110 {
111 std::unique_lock<std::mutex> lk(tail_mutex_);
112
113 tail_->next = node;
114 tail_ = node;
115 }
116
117 return true;
118 }
119
120 /**
121 * try to dequeue element.
122 *
123 * @param item location of dequeued item if dequeue() was successful
124 *
125 * @returns if item was written
126 * @retval true first item removed from the queue and assigned to item
127 * @retval false queue is empty
128 */
129 bool dequeue(T &item) {
130 Node *node = nullptr;
131 {
132 std::unique_lock<std::mutex> lk(head_mutex_);
133
134 node = head_;
135 Node *new_head = node->next;
136
137 if (new_head == nullptr) {
138 return false;
139 }
140
141 item = std::move(new_head->data);
142
143 head_ = new_head;
144 }
145
146 delete node;
147
148 return true;
149 }
150
151 private:
152 struct Node {
154 std::atomic<Node *> next{nullptr};
155 };
156
157 std::mutex head_mutex_;
158 std::mutex tail_mutex_;
161};
162
163// allow to switch implementation to a lock-free implementation later
164template <typename T>
166
167template <typename T>
169
170} // namespace mysql_harness
171
172#endif
a unbounded multi-producer multi-consumer queue.
Definition: mpmc_queue.h:59
bool enqueue(T &&item)
enqueue an element.
Definition: mpmc_queue.h:106
std::mutex head_mutex_
Definition: mpmc_queue.h:157
bool dequeue(T &item)
try to dequeue element.
Definition: mpmc_queue.h:129
Node * tail_
Definition: mpmc_queue.h:160
~MPMCQueueMS2Lock()
Definition: mpmc_queue.h:65
Node * head_
Definition: mpmc_queue.h:159
bool enqueue(const T &item)
enqueue an element.
Definition: mpmc_queue.h:84
MPMCQueueMS2Lock()
Definition: mpmc_queue.h:63
std::mutex tail_mutex_
Definition: mpmc_queue.h:158
T value_type
Definition: mpmc_queue.h:61
provide waiting pop and push operator to thread-safe queues.
Definition: waiting_queue_adaptor.h:38
Definition: common.h:41
Definition: mpmc_queue.h:152
std::atomic< Node * > next
Definition: mpmc_queue.h:154
T data
Definition: mpmc_queue.h:153