MySQL  8.0.22
Source Code Documentation
gcs_xcom_input_queue.h
Go to the documentation of this file.
1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef GCS_XCOM_INPUT_QUEUE_INCLUDED
24 #define GCS_XCOM_INPUT_QUEUE_INCLUDED
25 
26 #include <future>
27 #include <memory>
31 #include "plugin/group_replication/libmysqlgcs/xdr_gen/xcom_vp.h"
32 
34  constexpr xcom_input_request_ptr_deleter() noexcept = default;
36  if (ptr != nullptr) {
37  /* By calling the reply function, the reply object is deleted. Either
38  because there is no one waiting (reply function deletes the object), or
39  someone is waiting (or will wait) for the reply's future, in which case
40  the object is deleted when the future's result is deleted. */
41  ::xcom_input_request_reply(ptr, nullptr);
43  }
44  }
45 };
46 
47 static void do_not_reply(void *reply, pax_msg *payload);
48 static void reply_by_resolving_future(void *reply, pax_msg *payload);
49 
50 /**
51  * MPSC queue with FIFO semantics used to send commands to XCom.
52  */
53 /* The Queue template is only overridden by tests. */
54 template <typename Queue = Gcs_mpsc_queue<xcom_input_request,
57  public:
58  class Reply;
59  using future_reply = std::future<std::unique_ptr<Reply>>;
60  /**
61  * Wraps XCom's reply to a queued request.
62  *
63  * A request sent to XCom contains three important pieces of data:
64  *
65  * 1. The payload, i.e. what we want XCom to do
66  * 2. A function instructing XCom how to reply to the request
67  * 3. A Reply object
68  * (For more details please see xcom_input_request.)
69  *
70  * When GCS pushes a request to XCom, GCS receives back a future pointer to
71  * the Reply object associated with the request. (future_reply)
72  * Basically, this future's result is equivalent to the reply GCS would get if
73  * it used a TCP socket to send the request to XCom.
74  * XCom will resolve this future when it receives and processes the request.
75  * When GCS retrieves the future's result, GCS obtains a std::unique_ptr to
76  * the Reply. Unless you do any shenanigans with this std::unique_ptr, when it
77  * goes out of scope the Reply object is destroyed.
78  * The Reply object owns the payload of XCom's reply (pax_msg), so the Reply
79  * object destroys the pax_msg when it is destroyed.
80  */
81  class Reply {
82  public:
83  Reply() noexcept : m_payload(nullptr), m_promise() {}
84  ~Reply() { replace_pax_msg(&m_payload, nullptr); }
85  /**
86  * Associates the payload of XCom's reply to this object.
87  * Resolves the future.
88  *
89  * @param payload XCom's reply
90  */
91  void resolve(pax_msg *payload) {
92  m_payload = payload;
93  m_promise.set_value(std::unique_ptr<Reply>(this));
94  }
95  /**
96  * Get the future that will hold a pointer to this object after it has been
97  * resolved.
98  *
99  * @returns a future pointer to this object when it already contains XCom's
100  * reply
101  */
102  future_reply get_future() { return m_promise.get_future(); }
103  /**
104  * Get XCom's reply.
105  *
106  * @returns XCom's reply
107  */
108  pax_msg const *get_payload() { return m_payload; }
109 
110  private:
111  /** XCom's reply. */
112  pax_msg *m_payload;
113  /** Simultaneously holds the future's shared state and takes care of the
114  * lifetime of *this. */
115  std::promise<std::unique_ptr<Reply>> m_promise;
116  };
122  delete;
124  /**
125  * Sends the command @c msg to XCom.
126  *
127  * This method has fire-and-forget semantics, i.e. we do not wait or have
128  * access to any potential reply from XCom.
129  * Takes ownership of @c msg.
130  *
131  * @param msg the app_data_ptr to send to XCom
132  * @retval false if there is no memory avaiable
133  * @retval true otherwise (operation was successful)
134  */
135  bool push(app_data_ptr msg) {
136  Reply *reply = push_internal(msg, do_not_reply);
137  /* reply is destroyed by XCom. */
138  bool const successful = (reply != nullptr);
139  return successful;
140  }
141  /**
142  * Sends the command @c msg to XCom.
143  *
144  * This method has request-response semantics, i.e. we get back a future on
145  * which we must wait for XCom's reply. Please note that you must retrieve the
146  * future's value, otherwise it will leak. If you do not care for a reply, use
147  * @c push instead.
148  * Takes ownership of @c msg.
149  *
150  * @param msg the app_data_ptr to send to XCom
151  * @returns a valid future (future.valid()) if successful, an invalid future
152  * (!future.valid()) otherwise.
153  */
155  future_reply future;
156  Reply *reply = push_internal(msg, reply_by_resolving_future);
157  /* reply is destroyed by XCom. */
158  bool const successful = (reply != nullptr);
159  if (successful) {
160  future = reply->get_future();
161  }
162  return future;
163  }
164  /**
165  * Attempts to retrieve all the queued app_data_ptr in one swoop.
166  *
167  * Transfers ownership of the returned pointer(s).
168  * Note that this method is non-blocking.
169  *
170  * @retval app_data_ptr linked list of the queued comamnds if the queue is
171  * not empty
172  * @retval nullptr if the queue is empty
173  */
175  xcom_input_request *payload = m_queue.pop();
176  if (payload == nullptr) return nullptr;
177  /* Process first. */
178  xcom_input_request_ptr first_in = payload; // Take ownership.
179  xcom_input_request_ptr last_in = first_in;
180  /* Process remaining. */
181  payload = m_queue.pop();
182  while (payload != nullptr) {
183  ::xcom_input_request_set_next(last_in, payload); // Take ownership.
184  last_in = payload;
185  payload = m_queue.pop();
186  }
187  return first_in;
188  }
189  /**
190  * Empties the queue.
191  */
192  void reset() {
193  xcom_input_request_ptr cursor = pop();
194  while (cursor != nullptr) {
195  xcom_input_request_ptr next_request =
198  cursor = next_request;
199  }
200  }
201 
202  private:
203  /**
204  * Internal helper to implement @c push and @ push_and_get_reply.
205  * Creates and pushes a request to XCom with the payload @c msg and using the
206  * reply strategy @c reply_function.
207  *
208  * @param msg the request's payload
209  * @param reply_function the reply strategy for this request
210  * @retval Reply* if successful
211  * @retval nullptr if unsuccessful
212  */
213  Reply *push_internal(app_data_ptr msg,
214  xcom_input_reply_function_ptr reply_function) {
215  xcom_input_request_ptr xcom_request = nullptr;
216  bool successful = false;
217  auto *xcom_reply = new (std::nothrow) Reply();
218  if (xcom_reply == nullptr) {
219  /* purecov: begin inspected */
220  // Because the app_data_ptr is allocated on the heap and not on the stack.
221  xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
222  goto end;
223  /* purecov: end */
224  }
225  // Takes ownership of msg if successful.
226  xcom_request = ::xcom_input_request_new(msg, reply_function, xcom_reply);
227  if (xcom_request == nullptr) {
228  /* purecov: begin inspected */
229  // Because the app_data_ptr is allocated on the heap and not on the stack.
230  xdr_free((xdrproc_t)xdr_app_data_ptr, (char *)&msg);
231  delete xcom_reply;
232  xcom_reply = nullptr;
233  goto end;
234  /* purecov: end */
235  }
236  /* If the push is successful, the queue takes ownership of xcom_request. */
237  successful = m_queue.push(xcom_request);
238  if (!successful) {
239  delete xcom_reply;
240  xcom_reply = nullptr;
241  ::xcom_input_request_free(xcom_request); // also destroys msg
242  }
243  end:
244  return xcom_reply;
245  }
246  Queue m_queue; // Wrapped implementation.
247 };
249 
250 static inline void do_not_reply(void *reply, pax_msg *payload) {
251  auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
252  delete xcom_reply;
253  replace_pax_msg(&payload, nullptr);
254 }
255 static inline void reply_by_resolving_future(void *reply, pax_msg *payload) {
256  auto *xcom_reply = static_cast<Gcs_xcom_input_queue::Reply *>(reply);
257  xcom_reply->resolve(payload); // Takes ownership of payload.
258  /* xcom_reply will be deleted when the associated future has been waited for,
259  its value retrieved, and its value is deleted. */
260 }
261 
262 #endif /* GCS_XCOM_INPUT_QUEUE_INCLUDED */
pax_msg const * get_payload()
Get XCom&#39;s reply.
Definition: gcs_xcom_input_queue.h:108
MPSC queue with FIFO semantics used to send commands to XCom.
Definition: gcs_xcom_input_queue.h:56
~Reply()
Definition: gcs_xcom_input_queue.h:84
xcom_input_request_ptr xcom_input_request_new(app_data_ptr a, xcom_input_reply_function_ptr reply_function, void *reply_arg)
Creates a new XCom request.
Definition: xcom_input_request.cc:34
void xcom_input_request_set_next(xcom_input_request_ptr request, xcom_input_request_ptr next)
Links request to the list of requests next.
Definition: xcom_input_request.cc:59
xcom_input_request_ptr xcom_input_request_extract_next(xcom_input_request_ptr request)
Unlinks request from its list.
Definition: xcom_input_request.cc:64
void xcom_input_request_reply(xcom_input_request_ptr request, pax_msg *payload)
Replies to the request using the strategy chosen by the request&#39;s origin.
Definition: xcom_input_request.cc:78
Reply * push_internal(app_data_ptr msg, xcom_input_reply_function_ptr reply_function)
Internal helper to implement push and @ push_and_get_reply.
Definition: gcs_xcom_input_queue.h:213
pax_msg * m_payload
XCom&#39;s reply.
Definition: gcs_xcom_input_queue.h:112
Gcs_xcom_input_queue_impl() noexcept
Definition: gcs_xcom_input_queue.h:117
void(* xcom_input_reply_function_ptr)(void *reply_arg, pax_msg *payload)
The function type that XCom will use to reply to a request.
Definition: xcom_input_request.h:37
Implements a persistent FIFO using server List method names.
Definition: sql_profile.h:75
Wraps XCom&#39;s reply to a queued request.
Definition: gcs_xcom_input_queue.h:81
Reply() noexcept
Definition: gcs_xcom_input_queue.h:83
constexpr xcom_input_request_ptr_deleter() noexcept=default
Definition: gcs_xcom_input_queue.h:33
std::future< std::unique_ptr< Reply > > future_reply
Definition: gcs_xcom_input_queue.h:59
future_reply push_and_get_reply(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:154
MPSC queue with FIFO semantics.
Definition: gcs_mpsc_queue.h:39
~Gcs_xcom_input_queue_impl()
Definition: gcs_xcom_input_queue.h:118
std::promise< std::unique_ptr< Reply > > m_promise
Simultaneously holds the future&#39;s shared state and takes care of the lifetime of *this.
Definition: gcs_xcom_input_queue.h:115
T * pop()
Attempt to retrieve the first element from the queue.
Definition: gcs_mpsc_queue.h:159
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188
Definition: xcom_input_request.cc:27
future_reply get_future()
Get the future that will hold a pointer to this object after it has been resolved.
Definition: gcs_xcom_input_queue.h:102
void reset()
Empties the queue.
Definition: gcs_xcom_input_queue.h:192
void operator()(xcom_input_request_ptr ptr) const
Definition: gcs_xcom_input_queue.h:35
char msg[1024]
Definition: test_sql_9_sessions.cc:281
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:142
xcom_input_request_ptr pop()
Attempts to retrieve all the queued app_data_ptr in one swoop.
Definition: gcs_xcom_input_queue.h:174
Queue m_queue
Definition: gcs_xcom_input_queue.h:246
static void reply_by_resolving_future(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:255
static void do_not_reply(void *reply, pax_msg *payload)
Definition: gcs_xcom_input_queue.h:250
void xdr_free(xdrproc_t __proc, char *__objp) __THROW
Definition: xdr.c:62
void resolve(pax_msg *payload)
Associates the payload of XCom&#39;s reply to this object.
Definition: gcs_xcom_input_queue.h:91
bool push(app_data_ptr msg)
Sends the command msg to XCom.
Definition: gcs_xcom_input_queue.h:135
#define replace_pax_msg(target, p)
Definition: pax_msg.h:62
void xcom_input_request_free(xcom_input_request_ptr request)
Frees the given request and its payload.
Definition: xcom_input_request.cc:47
struct xcom_input_request xcom_input_request