MySQL 8.4.0
Source Code Documentation
gcs_xcom_input_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_XCOM_INPUT_QUEUE_INCLUDED
25#define GCS_XCOM_INPUT_QUEUE_INCLUDED
26
27#include <future>
28#include <memory>
32#include "plugin/group_replication/libmysqlgcs/xdr_gen/xcom_vp.h"
33
35 constexpr xcom_input_request_ptr_deleter() noexcept = default;
36 void operator()(xcom_input_request_ptr ptr) const {
37 if (ptr != nullptr) {
38 /* By calling the reply function, the reply object is deleted. Either
39 because there is no one waiting (reply function deletes the object), or
40 someone is waiting (or will wait) for the reply's future, in which case
41 the object is deleted when the future's result is deleted. */
42 ::xcom_input_request_reply(ptr, nullptr);
44 }
45 }
46};
47
48static void do_not_reply(void *reply, pax_msg *payload);
49static void reply_by_resolving_future(void *reply, pax_msg *payload);
50
51/**
52 * MPSC queue with FIFO semantics used to send commands to XCom.
53 */
54/* The Queue template is only overridden by tests. */
55template <typename Queue = Gcs_mpsc_queue<xcom_input_request,
58 public:
59 class Reply;
60 using future_reply = std::future<std::unique_ptr<Reply>>;
61 /**
62 * Wraps XCom's reply to a queued request.
63 *
64 * A request sent to XCom contains three important pieces of data:
65 *
66 * 1. The payload, i.e. what we want XCom to do
67 * 2. A function instructing XCom how to reply to the request
68 * 3. A Reply object
69 * (For more details please see xcom_input_request.)
70 *
71 * When GCS pushes a request to XCom, GCS receives back a future pointer to
72 * the Reply object associated with the request. (future_reply)
73 * Basically, this future's result is equivalent to the reply GCS would get if
74 * it used a TCP socket to send the request to XCom.
75 * XCom will resolve this future when it receives and processes the request.
76 * When GCS retrieves the future's result, GCS obtains a std::unique_ptr to
77 * the Reply. Unless you do any shenanigans with this std::unique_ptr, when it
78 * goes out of scope the Reply object is destroyed.
79 * The Reply object owns the payload of XCom's reply (pax_msg), so the Reply
80 * object destroys the pax_msg when it is destroyed.
81 */
82 class Reply {
83 public:
84 Reply() noexcept : m_payload(nullptr), m_promise() {}
86 /**
87 * Associates the payload of XCom's reply to this object.
88 * Resolves the future.
89 *
90 * @param payload XCom's reply
91 */
92 void resolve(pax_msg *payload) {
93 m_payload = payload;
94 m_promise.set_value(std::unique_ptr<Reply>(this));
95 }
96 /**
97 * Get the future that will hold a pointer to this object after it has been
98 * resolved.
99 *
100 * @returns a future pointer to this object when it already contains XCom's
101 * reply
102 */
103 future_reply get_future() { return m_promise.get_future(); }
104 /**
105 * Get XCom's reply.
106 *
107 * @returns XCom's reply
108 */
109 pax_msg const *get_payload() { return m_payload; }
110
111 private:
112 /** XCom's reply. */
114 /** Simultaneously holds the future's shared state and takes care of the
115 * lifetime of *this. */
116 std::promise<std::unique_ptr<Reply>> m_promise;
117 };
118 // NOLINTNEXTLINE(modernize-use-equals-default)
120 // NOLINTNEXTLINE(modernize-use-equals-default)
125 delete;
127 /**
128 * Sends the command @c msg to XCom.
129 *
130 * This method has fire-and-forget semantics, i.e. we do not wait or have
131 * access to any potential reply from XCom.
132 * Takes ownership of @c msg.
133 *
134 * @param msg the app_data_ptr to send to XCom
135 * @retval false if there is no memory available
136 * @retval true otherwise (operation was successful)
137 */
138 bool push(app_data_ptr msg) {
139 Reply *reply = push_internal(msg, do_not_reply);
140 /* reply is destroyed by XCom. */
141 bool const successful = (reply != nullptr);
142 return successful;
143 }
144 /**
145 * Sends the command @c msg to XCom.
146 *
147 * This method has request-response semantics, i.e. we get back a future on
148 * which we must wait for XCom's reply. Please note that you must retrieve the
149 * future's value, otherwise it will leak. If you do not care for a reply, use
150 * @c push instead.
151 * Takes ownership of @c msg.
152 *
153 * @param msg the app_data_ptr to send to XCom
154 * @returns a valid future (future.valid()) if successful, an invalid future
155 * (!future.valid()) otherwise.
156 */
158 future_reply future;
160 /* reply is destroyed by XCom. */
161 bool const successful = (reply != nullptr);
162 if (successful) {
163 future = reply->get_future();
164 }
165 return future;
166 }
167 /**
168 * Attempts to retrieve all the queued app_data_ptr in one swoop.
169 *
170 * Transfers ownership of the returned pointer(s).
171 * Note that this method is non-blocking.
172 *
173 * @retval app_data_ptr linked list of the queued commands if the queue is
174 * not empty
175 * @retval nullptr if the queue is empty
176 */
178 xcom_input_request *payload = m_queue.pop();
179 if (payload == nullptr) return nullptr;
180 /* Process first. */
181 xcom_input_request_ptr first_in = payload; // Take ownership.
182 xcom_input_request_ptr last_in = first_in;
183 /* Process remaining. */
184 payload = m_queue.pop();
185 while (payload != nullptr) {
186 ::xcom_input_request_set_next(last_in, payload); // Take ownership.
187 last_in = payload;
188 payload = m_queue.pop();
189 }
190 return first_in;
191 }
192 /**
193 * Empties the queue.
194 */
195 void reset() {
196 xcom_input_request_ptr cursor = pop();
197 while (cursor != nullptr) {
198 xcom_input_request_ptr next_request =
201 cursor = next_request;
202 }
203 }
204
205 private:
206 /**
207 * Internal helper to implement @c push and @ push_and_get_reply.
208 * Creates and pushes a request to XCom with the payload @c msg and using the
209 * reply strategy @c reply_function.
210 *
211 * @param msg the request's payload
212 * @param reply_function the reply strategy for this request
213 * @retval Reply* if successful
214 * @retval nullptr if unsuccessful
215 */
216 Reply *push_internal(app_data_ptr msg,
217 xcom_input_reply_function_ptr reply_function) {
218 xcom_input_request_ptr xcom_request = nullptr;
219 bool successful = false;
220 auto *xcom_reply = new (std::nothrow) Reply();
221 if (xcom_reply == nullptr) {
222 /* purecov: begin inspected */
223 // Because the app_data_ptr is allocated on the heap and not on the stack.
224 xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
225 goto end;
226 /* purecov: end */
227 }
228 // Takes ownership of msg if successful.
229 xcom_request = ::xcom_input_request_new(msg, reply_function, xcom_reply);
230 if (xcom_request == nullptr) {
231 /* purecov: begin inspected */
232 // Because the app_data_ptr is allocated on the heap and not on the stack.
233 xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
234 delete xcom_reply;
235 xcom_reply = nullptr;
236 goto end;
237 /* purecov: end */
238 }
239 /* If the push is successful, the queue takes ownership of xcom_request. */
240 successful = m_queue.push(xcom_request);
241 if (!successful) {
242 delete xcom_reply;
243 xcom_reply = nullptr;
244 ::xcom_input_request_free(xcom_request); // also destroys msg
245 }
246 end:
247 return xcom_reply;
248 }
249 Queue m_queue; // Wrapped implementation.
250};
252
253static inline void do_not_reply(void *reply, pax_msg *payload) {
254 auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
255 delete xcom_reply;
256 replace_pax_msg(&payload, nullptr);
257}
258static inline void reply_by_resolving_future(void *reply, pax_msg *payload) {
259 auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
260 xcom_reply->resolve(payload); // Takes ownership of payload.
261 /* xcom_reply will be deleted when the associated future has been waited for,
262 its value retrieved, and its value is deleted. */
263}
264
265#endif /* GCS_XCOM_INPUT_QUEUE_INCLUDED */
MPSC queue with FIFO semantics.
Definition: gcs_mpsc_queue.h:40
Wraps XCom's reply to a queued request.
Definition: gcs_xcom_input_queue.h:82
pax_msg const * get_payload()
Get XCom's reply.
Definition: gcs_xcom_input_queue.h:109
std::promise< std::unique_ptr< Reply > > m_promise
Simultaneously holds the future's shared state and takes care of the lifetime of *this.
Definition: gcs_xcom_input_queue.h:116
~Reply()
Definition: gcs_xcom_input_queue.h:85
pax_msg * m_payload
XCom's reply.
Definition: gcs_xcom_input_queue.h:113
void resolve(pax_msg *payload)
Associates the payload of XCom's reply to this object.
Definition: gcs_xcom_input_queue.h:92
future_reply get_future()
Get the future that will hold a pointer to this object after it has been resolved.
Definition: gcs_xcom_input_queue.h:103
Reply() noexcept
Definition: gcs_xcom_input_queue.h:84
MPSC queue with FIFO semantics used to send commands to XCom.
Definition: gcs_xcom_input_queue.h:57
~Gcs_xcom_input_queue_impl()
Definition: gcs_xcom_input_queue.h:121
Gcs_xcom_input_queue_impl(Gcs_xcom_input_queue_impl &&)=delete
Gcs_xcom_input_queue_impl(Gcs_xcom_input_queue_impl const &)=delete
void reset()
Empties the queue.
Definition: gcs_xcom_input_queue.h:195
bool push(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:138
xcom_input_request_ptr pop()
Attempts to retrieve all the queued app_data_ptr in one swoop.
Definition: gcs_xcom_input_queue.h:177
std::future< std::unique_ptr< Reply > > future_reply
Definition: gcs_xcom_input_queue.h:60
Gcs_xcom_input_queue_impl() noexcept
Definition: gcs_xcom_input_queue.h:119
future_reply push_and_get_reply(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:157
Reply * push_internal(app_data_ptr msg, xcom_input_reply_function_ptr reply_function)
Internal helper to implement push and @ push_and_get_reply.
Definition: gcs_xcom_input_queue.h:216
Gcs_xcom_input_queue_impl & operator=(Gcs_xcom_input_queue_impl const &)=delete
Queue m_queue
Definition: gcs_xcom_input_queue.h:249
Gcs_xcom_input_queue_impl & operator=(Gcs_xcom_input_queue_impl &&)=delete
Implements a persistent FIFO using server List method names.
Definition: sql_profile.h:76
T * pop()
Definition: sql_profile.h:122
static void reply_by_resolving_future(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:258
static void do_not_reply(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:253
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
#define replace_pax_msg(target, p)
Definition: pax_msg.h:63
struct pax_msg pax_msg
Definition: site_struct.h:37
Definition: gcs_xcom_input_queue.h:34
constexpr xcom_input_request_ptr_deleter() noexcept=default
Definition: xcom_input_request.cc:31
struct xcom_input_request xcom_input_request
void xcom_input_request_set_next(xcom_input_request_ptr request, xcom_input_request_ptr next)
Links request to the list of requests next.
Definition: xcom_input_request.cc:63
xcom_input_request_ptr xcom_input_request_extract_next(xcom_input_request_ptr request)
Unlinks request from its list.
Definition: xcom_input_request.cc:68
void xcom_input_request_reply(xcom_input_request_ptr request, pax_msg *payload)
Replies to the request using the strategy chosen by the request's origin.
Definition: xcom_input_request.cc:82
xcom_input_request_ptr xcom_input_request_new(app_data_ptr a, xcom_input_reply_function_ptr reply_function, void *reply_arg)
Creates a new XCom request.
Definition: xcom_input_request.cc:38
void xcom_input_request_free(xcom_input_request_ptr request)
Frees the given request and its payload.
Definition: xcom_input_request.cc:51
void(* xcom_input_reply_function_ptr)(void *reply_arg, pax_msg *payload)
The function type that XCom will use to reply to a request.
Definition: xcom_input_request.h:38
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:143
void xdr_free(xdrproc_t __proc, char *__objp) __THROW
Definition: xdr.c:63