MySQL 8.4.3
Source Code Documentation
gcs_xcom_communication_interface.h
Go to the documentation of this file.
1/* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED
25#define GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED
26
27#include <cstdlib>
28#include <map> // std::map
29#include <utility> // std::pair
30#include <vector> // std::vector
31
44
46
47/**
48 @class Gcs_xcom_communication_interface
49
50 Abstraction layer that adds XCom specific methods to the generic
51 communication interface.
52
53 This adds the following functionalities to the generic
54 Gcs_communication_interface:
55 - Ability to send messages without view safety and stats counter. This method
56 shall be used by the State Exchange algorithm when the high-level view
57 change is still occurring.
58 - Delegation method that will contain all the business logic related with
59 messages delivery to registered clients.
60*/
62 public:
63 /**
64 Sends a message that is internal to the binding implementation.
65 This message will not be subject to the same restrictions of send_message.
66 As such, it will not observe view safety nor will count for the statistics
67 of messages sent.
68
69 @param[in] message_to_send the message to send
70 @param[out] message_length the length of message which was send if GCS_OK,
71 unspecified otherwise
72 @param[in] cargo internal message header cargo type
73 @return the xcom broadcast message error
74 @retval GCS_OK message is transmitted successfully
75 @retval GCS_NOK error occurred while transmitting message
76 @retval GCS_MESSAGE_TOO_BIG message is bigger then xcom can handle
77
78 */
79
80 virtual enum_gcs_error do_send_message(const Gcs_message &message_to_send,
81 unsigned long long *message_length,
82 Cargo_type cargo) = 0;
83
85
86 /**
87 Buffer packets when a view is not installed yet and the state
88 exchange phase is being executed.
89
90 Note that this method must be executed by the same thread that
91 processes global view messages and data message in order to
92 avoid any concurrency issue.
93
94 @param packet Packet to buffer.
95 @param xcom_nodes Membership at the time the packet was received
96 */
97
99 Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
100
101 /**
102 The state exchange phase has been executed and the view has been
103 installed so this is used to send any buffered packet to upper
104 layers.
105
106 Note that this method must be executed by the same thread that
107 processes global view messages and data message in order to
108 avoid any concurrency issue.
109 */
110
111 virtual void deliver_buffered_packets() = 0;
112
113 /*
114 Clean up possible buffered packets that were not delivered to
115 upper layers because the state exchange has not finished and a
116 new global view message was received triggering a new state
117 exchange phase.
118
119 Note that this method must be executed by the same thread that
120 processes global view messages and data message in order to
121 avoid any concurrency issue.
122 */
123
124 virtual void cleanup_buffered_packets() = 0;
125
126 /**
127 Return the number of buffered packets.
128
129 Note that this method must be executed by the same thread that
130 processes global view messages and data message in order to
131 avoid any concurrency issue.
132 */
133
134 virtual size_t number_buffered_packets() = 0;
135
136 /**
137 Notify the pipeline about the new XCom membership when a state exchange
138 begins.
139
140 Note that this method must be executed by the same thread that processes
141 global view messages and data message in order to avoid any concurrency
142 issue.
143
144 @param me The identifier of this server
145 @param members The XCom membership
146 */
148 const Gcs_xcom_nodes &members) = 0;
149
150 /**
151 Attempts to recover the missing packets that are required for a node to
152 join the group successfully.
153 For example, the missing packets may be some fragments of a message that
154 have already been delivered by XCom to the existing members of the group.
155 The joining node needs those fragments in order to be able to deliver the
156 reassembled message when the final fragments are delivered by XCom.
157
158 Note that this method must be executed by the same thread that processes
159 global view messages and data message in order to avoid any concurrency
160 issue.
161
162 @param synodes The synodes where the required packets were decided
163 @returns true If successful, false otherwise
164 */
165 virtual bool recover_packets(Gcs_xcom_synode_set const &synodes) = 0;
166
167 /**
168 Converts the packet into a message that can be delivered to the upper
169 layer.
170
171 @param packet The packet to convert
172 @param xcom_nodes The membership at the time the packet was delivered
173 @retval Gcs_message* if successful
174 @retval nullptr if unsuccessful
175 */
177 Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
178
179 /**
180 The purpose of this method is to be called when in Gcs_xcom_interface
181 callback method xcom_receive_data is invoked.
182
183 This allows, in terms of software architecture, to concentrate all the
184 message delivery logic and processing in a single place.
185
186 The deliver_message callback that is registered in XCom
187 (in gcs_xcom_interface.h) and that actually receives the low-level
188 messages, is implemented as a delegator to this method.
189
190 Note that the method will be responsible for deleting the message
191 passed as parameter and must be executed by the same thread that
192 processes global view messages and data message in order to avoid
193 any concurrency issue.
194 */
195
197 Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
198
200};
201
202/**
203 @class Gcs_xcom_communication_interface
204
205 Implementation of the Gcs_communication_interface for xcom.
206*/
208 public:
209 /**
210 Gcs_xcom_communication_interface constructor.
211
212 @param[in] stats a reference to the statistics interface
213 @param[in] proxy a reference to an implementation of
214 Gcs_xcom_communication_proxy
215 @param[in] view_control a reference to a
216 gcs_xcom_view_change_control_interface implementation
217 @param[in] gcs_engine Pointer to gcs engine
218 @param[in] group_id reference to the group identifier
219 @param[in] comms_mgmt an unique_ptr to a
220 Network_provider_management_interface
221 */
222
223 explicit Gcs_xcom_communication(
227 std::unique_ptr<Network_provider_management_interface> comms_mgmt);
228
230
231 // Implementation of the Gcs_communication_interface
232
233 /**
234 Implementation of the public send_message method defined in
235 Gcs_xcom_communication.
236 Besides sending a message to the group, this method does two extra things:
237 - Guarantees view safety, in which no messages can be sent when a view
238 change is occurring.
239 - Registers in the statistics interface that a message was sent.
240
241 @param[in] message_to_send the message to send
242 @return the xcom broadcast message error
243 @retval GCS_OK when message is transmitted successfully
244 @retval GCS_NOK when error occurred while transmitting message
245 @retval GCS_MESSAGE_TOO_BIG when message is bigger then
246 xcom can handle
247 */
248
249 enum_gcs_error send_message(const Gcs_message &message_to_send) override;
250
252 const Gcs_communication_event_listener &event_listener) override;
253
254 void remove_event_listener(int event_listener_handle) override;
255
256 // Implementation of the Gcs_xcom_communication_interface
257 enum_gcs_error do_send_message(const Gcs_message &message_to_send,
258 unsigned long long *message_length,
259 Cargo_type cargo) override;
260
261 // For unit testing purposes
262 std::map<int, const Gcs_communication_event_listener &>
264
266
268 Gcs_packet &&packet,
269 std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
270
271 void deliver_buffered_packets() override;
272
273 void cleanup_buffered_packets() override;
274
275 size_t number_buffered_packets() override;
276
278 const Gcs_xcom_nodes &members) override;
279
280 bool recover_packets(Gcs_xcom_synode_set const &synodes) override;
281
283 Gcs_packet &&packet,
284 std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
285
287 Gcs_packet &&packet,
288 std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
289
291
292 std::pair<bool, std::future<void>> set_protocol_version(
293 Gcs_protocol_version new_version) override;
294
296
298
300
302
303 private:
304 // Registered event listeners
305 std::map<int, const Gcs_communication_event_listener &> event_listeners;
306
307 // Reference to the stats updater interface
309
310 // Reference to the xcom proxy interface
312
313 // Reference to the view change control object
315
316 /**
317 The pipeline of stages a message has to go through before it is delivered
318 to the application or sent to the network.
319 */
321
322 /**
323 Buffer that is used to store packets while the node is about to install
324 a view and is running the state exchange phase.
325 */
326 std::vector<std::pair<Gcs_packet, std::unique_ptr<Gcs_xcom_nodes>>>
328
329 /** Most recent XCom membership known. */
331
332 /** Hash of the group. */
333 unsigned int m_gid_hash;
334
335 /** Protocol changer. */
337
338 /***/
339 std::unique_ptr<Network_provider_management_interface> m_comms_mgmt_interface;
340
341 /** Notify upper layers that a message has been received. */
342 void notify_received_message(std::unique_ptr<Gcs_message> &&message);
343
344 /** Delivers the packet to the upper layer. */
346 std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes);
347
348 /**
349 @returns the list of possible donors from which to recover the missing
350 packets this server requires to successfully join the group.
351 */
352 std::vector<Gcs_xcom_node_information> possible_packet_recovery_donors()
353 const;
354
355 /**
356 Error code for the packet recovery proceess.
357 */
359 OK,
361 NO_MEMORY,
365 ERROR
366 };
367
368 /**
369 Attempts to recover the packets delivered in @c synodes from @c donor.
370
371 @c recovered_data is an out parameter.
372 */
374 Gcs_xcom_node_information const &donor,
375 Gcs_xcom_synode_set const &synodes,
376 synode_app_data_array &recovered_data);
377
378 /**
379 Processes all the recovered packets.
380 */
382 synode_app_data_array const &recovered_data);
383
384 /**
385 Processes a single recovered packet.
386 */
388 synode_app_data const &recovered_data);
389
390 /**
391 Logs the packet recovery failure.
392 */
394 packet_recovery_result const &error_code,
395 Gcs_xcom_node_information const &donor) const;
396
397 /*
398 Disabling the copy constructor and assignment operator.
399 */
402};
403
404#endif /* GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED */
This interface is implemented by those who wish to receive messages.
Definition: gcs_communication_event_listener.h:36
This interface represents all the communication facilities that a binding implementation should provi...
Definition: gcs_communication_interface.h:90
This represents the unique identification of a group.
Definition: gcs_group_identifier.h:35
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:365
Class that represents the data that is exchanged within a group.
Definition: gcs_message.h:357
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:58
Abstraction layer that adds XCom specific methods to the generic communication interface.
Definition: gcs_xcom_communication_interface.h:61
virtual void process_user_data_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
The purpose of this method is to be called when in Gcs_xcom_interface callback method xcom_receive_da...
virtual bool recover_packets(Gcs_xcom_synode_set const &synodes)=0
Attempts to recover the missing packets that are required for a node to join the group successfully.
virtual Gcs_message * convert_packet_to_message(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
Converts the packet into a message that can be delivered to the upper layer.
virtual void buffer_incoming_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
Buffer packets when a view is not installed yet and the state exchange phase is being executed.
virtual void cleanup_buffered_packets()=0
~Gcs_xcom_communication_interface() override=default
virtual size_t number_buffered_packets()=0
Return the number of buffered packets.
virtual enum_gcs_error do_send_message(const Gcs_message &message_to_send, unsigned long long *message_length, Cargo_type cargo)=0
Sends a message that is internal to the binding implementation.
virtual Gcs_message_pipeline & get_msg_pipeline()=0
virtual void deliver_buffered_packets()=0
The state exchange phase has been executed and the view has been installed so this is used to send an...
virtual void update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &members)=0
Notify the pipeline about the new XCom membership when a state exchange begins.
Implements the communication protocol change logic.
Definition: gcs_xcom_communication_protocol_changer.h:217
Definition: gcs_xcom_communication_interface.h:207
std::map< int, const Gcs_communication_event_listener & > event_listeners
Definition: gcs_xcom_communication_interface.h:305
Gcs_message_pipeline m_msg_pipeline
The pipeline of stages a message has to go through before it is delivered to the application or sent ...
Definition: gcs_xcom_communication_interface.h:320
size_t number_buffered_packets() override
Return the number of buffered packets.
Definition: gcs_xcom_communication_interface.cc:243
bool recover_packets(Gcs_xcom_synode_set const &synodes) override
Attempts to recover the missing packets that are required for a node to join the group successfully.
Definition: gcs_xcom_communication_interface.cc:440
Gcs_message_pipeline & get_msg_pipeline() override
Definition: gcs_xcom_communication_interface.h:265
Gcs_xcom_statistics_manager_interface * m_stats
Definition: gcs_xcom_communication_interface.h:308
Gcs_message * convert_packet_to_message(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes) override
Converts the packet into a message that can be delivered to the upper layer.
Definition: gcs_xcom_communication_interface.cc:483
packet_recovery_result process_recovered_packets(synode_app_data_array const &recovered_data)
Processes all the recovered packets.
Definition: gcs_xcom_communication_interface.cc:349
void notify_received_message(std::unique_ptr< Gcs_message > &&message)
Notify upper layers that a message has been received.
Definition: gcs_xcom_communication_interface.cc:198
void set_communication_protocol(enum_transport_protocol protocol) override
Sets the communication protocol to use.
Definition: gcs_xcom_communication_interface.cc:633
void deliver_user_data_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)
Delivers the packet to the upper layer.
Definition: gcs_xcom_communication_interface.cc:595
packet_recovery_result recover_packets_from_donor(Gcs_xcom_node_information const &donor, Gcs_xcom_synode_set const &synodes, synode_app_data_array &recovered_data)
Attempts to recover the packets delivered in synodes from donor.
Definition: gcs_xcom_communication_interface.cc:370
std::unique_ptr< Network_provider_management_interface > m_comms_mgmt_interface
Definition: gcs_xcom_communication_interface.h:339
void deliver_buffered_packets() override
The state exchange phase has been executed and the view has been installed so this is used to send an...
Definition: gcs_xcom_communication_interface.cc:225
Gcs_protocol_version get_protocol_version() const override
Retrieves the current GCS protocol version in use.
Definition: gcs_xcom_communication_interface.cc:614
void buffer_incoming_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes) override
Buffer packets when a view is not installed yet and the state exchange phase is being executed.
Definition: gcs_xcom_communication_interface.cc:215
Gcs_protocol_version get_maximum_supported_protocol_version() const override
Get the maximum protocol version currently supported by the group.
Definition: gcs_xcom_communication_interface.cc:624
Gcs_xcom_proxy * m_xcom_proxy
Definition: gcs_xcom_communication_interface.h:311
void set_maximum_supported_protocol_version(Gcs_protocol_version version)
Definition: gcs_xcom_communication_interface.cc:628
enum_gcs_error send_message(const Gcs_message &message_to_send) override
Implementation of the public send_message method defined in Gcs_xcom_communication.
Definition: gcs_xcom_communication_interface.cc:86
~Gcs_xcom_communication() override
std::vector< Gcs_xcom_node_information > possible_packet_recovery_donors() const
Definition: gcs_xcom_communication_interface.cc:254
Gcs_xcom_communication(Gcs_xcom_statistics_manager_interface *stats, Gcs_xcom_proxy *proxy, Gcs_xcom_view_change_control_interface *view_control, Gcs_xcom_engine *gcs_engine, Gcs_group_identifier const &group_id, std::unique_ptr< Network_provider_management_interface > comms_mgmt)
Gcs_xcom_communication_interface constructor.
Definition: gcs_xcom_communication_interface.cc:59
void remove_event_listener(int event_listener_handle) override
Removes a previously registered event listener.
Definition: gcs_xcom_communication_interface.cc:194
Gcs_xcom_nodes m_xcom_nodes
Most recent XCom membership known.
Definition: gcs_xcom_communication_interface.h:330
Gcs_xcom_communication & operator=(const Gcs_xcom_communication &)
enum_gcs_error do_send_message(const Gcs_message &message_to_send, unsigned long long *message_length, Cargo_type cargo) override
Sends a message that is internal to the binding implementation.
Definition: gcs_xcom_communication_interface.cc:112
void update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &members) override
Notify the pipeline about the new XCom membership when a state exchange begins.
Definition: gcs_xcom_communication_interface.cc:247
packet_recovery_result process_recovered_packet(synode_app_data const &recovered_data)
Processes a single recovered packet.
Definition: gcs_xcom_communication_interface.cc:279
void log_packet_recovery_failure(packet_recovery_result const &error_code, Gcs_xcom_node_information const &donor) const
Logs the packet recovery failure.
Definition: gcs_xcom_communication_interface.cc:397
Gcs_xcom_communication(const Gcs_xcom_communication &)
enum_transport_protocol get_incoming_connections_protocol() override
Get the incoming connections protocol which is currently active.
Definition: gcs_xcom_communication_interface.cc:639
void process_user_data_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes) override
The purpose of this method is to be called when in Gcs_xcom_interface callback method xcom_receive_da...
Definition: gcs_xcom_communication_interface.cc:540
std::map< int, const Gcs_communication_event_listener & > * get_event_listeners()
Definition: gcs_xcom_communication_interface.cc:82
Gcs_xcom_communication_protocol_changer m_protocol_changer
Protocol changer.
Definition: gcs_xcom_communication_interface.h:336
packet_recovery_result
Error code for the packet recovery proceess.
Definition: gcs_xcom_communication_interface.h:358
void cleanup_buffered_packets() override
Definition: gcs_xcom_communication_interface.cc:239
unsigned int m_gid_hash
Hash of the group.
Definition: gcs_xcom_communication_interface.h:333
std::pair< bool, std::future< void > > set_protocol_version(Gcs_protocol_version new_version) override
Modifies the GCS protocol version in use.
Definition: gcs_xcom_communication_interface.cc:618
int add_event_listener(const Gcs_communication_event_listener &event_listener) override
Registers an implementation of a Gcs_communication_event_listener that will receive Communication Eve...
Definition: gcs_xcom_communication_interface.cc:181
std::vector< std::pair< Gcs_packet, std::unique_ptr< Gcs_xcom_nodes > > > m_buffered_packets
Buffer that is used to store packets while the node is about to install a view and is running the sta...
Definition: gcs_xcom_communication_interface.h:327
Gcs_xcom_view_change_control_interface * m_view_control
Definition: gcs_xcom_communication_interface.h:314
Definition: gcs_xcom_notification.h:94
It represents a node within a group and is identified by the member identifier, unique identifier and...
Definition: gcs_xcom_group_member_information.h:193
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:391
Definition: gcs_xcom_proxy.h:53
This class is the storage and provider of all statistics coming from either XCom and GCS.
Definition: gcs_xcom_statistics_manager.h:65
Definition: gcs_xcom_state_exchange.h:716
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:115
enum_gcs_error
This enumeration describes errors which can occur during group communication operations.
Definition: gcs_types.h:41
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:128
static Gcs_xcom_engine * gcs_engine
Definition: gcs_xcom_interface.cc:144
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:84
enum_transport_protocol
Enum that describes the available XCom Communication Stacks.
Definition: network_provider.h:45
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: mysqlslap.cc:240