MySQL 8.0.37
Source Code Documentation
payload_event_buffer_istream.h
Go to the documentation of this file.
1/* Copyright (c) 2023, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
25#define BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
26
27#include <memory>
28#include <string>
29
30#include "libbinlogevents/include/buffer/managed_buffer.h" // mysqlns::buffer::Managed_buffer
31#include "libbinlogevents/include/compression/decompressor.h" // mysqlns::compression::Decompressor
32#include "libbinlogevents/include/compression/factory.h" // mysqlns::compression::Factory
33#include "libbinlogevents/include/control_events.h" // Transaction_payload_event
34#include "libbinlogevents/include/event_reader.h" // Event_reader
35#include "libbinlogevents/include/nodiscard.h" // NODISCARD
36
37/// @addtogroup Replication
38/// @{
39///
40/// @file payload_event_buffer_istream.h
41///
42/// Stream class that yields decompressed event byte buffers from a
43/// Transaction_payload_log_event.
44
46
47/// Stream class that yields a stream of byte buffers, each holding the
48/// raw decompressed data of one event contained in a
49/// Transaction_payload_log_event.
50///
51/// The suggested use pattern is:
52///
53/// while (stream >> event_buffer) {
54/// // handle event
55/// }
56/// if (stream.error()) {
57/// // handle error
58/// }
60 public:
61 using Char_t = unsigned char;
69 using Buffer_ptr_t = std::shared_ptr<Buffer_view_t>;
70 using Managed_buffer_ptr_t = std::shared_ptr<Managed_buffer_t>;
72 template <class T>
74
75 /// Construct the stream from the raw compressed data.
76 ///
77 /// This stream will keep a pointer to the buffer, so the caller
78 /// must ensure that the buffer outlives the stream.
79 ///
80 /// @param compressed_buffer Input buffer (compressed bytes).
81 ///
82 /// @param compressed_buffer_size Size of input buffer.
83 ///
84 /// @param compression_algorithm The algorithm the input was
85 /// compressed with.
86 ///
87 /// @param default_buffer_size The default size of the internal
88 /// event buffer. To tune this, consider that bigger buffers reduce
89 /// allocations since one buffer will be reused for all smaller
90 /// events, whereas smaller buffers reduce memory footprint in case
91 /// all events fit within the buffer size.
92 ///
93 /// @param memory_resource @c Memory_resource object used to
94 /// allocate memory.
95 template <class String_char_t>
97 const String_char_t *compressed_buffer, Size_t compressed_buffer_size,
98 type compression_algorithm, Size_t default_buffer_size = 0,
99 const Memory_resource_t &memory_resource = Memory_resource_t())
100 : m_memory_resource(memory_resource),
102 reinterpret_cast<const Char_t *>(compressed_buffer)),
103 m_compressed_buffer_size(compressed_buffer_size),
104 m_compression_algorithm(compression_algorithm),
105 m_default_buffer_size(default_buffer_size) {
106 initialize();
107 }
108
109 /// Construct the stream from the raw compressed data.
110 ///
111 /// This stream will keep a pointer to the buffer, so the caller
112 /// must ensure that the buffer outlives the stream.
113 ///
114 /// @param compressed_data Input buffer (compressed bytes).
115 ///
116 /// @param compression_algorithm The algorithm the input was
117 /// compressed with.
118 ///
119 /// @param default_buffer_size The default size of the internal
120 /// event buffer. To tune this, consider that bigger buffers reduce
121 /// allocations since one buffer will be reused for all smaller
122 /// events, whereas smaller buffers reduce memory footprint in case
123 /// all events fit within the buffer size.
124 ///
125 /// @param memory_resource @c Memory_resource object used to
126 /// allocate memory.
127 // Nolint: clang-tidy does not recognize that m_default_buffer_size
128 // is initialized, despite it is initialized in the targed
129 // constructor.
130 // NOLINTBEGIN(cppcoreguidelines-pro-type-member-init)
131 template <class String_char_t>
133 const std::basic_string<String_char_t> &compressed_data,
134 type compression_algorithm, Size_t default_buffer_size = 0,
135 const Memory_resource_t &memory_resource = Memory_resource_t())
137 compressed_data.data(), compressed_data.size(),
138 compression_algorithm, default_buffer_size, memory_resource) {}
139 // NOLINTEND(cppcoreguidelines-pro-type-member-init)
140
141 /// Construct the stream from a (non-owned) Payload Event.
142 ///
143 /// This stream will keep a pointer to the buffer owned by the
144 /// event, so the caller must ensure that the event outlives the
145 /// stream.
146 ///
147 /// @param transaction_payload_log_event Event containing the
148 /// compressed data.
149 ///
150 /// @param default_buffer_size The default size of the internal
151 /// event buffer. To tune this, consider that bigger buffers reduce
152 /// allocations since one buffer will be reused for all smaller
153 /// events, whereas smaller buffers reduce memory footprint in case
154 /// all events fit within the buffer size.
155 ///
156 /// @param memory_resource @c Memory_resource object used to
157 /// allocate memory.
159 const Transaction_payload_event &transaction_payload_log_event,
160 Size_t default_buffer_size = 0,
161 const Memory_resource_t &memory_resource = Memory_resource_t());
162
163 /// Construct the stream from a shared pointer to an event.
164 ///
165 /// This stream will, as long as it lives, hold shared ownership of
166 /// the event. If, when this stream is deleted, it is the last
167 /// owner of the event, it will delete the event.
168 ///
169 /// @param tple Event containing the compressed data.
170 ///
171 /// @param default_buffer_size The default size of the internal
172 /// event buffer. To tune this, consider that bigger buffers reduce
173 /// allocations since one buffer will be reused for all smaller
174 /// events, whereas smaller buffers reduce memory footprint in case
175 /// all events fit within the buffer size.
176 ///
177 /// @param memory_resource @c Memory_resource object used to
178 /// allocate memory.
180 const std::shared_ptr<const Transaction_payload_event> &tple,
181 Size_t default_buffer_size = 0,
182 const Memory_resource_t &memory_resource = Memory_resource_t());
183
187 delete;
189 delete;
190
192
193 /// Read the next event from the stream and update the stream state.
194 ///
195 /// If the stream status is already something else than
196 /// `Decompress_status::success`, the function does not change the
197 /// status.
198 ///
199 /// If the function was able to read an event, it modifies `out` to
200 /// point to a buffer holding event data. This leaves the stream
201 /// state as `Decompress_status::success`, and subsequent
202 /// invocations of `operator bool` will return true.
203 ///
204 /// If an error occurred, or the end of the stream was reached, the
205 /// function resets `out` to nullptr. It also alters the stream
206 /// state to the relevant `Decompress_status`, and subsequent
207 /// invocations of `operator bool` will return false. If the
208 /// resulting status is not `Decompress_status::end`, an error
209 /// message can subsequently be obtained by calling `get_error_str`.
210 ///
211 /// @note This class attempts to protect against a common coding
212 /// mistake. The mistake occurs when a caller forgets to check the
213 /// reason for ending the stream; whether it actually reached the
214 /// end, or whether there was an error. Normally, the caller should
215 /// act differently in the two cases. The protection mechanism is
216 /// enabled in debug mode, and enforces that the user calls
217 /// `get_status` after the stream ends (whether it ends by reaching
218 /// the end or by an error). If the stream ends, and the user does
219 /// not call `get_status`, and then the stream object is destroyed,
220 /// the destructor raises an assertion.
221 ///
222 /// @note The output is a reference to a shared pointer, and the
223 /// stream is another owner of the same shared pointer. On the next
224 /// invocation of `operator>>`, the buffer will be reused if there
225 /// are no other owners than the stream and the output argument. If
226 /// there are other owners to it, a new buffer is allocated. So a
227 /// caller is allowed to keep a shared pointer to the output buffer
228 /// as long as it needs. If the caller does not keep any shared
229 /// pointer to the output buffer, it allows the stream to reduce
230 /// allocations and memory footprint.
231 ///
232 /// @note Compressed events never have a checksum, regardless of
233 /// configuration. If the event is to be decoded, you must disable
234 /// checksum checks first.
235 ///
236 /// @param[out] out The target buffer where this function will store
237 /// the event data.
238 ///
239 /// @retval this
241
242 /// Read the next event into a Managed_buffer.
243 ///
244 /// @see operator>>(Buffer_ptr_t &out)
246
247 /// Indicate if EOF or error has not happened.
248 ///
249 /// @retval true last read was successful, or no read has yet been
250 /// attempted).
251 ///
252 /// @retval false last read resulted in end-of-stream or error.
253 explicit operator bool() const;
254
255 /// Indicate if EOF or error has happened. This is the negation of
256 /// `operator bool`.
257 ///
258 /// @retval false last read was successful, or no read has yet been
259 /// attempted.
260 ///
261 /// @retval true last read resulted in end-of-stream or error.
262 bool operator!() const;
263
264 /// Return the stream status.
265 ///
266 /// @retval success Last read was successful
267 ///
268 /// @retval end Last read could not complete because the position
269 /// was at the end.
270 ///
271 /// @retval out_of_memory Error: memory allocation failed.
272 ///
273 /// @retval exceeds_max_size Error: the event was larger than
274 /// the configured maximum.
275 ///
276 /// @retval corrupted A corruption error was reported from ZSTD, or
277 /// the stream was truncated.
278 Status_t get_status() const;
279
280 /// Return true if there was an error.
281 bool has_error() const;
282
283 /// Return a string that describes the last error, or empty string.
284 ///
285 /// This corresponds to the return value from `has_error`. When
286 /// `has_error` returns success or eof, `get_error_str` returns an
287 /// empty string.
288 std::string get_error_str() const;
289
290 /// Return Grow_calculator used for output buffers.
292
293 /// Set a new Grow_calculator to use for output buffers.
294 void set_grow_calculator(const Grow_calculator_t &grow_calculator);
295
296 private:
297 /// Construct and initialize the decompressor.
298 ///
299 /// This will attempt to initialize the decompressor and feed it the
300 /// input buffer. If an error occurs, the stream error state is set
301 /// accordingly.
302 void initialize();
303
304 /// Decompress the next event into the internal buffer.
305 ///
306 /// If any error occurs, the stream error state is set accordingly.
307 void next();
308
309 /// Allocate the output buffer if needed.
310 ///
311 /// It reuses the existing buffer if this object holds the only
312 /// reference to it. Otherwise it allocates a new buffer. If
313 /// allocation fails, the stream error state is set accordingly.
314 void update_buffer();
315
316 // Worker function that actually reads the event, used by the
317 // implementation of `operator>>`.
318 void read_event();
319
320 /// Specify the string that subsequent calls to error_str will
321 /// return.
322 void set_error_str(const std::string &s);
323
324 /// Update the status.
326
327 /// Memory_resource to handle all allocations.
329
330 // Input.
331
332 /// The buffer we are reading from.
334 /// Size of the buffer we are reading from.
336 /// Compression algorithm we are using.
338 /// The event we are reading from. We don't use this, we only hold
339 /// the shared pointer to prevent that the caller destroys the
340 /// object that owns the buffer.
341 std::shared_ptr<const Transaction_payload_event> m_tple{nullptr};
342
343 // Output.
344
345 /// Grow calculator for the Managed_buffer.
347 /// Default buffer size for the Managed_buffer.
349 /// Shared pointer to Managed_buffer that holds the output. This
350 /// will be shared with API clients. Therefore, API clients can use
351 /// the returned buffer as long as they like. The next time this
352 /// objects needs a buffer to write the output, it uses the shared
353 /// object if the API clients have stopped using it; otherwise
354 /// allocates a new Managed_buffer.
356
357 // Operation and status.
358
359 /// Decompressor.
360 std::unique_ptr<Decompressor_t> m_decompressor{nullptr};
361 /// Error status.
363 /// Error string.
364 std::string m_error_str;
365#ifndef NDEBUG
366 /// True if a read has failed but neither `get_error_str`,
367 /// `has_error`, nor `get_status` has been called.
368 mutable bool m_outstanding_error{false};
369#endif
370};
371
372} // namespace binary_log::transaction::compression
373
374/// @} (end of group Replication)
375
376#endif // ifndef
377 // BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
Event that encloses all the events of a transaction.
Definition: control_events.h:727
Abstract base class for decompressors.
Definition: decompressor.h:57
Stream class that yields a stream of byte buffers, each holding the raw decompressed data of one even...
Definition: payload_event_buffer_istream.h:59
Status_t m_status
Error status.
Definition: payload_event_buffer_istream.h:362
Grow_calculator_t m_grow_calculator
Grow calculator for the Managed_buffer.
Definition: payload_event_buffer_istream.h:346
unsigned char Char_t
Definition: payload_event_buffer_istream.h:61
void set_grow_calculator(const Grow_calculator_t &grow_calculator)
Set a new Grow_calculator to use for output buffers.
Definition: payload_event_buffer_istream.cpp:337
void update_buffer()
Allocate the output buffer if needed.
Definition: payload_event_buffer_istream.cpp:133
std::shared_ptr< const Transaction_payload_event > m_tple
The event we are reading from.
Definition: payload_event_buffer_istream.h:341
Managed_buffer_ptr_t m_managed_buffer_ptr
Shared pointer to Managed_buffer that holds the output.
Definition: payload_event_buffer_istream.h:355
Size_t m_default_buffer_size
Default buffer size for the Managed_buffer.
Definition: payload_event_buffer_istream.h:348
void next()
Decompress the next event into the internal buffer.
Definition: payload_event_buffer_istream.cpp:315
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &)=delete
Payload_event_buffer_istream(Payload_event_buffer_istream &&)=delete
Memory_resource_t m_memory_resource
Memory_resource to handle all allocations.
Definition: payload_event_buffer_istream.h:328
bool operator!() const
Indicate if EOF or error has happened.
Definition: payload_event_buffer_istream.cpp:70
std::string get_error_str() const
Return a string that describes the last error, or empty string.
Definition: payload_event_buffer_istream.cpp:102
const Grow_calculator_t & get_grow_calculator() const
Return Grow_calculator used for output buffers.
Definition: payload_event_buffer_istream.cpp:333
Payload_event_buffer_istream & operator>>(Buffer_ptr_t &out)
Read the next event from the stream and update the stream state.
Definition: payload_event_buffer_istream.cpp:283
Payload_event_buffer_istream(const std::basic_string< String_char_t > &compressed_data, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:132
std::string m_error_str
Error string.
Definition: payload_event_buffer_istream.h:364
Size_t m_compressed_buffer_size
Size of the buffer we are reading from.
Definition: payload_event_buffer_istream.h:335
void initialize()
Construct and initialize the decompressor.
Definition: payload_event_buffer_istream.cpp:115
std::shared_ptr< Managed_buffer_t > Managed_buffer_ptr_t
Definition: payload_event_buffer_istream.h:70
void read_event()
Definition: payload_event_buffer_istream.cpp:184
bool has_error() const
Return true if there was an error.
Definition: payload_event_buffer_istream.cpp:95
const Char_t * m_compressed_buffer
The buffer we are reading from.
Definition: payload_event_buffer_istream.h:333
Payload_event_buffer_istream(Payload_event_buffer_istream &)=delete
type m_compression_algorithm
Compression algorithm we are using.
Definition: payload_event_buffer_istream.h:337
void set_status(Status_t status)
Update the status.
Definition: payload_event_buffer_istream.cpp:86
void set_error_str(const std::string &s)
Specify the string that subsequent calls to error_str will return.
Definition: payload_event_buffer_istream.cpp:109
~Payload_event_buffer_istream()
Definition: payload_event_buffer_istream.cpp:60
mysqlns::resource::Memory_resource Memory_resource_t
Definition: payload_event_buffer_istream.h:71
std::unique_ptr< Decompressor_t > m_decompressor
Decompressor.
Definition: payload_event_buffer_istream.h:360
Status_t get_status() const
Return the stream status.
Definition: payload_event_buffer_istream.cpp:79
bool m_outstanding_error
True if a read has failed but neither get_error_str, has_error, nor get_status has been called.
Definition: payload_event_buffer_istream.h:368
std::shared_ptr< Buffer_view_t > Buffer_ptr_t
Definition: payload_event_buffer_istream.h:69
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &&)=delete
Payload_event_buffer_istream(const String_char_t *compressed_buffer, Size_t compressed_buffer_size, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:96
mysqlns::buffer::Buffer_view< Char_t >::Size_t Size_t
Definition: payload_event_buffer_istream.h:62
std::size_t Size_t
The 'size' type.
Definition: buffer_view.h:55
Description of a heuristic to determine how much memory to allocate.
Definition: grow_calculator.h:67
Owned, growable, contiguous memory buffer.
Definition: managed_buffer.h:111
Buffer_view< Char_t > Buffer_view_t
Definition: managed_buffer.h:114
Allocator using a Memory_resource to do the allocator.
Definition: allocator.h:54
Polymorphism-free memory resource class with custom allocator and deallocator functions.
Definition: memory_resource.h:88
Contains the classes representing events operating in the replication stream properties.
Contains the class responsible for deserializing fields of an event previously stored in a buffer.
Container class that provides a contiguous memory buffer to the caller, which the caller can request ...
Decompress_status
Definition: decompress_status.h:32
required uint32 status
Definition: replication_asynchronous_connection_failover.proto:61