MySQL 8.4.2
Source Code Documentation
gcs_xcom_communication_protocol_changer.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
25#define GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED
26
27#include <atomic> // std::atomic
28#include <condition_variable> // std::condition_variable
29#include <cstdlib>
30#include <future> // std::future, std::promise
31#include <mutex> // std::mutex
32#include <utility> // std::pair
33
42
43/**
44 Implements the communication protocol change logic.
45
46 Design
47 =============================================================================
48 The algorithm to change the communication protocol is roughly as follows:
49
50 1. Start buffering the node's outgoing messages.
51 2. Wait until all the node's outgoing messages have been delivered.
52 3. Modify the node's communication protocol version.
53 4. Stop buffering the node's outgoing messages and send any messages
54 buffered in step (1).
55
56 Implementing the algorithm requires synchronising user threads, which send
57 messages, with the GCS thread, which performs communication protocol changes.
58
59 The high level view of the synchronisation protocol between the user and GCS
60 threads is the following:
61
62 when send-message(m) from user thread:
63 atomically:
64 if protocol_changing:
65 wait until protocol_changing = false
66 nr_msgs_in_transit++
67 ...
68
69 when change-protocol(v) from GCS thread:
70 atomically:
71 protocol_changing := true
72 wait until nr_msgs_in_transit = 0
73 ...
74
75 We expect that communication protocol changes are rare events, especially
76 when compared to sending messages.
77 As such, the actual implementation strives to minimise the overhead on the
78 code path that sends messages.
79
80 To do this, we use an optimistic synchronisation protocol on the send message
81 side, that works as follows:
82
83 Algorithm #0, User thread:
84 1. If no protocol change is ongoing, the user thread will optimistically
85 increment the number of messages in transit.
86 2.a If a protocol change did not start meanwhile, we are good to go.
87 2.b If a protocol change started meanwhile:
88 2.b.1. Rollback the increment to the number of messages in transit
89 2.b.2. Wait for the protocol change to finish.
90
91 There is an additional action that needs to be performed on step (2.b), but
92 we will describe that action when we have the necessary context to understand
93 it.
94
95 On the protocol change side, it works as follows:
96
97 Algorithm #1, GCS thread:
98 1. Store that a protocol change is ongoing.
99 2. When the number of messages in transit is zero:
100 2.1. Change the protocol version
101 2.2. Wake up any user threads waiting for the protocol change
102 2.3. Deem the protocol change finished
103
104 The central part of the Algorithm #1 is step (2).
105 The question is: who triggers, and where, step (2)'s condition, i.e. the
106 number of in-transit messages is zero?
107 Well, the obvious place is that it is the GCS thread itself, when it is
108 processing an incoming message.
109 If that message comes from us, then we decrease the number of in-transit
110 messages, which may set it to zero.
111
112 However, recall that the user threads employ an optimistic synchronisation
113 protocol that "acts first, and asks for forgiveness later."
114 If the user thread rolls back its increment to the number of in-transit
115 messages, it may be the one to set it to zero---see Algorithm #0, step (2.b).
116 In this situation, it is the user thread that hits the condition required by
117 the GCS thread in Algorithm #1, step (2).
118 In order for the GCS thread to finish the protocol change, the user thread
119 must somehow signal the GCS thread to trigger its step (2).
120 This is the missing action of Algorithm #0, step (2.b).
121
122 So, the final synchronisation protocol of the user thread's side looks like
123 this:
124
125 Algorithm #2, User thread:
126 1. If no protocol change is ongoing, the user thread will optimistically
127 increment the number of messages in transit.
128 2.a If a protocol change did not start meanwhile, we are good to go.
129 2.b If a protocol change started meanwhile:
130 2.b.1. Rollback the the increment to the number of messages in transit
131 2.b.2. If our rollback set the number of messages in transit to zero,
132 signal the GCS thread
133 2.b.3. Wait for the protocol change to finish.
134
135 Implementation
136 =============================================================================
137 The implementation attempts to add as little overhead as possible to the
138 common case, which is that no protocol change is ongoing.
139 This is the fast path of Algorithm #2, step (2.a).
140 To achieve this goal, it employs a tagged lock.
141 For more details on the tagged lock implementation, see @c Gcs_tagged_lock.
142
143 In a nutshell, the tagged lock is a read-write spin lock which offers the
144 following API:
145
146 try_lock() -> bool
147 unlock()
148 optimistic_read() -> tag
149 validate_optimistic_read(tag) -> bool
150
151 For the write-side section, one uses it as a typical spin lock, e.g.:
152
153 do:
154 lock_acquired := try_lock()
155 while (not lock_acquired)
156 write-side section
157 unlock()
158
159 For the read-side section, one can use it as follows:
160
161 done := false
162 while (not done):
163 tag := optimistic_read()
164 unsynchronised read-side section
165 done := validate_optimistic_read(tag)
166 if (not done):
167 rollback unsynchronized read-side section
168
169 The idea is to allow an optimistic read-side section that does not perform
170 any memory stores.
171 This is in contrast with a typical read-write lock, where the read side
172 performs some memory stores to account for the reader, e.g. keeping a reader
173 counter.
174 The trade off is that:
175
176 a. the execution of the read-side of a tagged lock may be concurrent with
177 the write-side section if meanwhile the tagged lock is acquired
178 b. the read-side of a tagged lock may fail if meanwhile the tagged lock is
179 acquired, in which case one may want to rollback the effects of the
180 failed read-side section
181
182 The algorithms of the design are implemented as follows:
183
184 Algorithm #1 implementation, GCS thread:
185 1. Lock the tagged lock
186 2. When the number of messages in transit is zero:
187 2.1. Change the protocol version
188 2.2. Unlock the tagged lock, signal a condition variable to wake up any
189 user threads waiting for the protocol change
190 2.3. Deem the protocol change finished
191
192 Algorithm #2 implementation, User thread:
193 1. If the tagged lock is unlocked:
194 1.1. Start an optimistic read-side section
195 1.2. Atomically increment the number of messages in transit
196 2.a If the optimistic read-side section validates, we are good to go.
197 2.b If the optimistic read-side section fails validation:
198 2.b.1. Atomically rollback the increment to the number of messages
199 in transit
200 2.b.2. If our rollback set the number of messages in transit to zero,
201 signal the GCS thread
202 2.b.3. Wait on a condition variable for the protocol change to finish.
203
204 Note that we have concurrent access to the number of messages in transit
205 which needs to be synchronised.
206 This is done by using an std::atomic to implement the number of messages in
207 transit.
208
209 Some final implementation pointers:
210
211 a. Algorithm #1: see the code path that starts on @c set_protocol_version
212 and @c finish_protocol_version_change.
213 b. Algorithm #2: see the code paths that start on
214 @c atomically_increment_nr_packets_in_transit,
215 @c adjust_nr_packets_in_transit, and @c decrement_nr_packets_in_transit.
216 */
218 public:
221
226
231
232 /**
233 Retrieves the current protocol version in use.
234 @returns the current protocol version in use
235 */
237
238 /**
239 Starts a protocol change.
240 The protocol change is asynchronous, the caller can wait for the change to
241 finish using the returned future.
242
243 Note that for safety this method *must only* be called by the GCS engine
244 thread.
245
246 @param new_version The desired protocol version to change to
247 @retval {true, future} If successful
248 @retval {false, _} If the group does not support the requested protocol
249 */
250 std::pair<bool, std::future<void>> set_protocol_version(
251 Gcs_protocol_version new_version);
252
253 /**
254 Retrieves the greatest protocol version currently supported by the group.
255 @returns the greatest protocol version currently supported by the group
256 */
258
259 /**
260 Sets the greatest protocol version currently supported by the group.
261 @param version protocol
262 */
264
265 /**
266 Checks whether a protocol change is ongoing.
267 @returns true if a protocol change is ongoing, false otherwise
268 */
269 bool is_protocol_change_ongoing() const;
270
271 /**
272 Synchronises user threads, which send messages, with the GCS thread,
273 which performs protocol changes.
274
275 This method should be called by user threads when sending a message, before
276 the message goes through the pipeline.
277
278 @param cargo The type of message that will be sent
279 */
281
282 /**
283 After an outgoing message goes through the pipeline, it may produce more
284 than one packet. This method adjusts the increment done by
285 atomically_increment_nr_packets_in_transit to take into account the
286 additional packets produced by the pipeline.
287
288 @param cargo The type of message that will be sent
289 @param nr_additional_packets_to_send The number of additional packets that
290 will actually be sent
291 */
293 Cargo_type const &cargo,
294 std::size_t const &nr_additional_packets_to_send);
295
296 /**
297 Decrement the number of my in-transit packets.
298
299 @param packet The incoming packet
300 @param xcom_nodes The XCom membership at the time of delivery
301 */
303 Gcs_xcom_nodes const &xcom_nodes);
304
305 /**
306 Due to the synchronisation protocol used between user threads, which send
307 messages, and the GCS thread, which performs protocol changes, a user
308 thread may be the one to hit the condition that triggers the protocol
309 change to finish.
310
311 This function should be called by the user thread when it hits the
312 condition, to signal the GCS thread that the protocol change should finish.
313
314 @param caller_tag Identifier of the protocol change
315 */
317
318 private:
319 /*
320 Returns how many packets of mine are in-transit.
321 */
322 unsigned long get_nr_packets_in_transit() const;
323
324 /*
325 Begins a protocol change, and finishes it if the conditions are met, i.e.
326 we have no packets in-transit.
327 */
329
330 /*
331 Finishes the ongoing protocol change.
332
333 This method must only be called when is_protocol_change_ongoing(), i.e. after
334 a call to begin_protocol_version_change(_).
335 */
337
338 /*
339 Releases the tagged lock and notifies threads waiting for the protocol change
340 to finish.
341 */
343
344 /*
345 Auxiliary method to the implementation of
346 atomically_increment_nr_packets_in_transit.
347
348 Optimistically assumes a protocol change will not start meanwhile, and
349 increments the number of packets in transit.
350 */
351 std::pair<bool, Gcs_tagged_lock::Tag>
353
354 /*
355 Auxiliary method to the implementation of
356 atomically_increment_nr_packets_in_transit.
357
358 Rolls back the effects of optimistically_increment_nr_packets_in_transit and
359 signals the GCS thread to finish the protocol change if necessary.
360 */
362 Gcs_tagged_lock::Tag const &tag);
363
364 /*
365 Auxiliary method to the implementation of
366 atomically_increment_nr_packets_in_transit.
367
368 Waits until the ongoing protocol change finishes.
369 */
371
372 /**
373 Tagged lock used for the optimistic synchronisation protocol between user
374 threads, which send messages, and the GCS thread, which performs protocol
375 changes.
376 */
378
379 /**
380 For user threads to wait for an ongoing protocol change to finish.
381 */
382 std::mutex m_mutex;
383 std::condition_variable m_protocol_change_finished;
384
385 /**
386 Stores the outcome of the protocol change operation.
387 */
388 std::promise<void> m_promise;
389
390 /**
391 The protocol version we are going to change to when we start a protocol
392 change.
393 */
395
396 std::atomic<Gcs_protocol_version> m_max_supported_protocol;
397
398 std::atomic<unsigned long> m_nr_packets_in_transit;
399
401
403};
404
405#endif /* GCS_XCOM_COMMUNICATION_PROTOCOL_CHANGER_INCLUDED */
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:365
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:58
The Gcs_tagged_lock class Implements a tagged lock for optimistic read-side sections.
Definition: gcs_tagged_lock.h:84
std::uint64_t Tag
Definition: gcs_tagged_lock.h:86
Implements the communication protocol change logic.
Definition: gcs_xcom_communication_protocol_changer.h:217
void finish_protocol_version_change(Gcs_tagged_lock::Tag const caller_tag)
Due to the synchronisation protocol used between user threads, which send messages,...
Definition: gcs_xcom_communication_protocol_changer.cc:128
std::pair< bool, Gcs_tagged_lock::Tag > optimistically_increment_nr_packets_in_transit()
Definition: gcs_xcom_communication_protocol_changer.cc:178
Gcs_message_pipeline & m_msg_pipeline
Definition: gcs_xcom_communication_protocol_changer.h:402
std::atomic< unsigned long > m_nr_packets_in_transit
Definition: gcs_xcom_communication_protocol_changer.h:398
unsigned long get_nr_packets_in_transit() const
Definition: gcs_xcom_communication_protocol_changer.cc:167
bool is_protocol_change_ongoing() const
Checks whether a protocol change is ongoing.
Definition: gcs_xcom_communication_protocol_changer.cc:147
Gcs_xcom_communication_protocol_changer(Gcs_xcom_communication_protocol_changer &&)=delete
Gcs_tagged_lock m_tagged_lock
Tagged lock used for the optimistic synchronisation protocol between user threads,...
Definition: gcs_xcom_communication_protocol_changer.h:377
Gcs_xcom_communication_protocol_changer & operator=(Gcs_xcom_communication_protocol_changer const &)=delete
void wait_for_protocol_change_to_finish()
Definition: gcs_xcom_communication_protocol_changer.cc:229
std::condition_variable m_protocol_change_finished
Definition: gcs_xcom_communication_protocol_changer.h:383
Gcs_xcom_communication_protocol_changer & operator=(Gcs_xcom_communication_protocol_changer &&)=delete
std::mutex m_mutex
For user threads to wait for an ongoing protocol change to finish.
Definition: gcs_xcom_communication_protocol_changer.h:382
std::atomic< Gcs_protocol_version > m_max_supported_protocol
Definition: gcs_xcom_communication_protocol_changer.h:396
Gcs_protocol_version m_tentative_new_protocol
The protocol version we are going to change to when we start a protocol change.
Definition: gcs_xcom_communication_protocol_changer.h:394
Gcs_xcom_communication_protocol_changer(Gcs_xcom_communication_protocol_changer const &)=delete
void commit_protocol_version_change()
Definition: gcs_xcom_communication_protocol_changer.cc:102
void set_maximum_supported_protocol_version(Gcs_protocol_version version)
Sets the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:158
Gcs_xcom_engine & m_gcs_engine
Definition: gcs_xcom_communication_protocol_changer.h:400
void rollback_increment_nr_packets_in_transit(Gcs_tagged_lock::Tag const &tag)
Definition: gcs_xcom_communication_protocol_changer.cc:195
void release_tagged_lock_and_notify_waiters()
Definition: gcs_xcom_communication_protocol_changer.cc:120
Gcs_protocol_version get_maximum_supported_protocol_version() const
Retrieves the greatest protocol version currently supported by the group.
Definition: gcs_xcom_communication_protocol_changer.cc:153
std::pair< bool, std::future< void > > set_protocol_version(Gcs_protocol_version new_version)
Starts a protocol change.
Definition: gcs_xcom_communication_protocol_changer.cc:49
Gcs_xcom_communication_protocol_changer(Gcs_xcom_engine &gcs_engine, Gcs_message_pipeline &pipeline)
Definition: gcs_xcom_communication_protocol_changer.cc:31
std::promise< void > m_promise
Stores the outcome of the protocol change operation.
Definition: gcs_xcom_communication_protocol_changer.h:388
Gcs_protocol_version get_protocol_version() const
Retrieves the current protocol version in use.
Definition: gcs_xcom_communication_protocol_changer.cc:44
void atomically_increment_nr_packets_in_transit(Cargo_type const &cargo)
Synchronises user threads, which send messages, with the GCS thread, which performs protocol changes.
Definition: gcs_xcom_communication_protocol_changer.cc:240
void begin_protocol_version_change(Gcs_protocol_version new_version)
Definition: gcs_xcom_communication_protocol_changer.cc:79
void adjust_nr_packets_in_transit(Cargo_type const &cargo, std::size_t const &nr_additional_packets_to_send)
After an outgoing message goes through the pipeline, it may produce more than one packet.
Definition: gcs_xcom_communication_protocol_changer.cc:276
void decrement_nr_packets_in_transit(Gcs_packet const &packet, Gcs_xcom_nodes const &xcom_nodes)
Decrement the number of my in-transit packets.
Definition: gcs_xcom_communication_protocol_changer.cc:297
Definition: gcs_xcom_notification.h:94
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:391
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:115
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:128
static Gcs_xcom_engine * gcs_engine
Definition: gcs_xcom_interface.cc:144
required uint64 version
Definition: replication_group_member_actions.proto:41