MySQL 8.4.0
Source Code Documentation
gcs_message_stage_split.h
Go to the documentation of this file.
1/* Copyright (c) 2018, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_MESSAGE_STAGE_SPLIT_H
25#define GCS_MESSAGE_STAGE_SPLIT_H
26
27#include <cstdint>
28#include <map>
29#include <string>
30#include <tuple>
31#include <unordered_map>
32#include <utility>
33
36
37using Gcs_message_id = unsigned long long;
38using Gcs_sender_id = uint64_t;
39
40/**
41 This class is responsible for controlling message fragmentation and bundling
42 and produces messages with the following format:
43
44 ---------------------------------------------------------------
45 | Sender Id | Message Id | Part Id | Num Msg | Size | Payload |
46 ---------------------------------------------------------------
47
48 . Sender Id - Member Identifier (i.e. incarnation UUID) that is used to
49 uniquely identify the original sender.
50
51 . Message Id - Message Identifier (i.e. transaction counter) that is used to
52 uniquely identify the original message.
53
54 . Part Id - Fragment of the original message that the payload corresponds to.
55
56 . Num Msg - The total number of messages that together compose the original
57 message.
58
59 . Size - The payload size that is being carried on by this message.
60
61 . Payload - Payload that corresponds to the part of the original message that
62 it is carrying on.
63*/
65 public:
66 /**
67 On-the-wire field size for the number of messages.
68 */
69 static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE = 4;
70
71 /**
72 On-the-wire field size for the sender identification size.
73 */
74 static const unsigned short WIRE_HD_SENDER_ID_SIZE = 8;
75
76 /**
77 On-the-wire field size for the message sequence (i.e. identification).
78 */
79 static const unsigned short WIRE_HD_MESSAGE_ID_SIZE = 8;
80
81 /**
82 On-the-wire field size for the message part sequence (i.e. identification).
83 */
84 static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE = 4;
85
86 /**
87 On-the-wire field size for payload length.
88 */
89 static const unsigned short WIRE_HD_PAYLOAD_SIZE = 8;
90
91 private:
92 /**
93 Uniquely identify the sender which the message belongs to.
94 */
96
97 /**
98 Uniquely identify the message so that we can reassemble split messages.
99 */
101 static_assert(sizeof(decltype(m_message_id)) == WIRE_HD_MESSAGE_ID_SIZE,
102 "The m_message_id size does not match the storage capacity");
103
104 /**
105 Determine the number of original messages that are included here.
106 */
107 unsigned int m_num_messages{1};
108 static_assert(sizeof(decltype(m_num_messages)) == WIRE_HD_NUM_MESSAGES_SIZE,
109 "The m_num_messages size does not match the storage capacity");
110
111 /**
112 Determine the part in the original message that the current payload
113 corresponds to. Note that the value starts at 0.
114 */
115 unsigned int m_message_part_id{0};
116 static_assert(
118 "The m_message_part_id size does not match the storage capacity");
119
120 /**
121 Size of the current payload which is a full message or part of a message.
122 */
123 unsigned long long m_payload_length{0};
124 static_assert(sizeof(decltype(m_payload_length)) == WIRE_HD_PAYLOAD_SIZE,
125 "The m_payload_len size does not match the storage capacity");
126
127 public:
128 explicit Gcs_split_header_v2() = default;
129
130 explicit Gcs_split_header_v2(const Gcs_sender_id &sender_id,
131 Gcs_message_id message_id,
132 unsigned int num_messages,
133 unsigned int message_part_id,
134 unsigned long long payload_length) noexcept
135 : m_sender_id(sender_id),
136 m_message_id(message_id),
137 m_num_messages(num_messages),
138 m_message_part_id(message_part_id),
139 m_payload_length(payload_length) {}
140
141 std::unique_ptr<Gcs_stage_metadata> clone() override {
142 return std::unique_ptr<Gcs_split_header_v2>(new Gcs_split_header_v2(*this));
143 }
144
145 /**
146 Set the sender identifier.
147
148 @param sender_id Sender identification.
149 */
150 void set_sender_id(const Gcs_sender_id &sender_id) {
151 m_sender_id = sender_id;
152 }
153
154 /**
155 Return the sender identifier.
156 */
157 const Gcs_sender_id &get_sender_id() const { return m_sender_id; }
158
159 /**
160 Set the message identifier.
161
162 @param message_id Message identification.
163 */
164 void set_message_id(Gcs_message_id message_id) { m_message_id = message_id; }
165
166 /**
167 Return the message identifier.
168 */
170
171 /**
172 Set the number of messages bundled together.
173
174 @param num_messages Number of messages bundled together.
175 */
176 void set_num_messages(unsigned int num_messages) {
177 m_num_messages = num_messages;
178 }
179
180 /**
181 Return the number of messages bundled together.
182 */
183 unsigned int get_num_messages() const { return m_num_messages; }
184
185 /**
186 Set the part that identifies this message.
187
188 @param message_part_id Part that identifies this message.
189 */
190 void set_message_part_id(unsigned int message_part_id) {
191 m_message_part_id = message_part_id;
192 }
193
194 /**
195 Return the part that identifies this message.
196 */
197 unsigned int get_message_part_id() const { return m_message_part_id; }
198
199 /**
200 Set the payload length.
201
202 @param payload_length Payload buffer length.
203 */
204 void set_payload_length(unsigned long long payload_length) {
205 m_payload_length = payload_length;
206 }
207
208 /**
209 Return the payload length.
210 */
211 unsigned long long get_payload_length() const { return m_payload_length; }
212
213 /**
214 Decodes the contents of the buffer and sets the field values according to
215 the values decoded. The buffer MUST be encoded in little endian format.
216
217 @param buffer The buffer to decode from.
218 @return Length of the encoded information.
219 */
220 unsigned long long decode(const unsigned char *buffer) override;
221
222 /**
223 Encode the contents of this instance into the buffer. The encoding SHALL
224 be done in little endian format.
225
226 @param buffer The buffer to encode to.
227 @return Length of the encoded information.
228 */
229 unsigned long long encode(unsigned char *buffer) const override;
230
231 /**
232 Calculate the length used to store the stage header information.
233 */
234 unsigned long long calculate_encode_length() const override {
235 return fixed_encode_length();
236 }
237
238 /**
239 Create a string representation of the header to be logged.
240
241 @param output Reference to the output stream where the string will be
242 created.
243 */
244 void dump(std::ostringstream &output) const override;
245
246 private:
247 /**
248 Helper method to calculate the length used to store the stage header
249 information.
250 */
251 static unsigned long long fixed_encode_length() {
255 }
256};
257
258using Gcs_packets_list = std::vector<Gcs_packet>;
260 std::unordered_map<Gcs_message_id, Gcs_packets_list>;
262 std::unordered_map<Gcs_sender_id, Gcs_packets_per_content>;
263
265 public:
266 /**
267 Default split threshold.
268 */
269 static constexpr unsigned long long DEFAULT_THRESHOLD = 1048576;
270
271 /*
272 Methods inherited from the Gcs_message_stage class.
273 */
275 uint64_t const &original_payload_size) const override;
276
277 std::unique_ptr<Gcs_stage_metadata> get_stage_header() override;
278
279 protected:
280 std::pair<bool, std::vector<Gcs_packet>> apply_transformation(
281 Gcs_packet &&packet) override;
282
283 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert_transformation(
284 Gcs_packet &&packet) override;
285
287 const Gcs_packet &packet) const override;
288
289 private:
290 /*
291 Set of packets received that cannot be immediately delivered because its
292 related fragments were not received yet.
293 */
295
296 /**
297 Unique sender identifier that is dynamically generated when a node rejoins
298 the group.
299 */
301
302 /**
303 This marks the threshold in bytes above which a message shall be split.
304 */
306
307 /**
308 Unique message identifier per sender.
309 */
310 std::atomic<Gcs_message_id> m_next_message_number{1};
311
312 public:
313 /**
314 Creates an instance of the stage.
315
316 @param enabled enables this message stage
317 @param split_threshold messages with the payload larger
318 than split_threshold in bytes are split.
319 */
321 unsigned long long split_threshold)
324 m_split_threshold(split_threshold) {}
325
327
328 /**
329 Return the stage code.
330 */
332
333 /**
334 Update the list of members in the group as this is required to process split
335 messages.
336
337 @param me The local member identifier.
338 @param xcom_nodes List of members in the group.
339 @return If there is an error, true is returned. Otherwise, false is returned.
340 */
342 const Gcs_xcom_nodes &xcom_nodes) override;
343
344 /**
345 Sets the threshold in bytes after which messages are split.
346
347 @param split_threshold If the payload exceeds these many bytes, then
348 the message is split.
349 */
350 void set_threshold(unsigned long long split_threshold) {
351 m_split_threshold = split_threshold;
352 }
353
354 private:
355 /**
356 Insert a packet into the mapping that keeps track of fragments.
357
358 This method must only called when the packet received is part of a fragmented
359 message.
360
361 @param packet fragment Fragment that will be collected to reconstruct the
362 original
363 @returns true if successful, false otherwise
364 */
365 bool insert_fragment(Gcs_packet &&packet);
366
367 /**
368 Insert a sender into the mapping that keeps track of sliced packets.
369
370 @param sender_id Source identification
371 */
372 void insert_sender(const Gcs_sender_id &sender_id);
373
374 /**
375 Remove a sender from the mapping that keeps track of sliced packets.
376
377 @param sender_id Source identification
378 */
379 void remove_sender(const Gcs_sender_id &sender_id);
380
381 Gcs_xcom_synode_set get_snapshot() const override;
382
384
385 std::pair<bool, std::vector<Gcs_packet>> create_fragments(
386 Gcs_packet &&packet, unsigned int const &nr_fragments) const;
387
388 std::pair<bool, Gcs_packet> create_fragment(
389 unsigned int const &fragment_part_id, Gcs_packet const &other_fragment,
390 unsigned char const *const original_payload_pointer,
391 unsigned long long const &fragment_size) const;
392
393 bool unknown_sender(Gcs_split_header_v2 const &fragment_header) const;
394
395 bool is_final_fragment(Gcs_split_header_v2 const &fragment_header) const;
396
397 /**
398 Fetch the fragments associated with the given metadata.
399 Removes the fragments from the table of ongoing tranmissions.
400
401 This method must only be called if there were previous calls to @c
402 insert_fragment, i.e. if given metadata is about a fragmented message.
403
404 @param fragment_header Fragmentation metadata
405 @returns the list of already received fragments.
406 */
407 Gcs_packets_list get_fragments(Gcs_split_header_v2 const &fragment_header);
408
409 /**
410 Reassembles the given fragment list into the original, whole packet.
411
412 This method must only be called with a non-empty packet list.
413
414 @param fragments The list of packet to reassemble
415 @retval {true, Gcs_packet} If reassembled successfully
416 @retval {false, _} If we could not allocate memory for the reassembled packet
417 */
418 std::pair<bool, Gcs_packet> reassemble_fragments(
419 Gcs_packets_list &fragments) const;
420};
421
423 public:
424 /**
425 Creates an instance of the stage.
426
427 @param enabled enables this message stage
428 @param split_threshold messages with the payload larger
429 than split_threshold in bytes are split.
430 */
432 unsigned long long split_threshold)
433 : Gcs_message_stage_split_v2(enabled, split_threshold) {}
434
436
437 /**
438 Return the stage code.
439 */
441};
442
443#endif /* GCS_MESSAGE_STAGE_SPLIT_H */
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
Definition: gcs_message_stage_split.h:264
~Gcs_message_stage_split_v2() override
Definition: gcs_message_stage_split.h:326
static constexpr unsigned long long DEFAULT_THRESHOLD
Default split threshold.
Definition: gcs_message_stage_split.h:269
std::pair< bool, Gcs_packet > create_fragment(unsigned int const &fragment_part_id, Gcs_packet const &other_fragment, unsigned char const *const original_payload_pointer, unsigned long long const &fragment_size) const
Definition: gcs_message_stage_split.cc:374
std::pair< bool, std::vector< Gcs_packet > > create_fragments(Gcs_packet &&packet, unsigned int const &nr_fragments) const
Definition: gcs_message_stage_split.cc:278
std::unique_ptr< Gcs_stage_metadata > get_stage_header() override
Definition: gcs_message_stage_split.cc:216
bool update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) override
Update the list of members in the group as this is required to process split messages.
Definition: gcs_message_stage_split.cc:133
void apply_transformation_single_fragment(Gcs_packet &packet) const
Definition: gcs_message_stage_split.cc:251
Gcs_sender_id m_sender_id
Unique sender identifier that is dynamically generated when a node rejoins the group.
Definition: gcs_message_stage_split.h:300
unsigned long long m_split_threshold
This marks the threshold in bytes above which a message shall be split.
Definition: gcs_message_stage_split.h:305
std::pair< bool, Gcs_packet > reassemble_fragments(Gcs_packets_list &fragments) const
Reassembles the given fragment list into the original, whole packet.
Definition: gcs_message_stage_split.cc:536
void insert_sender(const Gcs_sender_id &sender_id)
Insert a sender into the mapping that keeps track of sliced packets.
Definition: gcs_message_stage_split.cc:686
bool is_final_fragment(Gcs_split_header_v2 const &fragment_header) const
Definition: gcs_message_stage_split.cc:487
Gcs_message_stage_split_v2(bool enabled, unsigned long long split_threshold)
Creates an instance of the stage.
Definition: gcs_message_stage_split.h:320
Gcs_message_stage::stage_status skip_revert(const Gcs_packet &packet) const override
Check if the revert operation which affects incoming packets should be executed (i....
Definition: gcs_message_stage_split.cc:626
std::atomic< Gcs_message_id > m_next_message_number
Unique message identifier per sender.
Definition: gcs_message_stage_split.h:310
Gcs_message_stage::stage_status skip_apply(uint64_t const &original_payload_size) const override
Check if the apply operation which affects outgoing packets should be executed (i....
Definition: gcs_message_stage_split.cc:181
Gcs_packets_list get_fragments(Gcs_split_header_v2 const &fragment_header)
Fetch the fragments associated with the given metadata.
Definition: gcs_message_stage_split.cc:519
std::pair< bool, std::vector< Gcs_packet > > apply_transformation(Gcs_packet &&packet) override
Implements the logic of this stage's transformation to the packet, and returns a set of one,...
Definition: gcs_message_stage_split.cc:225
Gcs_xcom_synode_set get_snapshot() const override
Definition: gcs_message_stage_split.cc:694
bool insert_fragment(Gcs_packet &&packet)
Insert a packet into the mapping that keeps track of fragments.
Definition: gcs_message_stage_split.cc:631
Stage_code get_stage_code() const override
Return the stage code.
Definition: gcs_message_stage_split.h:331
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_transformation(Gcs_packet &&packet) override
Implements the logic to revert this stage's transformation to the packet, and returns one,...
Definition: gcs_message_stage_split.cc:429
bool unknown_sender(Gcs_split_header_v2 const &fragment_header) const
Definition: gcs_message_stage_split.cc:480
void remove_sender(const Gcs_sender_id &sender_id)
Remove a sender from the mapping that keeps track of sliced packets.
Definition: gcs_message_stage_split.cc:706
Gcs_packets_per_sender m_packets_per_source
Definition: gcs_message_stage_split.h:294
void set_threshold(unsigned long long split_threshold)
Sets the threshold in bytes after which messages are split.
Definition: gcs_message_stage_split.h:350
Definition: gcs_message_stage_split.h:422
~Gcs_message_stage_split_v3() override
Definition: gcs_message_stage_split.h:435
Gcs_message_stage_split_v3(bool enabled, unsigned long long split_threshold)
Creates an instance of the stage.
Definition: gcs_message_stage_split.h:431
Stage_code get_stage_code() const override
Return the stage code.
Definition: gcs_message_stage_split.h:440
This is a stage in the pipeline that processes messages when they are put through the send and receiv...
Definition: gcs_message_stages.h:82
stage_status
Definition: gcs_message_stages.h:84
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:58
This class is responsible for controlling message fragmentation and bundling and produces messages wi...
Definition: gcs_message_stage_split.h:64
unsigned long long m_payload_length
Size of the current payload which is a full message or part of a message.
Definition: gcs_message_stage_split.h:123
Gcs_message_id m_message_id
Uniquely identify the message so that we can reassemble split messages.
Definition: gcs_message_stage_split.h:100
void set_sender_id(const Gcs_sender_id &sender_id)
Set the sender identifier.
Definition: gcs_message_stage_split.h:150
static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE
On-the-wire field size for the number of messages.
Definition: gcs_message_stage_split.h:69
unsigned long long get_payload_length() const
Return the payload length.
Definition: gcs_message_stage_split.h:211
unsigned long long calculate_encode_length() const override
Calculate the length used to store the stage header information.
Definition: gcs_message_stage_split.h:234
void set_num_messages(unsigned int num_messages)
Set the number of messages bundled together.
Definition: gcs_message_stage_split.h:176
static const unsigned short WIRE_HD_MESSAGE_ID_SIZE
On-the-wire field size for the message sequence (i.e.
Definition: gcs_message_stage_split.h:79
unsigned int m_num_messages
Determine the number of original messages that are included here.
Definition: gcs_message_stage_split.h:107
unsigned int get_message_part_id() const
Return the part that identifies this message.
Definition: gcs_message_stage_split.h:197
static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE
On-the-wire field size for the message part sequence (i.e.
Definition: gcs_message_stage_split.h:84
std::unique_ptr< Gcs_stage_metadata > clone() override
Creates a deep copy of this object.
Definition: gcs_message_stage_split.h:141
unsigned long long decode(const unsigned char *buffer) override
Decodes the contents of the buffer and sets the field values according to the values decoded.
Definition: gcs_message_stage_split.cc:84
void dump(std::ostringstream &output) const override
Create a string representation of the header to be logged.
Definition: gcs_message_stage_split.cc:110
Gcs_sender_id m_sender_id
Uniquely identify the sender which the message belongs to.
Definition: gcs_message_stage_split.h:95
unsigned int get_num_messages() const
Return the number of messages bundled together.
Definition: gcs_message_stage_split.h:183
unsigned long long encode(unsigned char *buffer) const override
Encode the contents of this instance into the buffer.
Definition: gcs_message_stage_split.cc:43
unsigned int m_message_part_id
Determine the part in the original message that the current payload corresponds to.
Definition: gcs_message_stage_split.h:115
static unsigned long long fixed_encode_length()
Helper method to calculate the length used to store the stage header information.
Definition: gcs_message_stage_split.h:251
Gcs_message_id get_message_id() const
Return the message identifier.
Definition: gcs_message_stage_split.h:169
static const unsigned short WIRE_HD_PAYLOAD_SIZE
On-the-wire field size for payload length.
Definition: gcs_message_stage_split.h:89
void set_message_part_id(unsigned int message_part_id)
Set the part that identifies this message.
Definition: gcs_message_stage_split.h:190
const Gcs_sender_id & get_sender_id() const
Return the sender identifier.
Definition: gcs_message_stage_split.h:157
void set_message_id(Gcs_message_id message_id)
Set the message identifier.
Definition: gcs_message_stage_split.h:164
Gcs_split_header_v2()=default
void set_payload_length(unsigned long long payload_length)
Set the payload length.
Definition: gcs_message_stage_split.h:204
static const unsigned short WIRE_HD_SENDER_ID_SIZE
On-the-wire field size for the sender identification size.
Definition: gcs_message_stage_split.h:74
Gcs_split_header_v2(const Gcs_sender_id &sender_id, Gcs_message_id message_id, unsigned int num_messages, unsigned int message_part_id, unsigned long long payload_length) noexcept
Definition: gcs_message_stage_split.h:130
Abstract class that defines specific metadata associated to a stage if it decides to extend it.
Definition: gcs_internal_message_headers.h:562
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:391
Stage_code
The different stages that are currently available.
Definition: gcs_internal_message_headers.h:73
std::unordered_map< Gcs_message_id, Gcs_packets_list > Gcs_packets_per_content
Definition: gcs_message_stage_split.h:260
unsigned long long Gcs_message_id
Definition: gcs_message_stage_split.h:37
uint64_t Gcs_sender_id
Definition: gcs_message_stage_split.h:38
std::unordered_map< Gcs_sender_id, Gcs_packets_per_content > Gcs_packets_per_sender
Definition: gcs_message_stage_split.h:262
std::vector< Gcs_packet > Gcs_packets_list
Definition: gcs_message_stage_split.h:258
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:84
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2870
required bool enabled
Definition: replication_group_member_actions.proto:33