MySQL  8.0.22
Source Code Documentation
gcs_xcom_communication_protocol_changer.h
Go to the documentation of this file.
1 /* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
24 #define GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
25 
26 #include <atomic> // std::atomic
27 #include <condition_variable> // std::condition_variable
28 #include <cstdlib>
29 #include <future> // std::future, std::promise
30 #include <mutex> // std::mutex
31 #include <utility> // std::pair
32 
41 
42 /**
43  Implements the communication protocol change logic.
44 
45  Design
46  =============================================================================
47  The algorithm to change the communication protocol is roughly as follows:
48 
49  1. Start buffering the node's outgoing messages.
50  2. Wait until all the node's outgoing messages have been delivered.
51  3. Modify the node's communication protocol version.
52  4. Stop buffering the node's outgoing messages and send any messages
53  buffered in step (1).
54 
55  Implementing the algorithm requires synchronising user threads, which send
56  messages, with the GCS thread, which performs communication protocol changes.
57 
58  The high level view of the synchronisation protocol between the user and GCS
59  threads is the following:
60 
61  when send-message(m) from user thread:
62  atomically:
63  if protocol_changing:
64  wait until protocol_changing = false
65  nr_msgs_in_transit++
66  ...
67 
68  when change-protocol(v) from GCS thread:
69  atomically:
70  protocol_changing := true
71  wait until nr_msgs_in_transit = 0
72  ...
73 
74  We expect that communication protocol changes are rare events, especially
75  when compared to sending messages.
76  As such, the actual implementation strives to minimise the overhead on the
77  code path that sends messages.
78 
79  To do this, we use an optimistic synchronisation protocol on the send message
80  side, that works as follows:
81 
82  Algorithm #0, User thread:
83  1. If no protocol change is ongoing, the user thread will optimistically
84  increment the number of messages in transit.
85  2.a If a protocol change did not start meanwhile, we are good to go.
86  2.b If a protocol change started meanwhile:
87  2.b.1. Rollback the increment to the number of messages in transit
88  2.b.2. Wait for the protocol change to finish.
89 
90  There is an additional action that needs to be performed on step (2.b), but
91  we will describe that action when we have the necessary context to understand
92  it.
93 
94  On the protocol change side, it works as follows:
95 
96  Algorithm #1, GCS thread:
97  1. Store that a protocol change is ongoing.
98  2. When the number of messages in transit is zero:
99  2.1. Change the protocol version
100  2.2. Wake up any user threads waiting for the protocol change
101  2.3. Deem the protocol change finished
102 
103  The central part of the Algorithm #1 is step (2).
104  The question is: who triggers, and where, step (2)'s condition, i.e. the
105  number of in-transit messages is zero?
106  Well, the obvious place is that it is the GCS thread itself, when it is
107  processing an incoming message.
108  If that message comes from us, then we decrease the number of in-transit
109  messages, which may set it to zero.
110 
111  However, recall that the user threads employ an optimistic synchronisation
112  protocol that "acts first, and asks for forgiveness later."
113  If the user thread rolls back its increment to the number of in-transit
114  messages, it may be the one to set it to zero---see Algorithm #0, step (2.b).
115  In this situation, it is the user thread that hits the condition required by
116  the GCS thread in Algorithm #1, step (2).
117  In order for the GCS thread to finish the protocol change, the user thread
118  must somehow signal the GCS thread to trigger its step (2).
119  This is the missing action of Algorithm #0, step (2.b).
120 
121  So, the final synchronisation protocol of the user thread's side looks like
122  this:
123 
124  Algorithm #2, User thread:
125  1. If no protocol change is ongoing, the user thread will optimistically
126  increment the number of messages in transit.
127  2.a If a protocol change did not start meanwhile, we are good to go.
128  2.b If a protocol change started meanwhile:
129  2.b.1. Rollback the the increment to the number of messages in transit
130  2.b.2. If our rollback set the number of messages in transit to zero,
131  signal the GCS thread
132  2.b.3. Wait for the protocol change to finish.
133 
134  Implementation
135  =============================================================================
136  The implementation attempts to add as little overhead as possible to the
137  common case, which is that no protocol change is ongoing.
138  This is the fast path of Algorithm #2, step (2.a).
139  To achieve this goal, it employs a tagged lock.
140  For more details on the tagged lock implementation, see @c Gcs_tagged_lock.
141 
142  In a nutshell, the tagged lock is a read-write spin lock which offers the
143  following API:
144 
145  try_lock() -> bool
146  unlock()
147  optimistic_read() -> tag
148  validate_optimistic_read(tag) -> bool
149 
150  For the write-side section, one uses it as a typical spin lock, e.g.:
151 
152  do:
153  lock_acquired := try_lock()
154  while (not lock_acquired)
155  write-side section
156  unlock()
157 
158  For the read-side section, one can use it as follows:
159 
160  done := false
161  while (not done):
162  tag := optimistic_read()
163  unsynchronised read-side section
164  done := validate_optimistic_read(tag)
165  if (not done):
166  rollback unsynchronized read-side section
167 
168  The idea is to allow an optimistic read-side section that does not perform
169  any memory stores.
170  This is in contrast with a typical read-write lock, where the read side
171  performs some memory stores to account for the reader, e.g. keeping a reader
172  counter.
173  The trade off is that:
174 
175  a. the execution of the read-side of a tagged lock may be concurrent with
176  the write-side section if meanwhile the tagged lock is acquired
177  b. the read-side of a tagged lock may fail if meanwhile the tagged lock is
178  acquired, in which case one may want to rollback the effects of the
179  failed read-side section
180 
181  The algorithms of the design are implemented as follows:
182 
183  Algorithm #1 implementation, GCS thread:
184  1. Lock the tagged lock
185  2. When the number of messages in transit is zero:
186  2.1. Change the protocol version
187  2.2. Unlock the tagged lock, signal a condition variable to wake up any
188  user threads waiting for the protocol change
189  2.3. Deem the protocol change finished
190 
191  Algorithm #2 implementation, User thread:
192  1. If the tagged lock is unlocked:
193  1.1. Start an optimistic read-side section
194  1.2. Atomically increment the number of messages in transit
195  2.a If the optimistic read-side section validates, we are good to go.
196  2.b If the optimistic read-side section fails validation:
197  2.b.1. Atomically rollback the increment to the number of messages
198  in transit
199  2.b.2. If our rollback set the number of messages in transit to zero,
200  signal the GCS thread
201  2.b.3. Wait on a condition variable for the protocol change to finish.
202 
203  Note that we have concurrent access to the number of messages in transit
204  which needs to be synchronised.
205  This is done by using an std::atomic to implement the number of messages in
206  transit.
207 
208  Some final implementation pointers:
209 
210  a. Algorithm #1: see the code path that starts on @c set_protocol_version
211  and @c finish_protocol_version_change.
212  b. Algorithm #2: see the code paths that start on
213  @c atomically_increment_nr_packets_in_transit,
214  @c adjust_nr_packets_in_transit, and @c decrement_nr_packets_in_transit.
215  */
217  public:
220 
222  Gcs_xcom_communication_protocol_changer const &) = delete;
225 
227  Gcs_xcom_communication_protocol_changer const &) = delete;
230 
231  /**
232  Retrieves the current protocol version in use.
233  @returns the current protocol version in use
234  */
236 
237  /**
238  Starts a protocol change.
239  The protocol change is asynchronous, the caller can wait for the change to
240  finish using the returned future.
241 
242  Note that for safety this method *must only* be called by the GCS engine
243  thread.
244 
245  @param new_version The desired protocol version to change to
246  @retval {true, future} If successful
247  @retval {false, _} If the group does not support the requested protocol
248  */
249  std::pair<bool, std::future<void>> set_protocol_version(
250  Gcs_protocol_version new_version);
251 
252  /**
253  Retrieves the greatest protocol version currently supported by the group.
254  @returns the greatest protocol version currently supported by the group
255  */
257 
258  /**
259  Sets the greatest protocol version currently supported by the group.
260  @param version protocol
261  */
263 
264  /**
265  Checks whether a protocol change is ongoing.
266  @returns true if a protocol change is ongoing, false otherwise
267  */
268  bool is_protocol_change_ongoing() const;
269 
270  /**
271  Synchronises user threads, which send messages, with the GCS thread,
272  which performs protocol changes.
273 
274  This method should be called by user threads when sending a message, before
275  the message goes through the pipeline.
276 
277  @param cargo The type of message that will be sent
278  */
280 
281  /**
282  After an outgoing message goes through the pipeline, it may produce more
283  than one packet. This method adjusts the increment done by
284  atomically_increment_nr_packets_in_transit to take into account the
285  additional packets produced by the pipeline.
286 
287  @param cargo The type of message that will be sent
288  @param nr_additional_packets_to_send The number of additional packets that
289  will actually be sent
290  */
292  Cargo_type const &cargo,
293  std::size_t const &nr_additional_packets_to_send);
294 
295  /**
296  Decrement the number of my in-transit packets.
297 
298  @param packet The incoming packet
299  @param xcom_nodes The XCom membership at the time of delivery
300  */
301  void decrement_nr_packets_in_transit(Gcs_packet const &packet,
302  Gcs_xcom_nodes const &xcom_nodes);
303 
304  /**
305  Due to the synchronisation protocol used between user threads, which send
306  messages, and the GCS thread, which performs protocol changes, a user
307  thread may be the one to hit the condition that triggers the protocol
308  change to finish.
309 
310  This function should be called by the user thread when it hits the
311  condition, to signal the GCS thread that the protocol change should finish.
312 
313  @param caller_tag Identifier of the protocol change
314  */
316 
317  private:
318  /*
319  Returns how many packets of mine are in-transit.
320  */
321  unsigned long get_nr_packets_in_transit() const;
322 
323  /*
324  Begins a protocol change, and finishes it if the conditions are met, i.e.
325  we have no packets in-transit.
326  */
328 
329  /*
330  Finishes the ongoing protocol change.
331 
332  This method must only be called when is_protocol_change_ongoing(), i.e. after
333  a call to begin_protocol_version_change(_).
334  */
336 
337  /*
338  Releases the tagged lock and notifies threads waiting for the protocol change
339  to finish.
340  */
342 
343  /*
344  Auxiliary method to the implementation of
345  atomically_increment_nr_packets_in_transit.
346 
347  Optimistically assumes a protocol change will not start meanwhile, and
348  increments the number of packets in transit.
349  */
350  std::pair<bool, Gcs_tagged_lock::Tag>
352 
353  /*
354  Auxiliary method to the implementation of
355  atomically_increment_nr_packets_in_transit.
356 
357  Rolls back the effects of optimistically_increment_nr_packets_in_transit and
358  signals the GCS thread to finish the protocol change if necessary.
359  */
361  Gcs_tagged_lock::Tag const &tag);
362 
363  /*
364  Auxiliary method to the implementation of
365  atomically_increment_nr_packets_in_transit.
366 
367  Waits until the ongoing protocol change finishes.
368  */
370 
371  /**
372  Tagged lock used for the optimistic synchronisation protocol between user
373  threads, which send messages, and the GCS thread, which performs protocol
374  changes.
375  */
377 
378  /**
379  For user threads to wait for an ongoing protocol change to finish.
380  */
382  std::condition_variable m_protocol_change_finished;
383 
384  /**
385  Stores the outcome of the protocol change operation.
386  */
387  std::promise<void> m_promise;
388 
389  /**
390  The protocol version we are going to change to when we start a protocol
391  change.
392  */
394 
395  std::atomic<Gcs_protocol_version> m_max_supported_protocol;
396 
397  std::atomic<unsigned long> m_nr_packets_in_transit;
398 
400 
402 };
403 
404 #endif /* GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED */
void decrement_nr_packets_in_transit(Gcs_packet const &packet, Gcs_xcom_nodes const &xcom_nodes)
Decrement the number of my in-transit packets.
Definition: gcs_xcom_communication_protocol_changer.cc:296
void begin_protocol_version_change(Gcs_protocol_version new_version)
Definition: gcs_xcom_communication_protocol_changer.cc:78
Gcs_xcom_engine & m_gcs_engine
Definition: gcs_xcom_communication_protocol_changer.h:399
pthread_mutex_t mutex
Definition: memcached.c:384
void commit_protocol_version_change()
Definition: gcs_xcom_communication_protocol_changer.cc:101
std::mutex m_mutex
For user threads to wait for an ongoing protocol change to finish.
Definition: gcs_xcom_communication_protocol_changer.h:381
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:127
Gcs_protocol_version m_tentative_new_protocol
The protocol version we are going to change to when we start a protocol change.
Definition: gcs_xcom_communication_protocol_changer.h:393
Implements the communication protocol change logic.
Definition: gcs_xcom_communication_protocol_changer.h:216
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:104
Gcs_xcom_communication_protocol_changer(Gcs_xcom_engine &gcs_engine, Gcs_message_pipeline &pipeline)
Definition: gcs_xcom_communication_protocol_changer.cc:30
Gcs_tagged_lock m_tagged_lock
Tagged lock used for the optimistic synchronisation protocol between user threads, which send messages, and the GCS thread, which performs protocol changes.
Definition: gcs_xcom_communication_protocol_changer.h:376
void rollback_increment_nr_packets_in_transit(Gcs_tagged_lock::Tag const &tag)
Definition: gcs_xcom_communication_protocol_changer.cc:194
void set_maximum_supported_protocol_version(Gcs_protocol_version version)
Sets the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:157
Gcs_protocol_version get_maximum_supported_protocol_version() const
Retrieves the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:152
Definition: gcs_xcom_notification.h:93
void adjust_nr_packets_in_transit(Cargo_type const &cargo, std::size_t const &nr_additional_packets_to_send)
After an outgoing message goes through the pipeline, it may produce more than one packet...
Definition: gcs_xcom_communication_protocol_changer.cc:275
std::condition_variable m_protocol_change_finished
Definition: gcs_xcom_communication_protocol_changer.h:382
std::atomic< Gcs_protocol_version > m_max_supported_protocol
Definition: gcs_xcom_communication_protocol_changer.h:395
void release_tagged_lock_and_notify_waiters()
Definition: gcs_xcom_communication_protocol_changer.cc:119
std::promise< void > m_promise
Stores the outcome of the protocol change operation.
Definition: gcs_xcom_communication_protocol_changer.h:387
const string version("\ersion\)
This class contains information on the configuration, i.e set of nodes or simply site definition...
Definition: gcs_xcom_group_member_information.h:390
The Gcs_tagged_lock class Implements a tagged lock for optimistic read-side sections.
Definition: gcs_tagged_lock.h:83
std::uint64_t Tag
Definition: gcs_tagged_lock.h:85
void atomically_increment_nr_packets_in_transit(Cargo_type const &cargo)
Synchronises user threads, which send messages, with the GCS thread, which performs protocol changes...
Definition: gcs_xcom_communication_protocol_changer.cc:239
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
std::pair< bool, Gcs_tagged_lock::Tag > optimistically_increment_nr_packets_in_transit()
Definition: gcs_xcom_communication_protocol_changer.cc:177
static Gcs_xcom_engine * gcs_engine
Definition: gcs_xcom_interface.cc:110
void finish_protocol_version_change(Gcs_tagged_lock::Tag const caller_tag)
Due to the synchronisation protocol used between user threads, which send messages, and the GCS thread, which performs protocol changes, a user thread may be the one to hit the condition that triggers the protocol change to finish.
Definition: gcs_xcom_communication_protocol_changer.cc:127
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:364
void wait_for_protocol_change_to_finish()
Definition: gcs_xcom_communication_protocol_changer.cc:228
std::pair< bool, std::future< void > > set_protocol_version(Gcs_protocol_version new_version)
Starts a protocol change.
Definition: gcs_xcom_communication_protocol_changer.cc:48
std::atomic< unsigned long > m_nr_packets_in_transit
Definition: gcs_xcom_communication_protocol_changer.h:397
Gcs_protocol_version get_protocol_version() const
Retrieves the current protocol version in use.
Definition: gcs_xcom_communication_protocol_changer.cc:43
Gcs_message_pipeline & m_msg_pipeline
Definition: gcs_xcom_communication_protocol_changer.h:401
bool is_protocol_change_ongoing() const
Checks whether a protocol change is ongoing.
Definition: gcs_xcom_communication_protocol_changer.cc:146
Gcs_xcom_communication_protocol_changer & operator=(Gcs_xcom_communication_protocol_changer const &)=delete
unsigned long get_nr_packets_in_transit() const
Definition: gcs_xcom_communication_protocol_changer.cc:166