MySQL 8.1.0
Source Code Documentation
gcs_xcom_communication_protocol_changer.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
24#define GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
25
26#include <atomic> // std::atomic
27#include <condition_variable> // std::condition_variable
28#include <cstdlib>
29#include <future> // std::future, std::promise
30#include <mutex> // std::mutex
31#include <utility> // std::pair
32
41
42/**
43 Implements the communication protocol change logic.
44
45 Design
46 =============================================================================
47 The algorithm to change the communication protocol is roughly as follows:
48
49 1. Start buffering the node's outgoing messages.
50 2. Wait until all the node's outgoing messages have been delivered.
51 3. Modify the node's communication protocol version.
52 4. Stop buffering the node's outgoing messages and send any messages
53 buffered in step (1).
54
55 Implementing the algorithm requires synchronising user threads, which send
56 messages, with the GCS thread, which performs communication protocol changes.
57
58 The high level view of the synchronisation protocol between the user and GCS
59 threads is the following:
60
61 when send-message(m) from user thread:
62 atomically:
63 if protocol_changing:
64 wait until protocol_changing = false
65 nr_msgs_in_transit++
66 ...
67
68 when change-protocol(v) from GCS thread:
69 atomically:
70 protocol_changing := true
71 wait until nr_msgs_in_transit = 0
72 ...
73
74 We expect that communication protocol changes are rare events, especially
75 when compared to sending messages.
76 As such, the actual implementation strives to minimise the overhead on the
77 code path that sends messages.
78
79 To do this, we use an optimistic synchronisation protocol on the send message
80 side, that works as follows:
81
82 Algorithm #0, User thread:
83 1. If no protocol change is ongoing, the user thread will optimistically
84 increment the number of messages in transit.
85 2.a If a protocol change did not start meanwhile, we are good to go.
86 2.b If a protocol change started meanwhile:
87 2.b.1. Rollback the increment to the number of messages in transit
88 2.b.2. Wait for the protocol change to finish.
89
90 There is an additional action that needs to be performed on step (2.b), but
91 we will describe that action when we have the necessary context to understand
92 it.
93
94 On the protocol change side, it works as follows:
95
96 Algorithm #1, GCS thread:
97 1. Store that a protocol change is ongoing.
98 2. When the number of messages in transit is zero:
99 2.1. Change the protocol version
100 2.2. Wake up any user threads waiting for the protocol change
101 2.3. Deem the protocol change finished
102
103 The central part of the Algorithm #1 is step (2).
104 The question is: who triggers, and where, step (2)'s condition, i.e. the
105 number of in-transit messages is zero?
106 Well, the obvious place is that it is the GCS thread itself, when it is
107 processing an incoming message.
108 If that message comes from us, then we decrease the number of in-transit
109 messages, which may set it to zero.
110
111 However, recall that the user threads employ an optimistic synchronisation
112 protocol that "acts first, and asks for forgiveness later."
113 If the user thread rolls back its increment to the number of in-transit
114 messages, it may be the one to set it to zero---see Algorithm #0, step (2.b).
115 In this situation, it is the user thread that hits the condition required by
116 the GCS thread in Algorithm #1, step (2).
117 In order for the GCS thread to finish the protocol change, the user thread
118 must somehow signal the GCS thread to trigger its step (2).
119 This is the missing action of Algorithm #0, step (2.b).
120
121 So, the final synchronisation protocol of the user thread's side looks like
122 this:
123
124 Algorithm #2, User thread:
125 1. If no protocol change is ongoing, the user thread will optimistically
126 increment the number of messages in transit.
127 2.a If a protocol change did not start meanwhile, we are good to go.
128 2.b If a protocol change started meanwhile:
129 2.b.1. Rollback the the increment to the number of messages in transit
130 2.b.2. If our rollback set the number of messages in transit to zero,
131 signal the GCS thread
132 2.b.3. Wait for the protocol change to finish.
133
134 Implementation
135 =============================================================================
136 The implementation attempts to add as little overhead as possible to the
137 common case, which is that no protocol change is ongoing.
138 This is the fast path of Algorithm #2, step (2.a).
139 To achieve this goal, it employs a tagged lock.
140 For more details on the tagged lock implementation, see @c Gcs_tagged_lock.
141
142 In a nutshell, the tagged lock is a read-write spin lock which offers the
143 following API:
144
145 try_lock() -> bool
146 unlock()
147 optimistic_read() -> tag
148 validate_optimistic_read(tag) -> bool
149
150 For the write-side section, one uses it as a typical spin lock, e.g.:
151
152 do:
153 lock_acquired := try_lock()
154 while (not lock_acquired)
155 write-side section
156 unlock()
157
158 For the read-side section, one can use it as follows:
159
160 done := false
161 while (not done):
162 tag := optimistic_read()
163 unsynchronised read-side section
164 done := validate_optimistic_read(tag)
165 if (not done):
166 rollback unsynchronized read-side section
167
168 The idea is to allow an optimistic read-side section that does not perform
169 any memory stores.
170 This is in contrast with a typical read-write lock, where the read side
171 performs some memory stores to account for the reader, e.g. keeping a reader
172 counter.
173 The trade off is that:
174
175 a. the execution of the read-side of a tagged lock may be concurrent with
176 the write-side section if meanwhile the tagged lock is acquired
177 b. the read-side of a tagged lock may fail if meanwhile the tagged lock is
178 acquired, in which case one may want to rollback the effects of the
179 failed read-side section
180
181 The algorithms of the design are implemented as follows:
182
183 Algorithm #1 implementation, GCS thread:
184 1. Lock the tagged lock
185 2. When the number of messages in transit is zero:
186 2.1. Change the protocol version
187 2.2. Unlock the tagged lock, signal a condition variable to wake up any
188 user threads waiting for the protocol change
189 2.3. Deem the protocol change finished
190
191 Algorithm #2 implementation, User thread:
192 1. If the tagged lock is unlocked:
193 1.1. Start an optimistic read-side section
194 1.2. Atomically increment the number of messages in transit
195 2.a If the optimistic read-side section validates, we are good to go.
196 2.b If the optimistic read-side section fails validation:
197 2.b.1. Atomically rollback the increment to the number of messages
198 in transit
199 2.b.2. If our rollback set the number of messages in transit to zero,
200 signal the GCS thread
201 2.b.3. Wait on a condition variable for the protocol change to finish.
202
203 Note that we have concurrent access to the number of messages in transit
204 which needs to be synchronised.
205 This is done by using an std::atomic to implement the number of messages in
206 transit.
207
208 Some final implementation pointers:
209
210 a. Algorithm #1: see the code path that starts on @c set_protocol_version
211 and @c finish_protocol_version_change.
212 b. Algorithm #2: see the code paths that start on
213 @c atomically_increment_nr_packets_in_transit,
214 @c adjust_nr_packets_in_transit, and @c decrement_nr_packets_in_transit.
215 */
217 public:
220
225
230
231 /**
232 Retrieves the current protocol version in use.
233 @returns the current protocol version in use
234 */
236
237 /**
238 Starts a protocol change.
239 The protocol change is asynchronous, the caller can wait for the change to
240 finish using the returned future.
241
242 Note that for safety this method *must only* be called by the GCS engine
243 thread.
244
245 @param new_version The desired protocol version to change to
246 @retval {true, future} If successful
247 @retval {false, _} If the group does not support the requested protocol
248 */
249 std::pair<bool, std::future<void>> set_protocol_version(
250 Gcs_protocol_version new_version);
251
252 /**
253 Retrieves the greatest protocol version currently supported by the group.
254 @returns the greatest protocol version currently supported by the group
255 */
257
258 /**
259 Sets the greatest protocol version currently supported by the group.
260 @param version protocol
261 */
263
264 /**
265 Checks whether a protocol change is ongoing.
266 @returns true if a protocol change is ongoing, false otherwise
267 */
268 bool is_protocol_change_ongoing() const;
269
270 /**
271 Synchronises user threads, which send messages, with the GCS thread,
272 which performs protocol changes.
273
274 This method should be called by user threads when sending a message, before
275 the message goes through the pipeline.
276
277 @param cargo The type of message that will be sent
278 */
280
281 /**
282 After an outgoing message goes through the pipeline, it may produce more
283 than one packet. This method adjusts the increment done by
284 atomically_increment_nr_packets_in_transit to take into account the
285 additional packets produced by the pipeline.
286
287 @param cargo The type of message that will be sent
288 @param nr_additional_packets_to_send The number of additional packets that
289 will actually be sent
290 */
292 Cargo_type const &cargo,
293 std::size_t const &nr_additional_packets_to_send);
294
295 /**
296 Decrement the number of my in-transit packets.
297
298 @param packet The incoming packet
299 @param xcom_nodes The XCom membership at the time of delivery
300 */
302 Gcs_xcom_nodes const &xcom_nodes);
303
304 /**
305 Due to the synchronisation protocol used between user threads, which send
306 messages, and the GCS thread, which performs protocol changes, a user
307 thread may be the one to hit the condition that triggers the protocol
308 change to finish.
309
310 This function should be called by the user thread when it hits the
311 condition, to signal the GCS thread that the protocol change should finish.
312
313 @param caller_tag Identifier of the protocol change
314 */
316
317 private:
318 /*
319 Returns how many packets of mine are in-transit.
320 */
321 unsigned long get_nr_packets_in_transit() const;
322
323 /*
324 Begins a protocol change, and finishes it if the conditions are met, i.e.
325 we have no packets in-transit.
326 */
328
329 /*
330 Finishes the ongoing protocol change.
331
332 This method must only be called when is_protocol_change_ongoing(), i.e. after
333 a call to begin_protocol_version_change(_).
334 */
336
337 /*
338 Releases the tagged lock and notifies threads waiting for the protocol change
339 to finish.
340 */
342
343 /*
344 Auxiliary method to the implementation of
345 atomically_increment_nr_packets_in_transit.
346
347 Optimistically assumes a protocol change will not start meanwhile, and
348 increments the number of packets in transit.
349 */
350 std::pair<bool, Gcs_tagged_lock::Tag>
352
353 /*
354 Auxiliary method to the implementation of
355 atomically_increment_nr_packets_in_transit.
356
357 Rolls back the effects of optimistically_increment_nr_packets_in_transit and
358 signals the GCS thread to finish the protocol change if necessary.
359 */
361 Gcs_tagged_lock::Tag const &tag);
362
363 /*
364 Auxiliary method to the implementation of
365 atomically_increment_nr_packets_in_transit.
366
367 Waits until the ongoing protocol change finishes.
368 */
370
371 /**
372 Tagged lock used for the optimistic synchronisation protocol between user
373 threads, which send messages, and the GCS thread, which performs protocol
374 changes.
375 */
377
378 /**
379 For user threads to wait for an ongoing protocol change to finish.
380 */
381 std::mutex m_mutex;
382 std::condition_variable m_protocol_change_finished;
383
384 /**
385 Stores the outcome of the protocol change operation.
386 */
387 std::promise<void> m_promise;
388
389 /**
390 The protocol version we are going to change to when we start a protocol
391 change.
392 */
394
395 std::atomic<Gcs_protocol_version> m_max_supported_protocol;
396
397 std::atomic<unsigned long> m_nr_packets_in_transit;
398
400
402};
403
404#endif /* GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED */
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:364
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
The Gcs_tagged_lock class Implements a tagged lock for optimistic read-side sections.
Definition: gcs_tagged_lock.h:83
std::uint64_t Tag
Definition: gcs_tagged_lock.h:85
Implements the communication protocol change logic.
Definition: gcs_xcom_communication_protocol_changer.h:216
void finish_protocol_version_change(Gcs_tagged_lock::Tag const caller_tag)
Due to the synchronisation protocol used between user threads, which send messages,...
Definition: gcs_xcom_communication_protocol_changer.cc:127
std::pair< bool, Gcs_tagged_lock::Tag > optimistically_increment_nr_packets_in_transit()
Definition: gcs_xcom_communication_protocol_changer.cc:177
Gcs_message_pipeline & m_msg_pipeline
Definition: gcs_xcom_communication_protocol_changer.h:401
std::atomic< unsigned long > m_nr_packets_in_transit
Definition: gcs_xcom_communication_protocol_changer.h:397
unsigned long get_nr_packets_in_transit() const
Definition: gcs_xcom_communication_protocol_changer.cc:166
bool is_protocol_change_ongoing() const
Checks whether a protocol change is ongoing.
Definition: gcs_xcom_communication_protocol_changer.cc:146
Gcs_xcom_communication_protocol_changer(Gcs_xcom_communication_protocol_changer &&)=delete
Gcs_tagged_lock m_tagged_lock
Tagged lock used for the optimistic synchronisation protocol between user threads,...
Definition: gcs_xcom_communication_protocol_changer.h:376
Gcs_xcom_communication_protocol_changer & operator=(Gcs_xcom_communication_protocol_changer const &)=delete
void wait_for_protocol_change_to_finish()
Definition: gcs_xcom_communication_protocol_changer.cc:228
std::condition_variable m_protocol_change_finished
Definition: gcs_xcom_communication_protocol_changer.h:382
Gcs_xcom_communication_protocol_changer & operator=(Gcs_xcom_communication_protocol_changer &&)=delete
std::mutex m_mutex
For user threads to wait for an ongoing protocol change to finish.
Definition: gcs_xcom_communication_protocol_changer.h:381
std::atomic< Gcs_protocol_version > m_max_supported_protocol
Definition: gcs_xcom_communication_protocol_changer.h:395
Gcs_protocol_version m_tentative_new_protocol
The protocol version we are going to change to when we start a protocol change.
Definition: gcs_xcom_communication_protocol_changer.h:393
Gcs_xcom_communication_protocol_changer(Gcs_xcom_communication_protocol_changer const &)=delete
void commit_protocol_version_change()
Definition: gcs_xcom_communication_protocol_changer.cc:101
void set_maximum_supported_protocol_version(Gcs_protocol_version version)
Sets the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:157
Gcs_xcom_engine & m_gcs_engine
Definition: gcs_xcom_communication_protocol_changer.h:399
void rollback_increment_nr_packets_in_transit(Gcs_tagged_lock::Tag const &tag)
Definition: gcs_xcom_communication_protocol_changer.cc:194
void release_tagged_lock_and_notify_waiters()
Definition: gcs_xcom_communication_protocol_changer.cc:119
Gcs_protocol_version get_maximum_supported_protocol_version() const
Retrieves the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:152
std::pair< bool, std::future< void > > set_protocol_version(Gcs_protocol_version new_version)
Starts a protocol change.
Definition: gcs_xcom_communication_protocol_changer.cc:48
Gcs_xcom_communication_protocol_changer(Gcs_xcom_engine &gcs_engine, Gcs_message_pipeline &pipeline)
Definition: gcs_xcom_communication_protocol_changer.cc:30
std::promise< void > m_promise
Stores the outcome of the protocol change operation.
Definition: gcs_xcom_communication_protocol_changer.h:387
Gcs_protocol_version get_protocol_version() const
Retrieves the current protocol version in use.
Definition: gcs_xcom_communication_protocol_changer.cc:43
void atomically_increment_nr_packets_in_transit(Cargo_type const &cargo)
Synchronises user threads, which send messages, with the GCS thread, which performs protocol changes.
Definition: gcs_xcom_communication_protocol_changer.cc:239
void begin_protocol_version_change(Gcs_protocol_version new_version)
Definition: gcs_xcom_communication_protocol_changer.cc:78
void adjust_nr_packets_in_transit(Cargo_type const &cargo, std::size_t const &nr_additional_packets_to_send)
After an outgoing message goes through the pipeline, it may produce more than one packet.
Definition: gcs_xcom_communication_protocol_changer.cc:275
void decrement_nr_packets_in_transit(Gcs_packet const &packet, Gcs_xcom_nodes const &xcom_nodes)
Decrement the number of my in-transit packets.
Definition: gcs_xcom_communication_protocol_changer.cc:296
Definition: gcs_xcom_notification.h:93
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:390
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:114
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:127
static Gcs_xcom_engine * gcs_engine
Definition: gcs_xcom_interface.cc:143
required uint64 version
Definition: replication_group_member_actions.proto:40