MySQL 8.2.0
Source Code Documentation
gcs_message_stages.h
Go to the documentation of this file.
1/* Copyright (c) 2016, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23#ifndef GCS_MSG_STAGES_H
24#define GCS_MSG_STAGES_H
25
26#include <atomic>
27#include <cassert>
28#include <initializer_list>
29#include <limits>
30#include <map>
31#include <memory>
32#include <set>
33#include <utility>
34#include <vector>
35
40
41/**
42 Error code for the pipeline's processing of incoming packets.
43 */
45 /** Successful, and returned a packet. */
47 /**
48 Successful, but produces no packet.
49 E.g. the incoming packet is a fragment, so it was buffered until all
50 fragments arrive and we reassemble the original message.
51 */
53 /** Unsuccessful. */
54 ERROR
55};
56
57/**
58 This is a stage in the pipeline that processes messages when they are
59 put through the send and receive code paths.
60
61 A stage may apply a transformation to the payload of the message that it is
62 handling. If it does morph the message, it will append a stage header to
63 the message and change the payload accordingly. On the receiving side the
64 GCS receiver thread will revert the transformation before delivering the
65 message to the application.
66
67 An example of a stage is the LZ4 stage that compresses the payload.
68
69 Developers willing to create a new stage have to inherit from this class and
70 implement six virtual methods that are self-explanatory. Note, however, that
71 the current semantics assume that each new stage added to the pipeline will
72 allocate a new buffer and copy the payload, which may be transformed or not,
73 to it.
74
75 This copy assumption makes it easier to create a simple infra-structure to
76 add new stages. Currently, this does not represent a performance bottleneck
77 but we may revisit this design if it becomes a problem. Note that a quick,
78 but maybe not so simple way to overcome this limitation, is through the
79 redefinition of the apply and revert methods.
80 */
82 public:
83 enum class stage_status : unsigned int { apply, skip, abort };
84
85 /**
86 Check if the apply operation which affects outgoing packets should be
87 executed (i.e. applied), skipped or aborted.
88
89 If the outcome is code apply or code skip, the stage will process or skip
90 the message, respectively. However, if the outcome is code abort, the
91 message will be discarded and an error will be reported thus stopping the
92 pipeline execution.
93
94 For example, if a packet's length is less than a pre-defined threshold the
95 packet is not compressed.
96
97 @param original_payload_size The size of the packet to which the
98 transformation should be applied.
99 @return a status specifying whether the transformation should be executed,
100 skipped or aborted
101 */
103 uint64_t const &original_payload_size) const = 0;
104
105 virtual std::unique_ptr<Gcs_stage_metadata> get_stage_header() = 0;
106
107 protected:
108 /**
109 Check if the revert operation which affects incoming packets should be
110 executed (i.e. applied), skipped or aborted.
111
112 If the outcome is code apply or code skip, the stage will process or skip
113 the message, respectively. However, if the outcome is code abort, the
114 message will be discarded and an error will be reported thus stopping the
115 pipeline execution.
116
117 For example, if the packet length is greater than the maximum allowed
118 compressed information an error is returned.
119
120 @param packet The packet upon which the transformation should be applied
121 @return a status specifying whether the transformation should be executed,
122 skipped or aborted
123 */
124 virtual stage_status skip_revert(const Gcs_packet &packet) const = 0;
125
126 /**
127 Implements the logic of this stage's transformation to the packet, and
128 returns a set of one, or more, transformed packets.
129
130 @param[in] packet The packet upon which the transformation should be applied
131 @retval {true, _} If there was an error applying the transformation
132 @retval {false, P} If the transformation was successful, and produced the
133 set of transformed packets P
134 */
135 virtual std::pair<bool, std::vector<Gcs_packet>> apply_transformation(
136 Gcs_packet &&packet) = 0;
137
138 /**
139 Implements the logic to revert this stage's transformation to the packet,
140 and returns one, or none, transformed packet.
141
142 @param[in] packet The packet upon which the transformation should be reverted
143 @retval {ERROR, _} If there was an error reverting the transformation
144 @retval {OK_NO_PACKET, NP} If the transformation was reverted, but produced
145 no packet
146 @retval {OK_PACKET, P} If the transformation was reverted, and produced the
147 packet P
148 */
149 virtual std::pair<Gcs_pipeline_incoming_result, Gcs_packet>
151
152 public:
153 explicit Gcs_message_stage() : m_is_enabled(true) {}
154
156
157 virtual ~Gcs_message_stage() = default;
158
159 /**
160 Return the unique stage code.
161 @return the stage code.
162 */
163 virtual Stage_code get_stage_code() const = 0;
164
165 /**
166 Apply some transformation to the outgoing packet, and return a set of one,
167 or more, transformed packets.
168
169 @param[in] packet The packet upon which the transformation should be applied
170 @retval {true, _} If there was an error applying the transformation
171 @retval {false, P} If the transformation was successful, and produced the
172 set of transformed packets P
173 */
174 std::pair<bool, std::vector<Gcs_packet>> apply(Gcs_packet &&packet);
175
176 /**
177 Revert some transformation from the incoming packet, and return one, or
178 none, transformed packet.
179
180 @param[in] packet The packet upon which the transformation should be reverted
181 @retval {ERROR, _} If there was an error reverting the transformation
182 @retval {OK_NO_PACKET, NP} If the transformation was reverted, but produced
183 no packet
184 @retval {OK_PACKET, P} If the transformation was reverted, and produced the
185 packet P
186 */
187 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert(
188 Gcs_packet &&packet);
189
190 /**
191 Return whether the message stage is enabled or not.
192 */
193 bool is_enabled() const { return m_is_enabled; }
194
195 /**
196 Update the list of members in the group as this may be required by some
197 stages in the communication pipeline. By default though, the call is simply
198 ignored.
199
200 @return If there is an error, true is returned. Otherwise, false is returned.
201 */
203 const Gcs_xcom_nodes &) {
204 return false;
205 }
206
207 virtual Gcs_xcom_synode_set get_snapshot() const { return {}; }
208
209 /**
210 Enable or disable the message stage.
211
212 @param is_enabled Whether the message stage is enabled or disabled.
213 */
215
216 protected:
217 /**
218 Encode the fixed part of the associated dynamic header information into
219 the header buffer.
220
221 @param header Pointer to the header buffer.
222 @param header_length Length of the header information.
223 @param old_payload_length Length of previous stage payload.
224 */
225 void encode(unsigned char *header, unsigned short header_length,
226 unsigned long long old_payload_length);
227
228 /**
229 Decode the fixed part of the associated dynamic header information from the
230 header buffer.
231
232 @param header Pointer to the header buffer
233 @param[out] header_length Pointer to the length of the header information
234 @param[out] old_payload_length Pointer to the length of previous stage
235 payload
236 */
237 void decode(const unsigned char *header, unsigned short *header_length,
238 unsigned long long *old_payload_length);
239
240 private:
242};
243
244/**
245 Definitions of structures that store the possible message stages and their
246 handlers.
247 */
248using Gcs_stages_list = std::vector<Stage_code>;
250 std::map<Stage_code, std::unique_ptr<Gcs_message_stage>>;
251using Gcs_map_version_stages = std::map<Gcs_protocol_version, Gcs_stages_list>;
253 std::pair<const Gcs_protocol_version, Gcs_stages_list>;
254
255/**
256 This is the pipeline that an outgoing or incoming message has to go through
257 when being sent to or received from the group respectively.
258
259 The message pipeline has stages registered and these are assembled in an
260 outgoing pipeline. Then outgoing messages always have to traverse this
261 pipeline. For incoming messages, the pipeline is built on the fly, according to
262 the information contained in the message stage headers.
263
264 The following rules are always enforced to guarantee safety:
265
266 . A node always knows how to process protocol versions in the domain [initial
267 version, max-version-known(node)] by keeping a complete versioned pipeline
268 for the entire domain
269
270 . Every time the pipeline or a message changes, the protocol version is
271 incremented and new pipeline version is also created accordingly with new codes
272 for all the stages
273
274 . Running group can upgrade, but never downgrade, its protocol unless a user
275 explicitly request to downgrade it
276
277 . Older nodes attempting to join a group running a newer protocol will discard
278 all messages because the messages will either: (a) contain an unknown cargo
279 type, or (b) contain an unknown type code
280
281 --- Adding a new stage ---
282
283 If a developer needs to add a new stage to the pipeline, the header protocol
284 version number has to be incremented and the pipeline stage updated as follows:
285
286 Gcs_message_old_stage *old_stage =
287 pipeline.register_stage<Gcs_message_old_stage>();
288 Gcs_message_modified_old_stage *modified_old_stage =
289 pipeline.register_stage<Gcs_message_modified_old_stage>();
290 Gcs_message_new_stage *new_stage =
291 pipeline.register_stage<Gcs_message_new_stage>();
292
293 pipeline.register_pipeline(
294 {
295 {
296 1, {old_stage->get_stage_code()}
297 },
298 {
299 X, {modified_old_stage->get_stage_code(), new_stage->get_stage_code()}
300 }
301 });
302
303 where X is the header protocol version after the update.
304
305 Note that the difference between the two old stages is only the type code.
306
307 --- Changing stage format ---
308
309 If a developer needs to change any stage format, i.e. replace an existing stage
310 of the pipeline, the header protocol version number has to be incremented and
311 the pipeline stage updated as follows:
312
313 Gcs_message_old_stage *old_stage =
314 pipeline.register_stage<Gcs_message_old_stage>();
315 Gcs_message_new_stage *new_stage =
316 pipeline.register_stage<Gcs_message_new_stage>();
317
318 pipeline.register_pipeline(
319 {
320 {
321 1, {old_stage->get_stage_code()}
322 },
323 {
324 X, {new_stage->get_stage_code()}
325 }
326 });
327
328 where X is the header protocol version after the update.
329
330 Note that a new pipeline stage with a unique type code has to be created.
331 Besides, every message will carry the current protocol version in use and this
332 information is available as part of the fixed header and can be read by any
333 stage in order to decide how the message content shall be interpreted.
334
335 --- Changing Cargo ---
336
337 If a developer needs to change a cargo format or create a new one, a new cargo
338 type must always be created as the current cargo types are not prepared to be
339 extended and the header protocol version number has to be incremented and the
340 pipeline stage updated as follows:
341
342 Gcs_message_old_stage *old_stage =
343 pipeline.register_stage<Gcs_message_old_stage>();
344 Gcs_message_modified_old_stage *modified_old_stage =
345 pipeline.register_stage<Gcs_message_modified_old_stage>();
346
347 pipeline.register_pipeline(
348 {
349 {
350 1, {old_stage->get_stage_code()}
351 },
352 {
353 X, {modified_old_stage->get_stage_code()}
354 }
355 });
356
357 where X is the header protocol version after the update.
358
359 Although the cargo type has no direct relation to the message pipeline stages,
360 increasing the protocol version number will allow nodes to decide if versions
361 are compatible. Note that the difference between the two old stages is only
362 the type code.
363 */
365 private:
366 /**
367 The registered stages. These are all stages that are known by this version
368 of MySQL GCS. This needs to contain an instance of all possible stages,
369 since it needs to handle cross-version communication.
370 */
372
373 /**
374 This is the pre-assembled outgoing pipelines for the different versions that
375 are currently supported, meaning that the stages are traversed in the given
376 order.
377 */
379
380 /**
381 The pipeline version in use.
382 */
383 std::atomic<Gcs_protocol_version> m_pipeline_version;
384
385 public:
387 : m_handlers(),
388 m_pipelines(),
390
392
394
396
398
399 virtual ~Gcs_message_pipeline() = default;
400
401 /**
402 This member function SHALL be called by the message sender. It makes the
403 message go through the pipeline of stages before it is actually handed
404 over to the group communication engine.
405
406 Note that the fragmentation layer may produce more than one packet.
407
408 @param[in] msg_data Message data to send.
409 @param[in] cargo The cargo type of the message to send
410 @retval {true, _} If there was an error in the pipeline
411 @retval {false, P} If the pipeline was successful, and produced the
412 set of transformed packets P
413 */
414 std::pair<bool, std::vector<Gcs_packet>> process_outgoing(
415 Gcs_message_data const &msg_data, Cargo_type cargo) const;
416
417 /**
418 This member function SHALL be called by the receiver thread to process the
419 packet through the stages it was processed when it was sent. This reverts
420 the effect on the receiving end.
421
422 @param packet The packet to process.
423 @retval {ERROR, _} If there was an error in the pipeline
424 @retval {OK_NO_PACKET, NP} If the pipeline was successful, but produced no
425 packet
426 @retval {OK_PACKET, P} If the pipeline was successful, and produced the
427 packet P
428 */
429 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> process_incoming(
430 Gcs_packet &&packet) const;
431
432 /**
433 Update the list of members in the group as this may be required by some
434 stages in the communication pipeline. By default though, the call is simply
435 ignored.
436
437 @param me The local member identifier.
438 @param xcom_nodes List of members in the group.
439 */
441 const Gcs_xcom_nodes &xcom_nodes) const;
442
444
445 /**
446 Register a stage to be used by the pipeline.
447
448 @tparam T Stage class type
449 @tparam Args Type of Parameters to the stage constructor
450 @param args Parameters to the stage constructor
451 */
452 template <class T, class... Args>
453 void register_stage(Args... args) {
454 std::unique_ptr<T> stage(new T(args...));
455
456 if (stage != nullptr) {
457 Stage_code code = stage->get_stage_code();
459 if (ptr == nullptr) {
460 m_handlers.insert(
461 std::make_pair(stage->get_stage_code(), std::move(stage)));
462 }
463 }
464 }
465
466 /**
467 Check whether a stage is registered or not.
468
469 @param code Stage code
470 @return whether a stage is registered or not.
471 */
472 bool contains_stage(Stage_code code) const {
473 return retrieve_stage(code) != nullptr;
474 }
475
476 /*
477 Return a reference to a stage. Note that the stage must exist, otherwise,
478 the call will lead to an undefined behavior.
479
480 @param code Stage code
481 @return a reference to a stage
482 */
485 assert(ptr != nullptr);
486 return *ptr;
487 }
488
489 /**
490 Register the stages per version that form the different pipelines.
491
492 This method must be called after registering all the desired stages using
493 register_stage.
494
495 This method must only be called on an unregistered pipeline.
496 If you want to reuse the pipeline, new calls to this method must be preceded
497 by calls to cleanup and register_stage.
498
499 @param stages Initialization list that contains a mapping between a
500 version and the associated pipeline stages.
501
502 @return false on success, true otherwise.
503 */
504 bool register_pipeline(std::initializer_list<Gcs_pair_version_stages> stages);
505
506 /**
507 Check whether a pipeline version is registered or not.
508
509 @param pipeline_version Pipeline version
510 @return whether a pipeline version is registered or not.
511 */
512 bool contains_pipeline(Gcs_protocol_version pipeline_version) const {
513 return retrieve_pipeline(pipeline_version) != nullptr;
514 }
515
516 /*
517 Return a reference to a pipeline version. Note that the pipeline version
518 must exist, otherwise, the call will lead to an undefined behavior.
519
520 @param pipeline_version Pipeline version
521 @return a reference to a pipeline
522 */
524 Gcs_protocol_version pipeline_version) const {
525 const Gcs_stages_list *ptr = retrieve_pipeline(pipeline_version);
526 assert(ptr != nullptr);
527 return *ptr;
528 }
529
530 /**
531 Clean all data structures and objects created.
532 */
533 void cleanup();
534
535 /**
536 Set the pipeline version in use.
537
538 @param pipeline_version Pipeline version.
539 @return false if successfully set, true otherwise
540 */
541 bool set_version(Gcs_protocol_version pipeline_version);
542
543 /**
544 Return the pipeline version in use.
545 */
547
548 private:
549 /**
550 Retrieve the stages associated with a pipeline version.
551
552 @param pipeline_version Pipeline version
553 */
555 Gcs_protocol_version pipeline_version) const;
556
557 /**
558 This member function SHALL retrieve the associated stage if there is any,
559 otherwise a null pointer is returned.
560
561 @param stage_code unique stage code
562 */
564
565 /**
566 This member function SHALL retrieve the current stage type code of a packet.
567
568 @param p the packet to process.
569 */
571
572 /**
573 Find out which stages should be applied to an outgoing message.
574
575 @param pipeline_version The pipeline version to use
576 @param original_payload_size The size of the outgoing message
577 @retval {true, _} If there was an error
578 @retval {false, S} If successful, and the message should go through the
579 sequence of stages S
580 */
581 std::pair<bool, std::vector<Stage_code>> get_stages_to_apply(
582 Gcs_protocol_version const &pipeline_version,
583 uint64_t const &original_payload_size) const;
584
585 /**
586 Create a packet for a message with size original_payload_size and type
587 cargo, that will go through the stages stages_to_apply from pipeline
588 version current_version.
589
590 @param cargo The message type
591 @param current_version The pipeline version
592 @param original_payload_size The payload size
593 @param stages_to_apply The stages that will be applied to the packet
594 @retval {true, _} If there was an error creating the packet
595 @retval {false, P} If successful, and created packet P
596 */
597 std::pair<bool, Gcs_packet> create_packet(
598 Cargo_type const &cargo, Gcs_protocol_version const &current_version,
599 uint64_t const &original_payload_size,
600 std::vector<Stage_code> const &stages_to_apply) const;
601
602 /**
603 Apply the given stages to the given outgoing packet.
604
605 @param packet The packet to transform
606 @param stages The stages to apply
607 @retval {true, _} If there was an error applying the stages
608 @retval {false, P} If the stages were successfully applied, and produced
609 the set of transformed packets P
610 */
611 std::pair<bool, std::vector<Gcs_packet>> apply_stages(
612 Gcs_packet &&packet, std::vector<Stage_code> const &stages) const;
613
614 /**
615 Apply the given stage to the given outgoing packet.
616
617 @param packets The packet to transform
618 @param stage The stage to apply
619 @retval {true, _} If there was an error applying the stage
620 @retval {false, P} If the stage was successfully applied, and produced the
621 set of transformed packets P
622 */
623 std::pair<bool, std::vector<Gcs_packet>> apply_stage(
624 std::vector<Gcs_packet> &&packets, Gcs_message_stage &stage) const;
625
626 /**
627 Revert the given stage to the given incoming packet.
628
629 @param packet The packet to transform
630 @param stage_code The stage to revert
631 @retval {ERROR, _} If there was an error in the stage
632 @retval {OK_NO_PACKET, NP} If the stage was successfully reverted, but
633 produced no packet
634 @retval {OK_PACKET, P} If the stage was successfully reverted, and produced
635 the packet P
636 */
637 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert_stage(
638 Gcs_packet &&packet, Stage_code const &stage_code) const;
639};
640
641#endif
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:39
This class serves as data container for information flowing in the GCS ecosystem.
Definition: gcs_message.h:47
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:364
std::pair< bool, Gcs_packet > create_packet(Cargo_type const &cargo, Gcs_protocol_version const &current_version, uint64_t const &original_payload_size, std::vector< Stage_code > const &stages_to_apply) const
Create a packet for a message with size original_payload_size and type cargo, that will go through th...
Definition: gcs_message_stages.cc:249
std::pair< bool, std::vector< Stage_code > > get_stages_to_apply(Gcs_protocol_version const &pipeline_version, uint64_t const &original_payload_size) const
Find out which stages should be applied to an outgoing message.
Definition: gcs_message_stages.cc:213
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > process_incoming(Gcs_packet &&packet) const
This member function SHALL be called by the receiver thread to process the packet through the stages ...
Definition: gcs_message_stages.cc:338
virtual ~Gcs_message_pipeline()=default
const Gcs_stages_list * retrieve_pipeline(Gcs_protocol_version pipeline_version) const
Retrieve the stages associated with a pipeline version.
Definition: gcs_message_stages.cc:415
bool contains_pipeline(Gcs_protocol_version pipeline_version) const
Check whether a pipeline version is registered or not.
Definition: gcs_message_stages.h:512
Gcs_message_stage & get_stage(Stage_code code) const
Definition: gcs_message_stages.h:483
Gcs_message_pipeline(Gcs_message_pipeline &&p)=delete
Gcs_message_pipeline & operator=(Gcs_message_pipeline &&p)=delete
void update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) const
Update the list of members in the group as this may be required by some stages in the communication p...
Definition: gcs_message_stages.cc:390
Gcs_map_type_handler m_handlers
The registered stages.
Definition: gcs_message_stages.h:371
Gcs_message_pipeline()
Definition: gcs_message_stages.h:386
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_stage(Gcs_packet &&packet, Stage_code const &stage_code) const
Revert the given stage to the given incoming packet.
Definition: gcs_message_stages.cc:370
const Gcs_stages_list & get_pipeline(Gcs_protocol_version pipeline_version) const
Definition: gcs_message_stages.h:523
Gcs_protocol_version get_version() const
Return the pipeline version in use.
Definition: gcs_message_stages.cc:491
std::pair< bool, std::vector< Gcs_packet > > process_outgoing(Gcs_message_data const &msg_data, Cargo_type cargo) const
This member function SHALL be called by the message sender.
Definition: gcs_message_stages.cc:116
bool contains_stage(Stage_code code) const
Check whether a stage is registered or not.
Definition: gcs_message_stages.h:472
void cleanup()
Clean all data structures and objects created.
Definition: gcs_message_stages.cc:495
Gcs_map_version_stages m_pipelines
This is the pre-assembled outgoing pipelines for the different versions that are currently supported,...
Definition: gcs_message_stages.h:378
std::pair< bool, std::vector< Gcs_packet > > apply_stage(std::vector< Gcs_packet > &&packets, Gcs_message_stage &stage) const
Apply the given stage to the given outgoing packet.
Definition: gcs_message_stages.cc:311
Gcs_message_stage * retrieve_stage(const Gcs_packet &p) const
This member function SHALL retrieve the current stage type code of a packet.
Gcs_message_stage * retrieve_stage(Stage_code stage_code) const
This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is...
Definition: gcs_message_stages.cc:408
void register_stage(Args... args)
Register a stage to be used by the pipeline.
Definition: gcs_message_stages.h:453
std::atomic< Gcs_protocol_version > m_pipeline_version
The pipeline version in use.
Definition: gcs_message_stages.h:383
Gcs_message_pipeline(Gcs_message_pipeline &p)=delete
Gcs_xcom_synode_set get_snapshot() const
Definition: gcs_message_stages.cc:397
bool set_version(Gcs_protocol_version pipeline_version)
Set the pipeline version in use.
Definition: gcs_message_stages.cc:483
std::pair< bool, std::vector< Gcs_packet > > apply_stages(Gcs_packet &&packet, std::vector< Stage_code > const &stages) const
Apply the given stages to the given outgoing packet.
Definition: gcs_message_stages.cc:287
Gcs_message_pipeline & operator=(const Gcs_message_pipeline &p)=delete
bool register_pipeline(std::initializer_list< Gcs_pair_version_stages > stages)
Register the stages per version that form the different pipelines.
Definition: gcs_message_stages.cc:422
This is a stage in the pipeline that processes messages when they are put through the send and receiv...
Definition: gcs_message_stages.h:81
virtual Gcs_xcom_synode_set get_snapshot() const
Definition: gcs_message_stages.h:207
void encode(unsigned char *header, unsigned short header_length, unsigned long long old_payload_length)
Encode the fixed part of the associated dynamic header information into the header buffer.
virtual std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_transformation(Gcs_packet &&packet)=0
Implements the logic to revert this stage's transformation to the packet, and returns one,...
bool m_is_enabled
Definition: gcs_message_stages.h:241
virtual std::pair< bool, std::vector< Gcs_packet > > apply_transformation(Gcs_packet &&packet)=0
Implements the logic of this stage's transformation to the packet, and returns a set of one,...
std::pair< bool, std::vector< Gcs_packet > > apply(Gcs_packet &&packet)
Apply some transformation to the outgoing packet, and return a set of one, or more,...
Definition: gcs_message_stages.cc:47
virtual Stage_code get_stage_code() const =0
Return the unique stage code.
Gcs_message_stage()
Definition: gcs_message_stages.h:153
virtual stage_status skip_apply(uint64_t const &original_payload_size) const =0
Check if the apply operation which affects outgoing packets should be executed (i....
void decode(const unsigned char *header, unsigned short *header_length, unsigned long long *old_payload_length)
Decode the fixed part of the associated dynamic header information from the header buffer.
virtual std::unique_ptr< Gcs_stage_metadata > get_stage_header()=0
void set_enabled(bool is_enabled)
Enable or disable the message stage.
Definition: gcs_message_stages.h:214
virtual stage_status skip_revert(const Gcs_packet &packet) const =0
Check if the revert operation which affects incoming packets should be executed (i....
bool is_enabled() const
Return whether the message stage is enabled or not.
Definition: gcs_message_stages.h:193
virtual bool update_members_information(const Gcs_member_identifier &, const Gcs_xcom_nodes &)
Update the list of members in the group as this may be required by some stages in the communication p...
Definition: gcs_message_stages.h:202
stage_status
Definition: gcs_message_stages.h:83
Gcs_message_stage(bool enabled)
Definition: gcs_message_stages.h:155
virtual ~Gcs_message_stage()=default
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert(Gcs_packet &&packet)
Revert some transformation from the incoming packet, and return one, or none, transformed packet.
Definition: gcs_message_stages.cc:75
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:57
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:390
const char * p
Definition: ctype-mb.cc:1234
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:114
Stage_code
The different stages that are currently available.
Definition: gcs_internal_message_headers.h:72
std::map< Gcs_protocol_version, Gcs_stages_list > Gcs_map_version_stages
Definition: gcs_message_stages.h:251
std::pair< const Gcs_protocol_version, Gcs_stages_list > Gcs_pair_version_stages
Definition: gcs_message_stages.h:253
std::vector< Stage_code > Gcs_stages_list
Definitions of structures that store the possible message stages and their handlers.
Definition: gcs_message_stages.h:248
std::map< Stage_code, std::unique_ptr< Gcs_message_stage > > Gcs_map_type_handler
Definition: gcs_message_stages.h:250
Gcs_pipeline_incoming_result
Error code for the pipeline's processing of incoming packets.
Definition: gcs_message_stages.h:44
@ OK_NO_PACKET
Successful, but produces no packet.
@ OK_PACKET
Successful, and returned a packet.
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:127
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:83
required bool enabled
Definition: replication_group_member_actions.proto:32