MySQL 9.1.0
Source Code Documentation
payload_event_buffer_istream.h
Go to the documentation of this file.
1/* Copyright (c) 2023, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef MYSQL_BINLOG_EVENT_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H
25#define MYSQL_BINLOG_EVENT_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H
26#include <memory>
27#include <string>
28
29#include "mysql/binlog/event/compression/decompressor.h" // mysqlns::compression::Decompressor
30#include "mysql/binlog/event/compression/factory.h" // mysqlns::compression::Factory
31#include "mysql/binlog/event/control_events.h" // Transaction_payload_event
32#include "mysql/binlog/event/event_reader.h" // Event_reader
33#include "mysql/containers/buffers/managed_buffer.h" // mysql::containers::buffers::Managed_buffer
34#include "mysql/utils/nodiscard.h" // NODISCARD
35
36/// @addtogroup GroupLibsMysqlBinlogEvent
37/// @{
38///
39/// @file
40///
41/// Stream class that yields decompressed event byte buffers from a
42/// Transaction_payload_log_event.
43
45
46/// Stream class that yields a stream of byte buffers, each holding the
47/// raw decompressed data of one event contained in a
48/// Transaction_payload_log_event.
49///
50/// The suggested use pattern is:
51///
52/// while (stream >> event_buffer) {
53/// // handle event
54/// }
55/// if (stream.error()) {
56/// // handle error
57/// }
59 public:
60 using Char_t = unsigned char;
68 using Buffer_ptr_t = std::shared_ptr<Buffer_view_t>;
69 using Managed_buffer_ptr_t = std::shared_ptr<Managed_buffer_t>;
71 template <class T>
73
74 /// Construct the stream from the raw compressed data.
75 ///
76 /// This stream will keep a pointer to the buffer, so the caller
77 /// must ensure that the buffer outlives the stream.
78 ///
79 /// @param compressed_buffer Input buffer (compressed bytes).
80 ///
81 /// @param compressed_buffer_size Size of input buffer.
82 ///
83 /// @param compression_algorithm The algorithm the input was
84 /// compressed with.
85 ///
86 /// @param default_buffer_size The default size of the internal
87 /// event buffer. To tune this, consider that bigger buffers reduce
88 /// allocations since one buffer will be reused for all smaller
89 /// events, whereas smaller buffers reduce memory footprint in case
90 /// all events fit within the buffer size.
91 ///
92 /// @param memory_resource @c Memory_resource object used to
93 /// allocate memory.
94 template <class String_char_t>
96 const String_char_t *compressed_buffer, Size_t compressed_buffer_size,
97 type compression_algorithm, Size_t default_buffer_size = 0,
98 const Memory_resource_t &memory_resource = Memory_resource_t())
99 : m_memory_resource(memory_resource),
101 reinterpret_cast<const Char_t *>(compressed_buffer)),
102 m_compressed_buffer_size(compressed_buffer_size),
103 m_compression_algorithm(compression_algorithm),
104 m_default_buffer_size(default_buffer_size) {
105 initialize();
106 }
107
108 /// Construct the stream from the raw compressed data.
109 ///
110 /// This stream will keep a pointer to the buffer, so the caller
111 /// must ensure that the buffer outlives the stream.
112 ///
113 /// @param compressed_data Input buffer (compressed bytes).
114 ///
115 /// @param compression_algorithm The algorithm the input was
116 /// compressed with.
117 ///
118 /// @param default_buffer_size The default size of the internal
119 /// event buffer. To tune this, consider that bigger buffers reduce
120 /// allocations since one buffer will be reused for all smaller
121 /// events, whereas smaller buffers reduce memory footprint in case
122 /// all events fit within the buffer size.
123 ///
124 /// @param memory_resource @c Memory_resource object used to
125 /// allocate memory.
126 // Nolint: clang-tidy does not recognize that m_default_buffer_size
127 // is initialized, despite it is initialized in the targed
128 // constructor.
129 // NOLINTBEGIN(cppcoreguidelines-pro-type-member-init)
130 template <class String_char_t>
132 const std::basic_string<String_char_t> &compressed_data,
133 type compression_algorithm, Size_t default_buffer_size = 0,
134 const Memory_resource_t &memory_resource = Memory_resource_t())
136 compressed_data.data(), compressed_data.size(),
137 compression_algorithm, default_buffer_size, memory_resource) {}
138 // NOLINTEND(cppcoreguidelines-pro-type-member-init)
139
140 /// Construct the stream from a (non-owned) Payload Event.
141 ///
142 /// This stream will keep a pointer to the buffer owned by the
143 /// event, so the caller must ensure that the event outlives the
144 /// stream.
145 ///
146 /// @param transaction_payload_log_event Event containing the
147 /// compressed data.
148 ///
149 /// @param default_buffer_size The default size of the internal
150 /// event buffer. To tune this, consider that bigger buffers reduce
151 /// allocations since one buffer will be reused for all smaller
152 /// events, whereas smaller buffers reduce memory footprint in case
153 /// all events fit within the buffer size.
154 ///
155 /// @param memory_resource @c Memory_resource object used to
156 /// allocate memory.
158 const Transaction_payload_event &transaction_payload_log_event,
159 Size_t default_buffer_size = 0,
160 const Memory_resource_t &memory_resource = Memory_resource_t());
161
162 /// Construct the stream from a shared pointer to an event.
163 ///
164 /// This stream will, as long as it lives, hold shared ownership of
165 /// the event. If, when this stream is deleted, it is the last
166 /// owner of the event, it will delete the event.
167 ///
168 /// @param tple Event containing the compressed data.
169 ///
170 /// @param default_buffer_size The default size of the internal
171 /// event buffer. To tune this, consider that bigger buffers reduce
172 /// allocations since one buffer will be reused for all smaller
173 /// events, whereas smaller buffers reduce memory footprint in case
174 /// all events fit within the buffer size.
175 ///
176 /// @param memory_resource @c Memory_resource object used to
177 /// allocate memory.
179 const std::shared_ptr<const Transaction_payload_event> &tple,
180 Size_t default_buffer_size = 0,
181 const Memory_resource_t &memory_resource = Memory_resource_t());
182
186 delete;
188 delete;
189
191
192 /// Read the next event from the stream and update the stream state.
193 ///
194 /// If the stream status is already something else than
195 /// `Decompress_status::success`, the function does not change the
196 /// status.
197 ///
198 /// If the function was able to read an event, it modifies `out` to
199 /// point to a buffer holding event data. This leaves the stream
200 /// state as `Decompress_status::success`, and subsequent
201 /// invocations of `operator bool` will return true.
202 ///
203 /// If an error occurred, or the end of the stream was reached, the
204 /// function resets `out` to nullptr. It also alters the stream
205 /// state to the relevant `Decompress_status`, and subsequent
206 /// invocations of `operator bool` will return false. If the
207 /// resulting status is not `Decompress_status::end`, an error
208 /// message can subsequently be obtained by calling `get_error_str`.
209 ///
210 /// @note This class attempts to protect against a common coding
211 /// mistake. The mistake occurs when a caller forgets to check the
212 /// reason for ending the stream; whether it actually reached the
213 /// end, or whether there was an error. Normally, the caller should
214 /// act differently in the two cases. The protection mechanism is
215 /// enabled in debug mode, and enforces that the user calls
216 /// `get_status` after the stream ends (whether it ends by reaching
217 /// the end or by an error). If the stream ends, and the user does
218 /// not call `get_status`, and then the stream object is destroyed,
219 /// the destructor raises an assertion.
220 ///
221 /// @note The output is a reference to a shared pointer, and the
222 /// stream is another owner of the same shared pointer. On the next
223 /// invocation of `operator>>`, the buffer will be reused if there
224 /// are no other owners than the stream and the output argument. If
225 /// there are other owners to it, a new buffer is allocated. So a
226 /// caller is allowed to keep a shared pointer to the output buffer
227 /// as long as it needs. If the caller does not keep any shared
228 /// pointer to the output buffer, it allows the stream to reduce
229 /// allocations and memory footprint.
230 ///
231 /// @note Compressed events never have a checksum, regardless of
232 /// configuration. If the event is to be decoded, you must disable
233 /// checksum checks first.
234 ///
235 /// @param[out] out The target buffer where this function will store
236 /// the event data.
238
239 /// Read the next event into a Managed_buffer.
240 ///
241 /// @see operator>>(Buffer_ptr_t &out)
243
244 /// Indicate if EOF or error has not happened.
245 ///
246 /// @retval true last read was successful, or no read has yet been
247 /// attempted).
248 ///
249 /// @retval false last read resulted in end-of-stream or error.
250 explicit operator bool() const;
251
252 /// Indicate if EOF or error has happened. This is the negation of
253 /// `operator bool`.
254 ///
255 /// @retval false last read was successful, or no read has yet been
256 /// attempted.
257 ///
258 /// @retval true last read resulted in end-of-stream or error.
259 bool operator!() const;
260
261 /// Return the stream status.
262 ///
263 /// @retval success Last read was successful
264 ///
265 /// @retval end Last read could not complete because the position
266 /// was at the end.
267 ///
268 /// @retval out_of_memory Error: memory allocation failed.
269 ///
270 /// @retval exceeds_max_size Error: the event was larger than
271 /// the configured maximum.
272 ///
273 /// @retval corrupted A corruption error was reported from ZSTD, or
274 /// the stream was truncated.
275 Status_t get_status() const;
276
277 /// Return true if there was an error.
278 bool has_error() const;
279
280 /// Return a string that describes the last error, or empty string.
281 ///
282 /// This corresponds to the return value from `has_error`. When
283 /// `has_error` returns success or eof, `get_error_str` returns an
284 /// empty string.
285 std::string get_error_str() const;
286
287 /// Return Grow_calculator used for output buffers.
289
290 /// Set a new Grow_calculator to use for output buffers.
291 void set_grow_calculator(const Grow_calculator_t &grow_calculator);
292
293 private:
294 /// Construct and initialize the decompressor.
295 ///
296 /// This will attempt to initialize the decompressor and feed it the
297 /// input buffer. If an error occurs, the stream error state is set
298 /// accordingly.
299 void initialize();
300
301 /// Decompress the next event into the internal buffer.
302 ///
303 /// If any error occurs, the stream error state is set accordingly.
304 void next();
305
306 /// Allocate the output buffer if needed.
307 ///
308 /// It reuses the existing buffer if this object holds the only
309 /// reference to it. Otherwise it allocates a new buffer. If
310 /// allocation fails, the stream error state is set accordingly.
311 void update_buffer();
312
313 // Worker function that actually reads the event, used by the
314 // implementation of `operator>>`.
315 void read_event();
316
317 /// Specify the string that subsequent calls to error_str will
318 /// return.
319 void set_error_str(const std::string &s);
320
321 /// Update the status.
323
324 /// Memory_resource to handle all allocations.
326
327 // Input.
328
329 /// The buffer we are reading from.
331 /// Size of the buffer we are reading from.
333 /// Compression algorithm we are using.
335 /// The event we are reading from. We don't use this, we only hold
336 /// the shared pointer to prevent that the caller destroys the
337 /// object that owns the buffer.
338 std::shared_ptr<const Transaction_payload_event> m_tple{nullptr};
339
340 // Output.
341
342 /// Grow calculator for the Managed_buffer.
344 /// Default buffer size for the Managed_buffer.
346 /// Shared pointer to Managed_buffer that holds the output. This
347 /// will be shared with API clients. Therefore, API clients can use
348 /// the returned buffer as long as they like. The next time this
349 /// objects needs a buffer to write the output, it uses the shared
350 /// object if the API clients have stopped using it; otherwise
351 /// allocates a new Managed_buffer.
353
354 // Operation and status.
355
356 /// Decompressor.
357 std::unique_ptr<Decompressor_t> m_decompressor{nullptr};
358 /// Error status.
360 /// Error string.
361 std::string m_error_str;
362#ifndef NDEBUG
363 /// True if a read has failed but neither `get_error_str`,
364 /// `has_error`, nor `get_status` has been called.
365 mutable bool m_outstanding_error{false};
366#endif
367};
368
369} // namespace mysql::binlog::event::compression
370
371/// @}
372
373#endif // ifndef
374 // MYSQL_BINLOG_EVENT_COMPRESSION_PAYLOAD_EVENT_BUFFER_ISTREAM_H
Allocator using a Memory_resource to do the allocator.
Definition: allocator.h:54
Polymorphism-free memory resource class with custom allocator and deallocator functions.
Definition: memory_resource.h:88
Event that encloses all the events of a transaction.
Definition: control_events.h:736
Abstract base class for decompressors.
Definition: decompressor.h:57
Stream class that yields a stream of byte buffers, each holding the raw decompressed data of one even...
Definition: payload_event_buffer_istream.h:58
mysql::containers::buffers::Buffer_view< Char_t >::Size_t Size_t
Definition: payload_event_buffer_istream.h:61
std::string get_error_str() const
Return a string that describes the last error, or empty string.
Definition: payload_event_buffer_istream.cpp:102
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &&)=delete
std::string m_error_str
Error string.
Definition: payload_event_buffer_istream.h:361
void set_status(Status_t status)
Update the status.
Definition: payload_event_buffer_istream.cpp:86
bool operator!() const
Indicate if EOF or error has happened.
Definition: payload_event_buffer_istream.cpp:70
Managed_buffer_ptr_t m_managed_buffer_ptr
Shared pointer to Managed_buffer that holds the output.
Definition: payload_event_buffer_istream.h:352
const Grow_calculator_t & get_grow_calculator() const
Return Grow_calculator used for output buffers.
Definition: payload_event_buffer_istream.cpp:334
Payload_event_buffer_istream & operator>>(Buffer_ptr_t &out)
Read the next event from the stream and update the stream state.
Definition: payload_event_buffer_istream.cpp:284
void initialize()
Construct and initialize the decompressor.
Definition: payload_event_buffer_istream.cpp:115
Payload_event_buffer_istream(const String_char_t *compressed_buffer, Size_t compressed_buffer_size, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:95
Grow_calculator_t m_grow_calculator
Grow calculator for the Managed_buffer.
Definition: payload_event_buffer_istream.h:343
Memory_resource_t m_memory_resource
Memory_resource to handle all allocations.
Definition: payload_event_buffer_istream.h:325
Status_t m_status
Error status.
Definition: payload_event_buffer_istream.h:359
void set_grow_calculator(const Grow_calculator_t &grow_calculator)
Set a new Grow_calculator to use for output buffers.
Definition: payload_event_buffer_istream.cpp:338
mysql::allocators::Memory_resource Memory_resource_t
Definition: payload_event_buffer_istream.h:70
std::shared_ptr< const Transaction_payload_event > m_tple
The event we are reading from.
Definition: payload_event_buffer_istream.h:338
Payload_event_buffer_istream(Payload_event_buffer_istream &&)=delete
Size_t m_default_buffer_size
Default buffer size for the Managed_buffer.
Definition: payload_event_buffer_istream.h:345
bool has_error() const
Return true if there was an error.
Definition: payload_event_buffer_istream.cpp:95
bool m_outstanding_error
True if a read has failed but neither get_error_str, has_error, nor get_status has been called.
Definition: payload_event_buffer_istream.h:365
std::unique_ptr< Decompressor_t > m_decompressor
Decompressor.
Definition: payload_event_buffer_istream.h:357
void set_error_str(const std::string &s)
Specify the string that subsequent calls to error_str will return.
Definition: payload_event_buffer_istream.cpp:109
type m_compression_algorithm
Compression algorithm we are using.
Definition: payload_event_buffer_istream.h:334
Payload_event_buffer_istream & operator=(Payload_event_buffer_istream &)=delete
std::shared_ptr< Buffer_view_t > Buffer_ptr_t
Definition: payload_event_buffer_istream.h:68
~Payload_event_buffer_istream()
Definition: payload_event_buffer_istream.cpp:60
void read_event()
Definition: payload_event_buffer_istream.cpp:185
Status_t get_status() const
Return the stream status.
Definition: payload_event_buffer_istream.cpp:79
std::shared_ptr< Managed_buffer_t > Managed_buffer_ptr_t
Definition: payload_event_buffer_istream.h:69
void update_buffer()
Allocate the output buffer if needed.
Definition: payload_event_buffer_istream.cpp:134
Size_t m_compressed_buffer_size
Size of the buffer we are reading from.
Definition: payload_event_buffer_istream.h:332
Payload_event_buffer_istream(Payload_event_buffer_istream &)=delete
void next()
Decompress the next event into the internal buffer.
Definition: payload_event_buffer_istream.cpp:316
unsigned char Char_t
Definition: payload_event_buffer_istream.h:60
const Char_t * m_compressed_buffer
The buffer we are reading from.
Definition: payload_event_buffer_istream.h:330
Payload_event_buffer_istream(const std::basic_string< String_char_t > &compressed_data, type compression_algorithm, Size_t default_buffer_size=0, const Memory_resource_t &memory_resource=Memory_resource_t())
Construct the stream from the raw compressed data.
Definition: payload_event_buffer_istream.h:131
std::size_t Size_t
The 'size' type.
Definition: buffer_view.h:55
Description of a heuristic to determine how much memory to allocate.
Definition: grow_calculator.h:68
Owned, growable, contiguous memory buffer.
Definition: managed_buffer.h:108
Buffer_view< Char_t > Buffer_view_t
Definition: managed_buffer.h:111
Contains the classes representing events operating in the replication stream properties.
Contains the class responsible for deserializing fields of an event previously stored in a buffer.
Container class that provides a contiguous memory buffer to the caller, which the caller can request ...
Definition: base.cpp:27
@ NONE
Definition: base.h:45
Decompress_status
Definition: decompress_status.h:32
size_t size(const char *const c)
Definition: base64.h:46
required uint32 status
Definition: replication_asynchronous_connection_failover.proto:61