MySQL 9.1.0
Source Code Documentation
gcs_message_stages.h
Go to the documentation of this file.
1/* Copyright (c) 2016, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef GCS_MSG_STAGES_H
25#define GCS_MSG_STAGES_H
26
27#include <atomic>
28#include <cassert>
29#include <initializer_list>
30#include <limits>
31#include <map>
32#include <memory>
33#include <set>
34#include <utility>
35#include <vector>
36
41
42/**
43 Error code for the pipeline's processing of incoming packets.
44 */
46 /** Successful, and returned a packet. */
48 /**
49 Successful, but produces no packet.
50 E.g. the incoming packet is a fragment, so it was buffered until all
51 fragments arrive and we reassemble the original message.
52 */
54 /** Unsuccessful. */
55 ERROR
56};
57
58/**
59 This is a stage in the pipeline that processes messages when they are
60 put through the send and receive code paths.
61
62 A stage may apply a transformation to the payload of the message that it is
63 handling. If it does morph the message, it will append a stage header to
64 the message and change the payload accordingly. On the receiving side the
65 GCS receiver thread will revert the transformation before delivering the
66 message to the application.
67
68 An example of a stage is the LZ4 stage that compresses the payload.
69
70 Developers willing to create a new stage have to inherit from this class and
71 implement six virtual methods that are self-explanatory. Note, however, that
72 the current semantics assume that each new stage added to the pipeline will
73 allocate a new buffer and copy the payload, which may be transformed or not,
74 to it.
75
76 This copy assumption makes it easier to create a simple infra-structure to
77 add new stages. Currently, this does not represent a performance bottleneck
78 but we may revisit this design if it becomes a problem. Note that a quick,
79 but maybe not so simple way to overcome this limitation, is through the
80 redefinition of the apply and revert methods.
81 */
83 public:
84 enum class stage_status : unsigned int { apply, skip, abort };
85
86 /**
87 Check if the apply operation which affects outgoing packets should be
88 executed (i.e. applied), skipped or aborted.
89
90 If the outcome is code apply or code skip, the stage will process or skip
91 the message, respectively. However, if the outcome is code abort, the
92 message will be discarded and an error will be reported thus stopping the
93 pipeline execution.
94
95 For example, if a packet's length is less than a pre-defined threshold the
96 packet is not compressed.
97
98 @param original_payload_size The size of the packet to which the
99 transformation should be applied.
100 @return a status specifying whether the transformation should be executed,
101 skipped or aborted
102 */
104 uint64_t const &original_payload_size) const = 0;
105
106 virtual std::unique_ptr<Gcs_stage_metadata> get_stage_header() = 0;
107
108 protected:
109 /**
110 Check if the revert operation which affects incoming packets should be
111 executed (i.e. applied), skipped or aborted.
112
113 If the outcome is code apply or code skip, the stage will process or skip
114 the message, respectively. However, if the outcome is code abort, the
115 message will be discarded and an error will be reported thus stopping the
116 pipeline execution.
117
118 For example, if the packet length is greater than the maximum allowed
119 compressed information an error is returned.
120
121 @param packet The packet upon which the transformation should be applied
122 @return a status specifying whether the transformation should be executed,
123 skipped or aborted
124 */
125 virtual stage_status skip_revert(const Gcs_packet &packet) const = 0;
126
127 /**
128 Implements the logic of this stage's transformation to the packet, and
129 returns a set of one, or more, transformed packets.
130
131 @param[in] packet The packet upon which the transformation should be applied
132 @retval {true, _} If there was an error applying the transformation
133 @retval {false, P} If the transformation was successful, and produced the
134 set of transformed packets P
135 */
136 virtual std::pair<bool, std::vector<Gcs_packet>> apply_transformation(
137 Gcs_packet &&packet) = 0;
138
139 /**
140 Implements the logic to revert this stage's transformation to the packet,
141 and returns one, or none, transformed packet.
142
143 @param[in] packet The packet upon which the transformation should be reverted
144 @retval {ERROR, _} If there was an error reverting the transformation
145 @retval {OK_NO_PACKET, NP} If the transformation was reverted, but produced
146 no packet
147 @retval {OK_PACKET, P} If the transformation was reverted, and produced the
148 packet P
149 */
150 virtual std::pair<Gcs_pipeline_incoming_result, Gcs_packet>
152
153 public:
154 explicit Gcs_message_stage() : m_is_enabled(true) {}
155
157
158 virtual ~Gcs_message_stage() = default;
159
160 /**
161 Return the unique stage code.
162 @return the stage code.
163 */
164 virtual Stage_code get_stage_code() const = 0;
165
166 /**
167 Apply some transformation to the outgoing packet, and return a set of one,
168 or more, transformed packets.
169
170 @param[in] packet The packet upon which the transformation should be applied
171 @retval {true, _} If there was an error applying the transformation
172 @retval {false, P} If the transformation was successful, and produced the
173 set of transformed packets P
174 */
175 std::pair<bool, std::vector<Gcs_packet>> apply(Gcs_packet &&packet);
176
177 /**
178 Revert some transformation from the incoming packet, and return one, or
179 none, transformed packet.
180
181 @param[in] packet The packet upon which the transformation should be reverted
182 @retval {ERROR, _} If there was an error reverting the transformation
183 @retval {OK_NO_PACKET, NP} If the transformation was reverted, but produced
184 no packet
185 @retval {OK_PACKET, P} If the transformation was reverted, and produced the
186 packet P
187 */
188 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert(
189 Gcs_packet &&packet);
190
191 /**
192 Return whether the message stage is enabled or not.
193 */
194 bool is_enabled() const { return m_is_enabled; }
195
196 /**
197 Update the list of members in the group as this may be required by some
198 stages in the communication pipeline. By default though, the call is simply
199 ignored.
200
201 @return If there is an error, true is returned. Otherwise, false is returned.
202 */
204 const Gcs_xcom_nodes &) {
205 return false;
206 }
207
208 virtual Gcs_xcom_synode_set get_snapshot() const { return {}; }
209
210 /**
211 Enable or disable the message stage.
212
213 @param is_enabled Whether the message stage is enabled or disabled.
214 */
216
217 protected:
218 /**
219 Encode the fixed part of the associated dynamic header information into
220 the header buffer.
221
222 @param header Pointer to the header buffer.
223 @param header_length Length of the header information.
224 @param old_payload_length Length of previous stage payload.
225 */
226 void encode(unsigned char *header, unsigned short header_length,
227 unsigned long long old_payload_length);
228
229 /**
230 Decode the fixed part of the associated dynamic header information from the
231 header buffer.
232
233 @param header Pointer to the header buffer
234 @param[out] header_length Pointer to the length of the header information
235 @param[out] old_payload_length Pointer to the length of previous stage
236 payload
237 */
238 void decode(const unsigned char *header, unsigned short *header_length,
239 unsigned long long *old_payload_length);
240
241 private:
243};
244
245/**
246 Definitions of structures that store the possible message stages and their
247 handlers.
248 */
249using Gcs_stages_list = std::vector<Stage_code>;
251 std::map<Stage_code, std::unique_ptr<Gcs_message_stage>>;
252using Gcs_map_version_stages = std::map<Gcs_protocol_version, Gcs_stages_list>;
254 std::pair<const Gcs_protocol_version, Gcs_stages_list>;
255
256/**
257 This is the pipeline that an outgoing or incoming message has to go through
258 when being sent to or received from the group respectively.
259
260 The message pipeline has stages registered and these are assembled in an
261 outgoing pipeline. Then outgoing messages always have to traverse this
262 pipeline. For incoming messages, the pipeline is built on the fly, according to
263 the information contained in the message stage headers.
264
265 The following rules are always enforced to guarantee safety:
266
267 . A node always knows how to process protocol versions in the domain [initial
268 version, max-version-known(node)] by keeping a complete versioned pipeline
269 for the entire domain
270
271 . Every time the pipeline or a message changes, the protocol version is
272 incremented and new pipeline version is also created accordingly with new codes
273 for all the stages
274
275 . Running group can upgrade, but never downgrade, its protocol unless a user
276 explicitly request to downgrade it
277
278 . Older nodes attempting to join a group running a newer protocol will discard
279 all messages because the messages will either: (a) contain an unknown cargo
280 type, or (b) contain an unknown type code
281
282 --- Adding a new stage ---
283
284 If a developer needs to add a new stage to the pipeline, the header protocol
285 version number has to be incremented and the pipeline stage updated as follows:
286
287 Gcs_message_old_stage *old_stage =
288 pipeline.register_stage<Gcs_message_old_stage>();
289 Gcs_message_modified_old_stage *modified_old_stage =
290 pipeline.register_stage<Gcs_message_modified_old_stage>();
291 Gcs_message_new_stage *new_stage =
292 pipeline.register_stage<Gcs_message_new_stage>();
293
294 pipeline.register_pipeline(
295 {
296 {
297 1, {old_stage->get_stage_code()}
298 },
299 {
300 X, {modified_old_stage->get_stage_code(), new_stage->get_stage_code()}
301 }
302 });
303
304 where X is the header protocol version after the update.
305
306 Note that the difference between the two old stages is only the type code.
307
308 --- Changing stage format ---
309
310 If a developer needs to change any stage format, i.e. replace an existing stage
311 of the pipeline, the header protocol version number has to be incremented and
312 the pipeline stage updated as follows:
313
314 Gcs_message_old_stage *old_stage =
315 pipeline.register_stage<Gcs_message_old_stage>();
316 Gcs_message_new_stage *new_stage =
317 pipeline.register_stage<Gcs_message_new_stage>();
318
319 pipeline.register_pipeline(
320 {
321 {
322 1, {old_stage->get_stage_code()}
323 },
324 {
325 X, {new_stage->get_stage_code()}
326 }
327 });
328
329 where X is the header protocol version after the update.
330
331 Note that a new pipeline stage with a unique type code has to be created.
332 Besides, every message will carry the current protocol version in use and this
333 information is available as part of the fixed header and can be read by any
334 stage in order to decide how the message content shall be interpreted.
335
336 --- Changing Cargo ---
337
338 If a developer needs to change a cargo format or create a new one, a new cargo
339 type must always be created as the current cargo types are not prepared to be
340 extended and the header protocol version number has to be incremented and the
341 pipeline stage updated as follows:
342
343 Gcs_message_old_stage *old_stage =
344 pipeline.register_stage<Gcs_message_old_stage>();
345 Gcs_message_modified_old_stage *modified_old_stage =
346 pipeline.register_stage<Gcs_message_modified_old_stage>();
347
348 pipeline.register_pipeline(
349 {
350 {
351 1, {old_stage->get_stage_code()}
352 },
353 {
354 X, {modified_old_stage->get_stage_code()}
355 }
356 });
357
358 where X is the header protocol version after the update.
359
360 Although the cargo type has no direct relation to the message pipeline stages,
361 increasing the protocol version number will allow nodes to decide if versions
362 are compatible. Note that the difference between the two old stages is only
363 the type code.
364 */
366 private:
367 /**
368 The registered stages. These are all stages that are known by this version
369 of MySQL GCS. This needs to contain an instance of all possible stages,
370 since it needs to handle cross-version communication.
371 */
373
374 /**
375 This is the pre-assembled outgoing pipelines for the different versions that
376 are currently supported, meaning that the stages are traversed in the given
377 order.
378 */
380
381 /**
382 The pipeline version in use.
383 */
384 std::atomic<Gcs_protocol_version> m_pipeline_version;
385
386 public:
388 : m_handlers(),
389 m_pipelines(),
391
393
395
397
399
400 virtual ~Gcs_message_pipeline() = default;
401
402 /**
403 This member function SHALL be called by the message sender. It makes the
404 message go through the pipeline of stages before it is actually handed
405 over to the group communication engine.
406
407 Note that the fragmentation layer may produce more than one packet.
408
409 @param[in] msg_data Message data to send.
410 @param[in] cargo The cargo type of the message to send
411 @retval {true, _} If there was an error in the pipeline
412 @retval {false, P} If the pipeline was successful, and produced the
413 set of transformed packets P
414 */
415 std::pair<bool, std::vector<Gcs_packet>> process_outgoing(
416 Gcs_message_data const &msg_data, Cargo_type cargo) const;
417
418 /**
419 This member function SHALL be called by the receiver thread to process the
420 packet through the stages it was processed when it was sent. This reverts
421 the effect on the receiving end.
422
423 @param packet The packet to process.
424 @retval {ERROR, _} If there was an error in the pipeline
425 @retval {OK_NO_PACKET, NP} If the pipeline was successful, but produced no
426 packet
427 @retval {OK_PACKET, P} If the pipeline was successful, and produced the
428 packet P
429 */
430 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> process_incoming(
431 Gcs_packet &&packet) const;
432
433 /**
434 Update the list of members in the group as this may be required by some
435 stages in the communication pipeline. By default though, the call is simply
436 ignored.
437
438 @param me The local member identifier.
439 @param xcom_nodes List of members in the group.
440 */
442 const Gcs_xcom_nodes &xcom_nodes) const;
443
445
446 /**
447 Register a stage to be used by the pipeline.
448
449 @tparam T Stage class type
450 @tparam Args Type of Parameters to the stage constructor
451 @param args Parameters to the stage constructor
452 */
453 template <class T, class... Args>
454 void register_stage(Args... args) {
455 std::unique_ptr<T> stage(new T(args...));
456
457 if (stage != nullptr) {
458 Stage_code code = stage->get_stage_code();
460 if (ptr == nullptr) {
461 m_handlers.insert(
462 std::make_pair(stage->get_stage_code(), std::move(stage)));
463 }
464 }
465 }
466
467 /**
468 Check whether a stage is registered or not.
469
470 @param code Stage code
471 @return whether a stage is registered or not.
472 */
473 bool contains_stage(Stage_code code) const {
474 return retrieve_stage(code) != nullptr;
475 }
476
477 /*
478 Return a reference to a stage. Note that the stage must exist, otherwise,
479 the call will lead to an undefined behavior.
480
481 @param code Stage code
482 @return a reference to a stage
483 */
486 assert(ptr != nullptr);
487 return *ptr;
488 }
489
490 /**
491 Register the stages per version that form the different pipelines.
492
493 This method must be called after registering all the desired stages using
494 register_stage.
495
496 This method must only be called on an unregistered pipeline.
497 If you want to reuse the pipeline, new calls to this method must be preceded
498 by calls to cleanup and register_stage.
499
500 @param stages Initialization list that contains a mapping between a
501 version and the associated pipeline stages.
502
503 @return false on success, true otherwise.
504 */
505 bool register_pipeline(std::initializer_list<Gcs_pair_version_stages> stages);
506
507 /**
508 Check whether a pipeline version is registered or not.
509
510 @param pipeline_version Pipeline version
511 @return whether a pipeline version is registered or not.
512 */
513 bool contains_pipeline(Gcs_protocol_version pipeline_version) const {
514 return retrieve_pipeline(pipeline_version) != nullptr;
515 }
516
517 /*
518 Return a reference to a pipeline version. Note that the pipeline version
519 must exist, otherwise, the call will lead to an undefined behavior.
520
521 @param pipeline_version Pipeline version
522 @return a reference to a pipeline
523 */
525 Gcs_protocol_version pipeline_version) const {
526 const Gcs_stages_list *ptr = retrieve_pipeline(pipeline_version);
527 assert(ptr != nullptr);
528 return *ptr;
529 }
530
531 /**
532 Clean all data structures and objects created.
533 */
534 void cleanup();
535
536 /**
537 Set the pipeline version in use.
538
539 @param pipeline_version Pipeline version.
540 @return false if successfully set, true otherwise
541 */
542 bool set_version(Gcs_protocol_version pipeline_version);
543
544 /**
545 Return the pipeline version in use.
546 */
548
549 private:
550 /**
551 Retrieve the stages associated with a pipeline version.
552
553 @param pipeline_version Pipeline version
554 */
556 Gcs_protocol_version pipeline_version) const;
557
558 /**
559 This member function SHALL retrieve the associated stage if there is any,
560 otherwise a null pointer is returned.
561
562 @param stage_code unique stage code
563 */
565
566 /**
567 This member function SHALL retrieve the current stage type code of a packet.
568
569 @param p the packet to process.
570 */
572
573 /**
574 Find out which stages should be applied to an outgoing message.
575
576 @param pipeline_version The pipeline version to use
577 @param original_payload_size The size of the outgoing message
578 @retval {true, _} If there was an error
579 @retval {false, S} If successful, and the message should go through the
580 sequence of stages S
581 */
582 std::pair<bool, std::vector<Stage_code>> get_stages_to_apply(
583 Gcs_protocol_version const &pipeline_version,
584 uint64_t const &original_payload_size) const;
585
586 /**
587 Create a packet for a message with size original_payload_size and type
588 cargo, that will go through the stages stages_to_apply from pipeline
589 version current_version.
590
591 @param cargo The message type
592 @param current_version The pipeline version
593 @param original_payload_size The payload size
594 @param stages_to_apply The stages that will be applied to the packet
595 @retval {true, _} If there was an error creating the packet
596 @retval {false, P} If successful, and created packet P
597 */
598 std::pair<bool, Gcs_packet> create_packet(
599 Cargo_type const &cargo, Gcs_protocol_version const &current_version,
600 uint64_t const &original_payload_size,
601 std::vector<Stage_code> const &stages_to_apply) const;
602
603 /**
604 Apply the given stages to the given outgoing packet.
605
606 @param packet The packet to transform
607 @param stages The stages to apply
608 @retval {true, _} If there was an error applying the stages
609 @retval {false, P} If the stages were successfully applied, and produced
610 the set of transformed packets P
611 */
612 std::pair<bool, std::vector<Gcs_packet>> apply_stages(
613 Gcs_packet &&packet, std::vector<Stage_code> const &stages) const;
614
615 /**
616 Apply the given stage to the given outgoing packet.
617
618 @param packets The packet to transform
619 @param stage The stage to apply
620 @retval {true, _} If there was an error applying the stage
621 @retval {false, P} If the stage was successfully applied, and produced the
622 set of transformed packets P
623 */
624 std::pair<bool, std::vector<Gcs_packet>> apply_stage(
625 std::vector<Gcs_packet> &&packets, Gcs_message_stage &stage) const;
626
627 /**
628 Revert the given stage to the given incoming packet.
629
630 @param packet The packet to transform
631 @param stage_code The stage to revert
632 @retval {ERROR, _} If there was an error in the stage
633 @retval {OK_NO_PACKET, NP} If the stage was successfully reverted, but
634 produced no packet
635 @retval {OK_PACKET, P} If the stage was successfully reverted, and produced
636 the packet P
637 */
638 std::pair<Gcs_pipeline_incoming_result, Gcs_packet> revert_stage(
639 Gcs_packet &&packet, Stage_code const &stage_code) const;
640};
641
642#endif
It represents the identity of a group member within a certain group.
Definition: gcs_member_identifier.h:40
This class serves as data container for information flowing in the GCS ecosystem.
Definition: gcs_message.h:48
This is the pipeline that an outgoing or incoming message has to go through when being sent to or rec...
Definition: gcs_message_stages.h:365
std::pair< bool, Gcs_packet > create_packet(Cargo_type const &cargo, Gcs_protocol_version const &current_version, uint64_t const &original_payload_size, std::vector< Stage_code > const &stages_to_apply) const
Create a packet for a message with size original_payload_size and type cargo, that will go through th...
Definition: gcs_message_stages.cc:250
std::pair< bool, std::vector< Stage_code > > get_stages_to_apply(Gcs_protocol_version const &pipeline_version, uint64_t const &original_payload_size) const
Find out which stages should be applied to an outgoing message.
Definition: gcs_message_stages.cc:214
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > process_incoming(Gcs_packet &&packet) const
This member function SHALL be called by the receiver thread to process the packet through the stages ...
Definition: gcs_message_stages.cc:339
virtual ~Gcs_message_pipeline()=default
const Gcs_stages_list * retrieve_pipeline(Gcs_protocol_version pipeline_version) const
Retrieve the stages associated with a pipeline version.
Definition: gcs_message_stages.cc:416
bool contains_pipeline(Gcs_protocol_version pipeline_version) const
Check whether a pipeline version is registered or not.
Definition: gcs_message_stages.h:513
Gcs_message_stage & get_stage(Stage_code code) const
Definition: gcs_message_stages.h:484
Gcs_message_pipeline(Gcs_message_pipeline &&p)=delete
Gcs_message_pipeline & operator=(Gcs_message_pipeline &&p)=delete
void update_members_information(const Gcs_member_identifier &me, const Gcs_xcom_nodes &xcom_nodes) const
Update the list of members in the group as this may be required by some stages in the communication p...
Definition: gcs_message_stages.cc:391
Gcs_map_type_handler m_handlers
The registered stages.
Definition: gcs_message_stages.h:372
Gcs_message_pipeline()
Definition: gcs_message_stages.h:387
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_stage(Gcs_packet &&packet, Stage_code const &stage_code) const
Revert the given stage to the given incoming packet.
Definition: gcs_message_stages.cc:371
const Gcs_stages_list & get_pipeline(Gcs_protocol_version pipeline_version) const
Definition: gcs_message_stages.h:524
Gcs_protocol_version get_version() const
Return the pipeline version in use.
Definition: gcs_message_stages.cc:492
std::pair< bool, std::vector< Gcs_packet > > process_outgoing(Gcs_message_data const &msg_data, Cargo_type cargo) const
This member function SHALL be called by the message sender.
Definition: gcs_message_stages.cc:117
bool contains_stage(Stage_code code) const
Check whether a stage is registered or not.
Definition: gcs_message_stages.h:473
void cleanup()
Clean all data structures and objects created.
Definition: gcs_message_stages.cc:496
Gcs_map_version_stages m_pipelines
This is the pre-assembled outgoing pipelines for the different versions that are currently supported,...
Definition: gcs_message_stages.h:379
std::pair< bool, std::vector< Gcs_packet > > apply_stage(std::vector< Gcs_packet > &&packets, Gcs_message_stage &stage) const
Apply the given stage to the given outgoing packet.
Definition: gcs_message_stages.cc:312
Gcs_message_stage * retrieve_stage(const Gcs_packet &p) const
This member function SHALL retrieve the current stage type code of a packet.
Gcs_message_stage * retrieve_stage(Stage_code stage_code) const
This member function SHALL retrieve the associated stage if there is any, otherwise a null pointer is...
Definition: gcs_message_stages.cc:409
void register_stage(Args... args)
Register a stage to be used by the pipeline.
Definition: gcs_message_stages.h:454
std::atomic< Gcs_protocol_version > m_pipeline_version
The pipeline version in use.
Definition: gcs_message_stages.h:384
Gcs_message_pipeline(Gcs_message_pipeline &p)=delete
Gcs_xcom_synode_set get_snapshot() const
Definition: gcs_message_stages.cc:398
bool set_version(Gcs_protocol_version pipeline_version)
Set the pipeline version in use.
Definition: gcs_message_stages.cc:484
std::pair< bool, std::vector< Gcs_packet > > apply_stages(Gcs_packet &&packet, std::vector< Stage_code > const &stages) const
Apply the given stages to the given outgoing packet.
Definition: gcs_message_stages.cc:288
Gcs_message_pipeline & operator=(const Gcs_message_pipeline &p)=delete
bool register_pipeline(std::initializer_list< Gcs_pair_version_stages > stages)
Register the stages per version that form the different pipelines.
Definition: gcs_message_stages.cc:423
This is a stage in the pipeline that processes messages when they are put through the send and receiv...
Definition: gcs_message_stages.h:82
virtual Gcs_xcom_synode_set get_snapshot() const
Definition: gcs_message_stages.h:208
void encode(unsigned char *header, unsigned short header_length, unsigned long long old_payload_length)
Encode the fixed part of the associated dynamic header information into the header buffer.
virtual std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert_transformation(Gcs_packet &&packet)=0
Implements the logic to revert this stage's transformation to the packet, and returns one,...
bool m_is_enabled
Definition: gcs_message_stages.h:242
virtual std::pair< bool, std::vector< Gcs_packet > > apply_transformation(Gcs_packet &&packet)=0
Implements the logic of this stage's transformation to the packet, and returns a set of one,...
std::pair< bool, std::vector< Gcs_packet > > apply(Gcs_packet &&packet)
Apply some transformation to the outgoing packet, and return a set of one, or more,...
Definition: gcs_message_stages.cc:48
virtual Stage_code get_stage_code() const =0
Return the unique stage code.
Gcs_message_stage()
Definition: gcs_message_stages.h:154
virtual stage_status skip_apply(uint64_t const &original_payload_size) const =0
Check if the apply operation which affects outgoing packets should be executed (i....
void decode(const unsigned char *header, unsigned short *header_length, unsigned long long *old_payload_length)
Decode the fixed part of the associated dynamic header information from the header buffer.
virtual std::unique_ptr< Gcs_stage_metadata > get_stage_header()=0
void set_enabled(bool is_enabled)
Enable or disable the message stage.
Definition: gcs_message_stages.h:215
virtual stage_status skip_revert(const Gcs_packet &packet) const =0
Check if the revert operation which affects incoming packets should be executed (i....
bool is_enabled() const
Return whether the message stage is enabled or not.
Definition: gcs_message_stages.h:194
virtual bool update_members_information(const Gcs_member_identifier &, const Gcs_xcom_nodes &)
Update the list of members in the group as this may be required by some stages in the communication p...
Definition: gcs_message_stages.h:203
stage_status
Definition: gcs_message_stages.h:84
Gcs_message_stage(bool enabled)
Definition: gcs_message_stages.h:156
virtual ~Gcs_message_stage()=default
std::pair< Gcs_pipeline_incoming_result, Gcs_packet > revert(Gcs_packet &&packet)
Revert some transformation from the incoming packet, and return one, or none, transformed packet.
Definition: gcs_message_stages.cc:76
This class is an abstraction for the packet concept.
Definition: gcs_internal_message.h:58
This class contains information on the configuration, i.e set of nodes or simply site definition.
Definition: gcs_xcom_group_member_information.h:391
const char * p
Definition: ctype-mb.cc:1225
Cargo_type
The different cargo type codes.
Definition: gcs_internal_message_headers.h:115
Stage_code
The different stages that are currently available.
Definition: gcs_internal_message_headers.h:73
std::map< Gcs_protocol_version, Gcs_stages_list > Gcs_map_version_stages
Definition: gcs_message_stages.h:252
std::pair< const Gcs_protocol_version, Gcs_stages_list > Gcs_pair_version_stages
Definition: gcs_message_stages.h:254
std::vector< Stage_code > Gcs_stages_list
Definitions of structures that store the possible message stages and their handlers.
Definition: gcs_message_stages.h:249
std::map< Stage_code, std::unique_ptr< Gcs_message_stage > > Gcs_map_type_handler
Definition: gcs_message_stages.h:251
Gcs_pipeline_incoming_result
Error code for the pipeline's processing of incoming packets.
Definition: gcs_message_stages.h:45
@ OK_NO_PACKET
Successful, but produces no packet.
@ OK_PACKET
Successful, and returned a packet.
Gcs_protocol_version
The GCS protocol versions.
Definition: gcs_types.h:128
std::unordered_set< Gcs_xcom_synode > Gcs_xcom_synode_set
Definition: gcs_xcom_synode.h:84
required bool enabled
Definition: replication_group_member_actions.proto:33