MySQL 8.0.32
Source Code Documentation
gcs_xcom_input_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2022, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef GCS_XCOM_INPUT_QUEUE_INCLUDED
24#define GCS_XCOM_INPUT_QUEUE_INCLUDED
25
26#include <future>
27#include <memory>
31#include "plugin/group_replication/libmysqlgcs/xdr_gen/xcom_vp.h"
32
34 constexpr xcom_input_request_ptr_deleter() noexcept = default;
35 void operator()(xcom_input_request_ptr ptr) const {
36 if (ptr != nullptr) {
37 /* By calling the reply function, the reply object is deleted. Either
38 because there is no one waiting (reply function deletes the object), or
39 someone is waiting (or will wait) for the reply's future, in which case
40 the object is deleted when the future's result is deleted. */
41 ::xcom_input_request_reply(ptr, nullptr);
43 }
44 }
45};
46
47static void do_not_reply(void *reply, pax_msg *payload);
48static void reply_by_resolving_future(void *reply, pax_msg *payload);
49
50/**
51 * MPSC queue with FIFO semantics used to send commands to XCom.
52 */
53/* The Queue template is only overridden by tests. */
54template <typename Queue = Gcs_mpsc_queue<xcom_input_request,
57 public:
58 class Reply;
59 using future_reply = std::future<std::unique_ptr<Reply>>;
60 /**
61 * Wraps XCom's reply to a queued request.
62 *
63 * A request sent to XCom contains three important pieces of data:
64 *
65 * 1. The payload, i.e. what we want XCom to do
66 * 2. A function instructing XCom how to reply to the request
67 * 3. A Reply object
68 * (For more details please see xcom_input_request.)
69 *
70 * When GCS pushes a request to XCom, GCS receives back a future pointer to
71 * the Reply object associated with the request. (future_reply)
72 * Basically, this future's result is equivalent to the reply GCS would get if
73 * it used a TCP socket to send the request to XCom.
74 * XCom will resolve this future when it receives and processes the request.
75 * When GCS retrieves the future's result, GCS obtains a std::unique_ptr to
76 * the Reply. Unless you do any shenanigans with this std::unique_ptr, when it
77 * goes out of scope the Reply object is destroyed.
78 * The Reply object owns the payload of XCom's reply (pax_msg), so the Reply
79 * object destroys the pax_msg when it is destroyed.
80 */
81 class Reply {
82 public:
83 Reply() noexcept : m_payload(nullptr), m_promise() {}
85 /**
86 * Associates the payload of XCom's reply to this object.
87 * Resolves the future.
88 *
89 * @param payload XCom's reply
90 */
91 void resolve(pax_msg *payload) {
92 m_payload = payload;
93 m_promise.set_value(std::unique_ptr<Reply>(this));
94 }
95 /**
96 * Get the future that will hold a pointer to this object after it has been
97 * resolved.
98 *
99 * @returns a future pointer to this object when it already contains XCom's
100 * reply
101 */
102 future_reply get_future() { return m_promise.get_future(); }
103 /**
104 * Get XCom's reply.
105 *
106 * @returns XCom's reply
107 */
108 pax_msg const *get_payload() { return m_payload; }
109
110 private:
111 /** XCom's reply. */
113 /** Simultaneously holds the future's shared state and takes care of the
114 * lifetime of *this. */
115 std::promise<std::unique_ptr<Reply>> m_promise;
116 };
117 // NOLINTNEXTLINE(modernize-use-equals-default)
119 // NOLINTNEXTLINE(modernize-use-equals-default)
124 delete;
126 /**
127 * Sends the command @c msg to XCom.
128 *
129 * This method has fire-and-forget semantics, i.e. we do not wait or have
130 * access to any potential reply from XCom.
131 * Takes ownership of @c msg.
132 *
133 * @param msg the app_data_ptr to send to XCom
134 * @retval false if there is no memory available
135 * @retval true otherwise (operation was successful)
136 */
137 bool push(app_data_ptr msg) {
138 Reply *reply = push_internal(msg, do_not_reply);
139 /* reply is destroyed by XCom. */
140 bool const successful = (reply != nullptr);
141 return successful;
142 }
143 /**
144 * Sends the command @c msg to XCom.
145 *
146 * This method has request-response semantics, i.e. we get back a future on
147 * which we must wait for XCom's reply. Please note that you must retrieve the
148 * future's value, otherwise it will leak. If you do not care for a reply, use
149 * @c push instead.
150 * Takes ownership of @c msg.
151 *
152 * @param msg the app_data_ptr to send to XCom
153 * @returns a valid future (future.valid()) if successful, an invalid future
154 * (!future.valid()) otherwise.
155 */
157 future_reply future;
159 /* reply is destroyed by XCom. */
160 bool const successful = (reply != nullptr);
161 if (successful) {
162 future = reply->get_future();
163 }
164 return future;
165 }
166 /**
167 * Attempts to retrieve all the queued app_data_ptr in one swoop.
168 *
169 * Transfers ownership of the returned pointer(s).
170 * Note that this method is non-blocking.
171 *
172 * @retval app_data_ptr linked list of the queued commands if the queue is
173 * not empty
174 * @retval nullptr if the queue is empty
175 */
177 xcom_input_request *payload = m_queue.pop();
178 if (payload == nullptr) return nullptr;
179 /* Process first. */
180 xcom_input_request_ptr first_in = payload; // Take ownership.
181 xcom_input_request_ptr last_in = first_in;
182 /* Process remaining. */
183 payload = m_queue.pop();
184 while (payload != nullptr) {
185 ::xcom_input_request_set_next(last_in, payload); // Take ownership.
186 last_in = payload;
187 payload = m_queue.pop();
188 }
189 return first_in;
190 }
191 /**
192 * Empties the queue.
193 */
194 void reset() {
195 xcom_input_request_ptr cursor = pop();
196 while (cursor != nullptr) {
197 xcom_input_request_ptr next_request =
200 cursor = next_request;
201 }
202 }
203
204 private:
205 /**
206 * Internal helper to implement @c push and @ push_and_get_reply.
207 * Creates and pushes a request to XCom with the payload @c msg and using the
208 * reply strategy @c reply_function.
209 *
210 * @param msg the request's payload
211 * @param reply_function the reply strategy for this request
212 * @retval Reply* if successful
213 * @retval nullptr if unsuccessful
214 */
215 Reply *push_internal(app_data_ptr msg,
216 xcom_input_reply_function_ptr reply_function) {
217 xcom_input_request_ptr xcom_request = nullptr;
218 bool successful = false;
219 auto *xcom_reply = new (std::nothrow) Reply();
220 if (xcom_reply == nullptr) {
221 /* purecov: begin inspected */
222 // Because the app_data_ptr is allocated on the heap and not on the stack.
223 xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
224 goto end;
225 /* purecov: end */
226 }
227 // Takes ownership of msg if successful.
228 xcom_request = ::xcom_input_request_new(msg, reply_function, xcom_reply);
229 if (xcom_request == nullptr) {
230 /* purecov: begin inspected */
231 // Because the app_data_ptr is allocated on the heap and not on the stack.
232 xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
233 delete xcom_reply;
234 xcom_reply = nullptr;
235 goto end;
236 /* purecov: end */
237 }
238 /* If the push is successful, the queue takes ownership of xcom_request. */
239 successful = m_queue.push(xcom_request);
240 if (!successful) {
241 delete xcom_reply;
242 xcom_reply = nullptr;
243 ::xcom_input_request_free(xcom_request); // also destroys msg
244 }
245 end:
246 return xcom_reply;
247 }
248 Queue m_queue; // Wrapped implementation.
249};
251
252static inline void do_not_reply(void *reply, pax_msg *payload) {
253 auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
254 delete xcom_reply;
255 replace_pax_msg(&payload, nullptr);
256}
257static inline void reply_by_resolving_future(void *reply, pax_msg *payload) {
258 auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
259 xcom_reply->resolve(payload); // Takes ownership of payload.
260 /* xcom_reply will be deleted when the associated future has been waited for,
261 its value retrieved, and its value is deleted. */
262}
263
264#endif /* GCS_XCOM_INPUT_QUEUE_INCLUDED */
MPSC queue with FIFO semantics.
Definition: gcs_mpsc_queue.h:39
Wraps XCom's reply to a queued request.
Definition: gcs_xcom_input_queue.h:81
pax_msg const * get_payload()
Get XCom's reply.
Definition: gcs_xcom_input_queue.h:108
std::promise< std::unique_ptr< Reply > > m_promise
Simultaneously holds the future's shared state and takes care of the lifetime of *this.
Definition: gcs_xcom_input_queue.h:115
~Reply()
Definition: gcs_xcom_input_queue.h:84
pax_msg * m_payload
XCom's reply.
Definition: gcs_xcom_input_queue.h:112
void resolve(pax_msg *payload)
Associates the payload of XCom's reply to this object.
Definition: gcs_xcom_input_queue.h:91
future_reply get_future()
Get the future that will hold a pointer to this object after it has been resolved.
Definition: gcs_xcom_input_queue.h:102
Reply() noexcept
Definition: gcs_xcom_input_queue.h:83
MPSC queue with FIFO semantics used to send commands to XCom.
Definition: gcs_xcom_input_queue.h:56
~Gcs_xcom_input_queue_impl()
Definition: gcs_xcom_input_queue.h:120
Gcs_xcom_input_queue_impl(Gcs_xcom_input_queue_impl &&)=delete
Gcs_xcom_input_queue_impl(Gcs_xcom_input_queue_impl const &)=delete
void reset()
Empties the queue.
Definition: gcs_xcom_input_queue.h:194
bool push(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:137
xcom_input_request_ptr pop()
Attempts to retrieve all the queued app_data_ptr in one swoop.
Definition: gcs_xcom_input_queue.h:176
std::future< std::unique_ptr< Reply > > future_reply
Definition: gcs_xcom_input_queue.h:59
Gcs_xcom_input_queue_impl() noexcept
Definition: gcs_xcom_input_queue.h:118
future_reply push_and_get_reply(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:156
Reply * push_internal(app_data_ptr msg, xcom_input_reply_function_ptr reply_function)
Internal helper to implement push and @ push_and_get_reply.
Definition: gcs_xcom_input_queue.h:215
Gcs_xcom_input_queue_impl & operator=(Gcs_xcom_input_queue_impl const &)=delete
Queue m_queue
Definition: gcs_xcom_input_queue.h:248
Gcs_xcom_input_queue_impl & operator=(Gcs_xcom_input_queue_impl &&)=delete
Implements a persistent FIFO using server List method names.
Definition: sql_profile.h:75
T * pop()
Definition: sql_profile.h:121
static void reply_by_resolving_future(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:257
static void do_not_reply(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:252
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
#define replace_pax_msg(target, p)
Definition: pax_msg.h:62
struct pax_msg pax_msg
Definition: site_struct.h:36
Definition: gcs_xcom_input_queue.h:33
constexpr xcom_input_request_ptr_deleter() noexcept=default
Definition: xcom_input_request.cc:30
struct xcom_input_request xcom_input_request
void xcom_input_request_set_next(xcom_input_request_ptr request, xcom_input_request_ptr next)
Links request to the list of requests next.
Definition: xcom_input_request.cc:62
xcom_input_request_ptr xcom_input_request_extract_next(xcom_input_request_ptr request)
Unlinks request from its list.
Definition: xcom_input_request.cc:67
void xcom_input_request_reply(xcom_input_request_ptr request, pax_msg *payload)
Replies to the request using the strategy chosen by the request's origin.
Definition: xcom_input_request.cc:81
xcom_input_request_ptr xcom_input_request_new(app_data_ptr a, xcom_input_reply_function_ptr reply_function, void *reply_arg)
Creates a new XCom request.
Definition: xcom_input_request.cc:37
void xcom_input_request_free(xcom_input_request_ptr request)
Frees the given request and its payload.
Definition: xcom_input_request.cc:50
void(* xcom_input_reply_function_ptr)(void *reply_arg, pax_msg *payload)
The function type that XCom will use to reply to a request.
Definition: xcom_input_request.h:37
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:142
void xdr_free(xdrproc_t __proc, char *__objp) __THROW
Definition: xdr.c:62