MySQL 8.1.0
Source Code Documentation
gcs_message_stage_split.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef GCS_MESSAGE_STAGE_SPLIT_H
24#define GCS_MESSAGE_STAGE_SPLIT_H
25
26#include <cstdint>
27#include <map>
28#include <string>
29#include <tuple>
30#include <unordered_map>
31#include <utility>
32
35
36using Gcs_message_id = unsigned long long;
37using Gcs_sender_id = uint64_t;
38
39/**
40 This class is responsible for controlling message fragmentation and bundling
41 and produces messages with the following format:
42
43 ---------------------------------------------------------------
44 | Sender Id | Message Id | Part Id | Num Msg | Size | Payload |
45 ---------------------------------------------------------------
46
47 . Sender Id - Member Identifier (i.e. incarnation UUID) that is used to
48 uniquely identify the original sender.
49
50 . Message Id - Message Identifier (i.e. transaction counter) that is used to
51 uniquely identify the original message.
52
53 . Part Id - Fragment of the original message that the payload corresponds to.
54
55 . Num Msg - The total number of messages that together compose the original
56 message.
57
58 . Size - The payload size that is being carried on by this message.
59
60 . Payload - Payload that corresponds to the part of the original message that
61 it is carrying on.
62*/
64 public:
65 /**
66 On-the-wire field size for the number of messages.
67 */
68 static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE = 4;
69
70 /**
71 On-the-wire field size for the sender identification size.
72 */
73 static const unsigned short WIRE_HD_SENDER_ID_SIZE = 8;
74
75 /**
76 On-the-wire field size for the message sequence (i.e. identification).
77 */
78 static const unsigned short WIRE_HD_MESSAGE_ID_SIZE = 8;
79
80 /**
81 On-the-wire field size for the message part sequence (i.e. identification).
82 */
83 static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE = 4;
84
85 /**
86 On-the-wire field size for payload length.
87 */
88 static const unsigned short WIRE_HD_PAYLOAD_SIZE = 8;
89
90 private:
91 /**
92 Uniquely identify the sender which the message belongs to.
93 */
95
96 /**
97 Uniquely identify the message so that we can reassemble split messages.
98 */
100 static_assert(sizeof(decltype(m_message_id)) == WIRE_HD_MESSAGE_ID_SIZE,
101 "The m_message_id size does not match the storage capacity");
102
103 /**
104 Determine the number of original messages that are included here.
105 */
106 unsigned int m_num_messages{1};
107 static_assert(sizeof(decltype(m_num_messages)) == WIRE_HD_NUM_MESSAGES_SIZE,
108 "The m_num_messages size does not match the storage capacity");
109
110 /**
111 Determine the part in the original message that the current payload
112 corresponds to. Note that the value starts at 0.
113 */
114 unsigned int m_message_part_id{0};
115 static_assert(
117 "The m_message_part_id size does not match the storage capacity");
118
119 /**
120 Size of the current payload which is a full message or part of a message.
121 */
122 unsigned long long m_payload_length{0};
123 static_assert(sizeof(decltype(m_payload_length)) == WIRE_HD_PAYLOAD_SIZE,
124 "The m_payload_len size does not match the storage capacity");
125
126 public:
127 explicit Gcs_split_header_v2() = default;
128
129 explicit Gcs_split_header_v2(const Gcs_sender_id &sender_id,
130 Gcs_message_id message_id,
131 unsigned int num_messages,
132 unsigned int message_part_id,
133 unsigned long long payload_length) noexcept
134 : m_sender_id(sender_id),
135 m_message_id(message_id),
136 m_num_messages(num_messages),
137 m_message_part_id(message_part_id),
138 m_payload_length(payload_length) {}
139
140 std::unique_ptr<Gcs_stage_metadata> clone() override {
141 return std::unique_ptr<Gcs_split_header_v2>(new Gcs_split_header_v2(*this));
142 }
143
144 /**
145 Set the sender identifier.
146
147 @param sender_id Sender identification.
148 */
149 void set_sender_id(const Gcs_sender_id &sender_id) {
150 m_sender_id = sender_id;
151 }
152
153 /**
154 Return the sender identifier.
155 */
156 const Gcs_sender_id &get_sender_id() const { return m_sender_id; }
157
158 /**
159 Set the message identifier.
160
161 @param message_id Message identification.
162 */
163 void set_message_id(Gcs_message_id message_id) { m_message_id = message_id; }
164
165 /**
166 Return the message identifier.
167 */
169
170 /**
171 Set the number of messages bundled together.
172
173 @param num_messages Number of messages bundled together.
174 */
175 void set_num_messages(unsigned int num_messages) {
176 m_num_messages = num_messages;
177 }
178
179 /**
180 Return the number of messages bundled together.
181 */
182 unsigned int get_num_messages() const { return m_num_messages; }
183
184 /**
185 Set the part that identifies this message.
186
187 @param message_part_id Part that identifies this message.
188 */
189 void set_message_part_id(unsigned int message_part_id) {
190 m_message_part_id = message_part_id;
191 }
192
193 /**
194 Return the part that identifies this message.
195 */
196 unsigned int get_message_part_id() const { return m_message_part_id; }
197
198 /**
199 Set the payload length.
200
201 @param payload_length Payload buffer length.
202 */
203 void set_payload_length(unsigned long long payload_length) {
204 m_payload_length = payload_length;
205 }
206
207 /**
208 Return the payload length.
209 */
210 unsigned long long get_payload_length() const { return m_payload_length; }
211
212 /**
213 Decodes the contents of the buffer and sets the field values according to
214 the values decoded. The buffer MUST be encoded in little endian format.
215
216 @param buffer The buffer to decode from.
217 @return Length of the encoded information.
218 */
219 unsigned long long decode(const unsigned char *buffer) override;
220
221 /**
222 Encode the contents of this instance into the buffer. The encoding SHALL
223 be done in little endian format.
224
225 @param buffer The buffer to encode to.
226 @return Length of the encoded information.
227 */
228 unsigned long long encode(unsigned char *buffer) const override;
229
230 /**
231 Calculate the length used to store the stage header information.
232 */
233 unsigned long long calculate_encode_length() const override {
234 return fixed_encode_length();
235 }
236
237 /**
238 Create a string representation of the header to be logged.
239
240 @param output Reference to the output stream where the string will be
241 created.
242 */
243 void dump(std::ostringstream &output) const override;
244
245 private:
246 /**
247 Helper method to calculate the length used to store the stage header
248 information.
249 */
250 static unsigned long long fixed_encode_length() {
254 }
255};
256
257using Gcs_packets_list = std::vector<Gcs_packet>;
259 std::unordered_map<Gcs_message_id, Gcs_packets_list>;
261 std::unordered_map<Gcs_sender_id, Gcs_packets_per_content>;
262
264 public:
265 /**
266 Default split threshold.
267 */
268 static constexpr unsigned long long DEFAULT_THRESHOLD = 1048576;
269
270 /*
271 Methods inherited from the Gcs_message_stage class.
272 */
274 uint64_t const &original_payload_size) const override;
275
276 std::unique_ptr<Gcs_stage_metadata> get_stage_header() override;
277
278 protected:
279 std::pair<bool, std::vector<Gcs_packet>> apply_transformation(
280 Gcs_packet &&packet) override;
281
282 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert_transformation(
283 Gcs_packet &&packet) override;
284
286 const Gcs_packet &packet) const override;
287
288 private:
289 /*
290 Set of packets received that cannot be immediately delivered because its
291 related fragments were not received yet.
292 */
294
295 /**
296 Unique sender identifier that is dynamically generated when a node rejoins
297 the group.
298 */
300
301 /**
302 This marks the threshold in bytes above which a message shall be split.
303 */
305
306 /**
307 Unique message identifier per sender.
308 */
309 std::atomic<Gcs_message_id> m_next_message_number{1};
310
311 public:
312 /**
313 Creates an instance of the stage.
314
315 @param enabled enables this message stage
316 @param split_threshold messages with the payload larger
317 than split_threshold in bytes are split.
318 */
320 unsigned long long split_threshold)
323 m_split_threshold(split_threshold) {}
324
326
327 /**
328 Return the stage code.
329 */
331
332 /**
333 Update the list of members in the group as this is required to process split
334 messages.
335
336 @param me The local member identifier.
337 @param xcom_nodes List of members in the group.
338 @return If there is an error, true is returned. Otherwise, false is returned.
339 */
341 const Gcs_xcom_nodes &xcom_nodes) override;
342
343 /**
344 Sets the threshold in bytes after which messages are split.
345
346 @param split_threshold If the payload exceeds these many bytes, then
347 the message is split.
348 */
349 void set_threshold(unsigned long long split_threshold) {
350 m_split_threshold = split_threshold;
351 }
352
353 private:
354 /**
355 Insert a packet into the mapping that keeps track of fragments.
356
357 This method must only called when the packet received is part of a fragmented
358 message.
359
360 @param packet fragment Fragment that will be collected to reconstruct the
361 original
362 @returns true if successful, false otherwise
363 */
364 bool insert_fragment(Gcs_packet &&packet);
365
366 /**
367 Insert a sender into the mapping that keeps track of sliced packets.
368
369 @param sender_id Source identification
370 */
371 void insert_sender(const Gcs_sender_id &sender_id);
372
373 /**
374 Remove a sender from the mapping that keeps track of sliced packets.
375
376 @param sender_id Source identification
377 */
378 void remove_sender(const Gcs_sender_id &sender_id);
379
380 Gcs_xcom_synode_set get_snapshot() const override;
381
383
384 std::pair<bool, std::vector<Gcs_packet>> create_fragments(
385 Gcs_packet &&packet, unsigned int const &nr_fragments) const;
386
387 std::pair<bool, Gcs_packet> create_fragment(
388 unsigned int const &fragment_part_id, Gcs_packet const &other_fragment,
389 unsigned char const *const original_payload_pointer,
390 unsigned long long const &fragment_size) const;
391
392 bool unknown_sender(Gcs_split_header_v2 const &fragment_header) const;
393
394 bool is_final_fragment(Gcs_split_header_v2 const &fragment_header) const;
395
396 /**
397 Fetch the fragments associated with the given metadata.
398 Removes the fragments from the table of ongoing tranmissions.
399
400 This method must only be called if there were previous calls to @c
401 insert_fragment, i.e. if given metadata is about a fragmented message.
402
403 @param fragment_header Fragmentation metadata
404 @returns the list of already received fragments.
405 */
406 Gcs_packets_list get_fragments(Gcs_split_header_v2 const &fragment_header);
407
408 /**
409 Reassembles the given fragment list into the original, whole packet.
410
411 This method must only be called with a non-empty packet list.
412
413 @param fragments The list of packet to reassemble
414 @retval {true, Gcs_packet} If reassembled successfully
415 @retval {false, _} If we could not allocate memory for the reassembled packet
416 */
417 std::pair<bool, Gcs_packet> reassemble_fragments(
418 Gcs_packets_list &fragments) const;
419};
420
422 public:
423 /**
424 Creates an instance of the stage.
425
426 @param enabled enables this message stage
427 @param split_threshold messages with the payload larger
428 than split_threshold in bytes are split.
429 */
431 unsigned long long split_threshold)
432 : Gcs_message_stage_split_v2(enabled, split_threshold) {}
433
435
436 /**
437 Return the stage code.
438 */
440};
441
442#endif /* GCS_MESSAGE_STAGE_SPLIT_H */
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:39
Definition: gcs_message_stage_split.h:263
~Gcs_message_stage_split_v2() override
Definition: gcs_message_stage_split.h:325
static constexpr unsigned long long DEFAULT_THRESHOLD
Default split threshold.
Definition: gcs_message_stage_split.h:268
std::pair< bool, Gcs_packet > create_fragment(unsigned int const &fragment_part_id, Gcs_packet const &other_fragment, unsigned char const *const original_payload_pointer, unsigned long long const &fragment_size) const
Definition: gcs_message_stage_split.cc:373
std::pair< bool, std::vector< Gcs_packet > > create_fragments(Gcs_packet &&packet, unsigned int const &nr_fragments) const
Definition: gcs_message_stage_split.cc:277
std::unique_ptr< Gcs_stage_metadata > get_stage_header() override
Definition: gcs_message_stage_split.cc:215
bool update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) override
Update the list of members in the group as this is required to process split messages.
Definition: gcs_message_stage_split.cc:132
void apply_transformation_single_fragment(Gcs_packet &packet) const
Definition: gcs_message_stage_split.cc:250
Gcs_sender_id m_sender_id
Unique sender identifier that is dynamically generated when a node rejoins the group.
Definition: gcs_message_stage_split.h:299
unsigned long long m_split_threshold
This marks the threshold in bytes above which a message shall be split.
Definition: gcs_message_stage_split.h:304
std::pair< bool, Gcs_packet > reassemble_fragments(Gcs_packets_list &fragments) const
Reassembles the given fragment list into the original, whole packet.
Definition: gcs_message_stage_split.cc:535
void insert_sender(const Gcs_sender_id &sender_id)
Insert a sender into the mapping that keeps track of sliced packets.
Definition: gcs_message_stage_split.cc:685
bool is_final_fragment(Gcs_split_header_v2 const &fragment_header) const
Definition: gcs_message_stage_split.cc:486
Gcs_message_stage_split_v2(bool enabled, unsigned long long split_threshold)
Creates an instance of the stage.
Definition: gcs_message_stage_split.h:319
Gcs_message_stage::stage_status skip_revert(const Gcs_packet &packet) const override
Check if the revert operation which affects incoming packets should be executed (i....
Definition: gcs_message_stage_split.cc:625
std::atomic< Gcs_message_id > m_next_message_number
Unique message identifier per sender.
Definition: gcs_message_stage_split.h:309
Gcs_message_stage::stage_status skip_apply(uint64_t const &original_payload_size) const override
Check if the apply operation which affects outgoing packets should be executed (i....
Definition: gcs_message_stage_split.cc:180
Gcs_packets_list get_fragments(Gcs_split_header_v2 const &fragment_header)
Fetch the fragments associated with the given metadata.
Definition: gcs_message_stage_split.cc:518
std::pair< bool, std::vector< Gcs_packet > > apply_transformation(Gcs_packet &&packet) override
Implements the logic of this stage's transformation to the packet, and returns a set of one,...
Definition: gcs_message_stage_split.cc:224
Gcs_xcom_synode_set get_snapshot() const override
Definition: gcs_message_stage_split.cc:693
bool insert_fragment(Gcs_packet &&packet)
Insert a packet into the mapping that keeps track of fragments.
Definition: gcs_message_stage_split.cc:630
Stage_code get_stage_code() const override
Return the stage code.
Definition: gcs_message_stage_split.h:330
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_transformation(Gcs_packet &&packet) override
Implements the logic to revert this stage's transformation to the packet, and returns one,...
Definition: gcs_message_stage_split.cc:428
bool unknown_sender(Gcs_split_header_v2 const &fragment_header) const
Definition: gcs_message_stage_split.cc:479
void remove_sender(const Gcs_sender_id &sender_id)
Remove a sender from the mapping that keeps track of sliced packets.
Definition: gcs_message_stage_split.cc:705
Gcs_packets_per_sender m_packets_per_source
Definition: gcs_message_stage_split.h:293
void set_threshold(unsigned long long split_threshold)
Sets the threshold in bytes after which messages are split.
Definition: gcs_message_stage_split.h:349
Definition: gcs_message_stage_split.h:421
~Gcs_message_stage_split_v3() override
Definition: gcs_message_stage_split.h:434
Gcs_message_stage_split_v3(bool enabled, unsigned long long split_threshold)
Creates an instance of the stage.
Definition: gcs_message_stage_split.h:430
Stage_code get_stage_code() const override
Return the stage code.
Definition: gcs_message_stage_split.h:439
This is a stage in the pipeline that processes messages when they are put through the send and receiv...
Definition: gcs_message_stages.h:81
stage_status
Definition: gcs_message_stages.h:83
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
This class is responsible for controlling message fragmentation and bundling and produces messages wi...
Definition: gcs_message_stage_split.h:63
unsigned long long m_payload_length
Size of the current payload which is a full message or part of a message.
Definition: gcs_message_stage_split.h:122
Gcs_message_id m_message_id
Uniquely identify the message so that we can reassemble split messages.
Definition: gcs_message_stage_split.h:99
void set_sender_id(const Gcs_sender_id &sender_id)
Set the sender identifier.
Definition: gcs_message_stage_split.h:149
static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE
On-the-wire field size for the number of messages.
Definition: gcs_message_stage_split.h:68
unsigned long long get_payload_length() const
Return the payload length.
Definition: gcs_message_stage_split.h:210
unsigned long long calculate_encode_length() const override
Calculate the length used to store the stage header information.
Definition: gcs_message_stage_split.h:233
void set_num_messages(unsigned int num_messages)
Set the number of messages bundled together.
Definition: gcs_message_stage_split.h:175
static const unsigned short WIRE_HD_MESSAGE_ID_SIZE
On-the-wire field size for the message sequence (i.e.
Definition: gcs_message_stage_split.h:78
unsigned int m_num_messages
Determine the number of original messages that are included here.
Definition: gcs_message_stage_split.h:106
unsigned int get_message_part_id() const
Return the part that identifies this message.
Definition: gcs_message_stage_split.h:196
static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE
On-the-wire field size for the message part sequence (i.e.
Definition: gcs_message_stage_split.h:83
std::unique_ptr< Gcs_stage_metadata > clone() override
Creates a deep copy of this object.
Definition: gcs_message_stage_split.h:140
unsigned long long decode(const unsigned char *buffer) override
Decodes the contents of the buffer and sets the field values according to the values decoded.
Definition: gcs_message_stage_split.cc:83
void dump(std::ostringstream &output) const override
Create a string representation of the header to be logged.
Definition: gcs_message_stage_split.cc:109
Gcs_sender_id m_sender_id
Uniquely identify the sender which the message belongs to.
Definition: gcs_message_stage_split.h:94
unsigned int get_num_messages() const
Return the number of messages bundled together.
Definition: gcs_message_stage_split.h:182
unsigned long long encode(unsigned char *buffer) const override
Encode the contents of this instance into the buffer.
Definition: gcs_message_stage_split.cc:42
unsigned int m_message_part_id
Determine the part in the original message that the current payload corresponds to.
Definition: gcs_message_stage_split.h:114
static unsigned long long fixed_encode_length()
Helper method to calculate the length used to store the stage header information.
Definition: gcs_message_stage_split.h:250
Gcs_message_id get_message_id() const
Return the message identifier.
Definition: gcs_message_stage_split.h:168
static const unsigned short WIRE_HD_PAYLOAD_SIZE
On-the-wire field size for payload length.
Definition: gcs_message_stage_split.h:88
void set_message_part_id(unsigned int message_part_id)
Set the part that identifies this message.
Definition: gcs_message_stage_split.h:189
const Gcs_sender_id & get_sender_id() const
Return the sender identifier.
Definition: gcs_message_stage_split.h:156
void set_message_id(Gcs_message_id message_id)
Set the message identifier.
Definition: gcs_message_stage_split.h:163
Gcs_split_header_v2()=default
void set_payload_length(unsigned long long payload_length)
Set the payload length.
Definition: gcs_message_stage_split.h:203
static const unsigned short WIRE_HD_SENDER_ID_SIZE
On-the-wire field size for the sender identification size.
Definition: gcs_message_stage_split.h:73
Gcs_split_header_v2(const Gcs_sender_id &sender_id, Gcs_message_id message_id, unsigned int num_messages, unsigned int message_part_id, unsigned long long payload_length) noexcept
Definition: gcs_message_stage_split.h:129
Abstract class that defines specific metadata associated to a stage if it decides to extend it.
Definition: gcs_internal_message_headers.h:561
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:390
Stage_code
The different stages that are currently available.
Definition: gcs_internal_message_headers.h:72
std::unordered_map< Gcs_message_id, Gcs_packets_list > Gcs_packets_per_content
Definition: gcs_message_stage_split.h:259
unsigned long long Gcs_message_id
Definition: gcs_message_stage_split.h:36
uint64_t Gcs_sender_id
Definition: gcs_message_stage_split.h:37
std::unordered_map< Gcs_sender_id, Gcs_packets_per_content > Gcs_packets_per_sender
Definition: gcs_message_stage_split.h:261
std::vector< Gcs_packet > Gcs_packets_list
Definition: gcs_message_stage_split.h:257
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:83
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:419
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2869
required bool enabled
Definition: replication_group_member_actions.proto:32