MySQL 8.4.0
Source Code Documentation
payload_event_buffer_istream.h
Go to the documentation of this file.
1/* Copyright (c) 2023, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
25#define BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
26
27#include <memory>
28#include <string>
29
30#include "mysql/binlog/event/compression/buffer/managed_buffer.h" // mysql::binlog::event::compression::buffer::Managed_buffer
31#include "mysql/binlog/event/compression/decompressor.h" // mysqlns::compression::Decompressor
32#include "mysql/binlog/event/compression/factory.h" // mysqlns::compression::Factory
33#include "mysql/binlog/event/control_events.h" // Transaction_payload_event
34#include "mysql/binlog/event/event_reader.h" // Event_reader
35#include "mysql/binlog/event/nodiscard.h" // NODISCARD
36
37/// @addtogroup GroupLibsMysqlBinlogEvent
38/// @{
39///
40/// @file payload_event_buffer_istream.h
41///
42/// Stream class that yields decompressed event byte buffers from a
43/// Transaction_payload_log_event.
44
46
47/// Stream class that yields a stream of byte buffers, each holding the
48/// raw decompressed data of one event contained in a
49/// Transaction_payload_log_event.
50///
51/// The suggested use pattern is:
52///
53/// while (stream >> event_buffer) {
54/// // handle event
55/// }
56/// if (stream.error()) {
57/// // handle error
58/// }
60 public:
61 using Char_t = unsigned char;
62 using Size_t =
72 using Buffer_ptr_t = std::shared_ptr<Buffer_view_t>;
73 using Managed_buffer_ptr_t = std::shared_ptr<Managed_buffer_t>;
75 template <class T>
77
78 /// Construct the stream from the raw compressed data.
79 ///
80 /// This stream will keep a pointer to the buffer, so the caller
81 /// must ensure that the buffer outlives the stream.
82 ///
83 /// @param compressed_buffer Input buffer (compressed bytes).
84 ///
85 /// @param compressed_buffer_size Size of input buffer.
86 ///
87 /// @param compression_algorithm The algorithm the input was
88 /// compressed with.
89 ///
90 /// @param default_buffer_size The default size of the internal
91 /// event buffer. To tune this, consider that bigger buffers reduce
92 /// allocations since one buffer will be reused for all smaller
93 /// events, whereas smaller buffers reduce memory footprint in case
94 /// all events fit within the buffer size.
95 ///
96 /// @param memory_resource @c Memory_resource object used to
97 /// allocate memory.
98 template <class String_char_t>
100 const String_char_t *compressed_buffer, Size_t compressed_buffer_size,
101 type compression_algorithm, Size_t default_buffer_size = 0,
102 const Memory_resource_t &memory_resource = Memory_resource_t())
103 : m_memory_resource(memory_resource),
105 reinterpret_cast<const Char_t *>(compressed_buffer)),
106 m_compressed_buffer_size(compressed_buffer_size),
107 m_compression_algorithm(compression_algorithm),
108 m_default_buffer_size(default_buffer_size) {
109 initialize();
110 }
111
112 /// Construct the stream from the raw compressed data.
113 ///
114 /// This stream will keep a pointer to the buffer, so the caller
115 /// must ensure that the buffer outlives the stream.
116 ///
117 /// @param compressed_data Input buffer (compressed bytes).
118 ///
119 /// @param compression_algorithm The algorithm the input was
120 /// compressed with.
121 ///
122 /// @param default_buffer_size The default size of the internal
123 /// event buffer. To tune this, consider that bigger buffers reduce
124 /// allocations since one buffer will be reused for all smaller
125 /// events, whereas smaller buffers reduce memory footprint in case
126 /// all events fit within the buffer size.
127 ///
128 /// @param memory_resource @c Memory_resource object used to
129 /// allocate memory.
130 // Nolint: clang-tidy does not recognize that m_default_buffer_size
131 // is initialized, despite it is initialized in the targed
132 // constructor.
133 // NOLINTBEGIN(cppcoreguidelines-pro-type-member-init)
134 template <class String_char_t>
136 const std::basic_string<String_char_t> &compressed_data,
137 type compression_algorithm, Size_t default_buffer_size = 0,
138 const Memory_resource_t &memory_resource = Memory_resource_t())
140 compressed_data.data(), compressed_data.size(),
141 compression_algorithm, default_buffer_size, memory_resource) {}
142 // NOLINTEND(cppcoreguidelines-pro-type-member-init)
143
144 /// Construct the stream from a (non-owned) Payload Event.
145 ///
146 /// This stream will keep a pointer to the buffer owned by the
147 /// event, so the caller must ensure that the event outlives the
148 /// stream.
149 ///
150 /// @param transaction_payload_log_event Event containing the
151 /// compressed data.
152 ///
153 /// @param default_buffer_size The default size of the internal
154 /// event buffer. To tune this, consider that bigger buffers reduce
155 /// allocations since one buffer will be reused for all smaller
156 /// events, whereas smaller buffers reduce memory footprint in case
157 /// all events fit within the buffer size.
158 ///
159 /// @param memory_resource @c Memory_resource object used to
160 /// allocate memory.
162 const Transaction_payload_event &transaction_payload_log_event,
163 Size_t default_buffer_size = 0,
164 const Memory_resource_t &memory_resource = Memory_resource_t());
165
166 /// Construct the stream from a shared pointer to an event.
167 ///
168 /// This stream will, as long as it lives, hold shared ownership of
169 /// the event. If, when this stream is deleted, it is the last
170 /// owner of the event, it will delete the event.
171 ///
172 /// @param tple Event containing the compressed data.
173 ///
174 /// @param default_buffer_size The default size of the internal
175 /// event buffer. To tune this, consider that bigger buffers reduce
176 /// allocations since one buffer will be reused for all smaller
177 /// events, whereas smaller buffers reduce memory footprint in case
178 /// all events fit within the buffer size.
179 ///
180 /// @param memory_resource @c Memory_resource object used to
181 /// allocate memory.
183 const std::shared_ptr<const Transaction_payload_event> &tple,
184 Size_t default_buffer_size = 0,
185 const Memory_resource_t &memory_resource = Memory_resource_t());
186
190 delete;
192 delete;
193
195
196 /// Read the next event from the stream and update the stream state.
197 ///
198 /// If the stream status is already something else than
199 /// `Decompress_status::success`, the function does not change the
200 /// status.
201 ///
202 /// If the function was able to read an event, it modifies `out` to
203 /// point to a buffer holding event data. This leaves the stream
204 /// state as `Decompress_status::success`, and subsequent
205 /// invocations of `operator bool` will return true.
206 ///
207 /// If an error occurred, or the end of the stream was reached, the
208 /// function resets `out` to nullptr. It also alters the stream
209 /// state to the relevant `Decompress_status`, and subsequent
210 /// invocations of `operator bool` will return false. If the
211 /// resulting status is not `Decompress_status::end`, an error
212 /// message can subsequently be obtained by calling `get_error_str`.
213 ///
214 /// @note This class attempts to protect against a common coding
215 /// mistake. The mistake occurs when a caller forgets to check the
216 /// reason for ending the stream; whether it actually reached the
217 /// end, or whether there was an error. Normally, the caller should
218 /// act differently in the two cases. The protection mechanism is
219 /// enabled in debug mode, and enforces that the user calls
220 /// `get_status` after the stream ends (whether it ends by reaching
221 /// the end or by an error). If the stream ends, and the user does
222 /// not call `get_status`, and then the stream object is destroyed,
223 /// the destructor raises an assertion.
224 ///
225 /// @note The output is a reference to a shared pointer, and the
226 /// stream is another owner of the same shared pointer. On the next
227 /// invocation of `operator>>`, the buffer will be reused if there
228 /// are no other owners than the stream and the output argument. If
229 /// there are other owners to it, a new buffer is allocated. So a
230 /// caller is allowed to keep a shared pointer to the output buffer
231 /// as long as it needs. If the caller does not keep any shared
232 /// pointer to the output buffer, it allows the stream to reduce
233 /// allocations and memory footprint.
234 ///
235 /// @note Compressed events never have a checksum, regardless of
236 /// configuration. If the event is to be decoded, you must disable
237 /// checksum checks first.
238 ///
239 /// @param[out] out The target buffer where this function will store
240 /// the event data.
242
243 /// Read the next event into a Managed_buffer.
244 ///
245 /// @see operator>>(Buffer_ptr_t &out)
247
248 /// Indicate if EOF or error has not happened.
249 ///
250 /// @retval true last read was successful, or no read has yet been
251 /// attempted).
252 ///
253 /// @retval false last read resulted in end-of-stream or error.
254 explicit operator bool() const;
255
256 /// Indicate if EOF or error has happened. This is the negation of
257 /// `operator bool`.
258 ///
259 /// @retval false last read was successful, or no read has yet been
260 /// attempted.
261 ///
262 /// @retval true last read resulted in end-of-stream or error.
263 bool operator!() const;
264
265 /// Return the stream status.
266 ///
267 /// @retval success Last read was successful
268 ///
269 /// @retval end Last read could not complete because the position
270 /// was at the end.
271 ///
272 /// @retval out_of_memory Error: memory allocation failed.
273 ///
274 /// @retval exceeds_max_size Error: the event was larger than
275 /// the configured maximum.
276 ///
277 /// @retval corrupted A corruption error was reported from ZSTD, or
278 /// the stream was truncated.
279 Status_t get_status() const;
280
281 /// Return true if there was an error.
282 bool has_error() const;
283
284 /// Return a string that describes the last error, or empty string.
285 ///
286 /// This corresponds to the return value from `has_error`. When
287 /// `has_error` returns success or eof, `get_error_str` returns an
288 /// empty string.
289 std::string get_error_str() const;
290
291 /// Return Grow_calculator used for output buffers.
293
294 /// Set a new Grow_calculator to use for output buffers.
295 void set_grow_calculator(const Grow_calculator_t &grow_calculator);
296
297 private:
298 /// Construct and initialize the decompressor.
299 ///
300 /// This will attempt to initialize the decompressor and feed it the
301 /// input buffer. If an error occurs, the stream error state is set
302 /// accordingly.
303 void initialize();
304
305 /// Decompress the next event into the internal buffer.
306 ///
307 /// If any error occurs, the stream error state is set accordingly.
308 void next();
309
310 /// Allocate the output buffer if needed.
311 ///
312 /// It reuses the existing buffer if this object holds the only
313 /// reference to it. Otherwise it allocates a new buffer. If
314 /// allocation fails, the stream error state is set accordingly.
315 void update_buffer();
316
317 // Worker function that actually reads the event, used by the
318 // implementation of `operator>>`.
319 void read_event();
320
321 /// Specify the string that subsequent calls to error_str will
322 /// return.
323 void set_error_str(const std::string &s);
324
325 /// Update the status.
327
328 /// Memory_resource to handle all allocations.
330
331 // Input.
332
333 /// The buffer we are reading from.
335 /// Size of the buffer we are reading from.
337 /// Compression algorithm we are using.
339 /// The event we are reading from. We don't use this, we only hold
340 /// the shared pointer to prevent that the caller destroys the
341 /// object that owns the buffer.
342 std::shared_ptr<const Transaction_payload_event> m_tple{nullptr};
343
344 // Output.
345
346 /// Grow calculator for the Managed_buffer.
348 /// Default buffer size for the Managed_buffer.
350 /// Shared pointer to Managed_buffer that holds the output. This
351 /// will be shared with API clients. Therefore, API clients can use
352 /// the returned buffer as long as they like. The next time this
353 /// objects needs a buffer to write the output, it uses the shared
354 /// object if the API clients have stopped using it; otherwise
355 /// allocates a new Managed_buffer.
357
358 // Operation and status.
359
360 /// Decompressor.
361 std::unique_ptr<Decompressor_t> m_decompressor{nullptr};
362 /// Error status.
364 /// Error string.
365 std::string m_error_str;
366#ifndef NDEBUG
367 /// True if a read has failed but neither `get_error_str`,
368 /// `has_error`, nor `get_status` has been called.
369 mutable bool m_outstanding_error{false};
370#endif
371};
372
373} // namespace mysql::binlog::event::compression
374
375/// @}
376
377#endif // ifndef
378 // BINARY_LOG_TRANSACTION_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H_
Event that encloses all the events of a transaction.
Definition: control_events.h:735
Abstract base class for decompressors.
Definition: decompressor.h:57
Stream class that yields a stream of byte buffers, each holding the raw decompressed data of one even...
Definition: payload_event_buffer_istream.h:59
std::string get_error_str() const
Return a string that describes the last error, or empty string.
Definition: payload_event_buffer_istream.cpp:102
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &&)=delete
std::string m_error_str
Error string.
Definition: payload_event_buffer_istream.h:365
void set_status(Status_t status)
Update the status.
Definition: payload_event_buffer_istream.cpp:86
bool operator!() const
Indicate if EOF or error has happened.
Definition: payload_event_buffer_istream.cpp:70
Managed_buffer_ptr_t m_managed_buffer_ptr
Shared pointer to Managed_buffer that holds the output.
Definition: payload_event_buffer_istream.h:356
const Grow_calculator_t & get_grow_calculator() const
Return Grow_calculator used for output buffers.
Definition: payload_event_buffer_istream.cpp:334
Payload_event_buffer_istream & operator>>(Buffer_ptr_t &out)
Read the next event from the stream and update the stream state.
Definition: payload_event_buffer_istream.cpp:284
void initialize()
Construct and initialize the decompressor.
Definition: payload_event_buffer_istream.cpp:115
Payload_event_buffer_istream(const String_char_t *compressed_buffer, Size_t compressed_buffer_size, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:99
Grow_calculator_t m_grow_calculator
Grow calculator for the Managed_buffer.
Definition: payload_event_buffer_istream.h:347
mysql::binlog::event::compression::buffer::Buffer_view< Char_t >::Size_t Size_t
Definition: payload_event_buffer_istream.h:63
Memory_resource_t m_memory_resource
Memory_resource to handle all allocations.
Definition: payload_event_buffer_istream.h:329
Status_t m_status
Error status.
Definition: payload_event_buffer_istream.h:363
void set_grow_calculator(const Grow_calculator_t &grow_calculator)
Set a new Grow_calculator to use for output buffers.
Definition: payload_event_buffer_istream.cpp:338
std::shared_ptr< const Transaction_payload_event > m_tple
The event we are reading from.
Definition: payload_event_buffer_istream.h:342
Payload_event_buffer_istream(Payload_event_buffer_istream &&)=delete
Size_t m_default_buffer_size
Default buffer size for the Managed_buffer.
Definition: payload_event_buffer_istream.h:349
bool has_error() const
Return true if there was an error.
Definition: payload_event_buffer_istream.cpp:95
mysql::binlog::event::resource::Memory_resource Memory_resource_t
Definition: payload_event_buffer_istream.h:74
bool m_outstanding_error
True if a read has failed but neither get_error_str, has_error, nor get_status has been called.
Definition: payload_event_buffer_istream.h:369
std::unique_ptr< Decompressor_t > m_decompressor
Decompressor.
Definition: payload_event_buffer_istream.h:361
void set_error_str(const std::string &s)
Specify the string that subsequent calls to error_str will return.
Definition: payload_event_buffer_istream.cpp:109
type m_compression_algorithm
Compression algorithm we are using.
Definition: payload_event_buffer_istream.h:338
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &)=delete
std::shared_ptr< Buffer_view_t > Buffer_ptr_t
Definition: payload_event_buffer_istream.h:72
~Payload_event_buffer_istream()
Definition: payload_event_buffer_istream.cpp:60
void read_event()
Definition: payload_event_buffer_istream.cpp:185
Status_t get_status() const
Return the stream status.
Definition: payload_event_buffer_istream.cpp:79
std::shared_ptr< Managed_buffer_t > Managed_buffer_ptr_t
Definition: payload_event_buffer_istream.h:73
void update_buffer()
Allocate the output buffer if needed.
Definition: payload_event_buffer_istream.cpp:134
Size_t m_compressed_buffer_size
Size of the buffer we are reading from.
Definition: payload_event_buffer_istream.h:336
Payload_event_buffer_istream(Payload_event_buffer_istream &)=delete
void next()
Decompress the next event into the internal buffer.
Definition: payload_event_buffer_istream.cpp:316
unsigned char Char_t
Definition: payload_event_buffer_istream.h:61
const Char_t * m_compressed_buffer
The buffer we are reading from.
Definition: payload_event_buffer_istream.h:334
Payload_event_buffer_istream(const std::basic_string< String_char_t > &compressed_data, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:135
std::size_t Size_t
The 'size' type.
Definition: buffer_view.h:55
Description of a heuristic to determine how much memory to allocate.
Definition: grow_calculator.h:68
Owned, growable, contiguous memory buffer.
Definition: managed_buffer.h:109
Buffer_view< Char_t > Buffer_view_t
Definition: managed_buffer.h:112
Allocator using a Memory_resource to do the allocator.
Definition: allocator.h:54
Polymorphism-free memory resource class with custom allocator and deallocator functions.
Definition: memory_resource.h:88
Contains the classes representing events operating in the replication stream properties.
Contains the class responsible for deserializing fields of an event previously stored in a buffer.
Container class that provides a contiguous memory buffer to the caller, which the caller can request ...
Definition: base.cpp:27
@ NONE
Definition: base.h:45
Decompress_status
Definition: decompress_status.h:32
size_t size(const char *const c)
Definition: base64.h:46
required uint32 status
Definition: replication_asynchronous_connection_failover.proto:61