MySQL 8.4.7
Source Code Documentation
bulk_data_service.h
Go to the documentation of this file.
1/* Copyright (c) 2022, 2025, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24/**
25 @file
26 Services for bulk data conversion and load to SE.
27*/
28
29#pragma once
30
31#include <assert.h>
33#include <stddef.h>
34#include <cstdint>
35#include <cstring>
36#include <functional>
37#include <limits>
38#include <memory>
39#include <string>
40#include <vector>
41
42class THD;
43struct TABLE;
44struct CHARSET_INFO;
45
47 std::string filename;
48 size_t row_number;
49 std::string column_name;
50 std::string column_type;
51 std::string column_input_data;
52};
53
55 /** Column data. */
56 const char *m_data_ptr{};
57
58 /** Column data length. */
59 size_t m_data_len{};
60};
61
63 /** Column Data Type */
64 int16_t m_type{};
65
66 /** Column data length. */
67 uint16_t m_data_len{};
68
69 /** If column is NULL. */
70 bool m_is_null{false};
71
72 /** Column data */
73 char *m_data_ptr{};
74
75 /** Column data in integer format. Used only for specific datatype. */
76 uint64_t m_int_data;
77};
78
79/** Implements the row and column memory management for parse and load
80operations. We try to pre-allocate the memory contiguously as much as we can to
81maximize the performance.
82
83@tparam Column_type Column_text when used in the CSV context, Column_sql when
84used in the InnoDB context.
85*/
86template <typename Column_type>
87class Row_bunch {
88 public:
89 /** Create a new row bunch.
90 @param[in] n_cols number of columns */
91 Row_bunch(size_t n_cols) : m_num_columns(n_cols) {}
92
93 /** @return return number of rows in the bunch. */
94 size_t get_num_rows() const { return m_num_rows; }
95
96 /** @return return number of columns in each row. */
97 size_t get_num_cols() const { return m_num_columns; }
98
99 /** Process all columns, invoking callback for each.
100 @param[in] row_index index of the row
101 @param[in] cbk callback function
102 @return true if successful */
103 template <typename F>
104 bool process_columns(size_t row_index, F &&cbk) {
105 assert(row_index < m_num_rows);
106
107 auto row_offset = row_index * m_num_columns;
108 return process_columns_by_offset(row_offset, std::move(cbk));
109 }
110
111 template <typename F>
112 bool process_columns_by_offset(size_t row_offset, F &&cbk) {
113 assert(row_offset + m_num_columns <= m_columns.size());
114
115 for (size_t index = 0; index < m_num_columns; ++index) {
116 bool last_col = (index == m_num_columns - 1);
117 if (!cbk(m_columns[row_offset + index], last_col)) {
118 return false;
119 }
120 }
121 return true;
122 }
123
124 /** Get current row offset to access columns.
125 @param[in] row_index row index
126 @return row offset in column vector. */
127 size_t get_row_offset(size_t row_index) const {
128 assert(row_index < m_num_rows);
129 return row_index * m_num_columns;
130 }
131
132 /** Get next row offset from current row offset.
133 @param[in,out] offset row offset
134 @return true if there is a next row. */
135 size_t get_next_row_offset(size_t &offset) const {
136 offset += m_num_columns;
137 return (offset < m_columns.size());
138 }
139
140 /** Get column using row offset and column index.
141 @param[in] row_offset row offset in column vector
142 @param[in] col_index index of the column within row
143 @return column data */
144 Column_type &get_column(size_t row_offset, size_t col_index) {
145 assert(col_index < m_num_columns);
146 assert(row_offset + col_index < m_columns.size());
147 return m_columns[row_offset + col_index];
148 }
149
150 /** Get constant column for reading using row offset and column index.
151 @param[in] row_offset row offset in column vector
152 @param[in] col_index index of the column within row
153 @return column data */
154 const Column_type &read_column(size_t row_offset, size_t col_index) const {
155 assert(col_index < m_num_columns);
156 assert(row_offset + col_index < m_columns.size());
157 return m_columns[row_offset + col_index];
158 }
159
160 /** Set the number of rows. Adjust number of rows base on maximum column
161 storage limit.
162 @param[in,out] n_rows number of rows
163 @return true if successful, false if too many rows or columns. */
164 bool set_num_rows(size_t n_rows) {
165 /* Avoid any overflow during multiplication. */
166 if (n_rows > std::numeric_limits<uint32_t>::max() ||
167 m_num_columns > std::numeric_limits<uint32_t>::max()) {
168 return false;
169 }
170 auto total_cols = (uint64_t)n_rows * m_num_columns;
171
172 if (total_cols > S_MAX_TOTAL_COLS) {
173 return false;
174 }
175
176 m_num_rows = n_rows;
177
178 /* Extend columns if needed. */
179 if (m_columns.size() < total_cols) {
180 m_columns.resize(total_cols);
181 }
182 return true;
183 }
184
185 /** Limit allocation up to 600M columns. This number is rounded up from an
186 * estimate of the number of columns with the max chunk size (1024M). In the
187 * worst case we can have 2 bytes per column so a chunk can contain around
188 * 512M columns, and because of rows that spill over chunk boundaries we
189 * assume we can append a full additional row (which should have at most 4096
190 * columns). Rounded up to 600M. */
191 const static size_t S_MAX_TOTAL_COLS = 600 * 1024 * 1024;
192
193 private:
194 /** All the columns. */
195 std::vector<Column_type> m_columns;
196
197 /** Number of rows. */
198 size_t m_num_rows{};
199
200 /** Number of columns in each row. */
202};
203
206
207/** Column metadata information. */
209 /** Data comparison method. */
210 enum class Compare {
211 /* Integer comparison */
213 /* Unsigned Integer comparison */
215 /* Binary comparison (memcmp) */
216 BINARY,
217 /* Need to callback to use appropriate comparison function in server. */
218 MYSQL
219 };
220
221 /** @return true if integer type. */
222 bool is_integer() const {
225 }
226
227 /** Field type */
229
230 /** If column could be NULL. */
232
233 /** If column is part of primary key. */
235
236 /** If the key is descending. */
238
239 /** If the key is prefix of the column. */
241
242 /** If it is fixed length type. */
244
245 /** If it is integer type. */
247
248 /** If it is unsigned integer type. */
250
251 /** Check the row header to find out if it is fixed length. For
252 character data type the row header indicates fixed length. */
254
255 /** If character column length can be kept in one byte. */
257
258 /** The length of column data if fixed. */
259 uint16_t m_fixed_len;
260
261 /** Maximum length of data in bytes. */
262 uint16_t m_max_len;
263
264 /** Index of column in row. */
265 uint16_t m_index;
266
267 /** Byte index in NULL bitmap. */
268 uint16_t m_null_byte;
269
270 /** BIT number in NULL bitmap. */
271 uint16_t m_null_bit;
272
273 /** Character set for char & varchar columns. */
274 const void *m_charset;
275};
276
277/** Row metadata */
278struct Row_meta {
279 /** Key type for fast comparison. */
280 enum class Key_type {
281 /* All Keys are signed integer an ascending. */
283 /* All keys are integer. */
284 INT,
285 /* Keys are of any supported type. */
286 ANY
287 };
288 /** All columns in a row are arranged with key columns first. */
289 std::vector<Column_meta> m_columns;
290
291 /** Total bitmap header length for the row. */
292 size_t m_bitmap_length = 0;
293
294 /** Total header length. */
295 size_t m_header_length = 0;
296
297 /** Length of the first key column. Helps to get the row pointer from first
298 key data pointer. */
299 size_t m_first_key_len = 0;
300
301 /** Key length in bytes for non-integer keys. This is required to estimate
302 the space required to save keys. */
303 size_t m_key_length = 0;
304
305 /** Number of columns used in primary key. */
306 uint32_t m_keys = 0;
307
308 /** Number of columns not used in primary Key. */
309 uint32_t m_non_keys = 0;
310
311 /** Key type for comparison. */
313
314 /** Total number of columns. A key could be on a column prefix.
315 m_columns <= m_keys + m_non_keys */
316 uint32_t m_num_columns = 0;
317
318 /** Approximate row length. */
320};
321
322namespace Bulk_load {
323
324/** Callbacks for collecting time statistics */
326 /* Operation begin. */
327 std::function<void()> m_fn_begin;
328 /* Operation end. */
329 std::function<void()> m_fn_end;
330};
331
332} // namespace Bulk_load
333
334/** Bulk Data conversion. */
335BEGIN_SERVICE_DEFINITION(bulk_data_convert)
336/** Convert row from text format for MySQL column format. Convert as many
337rows as possible consuming the data buffer starting form next_index. On
338output next_index is the next row index that is not yet consumed. If it
339matches the size of input text_rows, then all rows are consumed.
340@param[in,out] thd session THD
341@param[in] table MySQL TABLE
342@param[in] text_rows rows with column in text
343@param[in,out] next_index next_index in text_rows to be processed
344@param[in,out] buffer data buffer for keeping sql row data
345@param[in,out] buffer_length length of the data buffer
346@param[in] charset input row data character set
347@param[in] metadata row metadata
348@param[out] sql_rows rows with column in MySQL column format
349@return error code. */
351 (THD * thd, const TABLE *table, const Rows_text &text_rows,
352 size_t &next_index, char *buffer, size_t &buffer_length,
353 const CHARSET_INFO *charset, const Row_meta &metadata,
354 Rows_mysql &sql_rows,
356
357/** Convert row to MySQL column format from raw form
358@param[in,out] buffer input raw data buffer
359@param[in] buffer_length buffer length
360@param[in] metadata row metadata
361@param[in] start_index start row index in row bunch
362@param[out] consumed_length length of buffer consumed
363@param[in,out] sql_rows row bunch to fill data
364@return error code. */
366 (char *buffer, size_t buffer_length, const Row_meta &metadata,
367 size_t start_index, size_t &consumed_length,
368 Rows_mysql &sql_rows));
369
370/** Convert row to MySQL column format using the key
371@param[in] metadata row metadata
372@param[in] sql_keys Key bunch
373@param[in] key_offset offset for the key
374@param[in,out] sql_rows row bunch to fill data
375@param[in] sql_index index of the row to be filled
376@return error code. */
378 (const Row_meta &metadata, const Rows_mysql &sql_keys,
379 size_t key_offset, Rows_mysql &sql_rows, size_t sql_index));
380
381/** Check if session is interrupted.
382@param[in,out] thd session THD
383@return true if connection or statement is killed. */
385
386/** Compare two key columns
387@param[in] key1 first key
388@param[in] key2 second key
389@param[in] col_meta column meta information
390@return positive, 0, negative, if key_1 is greater, equal, less than key_2 */
392 (const Column_mysql &key1, const Column_mysql &key2,
393 const Column_meta &col_meta));
394
395/** Get Table row metadata.
396@param[in,out] thd session THD
397@param[in] table MySQL TABLE
398@param[in] have_key include Primary Key metadata
399@param[out] metadata Metadata
400@return true if successful. */
402 (THD * thd, const TABLE *table, bool have_key,
403 Row_meta &metadata));
404
405END_SERVICE_DEFINITION(bulk_data_convert)
406
407/** Column metadata information. */
408/* Bulk data load to SE. */
410/** Begin Loading bulk data to SE.
411@param[in,out] thd session THD
412@param[in] table MySQL TABLE
413@param[in] data_size total data size to load
414@param[in] memory SE memory to be used
415@param[in] num_threads Number of concurrent threads
416@return SE bulk load context or nullptr in case of an error. */
417DECLARE_METHOD(void *, begin,
418 (THD * thd, const TABLE *table, size_t data_size, size_t memory,
419 size_t num_threads));
420
421/** Load a set of rows to SE table by one thread.
422@param[in,out] thd session THD
423@param[in,out] ctx SE load context returned by begin()
424@param[in] table MySQL TABLE
425@param[in] sql_rows row data to load
426@param[in] thread current thread number
427@param[in] wait_cbks wait stat callbacks
428@return true if successful. */
429DECLARE_METHOD(bool, load,
430 (THD * thd, void *ctx, const TABLE *table,
431 const Rows_mysql &sql_rows, size_t thread,
432 Bulk_load::Stat_callbacks &wait_cbks));
433
434/** End Loading bulk data to SE.
435
436Called at the end of bulk load execution, even if begin or load calls failed.
437
438@param[in,out] thd session THD
439@param[in,out] ctx SE load context
440@param[in] table MySQL TABLE
441@param[in] error true, if exiting after error
442@return true if successful. */
443DECLARE_METHOD(bool, end,
444 (THD * thd, void *ctx, const TABLE *table, bool error));
445
446/** Check if a table is supported by the bulk load implementation.
447@param[in,out] thd session THD
448@param[in] table MySQL TABLE
449@return true if table is supported. */
451
452/** Get available buffer pool memory for bulk load operations.
453@param[in,out] thd session THD
454@param[in] table MySQL TABLE
455@return buffer pool memory available for bulk load. */
457
Implements the row and column memory management for parse and load operations.
Definition: bulk_data_service.h:87
bool set_num_rows(size_t n_rows)
Set the number of rows.
Definition: bulk_data_service.h:164
std::vector< Column_type > m_columns
All the columns.
Definition: bulk_data_service.h:195
size_t get_next_row_offset(size_t &offset) const
Get next row offset from current row offset.
Definition: bulk_data_service.h:135
bool process_columns(size_t row_index, F &&cbk)
Process all columns, invoking callback for each.
Definition: bulk_data_service.h:104
bool process_columns_by_offset(size_t row_offset, F &&cbk)
Definition: bulk_data_service.h:112
size_t get_num_cols() const
Definition: bulk_data_service.h:97
size_t m_num_rows
Number of rows.
Definition: bulk_data_service.h:198
size_t get_row_offset(size_t row_index) const
Get current row offset to access columns.
Definition: bulk_data_service.h:127
size_t get_num_rows() const
Definition: bulk_data_service.h:94
const Column_type & read_column(size_t row_offset, size_t col_index) const
Get constant column for reading using row offset and column index.
Definition: bulk_data_service.h:154
Row_bunch(size_t n_cols)
Create a new row bunch.
Definition: bulk_data_service.h:91
static const size_t S_MAX_TOTAL_COLS
Limit allocation up to 600M columns.
Definition: bulk_data_service.h:191
size_t m_num_columns
Number of columns in each row.
Definition: bulk_data_service.h:201
Column_type & get_column(size_t row_offset, size_t col_index)
Get column using row offset and column index.
Definition: bulk_data_service.h:144
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
static int compare_keys(PFS_table_share *pfs, const TABLE_SHARE *share)
Definition: pfs_instr_class.cc:2187
static uint16 key1[1001]
Definition: hp_test2.cc:50
void error(const char *format,...)
int mysql_format_from_raw(char *buffer, size_t buffer_length, const Row_meta &metadata, size_t start_index, size_t &consumed_length, Rows_mysql &sql_rows) noexcept
Definition: bulk_data_service.cc:1160
int mysql_format(THD *thd, const TABLE *table, const Rows_text &text_rows, size_t &next_index, char *buffer, size_t &buffer_length, const CHARSET_INFO *charset, const Row_meta &metadata, Rows_mysql &sql_rows, Bulk_load_error_location_details &error_details) noexcept
Definition: bulk_data_service.cc:1189
bool get_row_metadata(THD *, const TABLE *table, bool have_key, Row_meta &metadata) noexcept
Definition: bulk_data_service.cc:1392
int mysql_format_using_key(const Row_meta &metadata, const Rows_mysql &sql_keys, size_t key_offset, Rows_mysql &sql_rows, size_t sql_index) noexcept
Definition: bulk_data_service.cc:1139
bool is_killed(THD *thd) noexcept
Definition: bulk_data_service.cc:1241
size_t get_se_memory_size(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1633
bool is_table_supported(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1637
Definition: bulk_data_service.h:322
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
const std::string charset("charset")
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
Definition: aligned_atomic.h:44
const char * begin(const char *const c)
Definition: base64.h:44
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
#define DECLARE_METHOD(retval, name, args)
Declares a method as a part of the Service definition.
Definition: service.h:103
#define END_SERVICE_DEFINITION(name)
A macro to end the last Service definition started with the BEGIN_SERVICE_DEFINITION macro.
Definition: service.h:91
#define BEGIN_SERVICE_DEFINITION(name)
Declares a new Service.
Definition: service.h:86
Callbacks for collecting time statistics.
Definition: bulk_data_service.h:325
std::function< void()> m_fn_begin
Definition: bulk_data_service.h:327
std::function< void()> m_fn_end
Definition: bulk_data_service.h:329
Definition: bulk_data_service.h:46
std::string filename
Definition: bulk_data_service.h:47
size_t row_number
Definition: bulk_data_service.h:48
std::string column_input_data
Definition: bulk_data_service.h:51
std::string column_name
Definition: bulk_data_service.h:49
std::string column_type
Definition: bulk_data_service.h:50
Definition: m_ctype.h:423
Column metadata information.
Definition: bulk_data_service.h:208
bool m_is_prefix_key
If the key is prefix of the column.
Definition: bulk_data_service.h:240
uint16_t m_index
Index of column in row.
Definition: bulk_data_service.h:265
bool m_is_single_byte_len
If character column length can be kept in one byte.
Definition: bulk_data_service.h:256
uint16_t m_null_byte
Byte index in NULL bitmap.
Definition: bulk_data_service.h:268
bool m_is_desc_key
If the key is descending.
Definition: bulk_data_service.h:237
Compare m_compare
If it is integer type.
Definition: bulk_data_service.h:246
uint16_t m_fixed_len
The length of column data if fixed.
Definition: bulk_data_service.h:259
bool is_integer() const
Definition: bulk_data_service.h:222
Compare
Data comparison method.
Definition: bulk_data_service.h:210
uint16_t m_max_len
Maximum length of data in bytes.
Definition: bulk_data_service.h:262
bool m_is_fixed_len
If it is fixed length type.
Definition: bulk_data_service.h:243
int m_type
Field type.
Definition: bulk_data_service.h:228
bool m_is_key
If column is part of primary key.
Definition: bulk_data_service.h:234
uint16_t m_null_bit
BIT number in NULL bitmap.
Definition: bulk_data_service.h:271
bool m_fixed_len_if_set_in_row
Check the row header to find out if it is fixed length.
Definition: bulk_data_service.h:253
bool m_is_nullable
If column could be NULL.
Definition: bulk_data_service.h:231
bool m_is_unsigned
If it is unsigned integer type.
Definition: bulk_data_service.h:249
const void * m_charset
Character set for char & varchar columns.
Definition: bulk_data_service.h:274
Definition: bulk_data_service.h:62
uint64_t m_int_data
Column data in integer format.
Definition: bulk_data_service.h:76
bool m_is_null
If column is NULL.
Definition: bulk_data_service.h:70
char * m_data_ptr
Column data.
Definition: bulk_data_service.h:73
int16_t m_type
Column Data Type.
Definition: bulk_data_service.h:64
uint16_t m_data_len
Column data length.
Definition: bulk_data_service.h:67
Definition: bulk_data_service.h:54
const char * m_data_ptr
Column data.
Definition: bulk_data_service.h:56
size_t m_data_len
Column data length.
Definition: bulk_data_service.h:59
Definition: mysql.h:300
Row metadata.
Definition: bulk_data_service.h:278
size_t m_bitmap_length
Total bitmap header length for the row.
Definition: bulk_data_service.h:292
size_t m_first_key_len
Length of the first key column.
Definition: bulk_data_service.h:299
size_t m_header_length
Total header length.
Definition: bulk_data_service.h:295
Key_type
Key type for fast comparison.
Definition: bulk_data_service.h:280
uint32_t m_non_keys
Number of columns not used in primary Key.
Definition: bulk_data_service.h:309
uint32_t m_num_columns
Total number of columns.
Definition: bulk_data_service.h:316
uint32_t m_keys
Number of columns used in primary key.
Definition: bulk_data_service.h:306
size_t m_key_length
Key length in bytes for non-integer keys.
Definition: bulk_data_service.h:303
std::vector< Column_meta > m_columns
All columns in a row are arranged with key columns first.
Definition: bulk_data_service.h:289
Key_type m_key_type
Key type for comparison.
Definition: bulk_data_service.h:312
size_t m_approx_row_len
Approximate row length.
Definition: bulk_data_service.h:319
Definition: table.h:1407