MySQL  8.0.22
Source Code Documentation
gcs_message_stage_split.h
Go to the documentation of this file.
1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef GCS_MESSAGE_STAGE_SPLIT_H
24 #define GCS_MESSAGE_STAGE_SPLIT_H
25 
26 #include <cstdint>
27 #include <map>
28 #include <string>
29 #include <tuple>
30 #include <unordered_map>
31 #include <utility>
32 
35 
36 using Gcs_message_id = unsigned long long;
37 using Gcs_sender_id = uint64_t;
38 
39 /**
40  This class is responsible for controlling message fragmentation and bundling
41  and produces messages with the following format:
42 
43  ---------------------------------------------------------------
44  | Sender Id | Message Id | Part Id | Num Msg | Size | Payload |
45  ---------------------------------------------------------------
46 
47  . Sender Id - Member Identifier (i.e. incarnation UUID) that is used to
48  uniquely identify the original sender.
49 
50  . Message Id - Message Identifier (i.e. transaction counter) that is used to
51  uniquely identify the original message.
52 
53  . Part Id - Fragment of the original message that the payload corresponds to.
54 
55  . Num Msg - The total number of messages that together compose the original
56  message.
57 
58  . Size - The payload size that is being carried on by this message.
59 
60  . Payload - Payload that corresponds to the part of the original message that
61  it is carrying on.
62 */
64  public:
65  /**
66  On-the-wire field size for the number of messages.
67  */
68  static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE = 4;
69 
70  /**
71  On-the-wire field size for the sender identification size.
72  */
73  static const unsigned short WIRE_HD_SENDER_ID_SIZE = 8;
74 
75  /**
76  On-the-wire field size for the message sequence (i.e. identification).
77  */
78  static const unsigned short WIRE_HD_MESSAGE_ID_SIZE = 8;
79 
80  /**
81  On-the-wire field size for the message part sequence (i.e. identification).
82  */
83  static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE = 4;
84 
85  /**
86  On-the-wire field size for payload length.
87  */
88  static const unsigned short WIRE_HD_PAYLOAD_SIZE = 8;
89 
90  private:
91  /**
92  Uniquely identify the sender which the message belongs to.
93  */
95 
96  /**
97  Uniquely identify the message so that we can reassemble splitted messages.
98  */
100  static_assert(sizeof(decltype(m_message_id)) == WIRE_HD_MESSAGE_ID_SIZE,
101  "The m_message_id size does not match the storage capacity");
102 
103  /**
104  Determine the number of original messages that are included here.
105  */
106  unsigned int m_num_messages{1};
107  static_assert(sizeof(decltype(m_num_messages)) == WIRE_HD_NUM_MESSAGES_SIZE,
108  "The m_num_messages size does not match the storage capacity");
109 
110  /**
111  Determine the part in the original message that the current payload
112  corresponds to. Note that the value starts at 0.
113  */
114  unsigned int m_message_part_id{0};
115  static_assert(
116  sizeof(decltype(m_message_part_id)) == WIRE_HD_MESSAGE_PART_ID_SIZE,
117  "The m_message_part_id size does not match the storage capacity");
118 
119  /**
120  Size of the current payload which is a full message or part of a message.
121  */
122  unsigned long long m_payload_length{0};
123  static_assert(sizeof(decltype(m_payload_length)) == WIRE_HD_PAYLOAD_SIZE,
124  "The m_payload_len size does not match the storage capacity");
125 
126  public:
127  explicit Gcs_split_header_v2() = default;
128 
129  explicit Gcs_split_header_v2(const Gcs_sender_id &sender_id,
130  Gcs_message_id message_id,
131  unsigned int num_messages,
132  unsigned int message_part_id,
133  unsigned long long payload_length) noexcept
134  : m_sender_id(sender_id),
135  m_message_id(message_id),
136  m_num_messages(num_messages),
137  m_message_part_id(message_part_id),
138  m_payload_length(payload_length) {}
139 
140  std::unique_ptr<Gcs_stage_metadata> clone() override {
141  return std::unique_ptr<Gcs_split_header_v2>(new Gcs_split_header_v2(*this));
142  }
143 
144  /**
145  Set the sender identifier.
146 
147  @param sender_id Sender identification.
148  */
149  void set_sender_id(const Gcs_sender_id &sender_id) {
150  m_sender_id = sender_id;
151  }
152 
153  /**
154  Return the sender identifier.
155  */
156  const Gcs_sender_id &get_sender_id() const { return m_sender_id; }
157 
158  /**
159  Set the message identifier.
160 
161  @param message_id Message identification.
162  */
163  void set_message_id(Gcs_message_id message_id) { m_message_id = message_id; }
164 
165  /**
166  Return the message identifier.
167  */
169 
170  /**
171  Set the number of messages bundled together.
172 
173  @param num_messages Number of messages bundled together.
174  */
175  void set_num_messages(unsigned int num_messages) {
176  m_num_messages = num_messages;
177  }
178 
179  /**
180  Return the number of messages bundled together.
181  */
182  unsigned int get_num_messages() const { return m_num_messages; }
183 
184  /**
185  Set the part that identifies this message.
186 
187  @param message_part_id Part that identifies this message.
188  */
189  void set_message_part_id(unsigned int message_part_id) {
190  m_message_part_id = message_part_id;
191  }
192 
193  /**
194  Return the part that identifies this message.
195  */
196  unsigned int get_message_part_id() const { return m_message_part_id; }
197 
198  /**
199  Set the payload length.
200 
201  @param payload_length Payload buffer length.
202  */
203  void set_payload_length(unsigned long long payload_length) {
204  m_payload_length = payload_length;
205  }
206 
207  /**
208  Return the payload length.
209  */
210  unsigned long long get_payload_length() const { return m_payload_length; }
211 
212  /**
213  Decodes the contents of the buffer and sets the field values according to
214  the values decoded. The buffer MUST be encoded in little endian format.
215 
216  @param buffer The buffer to decode from.
217  @return Length of the encoded information.
218  */
219  unsigned long long decode(const unsigned char *buffer) override;
220 
221  /**
222  Encode the contents of this instance into the buffer. The encoding SHALL
223  be done in little endian format.
224 
225  @param buffer The buffer to encode to.
226  @return Length of the encoded information.
227  */
228  unsigned long long encode(unsigned char *buffer) const override;
229 
230  /**
231  Calculate the length used to store the stage header information.
232  */
233  unsigned long long calculate_encode_length() const override {
234  return fixed_encode_length();
235  }
236 
237  /**
238  Create a string representation of the header to be logged.
239 
240  @param output Reference to the output stream where the string will be
241  created.
242  */
243  void dump(std::ostringstream &output) const override;
244 
245  private:
246  /**
247  Helper method to calculate the length used to store the stage header
248  information.
249  */
250  static unsigned long long fixed_encode_length() {
251  return WIRE_HD_NUM_MESSAGES_SIZE + WIRE_HD_SENDER_ID_SIZE +
252  WIRE_HD_MESSAGE_ID_SIZE + WIRE_HD_MESSAGE_PART_ID_SIZE +
254  }
255 };
256 
257 using Gcs_packets_list = std::vector<Gcs_packet>;
259  std::unordered_map<Gcs_message_id, Gcs_packets_list>;
261  std::unordered_map<Gcs_sender_id, Gcs_packets_per_content>;
262 
264  public:
265  /**
266  Default split threshold.
267  */
268  static constexpr unsigned long long DEFAULT_THRESHOLD = 1048576;
269 
270  /*
271  Methods inherited from the Gcs_message_stage class.
272  */
274  uint64_t const &original_payload_size) const override;
275 
276  std::unique_ptr<Gcs_stage_metadata> get_stage_header() override;
277 
278  protected:
279  std::pair<bool, std::vector<Gcs_packet>> apply_transformation(
280  Gcs_packet &&packet) override;
281 
282  std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert_transformation(
283  Gcs_packet &&packet) override;
284 
286  const Gcs_packet &packet) const override;
287 
288  private:
289  /*
290  Set of packets received that cannot be immediately delivered because its
291  related fragments were not received yet.
292  */
294 
295  /**
296  Unique sender identifier that is dynamically generated when a node rejoins
297  the group.
298  */
300 
301  /**
302  This marks the threshold in bytes above which a message shall be split.
303  */
304  unsigned long long m_split_threshold{DEFAULT_THRESHOLD};
305 
306  /**
307  Unique message identifier per sender.
308  */
309  std::atomic<Gcs_message_id> m_next_message_number{1};
310 
311  public:
312  /**
313  Creates an instance of the stage.
314 
315  @param enabled enables this message stage
316  @param split_threshold messages with the payload larger
317  than split_threshold in bytes are split.
318  */
320  unsigned long long split_threshold)
321  : Gcs_message_stage(enabled),
322  m_packets_per_source(),
323  m_split_threshold(split_threshold) {}
324 
325  ~Gcs_message_stage_split_v2() override { m_packets_per_source.clear(); }
326 
327  /**
328  Return the stage code.
329  */
330  Stage_code get_stage_code() const override { return Stage_code::ST_SPLIT_V2; }
331 
332  /**
333  Update the list of members in the group as this is required to process split
334  messages.
335 
336  @param me The local member identifier.
337  @param xcom_nodes List of members in the group.
338  @return If there is an error, true is returned. Otherwise, false is returned.
339  */
340  bool update_members_information(const Gcs_member_identifier &me,
341  const Gcs_xcom_nodes &xcom_nodes) override;
342 
343  /**
344  Sets the threshold in bytes after which messages are split.
345 
346  @param split_threshold If the payload exceeds these many bytes, then
347  the message is split.
348  */
349  void set_threshold(unsigned long long split_threshold) {
350  m_split_threshold = split_threshold;
351  }
352 
353  private:
354  /**
355  Insert a packet into the mapping that keeps track of fragments.
356 
357  This method must only called when the packet received is part of a fragmented
358  message.
359 
360  @param packet fragment Fragment that will be collected to reconstruct the
361  orignal
362  @returns true if successful, false otherwise
363  */
364  bool insert_fragment(Gcs_packet &&packet);
365 
366  /**
367  Insert a sender into the mapping that keeps track of sliced packets.
368 
369  @param sender_id Source identification
370  */
371  void insert_sender(const Gcs_sender_id &sender_id);
372 
373  /**
374  Remove a sender from the mapping that keeps track of sliced packets.
375 
376  @param sender_id Source identification
377  */
378  void remove_sender(const Gcs_sender_id &sender_id);
379 
380  Gcs_xcom_synode_set get_snapshot() const override;
381 
382  void apply_transformation_single_fragment(Gcs_packet &packet) const;
383 
384  std::pair<bool, std::vector<Gcs_packet>> create_fragments(
385  Gcs_packet &&packet, unsigned int const &nr_fragments) const;
386 
387  std::pair<bool, Gcs_packet> create_fragment(
388  unsigned int const &fragment_part_id, Gcs_packet const &other_fragment,
389  unsigned char const *const original_payload_pointer,
390  unsigned long long const &fragment_size) const;
391 
392  bool unknown_sender(Gcs_split_header_v2 const &fragment_header) const;
393 
394  bool is_final_fragment(Gcs_split_header_v2 const &fragment_header) const;
395 
396  /**
397  Fetch the fragments associated with the given metadata.
398  Removes the fragments from the table of ongoing tranmissions.
399 
400  This method must only be called if there were previous calls to @c
401  insert_fragment, i.e. if given metadata is about a fragmented message.
402 
403  @param fragment_header Fragmentation metadata
404  @returns the list of already received fragments.
405  */
406  Gcs_packets_list get_fragments(Gcs_split_header_v2 const &fragment_header);
407 
408  /**
409  Reassembles the given fragment list into the orginal, whole packet.
410 
411  This method must only be called with a non-empty packet list.
412 
413  @param fragments The list of packet to reassemble
414  @retval {true, Gcs_packet} If reassembled successfully
415  @retval {false, _} If we could not allocate memory for the reassembled packet
416  */
417  std::pair<bool, Gcs_packet> reassemble_fragments(
418  Gcs_packets_list &fragments) const;
419 };
420 
421 #endif /* GCS_MESSAGE_STAGE_SPLIT_H */
This class is responsible for controlling message fragmentation and bundling and produces messages wi...
Definition: gcs_message_stage_split.h:63
void set_message_id(Gcs_message_id message_id)
Set the message identifier.
Definition: gcs_message_stage_split.h:163
char buffer[STRING_BUFFER]
Definition: test_sql_9_sessions.cc:57
uint64_t Gcs_sender_id
Definition: gcs_message_stage_split.h:37
unsigned long long encode(unsigned char *buffer) const override
Encode the contents of this instance into the buffer.
Definition: gcs_message_stage_split.cc:42
Gcs_packets_per_sender m_packets_per_source
Definition: gcs_message_stage_split.h:293
Gcs_split_header_v2()=default
unsigned long long calculate_encode_length() const override
Calculate the length used to store the stage header information.
Definition: gcs_message_stage_split.h:233
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:83
bool enabled
true if enabled.
Definition: buf0dblwr.cc:80
unsigned int m_message_part_id
Determine the part in the original message that the current payload corresponds to.
Definition: gcs_message_stage_split.h:114
static const unsigned short WIRE_HD_MESSAGE_ID_SIZE
On-the-wire field size for the message sequence (i.e.
Definition: gcs_message_stage_split.h:78
Stage_code
The different stages that are currently available.
Definition: gcs_internal_message_headers.h:72
void dump(std::ostringstream &output) const override
Create a string representation of the header to be logged.
Definition: gcs_message_stage_split.cc:109
unsigned int get_num_messages() const
Return the number of messages bundled together.
Definition: gcs_message_stage_split.h:182
Gcs_message_stage_split_v2(bool enabled, unsigned long long split_threshold)
Creates an instance of the stage.
Definition: gcs_message_stage_split.h:319
static const unsigned short WIRE_HD_NUM_MESSAGES_SIZE
On-the-wire field size for the number of messages.
Definition: gcs_message_stage_split.h:68
std::vector< Gcs_packet > Gcs_packets_list
Definition: gcs_message_stage_split.h:257
std::unique_ptr< Gcs_stage_metadata > clone() override
Creates a deep copy of this object.
Definition: gcs_message_stage_split.h:140
unsigned long long m_payload_length
Size of the current payload which is a full message or part of a message.
Definition: gcs_message_stage_split.h:122
unsigned int m_num_messages
Determine the number of original messages that are included here.
Definition: gcs_message_stage_split.h:106
void set_num_messages(unsigned int num_messages)
Set the number of messages bundled together.
Definition: gcs_message_stage_split.h:175
unsigned long long Gcs_message_id
Definition: gcs_message_stage_split.h:36
Gcs_message_id m_message_id
Uniquely identify the message so that we can reassemble splitted messages.
Definition: gcs_message_stage_split.h:99
Gcs_sender_id m_sender_id
Uniquely identify the sender which the message belongs to.
Definition: gcs_message_stage_split.h:94
unsigned long long decode(const unsigned char *buffer) override
Decodes the contents of the buffer and sets the field values according to the values decoded...
Definition: gcs_message_stage_split.cc:83
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:39
static const unsigned short WIRE_HD_SENDER_ID_SIZE
On-the-wire field size for the sender identification size.
Definition: gcs_message_stage_split.h:73
static const unsigned short WIRE_HD_PAYLOAD_SIZE
On-the-wire field size for payload length.
Definition: gcs_message_stage_split.h:88
Abstract class that defines specific metadata associated to a stage if it decides to extend it...
Definition: gcs_internal_message_headers.h:551
void set_message_part_id(unsigned int message_part_id)
Set the part that identifies this message.
Definition: gcs_message_stage_split.h:189
stage_status
Definition: gcs_message_stages.h:83
std::unordered_map< Gcs_message_id, Gcs_packets_list > Gcs_packets_per_content
Definition: gcs_message_stage_split.h:259
This class contains information on the configuration, i.e set of nodes or simply site definition...
Definition: gcs_xcom_group_member_information.h:390
void set_sender_id(const Gcs_sender_id &sender_id)
Set the sender identifier.
Definition: gcs_message_stage_split.h:149
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
std::unordered_map< Gcs_sender_id, Gcs_packets_per_content > Gcs_packets_per_sender
Definition: gcs_message_stage_split.h:261
~Gcs_message_stage_split_v2() override
Definition: gcs_message_stage_split.h:325
static unsigned long long fixed_encode_length()
Helper method to calculate the length used to store the stage header information. ...
Definition: gcs_message_stage_split.h:250
const Gcs_sender_id & get_sender_id() const
Return the sender identifier.
Definition: gcs_message_stage_split.h:156
std::basic_ostringstream< char, std::char_traits< char >, ut_allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut_allocator.
Definition: ut0new.h:1302
unsigned int get_message_part_id() const
Return the part that identifies this message.
Definition: gcs_message_stage_split.h:196
void set_payload_length(unsigned long long payload_length)
Set the payload length.
Definition: gcs_message_stage_split.h:203
Gcs_message_id get_message_id() const
Return the message identifier.
Definition: gcs_message_stage_split.h:168
Gcs_split_header_v2(const Gcs_sender_id &sender_id, Gcs_message_id message_id, unsigned int num_messages, unsigned int message_part_id, unsigned long long payload_length) noexcept
Definition: gcs_message_stage_split.h:129
void set_threshold(unsigned long long split_threshold)
Sets the threshold in bytes after which messages are split.
Definition: gcs_message_stage_split.h:349
This is a stage in the pipeline that processes messages when they are put through the send and receiv...
Definition: gcs_message_stages.h:81
Stage_code get_stage_code() const override
Return the stage code.
Definition: gcs_message_stage_split.h:330
static const unsigned short WIRE_HD_MESSAGE_PART_ID_SIZE
On-the-wire field size for the message part sequence (i.e.
Definition: gcs_message_stage_split.h:83
unsigned long long get_payload_length() const
Return the payload length.
Definition: gcs_message_stage_split.h:210
Definition: gcs_message_stage_split.h:263