MySQL  8.0.22
Source Code Documentation
gcs_xcom_communication_interface.h
Go to the documentation of this file.
1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED
24 #define GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED
25 
26 #include <cstdlib>
27 #include <map> // std::map
28 #include <utility> // std::pair
29 #include <vector> // std::vector
30 
42 
43 /**
44  @class Gcs_xcom_communication_interface
45 
46  Abstraction layer that adds XCom specific methods to the generic
47  communication interface.
48 
49  This adds the following functionalities to the generic
50  Gcs_communication_interface:
51  - Ability to send messages without view safety and stats counter. This method
52  shall be used by the State Exchange algorithm when the high-level view
53  change is still occurring.
54  - Delegation method that will contain all the business logic related with
55  messages delivery to registered clients.
56 */
58  public:
59  /**
60  Sends a message that is internal to the binding implementation.
61  This message will not be subject to the same restrictions of send_message.
62  As such, it will not observe view safety nor will count for the statistics
63  of messages sent.
64 
65  @param[in] message_to_send the message to send
66  @param[out] message_length the length of message which was send if GCS_OK,
67  unspecified otherwise
68  @param[in] cargo internal message header cargo type
69  @return the xcom broadcast message error
70  @retval GCS_OK message is transmitted successfully
71  @retval GCS_NOK error occurred while transmitting message
72  @retval GCS_MESSAGE_TOO_BIG message is bigger then xcom can handle
73 
74  */
75 
76  virtual enum_gcs_error do_send_message(const Gcs_message &message_to_send,
77  unsigned long long *message_length,
78  Cargo_type cargo) = 0;
79 
81 
82  /**
83  Buffer packets when a view is not installed yet and the state
84  exchange phase is being executed.
85 
86  Note that this method must be excuted by the same thread that
87  processes global view messages and data message in order to
88  avoid any concurrency issue.
89 
90  @param packet Packet to buffer.
91  @param xcom_nodes Membership at the time the packet was received
92  */
93 
94  virtual void buffer_incoming_packet(
95  Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
96 
97  /**
98  The state exchange phase has been executed and the view has been
99  installed so this is used to send any buffered packet to upper
100  layers.
101 
102  Note that this method must be excuted by the same thread that
103  processes global view messages and data message in order to
104  avoid any concurrency issue.
105  */
106 
107  virtual void deliver_buffered_packets() = 0;
108 
109  /*
110  Clean up possible buffered packets that were not delivered to
111  upper layers because the state exchange has not finished and a
112  new global view message was received triggering a new state
113  exchange phase.
114 
115  Note that this method must be excuted by the same thread that
116  processes global view messages and data message in order to
117  avoid any concurrency issue.
118  */
119 
120  virtual void cleanup_buffered_packets() = 0;
121 
122  /**
123  Return the number of buffered packets.
124 
125  Note that this method must be excuted by the same thread that
126  processes global view messages and data message in order to
127  avoid any concurrency issue.
128  */
129 
130  virtual size_t number_buffered_packets() = 0;
131 
132  /**
133  Notify the pipeline about the new XCom membership when a state exchange
134  begins.
135 
136  Note that this method must be executed by the same thread that processes
137  global view messages and data message in order to avoid any concurrency
138  issue.
139 
140  @param me The identifier of this server
141  @param members The XCom membership
142  */
143  virtual void update_members_information(const Gcs_member_identifier &me,
144  const Gcs_xcom_nodes &members) = 0;
145 
146  /**
147  Attempts to recover the missing packets that are required for a node to
148  join the group successfully.
149  For example, the missing packets may be some fragments of a message that
150  have already been delivered by XCom to the existing members of the group.
151  The joining node needs those fragments in order to be able to deliver the
152  reassembled message when the final fragments are delivered by XCom.
153 
154  Note that this method must be executed by the same thread that processes
155  global view messages and data message in order to avoid any concurrency
156  issue.
157 
158  @param synodes The synodes where the required packets were decided
159  @returns true If successful, false otherwise
160  */
161  virtual bool recover_packets(Gcs_xcom_synode_set const &synodes) = 0;
162 
163  /**
164  Converts the packet into a message that can be delivered to the upper
165  layer.
166 
167  @param packet The packet to convert
168  @param xcom_nodes The membership at the time the packet was delivered
169  @retval Gcs_message* if successful
170  @retval nullptr if unsuccessful
171  */
173  Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
174 
175  /**
176  The purpose of this method is to be called when in Gcs_xcom_interface
177  callback method xcom_receive_data is invoked.
178 
179  This allows, in terms of software architecture, to concentrate all the
180  message delivery logic and processing in a single place.
181 
182  The deliver_message callback that is registered in XCom
183  (in gcs_xcom_interface.h) and that actually receives the low-level
184  messages, is implemented as a delegator to this method.
185 
186  Note that the method will be responsible for deleting the message
187  passed as parameter and must be executed by the same thread that
188  processes global view messages and data message in order to avoid
189  any concurrency issue.
190  */
191 
192  virtual void process_user_data_packet(
193  Gcs_packet &&packet, std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) = 0;
194 
196 };
197 
198 /**
199  @class Gcs_xcom_communication_interface
200 
201  Implementation of the Gcs_communication_interface for xcom.
202 */
204  public:
205  /**
206  Gcs_xcom_communication_interface constructor.
207 
208  @param[in] stats a reference to the statistics interface
209  @param[in] proxy a reference to an implementation of
210  Gcs_xcom_communication_proxy
211  @param[in] view_control a reference to a
212  gcs_xcom_view_change_control_interface implementation
213  @param[in] gcs_engine Pointer to gcs engine
214  @param[in] group_id reference to the group identifier
215  */
216 
217  explicit Gcs_xcom_communication(
221 
222  ~Gcs_xcom_communication() override;
223 
224  // Implementation of the Gcs_communication_interface
225 
226  /**
227  Implementation of the public send_message method defined in
228  Gcs_xcom_communication.
229  Besides sending a message to the group, this method does two extra things:
230  - Guarantees view safety, in which no messages can be sent when a view
231  change is occurring.
232  - Registers in the statistics interface that a message was sent.
233 
234  @param[in] message_to_send the message to send
235  @return the xcom broadcast message error
236  @retval GCS_OK when message is transmitted successfully
237  @retval GCS_NOK when error occurred while transmitting message
238  @retval GCS_MESSAGE_TOO_BIG when message is bigger then
239  xcom can handle
240  */
241 
242  enum_gcs_error send_message(const Gcs_message &message_to_send) override;
243 
244  int add_event_listener(
245  const Gcs_communication_event_listener &event_listener) override;
246 
247  void remove_event_listener(int event_listener_handle) override;
248 
249  // Implementation of the Gcs_xcom_communication_interface
250  enum_gcs_error do_send_message(const Gcs_message &message_to_send,
251  unsigned long long *message_length,
252  Cargo_type cargo) override;
253 
254  // For unit testing purposes
255  std::map<int, const Gcs_communication_event_listener &>
256  *get_event_listeners();
257 
258  Gcs_message_pipeline &get_msg_pipeline() override { return m_msg_pipeline; }
259 
261  Gcs_packet &&packet,
262  std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
263 
264  void deliver_buffered_packets() override;
265 
266  void cleanup_buffered_packets() override;
267 
268  size_t number_buffered_packets() override;
269 
271  const Gcs_xcom_nodes &members) override;
272 
273  bool recover_packets(Gcs_xcom_synode_set const &synodes) override;
274 
276  Gcs_packet &&packet,
277  std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
278 
280  Gcs_packet &&packet,
281  std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes) override;
282 
284 
285  std::pair<bool, std::future<void>> set_protocol_version(
286  Gcs_protocol_version new_version) override;
287 
289 
290  void set_maximum_supported_protocol_version(Gcs_protocol_version version);
291 
292  private:
293  // Registered event listeners
294  std::map<int, const Gcs_communication_event_listener &> event_listeners;
295 
296  // Reference to the stats updater interface
298 
299  // Reference to the xcom proxy interface
301 
302  // Reference to the view change control object
304 
305  /**
306  The pipeline of stages a message has to go through before it is delivered
307  to the application or sent to the network.
308  */
310 
311  /**
312  Buffer that is used to store packets while the node is about to install
313  a view and is running the state exchange phase.
314  */
315  std::vector<std::pair<Gcs_packet, std::unique_ptr<Gcs_xcom_nodes>>>
317 
318  /** Most recent XCom membership known. */
320 
321  /** Hash of the group. */
322  unsigned int m_gid_hash;
323 
324  /** Protocol changer. */
326 
327  /** Notify upper layers that a message has been received. */
328  void notify_received_message(std::unique_ptr<Gcs_message> &&message);
329 
330  /** Delivers the packet to the upper layer. */
331  void deliver_user_data_packet(Gcs_packet &&packet,
332  std::unique_ptr<Gcs_xcom_nodes> &&xcom_nodes);
333 
334  /**
335  @returns the list of possible donors from which to recover the missing
336  packets this server requires to successfully join the group.
337  */
338  std::vector<Gcs_xcom_node_information> possible_packet_recovery_donors()
339  const;
340 
341  /**
342  Error code for the packet recovery proceess.
343  */
345  OK,
346  PACKETS_UNRECOVERABLE,
347  NO_MEMORY,
348  PIPELINE_ERROR,
349  PIPELINE_UNEXPECTED_OUTPUT,
350  PACKET_UNEXPECTED_CARGO,
351  ERROR
352  };
353 
354  /**
355  Attempts to recover the packets delivered in @c synodes from @c donor.
356 
357  @c recovered_data is an out parameter.
358  */
359  packet_recovery_result recover_packets_from_donor(
360  Gcs_xcom_node_information const &donor,
361  Gcs_xcom_synode_set const &synodes,
362  synode_app_data_array &recovered_data);
363 
364  /**
365  Processes all the recovered packets.
366  */
367  packet_recovery_result process_recovered_packets(
368  synode_app_data_array const &recovered_data);
369 
370  /**
371  Processes a single recovered packet.
372  */
373  packet_recovery_result process_recovered_packet(
374  synode_app_data const &recovered_data);
375 
376  /**
377  Logs the packet recovery failure.
378  */
379  void log_packet_recovery_failure(
380  packet_recovery_result const &error_code,
381  Gcs_xcom_node_information const &donor) const;
382 
383  /*
384  Disabling the copy constructor and assignment operator.
385  */
388 };
389 
390 #endif /* GCS_XCOM_COMMUNICATION_INTERFACE_INCLUDED */
virtual std::pair< bool, std::future< void > > set_protocol_version(Gcs_protocol_version new_version)=0
Modifies the GCS protocol version in use.
Gcs_message_pipeline m_msg_pipeline
The pipeline of stages a message has to go through before it is delivered to the application or sent ...
Definition: gcs_xcom_communication_interface.h:309
virtual bool recover_packets(Gcs_xcom_synode_set const &synodes)=0
Attempts to recover the missing packets that are required for a node to join the group successfully...
Abstraction layer that adds XCom specific methods to the generic communication interface.
Definition: gcs_xcom_communication_interface.h:57
It represents a node within a group and is identified by the member identifier, unique identifier and...
Definition: gcs_xcom_group_member_information.h:192
Gcs_xcom_communication_protocol_changer m_protocol_changer
Protocol changer.
Definition: gcs_xcom_communication_interface.h:325
std::map< int, const Gcs_communication_event_listener & > event_listeners
Definition: gcs_xcom_communication_interface.h:294
packet_recovery_result
Error code for the packet recovery proceess.
Definition: gcs_xcom_communication_interface.h:344
virtual Gcs_message * convert_packet_to_message(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
Converts the packet into a message that can be delivered to the upper layer.
virtual void update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &members)=0
Notify the pipeline about the new XCom membership when a state exchange begins.
This interface contains the public methods that the implementation of the Gcs_statistics_interface wi...
Definition: gcs_xcom_statistics_interface.h:38
Gcs_xcom_nodes m_xcom_nodes
Most recent XCom membership known.
Definition: gcs_xcom_communication_interface.h:319
enum_gcs_error
This enumeration describes errors which can occur during group communication operations.
Definition: gcs_types.h:40
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:83
virtual void remove_event_listener(int event_listener_handle)=0
Removes a previously registered event listener.
Class that represents the data that is exchanged within a group.
Definition: gcs_message.h:338
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:127
Implements the communication protocol change logic.
Definition: gcs_xcom_communication_protocol_changer.h:216
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:104
std::vector< std::pair< Gcs_packet, std::unique_ptr< Gcs_xcom_nodes > > > m_buffered_packets
Buffer that is used to store packets while the node is about to install a view and is running the sta...
Definition: gcs_xcom_communication_interface.h:316
This interface is implemented by those who wish to receive messages.
Definition: gcs_communication_event_listener.h:35
virtual void deliver_buffered_packets()=0
The state exchange phase has been executed and the view has been installed so this is used to send an...
virtual void cleanup_buffered_packets()=0
virtual void buffer_incoming_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
Buffer packets when a view is not installed yet and the state exchange phase is being executed...
Definition: gcs_xcom_notification.h:93
Definition: gcs_xcom_proxy.h:49
This interface represents all the communication facilities that a binding implementation should provi...
Definition: gcs_communication_interface.h:87
Gcs_xcom_statistics_updater * stats
Definition: gcs_xcom_communication_interface.h:297
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:39
virtual Gcs_protocol_version get_protocol_version() const =0
Retrieves the current GCS protocol version in use.
virtual Gcs_message_pipeline & get_msg_pipeline()=0
const string version("\ersion\)
Gcs_xcom_view_change_control_interface * m_view_control
Definition: gcs_xcom_communication_interface.h:303
This class contains information on the configuration, i.e set of nodes or simply site definition...
Definition: gcs_xcom_group_member_information.h:390
~Gcs_xcom_communication_interface() override
Definition: gcs_xcom_communication_interface.h:195
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
static Gcs_xcom_engine * gcs_engine
Definition: gcs_xcom_interface.cc:110
virtual Gcs_protocol_version get_maximum_supported_protocol_version() const =0
Get the maximum protocol version currently supported by the group.
Global stats.
Definition: mysqlslap.cc:233
Definition: gcs_xcom_communication_interface.h:203
Gcs_xcom_proxy * m_xcom_proxy
Definition: gcs_xcom_communication_interface.h:300
virtual void process_user_data_packet(Gcs_packet &&packet, std::unique_ptr< Gcs_xcom_nodes > &&xcom_nodes)=0
The purpose of this method is to be called when in Gcs_xcom_interface callback method xcom_receive_da...
virtual size_t number_buffered_packets()=0
Return the number of buffered packets.
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:364
Gcs_message_pipeline & get_msg_pipeline() override
Definition: gcs_xcom_communication_interface.h:258
Definition: gcs_xcom_state_exchange.h:715
This represents the unique identification of a group.
Definition: gcs_group_identifier.h:34
virtual enum_gcs_error do_send_message(const Gcs_message &message_to_send, unsigned long long *message_length, Cargo_type cargo)=0
Sends a message that is internal to the binding implementation.
virtual int add_event_listener(const Gcs_communication_event_listener &event_listener)=0
Registers an implementation of a Gcs_communication_event_listener that will receive Communication Eve...
unsigned int m_gid_hash
Hash of the group.
Definition: gcs_xcom_communication_interface.h:322
virtual enum_gcs_error send_message(const Gcs_message &message_to_send)=0
Method used to broadcast a message to a group in which this interface pertains.