MySQL 9.1.0
Source Code Documentation
gcs_xcom_state_exchange.h
Go to the documentation of this file.
1/* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_XCOM_STATE_EXCHANGE_INCLUDED
25#define GCS_XCOM_STATE_EXCHANGE_INCLUDED
26
27#include <stdio.h>
28#include <sys/types.h>
29#include <map>
30#include <memory> // std::unique_ptr
31#include <set>
32#include <string>
33#include <vector>
34
44#include "plugin/group_replication/libmysqlgcs/xdr_gen/xcom_vp.h"
45
46/**
47 @class Xcom_member_state
48
49 Class that conveys the state to be exchanged between members, which is not
50 provided by XCom.
51
52 In the original GCS protocol (version 1), this message had the following wire
53 format:
54
55 +--------+---------------------+
56 | Header | Upper-layer payload |
57 +--------+---------------------+
58
59 Where the header has the following format:
60
61 +---------+------------+
62 | View ID | XCom synod |
63 +---------+------------+
64
65 With the introduction of the fragmentation stage into the message pipeline, it
66 is possible that an original message is fragmented into several fragments, and
67 each fragment is delivered by XCom individually.
68
69 This means that when a node joins, it is possible for the transmission of a
70 fragmented message to be ongoing, i.e. the existing group members have already
71 received some, but not all, of the fragments.
72 In this situation, the last fragment will arrive after the new node has joined
73 the group.
74 Since the original message is only delivered when all its fragments are
75 received, this means that the original message will need to be delivered by
76 the new node as well.
77 But the problem is that new node does not have the fragments that were
78 received before the new node joined.
79
80 We solve this problem by augmenting the state exchange message with
81 information about the ongoing transmission of fragmented messages.
82 In particular, for every fragmented message whose transmission is ongoing, a
83 node will attach the XCom synods of the fragments it has already received.
84 The new node can use this information to fetch the fragments from XCom.
85
86 As such, in GCS protocol version 2, this message has the following wire
87 format:
88
89 +--------+---------------------+----------------+
90 | Header | Upper-layer payload | Recovery info. |
91 +--------+---------------------+----------------+
92
93 Where the recovery information has the following format:
94
95 +---------------+-...-+----------------+------------+
96 | XCom synod #1 | | XCom synode #n | Nr. synods |
97 +---------------+-...-+----------------+------------+
98
99 Older GCS instances that only support protocol version 1 will deserialize the
100 recovery information as part of the upper-layer payload.
101 However, this works as long as the upper layer consumes only the portion it is
102 expecting (upper-layer payload), even though it is fed its expected payload
103 with the recovery information appended.
104*/
106 public:
107 /**
108 Xcom_member_state constructor.
109
110 @param[in] view_id the view identifier from the node
111 @param[in] configuration_id Configuration identifier in use when the state
112 exchange message was created
113 @param[in] version Protocol version used to represent the state
114 @param[in] snapshot Snapshot information currently in use
115 @param[in] data the generic data to be exchanged
116 @param[in] data_size data's size
117 */
118
119 explicit Xcom_member_state(const Gcs_xcom_view_identifier &view_id,
120 synode_no configuration_id,
122 const Gcs_xcom_synode_set &snapshot,
123 const uchar *data, uint64_t data_size);
124
125 /**
126 Xcom_member_state constructor.
127
128 @param[in] version Protocol version used to represent the state
129 @param[in] data the generic data to be exchanged
130 @param[in] data_size data's size
131 */
132
134 uint64_t data_size);
135
136 /**
137 Member state destructor.
138 */
139
141
142 /**
143 Encodes the Member State's header to be sent through the newtwork.
144
145 @param[out] buffer where the header will be stored.
146 @param[in,out] buffer_len pointer to the variable that will hold the
147 header's size and has the buffer's len as input.
148
149 @return True if there is no space to store the header.
150 Otherwise, false.
151 */
152 bool encode_header(uchar *buffer, uint64_t *buffer_len) const;
153
154 /**
155 Decodes the Member State's header that was sent through the network.
156
157 @param[out] buffer Where the header was stored.
158 @param[in,out] buffer_len Pointer to the variable that holds the
159 header's size and has the buffer's len as input.
160
161 @return true if nothing went wrong. Otherwise, false.
162 */
163 bool decode_header(const uchar *buffer, uint64_t buffer_len);
164
165 /**
166 Encodes the Member State's snapshot to be sent through the network.
167
168 @param[out] buffer where the snapshot will be stored.
169 @param[in,out] buffer_len pointer to the variable that will hold the
170 snapshot's size and has the buffer's len as input.
171
172 @return True if there is no space to store the snapshot.
173 Otherwise, false.
174 */
175 bool encode_snapshot(uchar *buffer, uint64_t *buffer_len) const;
176
177 /**
178 Decodes the Member State's snapshot that was sent through the network.
179
180 @param[out] buffer Where the snapshot was stored.
181 @param[in,out] buffer_len It holds the snapshot's size.
182
183 @return true if nothing went wrong. Otherwise, false.
184 */
185 bool decode_snapshot(const uchar *buffer, uint64_t buffer_len);
186
187 /**
188 Decodes Member State that was sent through the network.
189
190 @param[out] data where the data was stored.
191 @param[out] data_size pointer to the variable that holds the
192 data's size.
193
194 @return True if for any reason the data could not be exchanged.
195 Otherwise, false.
196 */
197 bool decode(const uchar *data, uint64_t data_size);
198
199 /**
200 @return the size of the encoded payload when put on the wire.
201 */
202
203 uint64_t get_encode_payload_size() const;
204
205 /**
206 @return the size of the encoded header when put on the wire.
207 */
208
209 static constexpr uint64_t get_encode_header_size() {
213 }
214
215 /**
216 @return the size of the encoded snapshot when put on the wire.
217 */
218
219 uint64_t get_encode_snapshot_size() const;
220
221 /**
222 @return the view identifier
223 */
224
226
227 /**
228 @return the configuration identifier
229 */
230
231 synode_no get_configuration_id() const { return m_configuration_id; }
232
233 /**
234 @return the generic exchangeable data
235 */
236
237 const uchar *get_data() const { return m_data; }
238
239 /**
240 @return the size of the generic exchangeable data
241 */
242
243 uint64_t get_data_size() const { return m_data_size; }
244
245 const Gcs_xcom_synode_set &get_snapshot() const { return m_snapshot; }
246
247 private:
248 static constexpr auto WIRE_XCOM_VARIABLE_VIEW_ID_SIZE = 8;
249 static constexpr auto WIRE_XCOM_VIEW_ID_SIZE = 4;
250 static constexpr auto WIRE_XCOM_GROUP_ID_SIZE = 4;
251 static constexpr auto WIRE_XCOM_MSG_ID_SIZE = 8;
252 static constexpr auto WIRE_XCOM_NODE_ID_SIZE = 4;
253 static constexpr auto WIRE_XCOM_SNAPSHOT_NR_ELEMS_SIZE = 8;
254
255 static constexpr uint64_t get_encode_snapshot_elem_size() {
257 }
258
259 /*
260 View identifier installed by the current member if there
261 is any.
262 */
264
265 /*
266 Configuration identifier in use when the state exchange message
267 was created.
268 */
270
271 /*
272 Data to be disseminated by the state exchange phase.
273 */
275
276 /*
277 Data's size disseminated by the state exchange phase.
278 */
279 uint64_t m_data_size;
280
281 /**
282 Recovery information which is currently a list of the XCom synods of the
283 fragments that are in buffers when a node is joining.
284
285 This is currently used only to transfer information on slice packets that
286 need to be fetched by the joining node before it may become ready to serve
287 requests.
288 */
290
291 /**
292 GCS communication version number in use.
293 */
295
296 /*
297 Disabling the copy constructor and assignment operator.
298 */
301};
302
303/**
304 @class gcs_xcom_state_exchange_interface
305
306 Interface that defines the operations that state exchange will provide.
307 In what follows, we describe how the state exchange algorithm works and
308 the view change process where it is inserted and which is an essential
309 part of our system.
310
311 The view change process is comprised of two major parts:
312 - Adding or removing a node from the system, accomplished in the
313 XCom/Paxos layer: "The SMART Way to Migrate Replicated Stateful
314 Services"
315
316 - A state exchange phase in which all members distribute data among
317 themselves.
318
319 Whenever a node wants to add or remove itself from the group, or after
320 a failure when a healthy member expels the faulty node from the group, a
321 reconfiguration request is sent in the form of an add_node or remove_node
322 message.
323
324 After the success of the request, XCOM sends a global view message
325 that contains information on all nodes tagging them as alive or
326 faulty to all non-faulty members. MySQL GCS looks at this information
327 and computes who has joined and who has left the group. The computation
328 is trivially simple and compares the set of nodes received in the
329 current view with the set of nodes in the previous view:
330
331 . left nodes = (alive_members in old_set) - (alive_members in new_set)
332
333 . joined nodes = (alive_members in new_set) - (alive_members in old_set)
334
335 However, the new view is only delivered to an upper layer after all
336 members exchange what we call a state message. While the view is being
337 processed and the state exchange is ongoing, all incoming data messages
338 are not delivered to the application and are put into a buffer. So after
339 getting state messages from all members, the view change is delivered
340 to the upper layer along with the content of the state messages and any
341 buffered message is delivered afterwards.
342
343 Why blocking the delivery of data messages and why these state
344 messages?
345
346 Recall that all messages are atomically delivered and we can guarantee
347 that all nodes will have the same state which encompasses messages
348 (e.g. transactions) in queues and in the storage (e.g. binary log)
349 because all new data messages are buffered while the state messages
350 are being exchanged.
351
352 Blocking the delivery of new data messages give us a synchronization
353 point.
354
355 But if all nodes have the same state why gathering a state message
356 from all members?
357
358 The power of choice. Let us use MySQL Group Replication as a concrete
359 example of an upper layer to understand why. This is done because having
360 information on all members allow the new node to choose a member that is
361 not lagging behind (i.e. has a small queue) as a donor in a recovery
362 phase. Besides, the state message also carries information on IPs and
363 Ports used to access the MySQL Instances. This information is necessary
364 to start the recovery which will be asynchronously started and will dump
365 the missing data from a donor.
366
367 Note that the content of the state message is opaque to the MySQL GCS
368 layer which only provides a synchronization point.
369*/
371 public:
373
374 /**
375 Accomplishes all necessary initialization steps.
376 */
377
378 virtual void init() = 0;
379
380 /**
381 If messages were buffered during its processing, they are discarded
382 and internal structures needed are cleaned up.
383 */
384
385 virtual void reset() = 0;
386
387 /**
388 Has the same behavior as the reset but additionally flushes buffered
389 messages.
390 */
391
392 virtual void reset_with_flush() = 0;
393
394 /**
395 If messages were buffered during its processing, they are delivered
396 to upper layers and internal structures needed are cleaned up.
397 */
398
399 virtual void end() = 0;
400
401 /**
402 Signals the module to start a State Exchange.
403
404 @param[in] configuration_id Configuration identifier in use when the state
405 exchange phase started
406 @param[in] total xcom total members in the new view
407 @param[in] left xcom members that left in the new view
408 @param[in] joined xcom members that joined in the new view
409 @param[in] exchangeable_data generic exchanged data
410 @param[in] current_view the currently installed view
411 @param[in] group group name
412 @param[in] local_info the local GCS member identifier
413 @param[in] xcom_nodes list of nodes
414
415 @return true if the member is leaving
416 */
417
418 virtual bool state_exchange(
419 synode_no configuration_id, std::vector<Gcs_member_identifier *> &total,
420 std::vector<Gcs_member_identifier *> &left,
421 std::vector<Gcs_member_identifier *> &joined,
422 std::vector<std::unique_ptr<Gcs_message_data>> &exchangeable_data,
423 Gcs_view *current_view, std::string *group,
424 const Gcs_member_identifier &local_info,
425 const Gcs_xcom_nodes &xcom_nodes) = 0;
426
427 /**
428 Processes a member state message on an ongoing State Exchange round.
429
430 @param[in] ms_info received Member State
431 @param[in] p_id the node that the Member State pertains
432 @param[in] maximum_supported_protocol_version maximum supported protocol
433 version
434 @param[in] used_protocol_version protocol version in use by a member during
435 the state exchange phase
436
437 @return true if State Exchanged is to be finished and the view can be
438 installed
439 */
440
442 Xcom_member_state *ms_info, const Gcs_member_identifier &p_id,
443 Gcs_protocol_version maximum_supported_protocol_version,
444 Gcs_protocol_version used_protocol_version) = 0;
445
446 /**
447 Compute the set of incompatible members after the state exchange has
448 finished.
449 A member M is incompatible if it is attempting to join a group that is
450 using protocol X, but M is using protocol Y s.t. X != Y.
451 @returns the set of incompatible members
452 */
453 virtual std::vector<Gcs_xcom_node_information>
455
456 /**
457 Recovers any missing packets required for the member to join the group.
458 @retval true if successful
459 @retval false otherwise
460 */
461 virtual bool process_recovery_state() = 0;
462
463 /**
464 Retrieves the new view identifier after a State Exchange.
465
466 @return the new view identifier
467 */
468
470
471 /**
472 @return the members that joined in this State Exchange round
473 */
474
475 virtual std::set<Gcs_member_identifier *> *get_joined() = 0;
476
477 /**
478 @return the members that left in this State Exchange round
479 */
480
481 virtual std::set<Gcs_member_identifier *> *get_left() = 0;
482
483 /**
484 @return All the members in this State Exchange round
485 */
486
487 virtual std::set<Gcs_member_identifier *> *get_total() = 0;
488
489 /**
490 @return the group in which this State Exchange is occurring
491 */
492
493 virtual std::string *get_group() = 0;
494
495 /**
496 @return the saved states
497 */
498
499 virtual std::map<Gcs_member_identifier, Xcom_member_state *>
501
502 /**
503 Computes the maximum protocol version supported by the group.
504 */
506};
507
508/**
509 Implementation of the gcs_xcom_state_exchange_interface.
510*/
512 public:
513 /**
514 State Exchange constructor.
515
516 @param[in] comm Communication interface reference to allow broadcasting of
517 member states
518 */
519
521
522 ~Gcs_xcom_state_exchange() override;
523
524 // Implementation of gcs_xcom_state_exchange_interface
525 void init() override;
526
527 void reset() override;
528
529 void reset_with_flush() override;
530
531 void end() override;
532
533 bool state_exchange(
534 synode_no configuration_id, std::vector<Gcs_member_identifier *> &total,
535 std::vector<Gcs_member_identifier *> &left,
536 std::vector<Gcs_member_identifier *> &joined,
537 std::vector<std::unique_ptr<Gcs_message_data>> &exchangeable_data,
538 Gcs_view *current_view, std::string *group,
539 const Gcs_member_identifier &local_info,
540 const Gcs_xcom_nodes &xcom_nodes) override;
541
543 Xcom_member_state *ms_info, const Gcs_member_identifier &p_id,
544 Gcs_protocol_version maximum_supported_protocol_version,
545 Gcs_protocol_version used_protocol_version) override;
546
547 std::vector<Gcs_xcom_node_information> compute_incompatible_members()
548 override;
549
550 bool process_recovery_state() override;
551
553
554 std::set<Gcs_member_identifier *> *get_joined() override {
555 return &m_ms_joined;
556 }
557
558 std::set<Gcs_member_identifier *> *get_left() override { return &m_ms_left; }
559
560 std::set<Gcs_member_identifier *> *get_total() override {
561 return &m_ms_total;
562 }
563
564 std::map<Gcs_member_identifier, Xcom_member_state *> *get_member_states()
565 override {
566 return &m_member_states;
567 }
568
569 std::string *get_group() override { return m_group_name; }
570
572
573 private:
574 /**
575 Computes if the local member is leaving.
576
577 @return true in case of the local member is leaving
578 */
579
580 bool is_leaving();
581
582 /**
583 Computes if the local member is joining.
584
585 @return true in case of the local member is joining
586 */
587
588 bool is_joining();
589
590 /**
591 Update the communication system with information on membership.
592
593 @param xcom_nodes List of nodes that belong to the current membership.
594 */
595 void update_communication_channel(const Gcs_xcom_nodes &xcom_nodes);
596
597 /**
598 Broadcasts the local state to all nodes in the Cluster.
599
600 @param[in] proposed_view proposed view to broadcast
601 @param[in] exchangeable_data List with exchangeable messages
602 */
603
605 const Gcs_xcom_view_identifier &proposed_view,
606 std::vector<std::unique_ptr<Gcs_message_data>> &exchangeable_data);
607
608 /**
609 Updates the structure that waits for State Exchanges.
610 */
611
613
614 /**
615 Converts xcom data to a set of internal representation.
616
617 @param[in] in xcom list
618 @param[in] pset Set where the converted member ids will be written
619 */
620
621 void fill_member_set(std::vector<Gcs_member_identifier *> &in,
622 std::set<Gcs_member_identifier *> &pset);
623
624 /**
625 Stores the member's state and protocol version.
626
627 @param ms_info state
628 @param p_id member
629 @param[in] maximum_supported_protocol_version maximum supported protocol
630 version
631 @param used_protocol_version protocol version
632 */
634 Xcom_member_state *ms_info, const Gcs_member_identifier &p_id,
635 Gcs_protocol_version maximum_supported_protocol_version,
636 Gcs_protocol_version used_protocol_version);
637
638 /**
639 Auxiliary method that checks whether @c snapshot_to_recover contains all
640 the synodes required.
641 */
642 bool snapshot_is_enough(Gcs_xcom_synode_set const &snapshot_to_recover) const;
643
644 /**
645 Checks whether all the existing group members, myself excluded, announce the
646 same protocol version.
647 @retval {false, _} if they do not all announce the same protocol version
648 @retval {true, Gcs_protocol_version} if they all announced the same protocol
649 version of the return value
650 */
651 std::pair<bool, Gcs_protocol_version> members_announce_same_version() const;
652
653 /**
654 Checks whether this server is incompatible with the group.
655 @retval true If it is incompatible
656 @retval false If it is compatible
657 */
658 bool incompatible_with_group() const;
659
660 /**
661 Computes the set of incompatible nodes that are trying to join the group.
662 @returns the set of incompatible joiners
663 */
664 std::vector<Gcs_xcom_node_information> compute_incompatible_joiners();
665
667
668 std::map<Gcs_member_identifier, uint> m_awaited_vector;
669
670 std::map<Gcs_member_identifier, uint> m_recover_vector;
671
672 /* Set of ids in GCS native format as reported by View-change handler. */
673 std::set<Gcs_member_identifier *> m_ms_total, m_ms_left, m_ms_joined;
674
675 /* Collection of State Message contents to facilitate view installation. */
676 std::map<Gcs_member_identifier, Xcom_member_state *> m_member_states;
677
678 /* Collection of protocol version in use per member. */
679 std::map<Gcs_member_identifier, Gcs_protocol_version> m_member_versions;
680
681 /* Collection of maximum protocol version supported per member. */
682 std::map<Gcs_member_identifier, Gcs_protocol_version> m_member_max_versions;
683
684 // Group name to exchange state
685 std::string *m_group_name;
686
687 // Local GCS member identification
689
690 /* Configuration identifier in use when the state exchange phase started */
692
693 std::vector<synode_no> cached_ids;
694
695 /* XCom identifiers of the members in m_ms_total */
697
698 /*
699 Disabling the copy constructor and assignment operator.
700 */
703};
704
705/*
706 @interface gcs_xcom_view_change_control_interface
707
708 This interface will serve as a synchronization point to all those that are
709 interested in maintaining view safety. This will guarantee that no actions are
710 accomplished while a view change procedure is ongoing.
711
712 The promoters of view change will indicate via start_view_exchange() and
713 end_view_exchange() the boundaries of the process. Those that want to wait
714 for the end, will synchronize on wait_for_view_change_end().
715*/
717 public:
719
720 virtual void start_view_exchange() = 0;
721 virtual void end_view_exchange() = 0;
722 virtual void wait_for_view_change_end() = 0;
723 virtual bool is_view_changing() = 0;
724
725 // Leave related information
726 virtual bool start_leave() = 0;
727 virtual void end_leave() = 0;
728 virtual bool is_leaving() = 0;
729
730 // Join related information
731 virtual bool start_join() = 0;
732 virtual void end_join() = 0;
733 virtual bool is_joining() = 0;
734
735 // Keep track of delivered views
736 virtual void set_current_view(Gcs_view *current_view) = 0;
738 virtual bool belongs_to_group() = 0;
739 virtual void set_belongs_to_group(bool belong) = 0;
740 virtual void set_unsafe_current_view(Gcs_view *current_view) = 0;
742
743 // Keep track if GCS as a whole has been ordered to finalize;
744 virtual void finalize() = 0;
745 virtual bool is_finalized() = 0;
746};
747
748/*
749 @class gcs_xcom_view_change_control
750
751 Implementation of gcs_xcom_view_change_control_interface.
752*/
755 public:
758
759 void start_view_exchange() override;
760 void end_view_exchange() override;
761 void wait_for_view_change_end() override;
762 bool is_view_changing() override;
763
764 bool start_leave() override;
765 void end_leave() override;
766 bool is_leaving() override;
767
768 bool start_join() override;
769 void end_join() override;
770 bool is_joining() override;
771
772 void set_current_view(Gcs_view *current_view) override;
773 Gcs_view *get_current_view() override;
774 bool belongs_to_group() override;
775 void set_belongs_to_group(bool belong) override;
776 void set_unsafe_current_view(Gcs_view *current_view) override;
778
779 void finalize() override;
780 bool is_finalized() override;
781
782 private:
786
790
791 /*
792 Reference to the currently installed view.
793 */
795
796 /*
797 Protect access to the current view so that it can be
798 copied and returned.
799 */
801
802 /*
803 Whether the current node belongs to a group or not.
804 */
806
807 /*
808 */
809 std::atomic<bool> m_finalized;
810
811 /*
812 Disabling the copy constructor and assignment operator.
813 */
816};
817#endif /* GCS_XCOM_STATE_EXCHANGE_INCLUDED */
This interface represents all the communication facilities that a binding implementation should provi...
Definition: gcs_communication_interface.h:90
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
This represents the membership view that a member has from a group.
Definition: gcs_view.h:55
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:391
Definition: gcs_xcom_state_exchange.h:370
virtual std::map< Gcs_member_identifier, Xcom_member_state * > * get_member_states()=0
virtual std::string * get_group()=0
virtual std::vector< Gcs_xcom_node_information > compute_incompatible_members()=0
Compute the set of incompatible members after the state exchange has finished.
virtual void init()=0
Accomplishes all necessary initialization steps.
virtual bool process_recovery_state()=0
Recovers any missing packets required for the member to join the group.
virtual std::set< Gcs_member_identifier * > * get_total()=0
virtual void end()=0
If messages were buffered during its processing, they are delivered to upper layers and internal stru...
virtual Gcs_xcom_view_identifier * get_new_view_id()=0
Retrieves the new view identifier after a State Exchange.
virtual std::set< Gcs_member_identifier * > * get_left()=0
virtual std::set< Gcs_member_identifier * > * get_joined()=0
virtual ~Gcs_xcom_state_exchange_interface()=default
virtual void reset()=0
If messages were buffered during its processing, they are discarded and internal structures needed ar...
virtual void reset_with_flush()=0
Has the same behavior as the reset but additionally flushes buffered messages.
virtual void compute_maximum_supported_protocol_version()=0
Computes the maximum protocol version supported by the group.
virtual bool state_exchange(synode_no configuration_id, std::vector< Gcs_member_identifier * > &total, std::vector< Gcs_member_identifier * > &left, std::vector< Gcs_member_identifier * > &joined, std::vector< std::unique_ptr< Gcs_message_data > > &exchangeable_data, Gcs_view *current_view, std::string *group, const Gcs_member_identifier &local_info, const Gcs_xcom_nodes &xcom_nodes)=0
Signals the module to start a State Exchange.
virtual bool process_member_state(Xcom_member_state *ms_info, const Gcs_member_identifier &p_id, Gcs_protocol_version maximum_supported_protocol_version, Gcs_protocol_version used_protocol_version)=0
Processes a member state message on an ongoing State Exchange round.
Implementation of the gcs_xcom_state_exchange_interface.
Definition: gcs_xcom_state_exchange.h:511
void init() override
Accomplishes all necessary initialization steps.
Definition: gcs_xcom_state_exchange.cc:334
std::map< Gcs_member_identifier, uint > m_awaited_vector
Definition: gcs_xcom_state_exchange.h:668
void update_communication_channel(const Gcs_xcom_nodes &xcom_nodes)
Update the communication system with information on membership.
Definition: gcs_xcom_state_exchange.cc:656
std::pair< bool, Gcs_protocol_version > members_announce_same_version() const
Checks whether all the existing group members, myself excluded, announce the same protocol version.
Definition: gcs_xcom_state_exchange.cc:754
std::string * get_group() override
Definition: gcs_xcom_state_exchange.h:569
void compute_maximum_supported_protocol_version() override
Computes the maximum protocol version supported by the group.
Definition: gcs_xcom_state_exchange.cc:941
Gcs_member_identifier m_local_information
Definition: gcs_xcom_state_exchange.h:688
std::string * m_group_name
Definition: gcs_xcom_state_exchange.h:685
std::set< Gcs_member_identifier * > * get_joined() override
Definition: gcs_xcom_state_exchange.h:554
Gcs_xcom_view_identifier * get_new_view_id() override
Retrieves the new view identifier after a State Exchange.
Definition: gcs_xcom_state_exchange.cc:1030
Gcs_communication_interface * m_broadcaster
Definition: gcs_xcom_state_exchange.h:666
std::map< Gcs_member_identifier, Xcom_member_state * > * get_member_states() override
Definition: gcs_xcom_state_exchange.h:564
bool is_joining()
Computes if the local member is joining.
Definition: gcs_xcom_state_exchange.cc:494
std::map< Gcs_member_identifier, Gcs_protocol_version > m_member_versions
Definition: gcs_xcom_state_exchange.h:679
Gcs_xcom_nodes m_ms_xcom_nodes
Definition: gcs_xcom_state_exchange.h:696
enum_gcs_error broadcast_state(const Gcs_xcom_view_identifier &proposed_view, std::vector< std::unique_ptr< Gcs_message_data > > &exchangeable_data)
Broadcasts the local state to all nodes in the Cluster.
Definition: gcs_xcom_state_exchange.cc:517
bool process_member_state(Xcom_member_state *ms_info, const Gcs_member_identifier &p_id, Gcs_protocol_version maximum_supported_protocol_version, Gcs_protocol_version used_protocol_version) override
Processes a member state message on an ongoing State Exchange round.
Definition: gcs_xcom_state_exchange.cc:665
std::vector< Gcs_xcom_node_information > compute_incompatible_members() override
Compute the set of incompatible members after the state exchange has finished.
Definition: gcs_xcom_state_exchange.cc:721
std::set< Gcs_member_identifier * > * get_left() override
Definition: gcs_xcom_state_exchange.h:558
void fill_member_set(std::vector< Gcs_member_identifier * > &in, std::set< Gcs_member_identifier * > &pset)
Converts xcom data to a set of internal representation.
Definition: gcs_xcom_state_exchange.cc:1010
bool incompatible_with_group() const
Checks whether this server is incompatible with the group.
Definition: gcs_xcom_state_exchange.cc:788
void update_awaited_vector()
Updates the structure that waits for State Exchanges.
Definition: gcs_xcom_state_exchange.cc:637
bool state_exchange(synode_no configuration_id, std::vector< Gcs_member_identifier * > &total, std::vector< Gcs_member_identifier * > &left, std::vector< Gcs_member_identifier * > &joined, std::vector< std::unique_ptr< Gcs_message_data > > &exchangeable_data, Gcs_view *current_view, std::string *group, const Gcs_member_identifier &local_info, const Gcs_xcom_nodes &xcom_nodes) override
Signals the module to start a State Exchange.
Definition: gcs_xcom_state_exchange.cc:409
bool is_leaving()
Computes if the local member is leaving.
Definition: gcs_xcom_state_exchange.cc:506
std::map< Gcs_member_identifier, uint > m_recover_vector
Definition: gcs_xcom_state_exchange.h:670
void reset_with_flush() override
Has the same behavior as the reset but additionally flushes buffered messages.
Definition: gcs_xcom_state_exchange.cc:336
std::set< Gcs_member_identifier * > m_ms_total
Definition: gcs_xcom_state_exchange.h:673
std::set< Gcs_member_identifier * > m_ms_left
Definition: gcs_xcom_state_exchange.h:673
bool snapshot_is_enough(Gcs_xcom_synode_set const &snapshot_to_recover) const
Auxiliary method that checks whether snapshot_to_recover contains all the synodes required.
void save_member_state(Xcom_member_state *ms_info, const Gcs_member_identifier &p_id, Gcs_protocol_version maximum_supported_protocol_version, Gcs_protocol_version used_protocol_version)
Stores the member's state and protocol version.
Definition: gcs_xcom_state_exchange.cc:1016
~Gcs_xcom_state_exchange() override
Definition: gcs_xcom_state_exchange.cc:325
std::map< Gcs_member_identifier, Xcom_member_state * > m_member_states
Definition: gcs_xcom_state_exchange.h:676
std::vector< synode_no > cached_ids
Definition: gcs_xcom_state_exchange.h:693
std::map< Gcs_member_identifier, Gcs_protocol_version > m_member_max_versions
Definition: gcs_xcom_state_exchange.h:682
void reset() override
If messages were buffered during its processing, they are discarded and internal structures needed ar...
Definition: gcs_xcom_state_exchange.cc:361
std::vector< Gcs_xcom_node_information > compute_incompatible_joiners()
Computes the set of incompatible nodes that are trying to join the group.
Definition: gcs_xcom_state_exchange.cc:885
std::set< Gcs_member_identifier * > m_ms_joined
Definition: gcs_xcom_state_exchange.h:673
Gcs_xcom_state_exchange(Gcs_xcom_state_exchange const &)
synode_no m_configuration_id
Definition: gcs_xcom_state_exchange.h:691
Gcs_xcom_state_exchange(Gcs_communication_interface *comm)
State Exchange constructor.
Definition: gcs_xcom_state_exchange.cc:309
void end() override
If messages were buffered during its processing, they are delivered to upper layers and internal stru...
Definition: gcs_xcom_state_exchange.cc:400
bool process_recovery_state() override
Recovers any missing packets required for the member to join the group.
Definition: gcs_xcom_state_exchange.cc:963
Gcs_xcom_state_exchange & operator=(Gcs_xcom_state_exchange const &)
std::set< Gcs_member_identifier * > * get_total() override
Definition: gcs_xcom_state_exchange.h:560
Definition: gcs_xcom_state_exchange.h:716
virtual void set_current_view(Gcs_view *current_view)=0
virtual void set_unsafe_current_view(Gcs_view *current_view)=0
virtual void set_belongs_to_group(bool belong)=0
virtual Gcs_view * get_current_view()=0
virtual ~Gcs_xcom_view_change_control_interface()=default
virtual Gcs_view * get_unsafe_current_view()=0
Definition: gcs_xcom_state_exchange.h:754
void end_join() override
Definition: gcs_xcom_state_exchange.cc:1214
bool start_join() override
Definition: gcs_xcom_state_exchange.cc:1203
bool is_finalized() override
Definition: gcs_xcom_state_exchange.cc:1232
bool m_view_changing
Definition: gcs_xcom_state_exchange.h:783
My_xp_mutex_impl m_current_view_mutex
Definition: gcs_xcom_state_exchange.h:800
Gcs_view * get_current_view() override
Definition: gcs_xcom_state_exchange.cc:1123
void end_view_exchange() override
Definition: gcs_xcom_state_exchange.cc:1151
My_xp_cond_impl m_wait_for_view_cond
Definition: gcs_xcom_state_exchange.h:787
Gcs_xcom_view_change_control(Gcs_xcom_view_change_control const &)
~Gcs_xcom_view_change_control() override
Definition: gcs_xcom_state_exchange.cc:1102
bool is_view_changing() override
Definition: gcs_xcom_state_exchange.cc:1158
bool is_joining() override
Definition: gcs_xcom_state_exchange.cc:1220
void finalize() override
Definition: gcs_xcom_state_exchange.cc:1230
bool is_leaving() override
Definition: gcs_xcom_state_exchange.cc:1193
void start_view_exchange() override
Definition: gcs_xcom_state_exchange.cc:1145
void wait_for_view_change_end() override
Definition: gcs_xcom_state_exchange.cc:1167
Gcs_xcom_view_change_control & operator=(Gcs_xcom_view_change_control const &)
bool belongs_to_group() override
Definition: gcs_xcom_state_exchange.cc:1137
void set_belongs_to_group(bool belong) override
Definition: gcs_xcom_state_exchange.cc:1141
void set_current_view(Gcs_view *current_view) override
Definition: gcs_xcom_state_exchange.cc:1109
bool start_leave() override
Definition: gcs_xcom_state_exchange.cc:1176
bool m_joining
Definition: gcs_xcom_state_exchange.h:785
My_xp_mutex_impl m_joining_leaving_mutex
Definition: gcs_xcom_state_exchange.h:789
void end_leave() override
Definition: gcs_xcom_state_exchange.cc:1187
Gcs_xcom_view_change_control()
Definition: gcs_xcom_state_exchange.cc:1079
bool m_belongs_to_group
Definition: gcs_xcom_state_exchange.h:805
std::atomic< bool > m_finalized
Definition: gcs_xcom_state_exchange.h:809
Gcs_view * m_current_view
Definition: gcs_xcom_state_exchange.h:794
My_xp_mutex_impl m_wait_for_view_mutex
Definition: gcs_xcom_state_exchange.h:788
Gcs_view * get_unsafe_current_view() override
Definition: gcs_xcom_state_exchange.cc:1133
bool m_leaving
Definition: gcs_xcom_state_exchange.h:784
void set_unsafe_current_view(Gcs_view *current_view) override
Definition: gcs_xcom_state_exchange.cc:1117
Definition: gcs_xcom_view_identifier.h:33
Definition: my_xp_cond.h:136
Definition: my_xp_mutex.h:123
Class that conveys the state to be exchanged between members, which is not provided by XCom.
Definition: gcs_xcom_state_exchange.h:105
Xcom_member_state & operator=(Xcom_member_state const &)=delete
Xcom_member_state(Xcom_member_state const &)=delete
Gcs_xcom_view_identifier * m_view_id
Definition: gcs_xcom_state_exchange.h:263
synode_no m_configuration_id
Definition: gcs_xcom_state_exchange.h:269
const Gcs_xcom_synode_set & get_snapshot() const
Definition: gcs_xcom_state_exchange.h:245
Gcs_protocol_version m_version
GCS communication version number in use.
Definition: gcs_xcom_state_exchange.h:294
static constexpr uint64_t get_encode_snapshot_elem_size()
Definition: gcs_xcom_state_exchange.h:255
Gcs_xcom_view_identifier * get_view_id()
Definition: gcs_xcom_state_exchange.h:225
uint64_t get_encode_payload_size() const
Definition: gcs_xcom_state_exchange.cc:74
static constexpr uint64_t get_encode_header_size()
Definition: gcs_xcom_state_exchange.h:209
bool decode_header(const uchar *buffer, uint64_t buffer_len)
Decodes the Member State's header that was sent through the network.
Definition: gcs_xcom_state_exchange.cc:206
uchar * m_data
Definition: gcs_xcom_state_exchange.h:274
static constexpr auto WIRE_XCOM_SNAPSHOT_NR_ELEMS_SIZE
Definition: gcs_xcom_state_exchange.h:253
static constexpr auto WIRE_XCOM_VARIABLE_VIEW_ID_SIZE
Definition: gcs_xcom_state_exchange.h:248
static constexpr auto WIRE_XCOM_VIEW_ID_SIZE
Definition: gcs_xcom_state_exchange.h:249
static constexpr auto WIRE_XCOM_GROUP_ID_SIZE
Definition: gcs_xcom_state_exchange.h:250
uint64_t get_data_size() const
Definition: gcs_xcom_state_exchange.h:243
uint64_t m_data_size
Definition: gcs_xcom_state_exchange.h:279
bool encode_header(uchar *buffer, uint64_t *buffer_len) const
Encodes the Member State's header to be sent through the newtwork.
Definition: gcs_xcom_state_exchange.cc:92
bool encode_snapshot(uchar *buffer, uint64_t *buffer_len) const
Encodes the Member State's snapshot to be sent through the network.
Definition: gcs_xcom_state_exchange.cc:159
static constexpr auto WIRE_XCOM_NODE_ID_SIZE
Definition: gcs_xcom_state_exchange.h:252
~Xcom_member_state()
Member state destructor.
Definition: gcs_xcom_state_exchange.cc:69
static constexpr auto WIRE_XCOM_MSG_ID_SIZE
Definition: gcs_xcom_state_exchange.h:251
bool decode(const uchar *data, uint64_t data_size)
Decodes Member State that was sent through the network.
Definition: gcs_xcom_state_exchange.cc:281
Xcom_member_state(const Gcs_xcom_view_identifier &view_id, synode_no configuration_id, Gcs_protocol_version version, const Gcs_xcom_synode_set &snapshot, const uchar *data, uint64_t data_size)
Xcom_member_state constructor.
Definition: gcs_xcom_state_exchange.cc:38
bool decode_snapshot(const uchar *buffer, uint64_t buffer_len)
Decodes the Member State's snapshot that was sent through the network.
Definition: gcs_xcom_state_exchange.cc:239
synode_no get_configuration_id() const
Definition: gcs_xcom_state_exchange.h:231
Gcs_xcom_synode_set m_snapshot
Recovery information which is currently a list of the XCom synods of the fragments that are in buffer...
Definition: gcs_xcom_state_exchange.h:289
const uchar * get_data() const
Definition: gcs_xcom_state_exchange.h:237
uint64_t get_encode_snapshot_size() const
Definition: gcs_xcom_state_exchange.cc:78
enum_gcs_error
This enumeration describes errors which can occur during group communication operations.
Definition: gcs_types.h:41
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:128
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:84
unsigned char uchar
Definition: my_inttypes.h:52
Type total(const Shards< COUNT > &shards) noexcept
Get the total value of all shards.
Definition: ut0counter.h:333
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
std::vector< T, ut::allocator< T > > vector
Specialization of vector which uses allocator.
Definition: ut0new.h:2876
required uint64 version
Definition: replication_group_member_actions.proto:41