MySQL 8.4.0
Source Code Documentation
bulk_data_service.h
Go to the documentation of this file.
1/* Copyright (c) 2022, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24/**
25 @file
26 Services for bulk data conversion and load to SE.
27*/
28
29#pragma once
30
31#include <assert.h>
33#include <stddef.h>
34#include <cstring>
35#include <functional>
36#include <limits>
37#include <memory>
38#include <string>
39#include <vector>
40
41class THD;
42struct TABLE;
43struct CHARSET_INFO;
44
46 std::string filename;
47 size_t row_number;
48 std::string column_name;
49 std::string column_type;
50 std::string column_input_data;
51};
52
54 /** Column data. */
55 const char *m_data_ptr{};
56
57 /** Column data length. */
58 size_t m_data_len{};
59};
60
62 /** Column Data Type */
63 int16_t m_type{};
64
65 /** Column data length. */
66 uint16_t m_data_len{};
67
68 /** If column is NULL. */
69 bool m_is_null{false};
70
71 /** Column data */
72 char *m_data_ptr{};
73
74 /** Column data in integer format. Used only for specific datatype. */
75 uint64_t m_int_data;
76};
77
78/** Implements the row and column memory management for parse and load
79operations. We try to pre-allocate the memory contiguously as much as we can to
80maximize the performance.
81
82@tparam Column_type Column_text when used in the CSV context, Column_sql when
83used in the InnoDB context.
84*/
85template <typename Column_type>
86class Row_bunch {
87 public:
88 /** Create a new row bunch.
89 @param[in] n_cols number of columns */
90 Row_bunch(size_t n_cols) : m_num_columns(n_cols) {}
91
92 /** @return return number of rows in the bunch. */
93 size_t get_num_rows() const { return m_num_rows; }
94
95 /** @return return number of columns in each row. */
96 size_t get_num_cols() const { return m_num_columns; }
97
98 /** Process all columns, invoking callback for each.
99 @param[in] row_index index of the row
100 @param[in] cbk callback function
101 @return true if successful */
102 template <typename F>
103 bool process_columns(size_t row_index, F &&cbk) {
104 assert(row_index < m_num_rows);
105
106 auto row_offset = row_index * m_num_columns;
107 return process_columns_by_offset(row_offset, std::move(cbk));
108 }
109
110 template <typename F>
111 bool process_columns_by_offset(size_t row_offset, F &&cbk) {
112 assert(row_offset + m_num_columns <= m_columns.size());
113
114 for (size_t index = 0; index < m_num_columns; ++index) {
115 bool last_col = (index == m_num_columns - 1);
116 if (!cbk(m_columns[row_offset + index], last_col)) {
117 return false;
118 }
119 }
120 return true;
121 }
122
123 /** Get current row offset to access columns.
124 @param[in] row_index row index
125 @return row offset in column vector. */
126 size_t get_row_offset(size_t row_index) const {
127 assert(row_index < m_num_rows);
128 return row_index * m_num_columns;
129 }
130
131 /** Get next row offset from current row offset.
132 @param[in,out] offset row offset
133 @return true if there is a next row. */
134 size_t get_next_row_offset(size_t &offset) const {
135 offset += m_num_columns;
136 return (offset < m_columns.size());
137 }
138
139 /** Get column using row offset and column index.
140 @param[in] row_offset row offset in column vector
141 @param[in] col_index index of the column within row
142 @return column data */
143 Column_type &get_column(size_t row_offset, size_t col_index) {
144 assert(col_index < m_num_columns);
145 assert(row_offset + col_index < m_columns.size());
146 return m_columns[row_offset + col_index];
147 }
148
149 /** Get constant column for reading using row offset and column index.
150 @param[in] row_offset row offset in column vector
151 @param[in] col_index index of the column within row
152 @return column data */
153 const Column_type &read_column(size_t row_offset, size_t col_index) const {
154 assert(col_index < m_num_columns);
155 assert(row_offset + col_index < m_columns.size());
156 return m_columns[row_offset + col_index];
157 }
158
159 /** Set the number of rows. Adjust number of rows base on maximum column
160 storage limit.
161 @param[in,out] n_rows number of rows
162 @return true if successful, false if too many rows or columns. */
163 bool set_num_rows(size_t n_rows) {
164 /* Avoid any overflow during multiplication. */
165 if (n_rows > std::numeric_limits<uint32_t>::max() ||
166 m_num_columns > std::numeric_limits<uint32_t>::max()) {
167 return false;
168 }
169 auto total_cols = (uint64_t)n_rows * m_num_columns;
170
171 if (total_cols > S_MAX_TOTAL_COLS) {
172 return false;
173 }
174
175 m_num_rows = n_rows;
176
177 /* Extend columns if needed. */
178 if (m_columns.size() < total_cols) {
179 m_columns.resize(total_cols);
180 }
181 return true;
182 }
183
184 /** Limit allocation up to 600M columns. This number is rounded up from an
185 * estimate of the number of columns with the max chunk size (1024M). In the
186 * worst case we can have 2 bytes per column so a chunk can contain around
187 * 512M columns, and because of rows that spill over chunk boundaries we
188 * assume we can append a full additional row (which should have at most 4096
189 * columns). Rounded up to 600M. */
190 const static size_t S_MAX_TOTAL_COLS = 600 * 1024 * 1024;
191
192 private:
193 /** All the columns. */
194 std::vector<Column_type> m_columns;
195
196 /** Number of rows. */
197 size_t m_num_rows{};
198
199 /** Number of columns in each row. */
201};
202
205
206/** Column metadata information. */
208 /** Data comparison method. */
209 enum class Compare {
210 /* Integer comparison */
212 /* Unsigned Integer comparison */
214 /* Binary comparison (memcmp) */
215 BINARY,
216 /* Need to callback to use appropriate comparison function in server. */
217 MYSQL
218 };
219
220 /** @return true if integer type. */
221 bool is_integer() const {
224 }
225
226 /** Field type */
228
229 /** If column could be NULL. */
231
232 /** If column is part of primary key. */
234
235 /** If the key is descending. */
237
238 /** If the key is prefix of the column. */
240
241 /** If it is fixed length type. */
243
244 /** If it is integer type. */
246
247 /** If it is unsigned integer type. */
249
250 /** Check the row header to find out if it is fixed length. For
251 character data type the row header indicates fixed length. */
253
254 /** If character column length can be kept in one byte. */
256
257 /** The length of column data if fixed. */
258 uint16_t m_fixed_len;
259
260 /** Maximum length of data in bytes. */
261 uint16_t m_max_len;
262
263 /** Index of column in row. */
264 uint16_t m_index;
265
266 /** Byte index in NULL bitmap. */
267 uint16_t m_null_byte;
268
269 /** BIT number in NULL bitmap. */
270 uint16_t m_null_bit;
271
272 /** Character set for char & varchar columns. */
273 const void *m_charset;
274};
275
276/** Row metadata */
277struct Row_meta {
278 /** Key type for fast comparison. */
279 enum class Key_type {
280 /* All Keys are signed integer an ascending. */
282 /* All keys are integer. */
283 INT,
284 /* Keys are of any supported type. */
285 ANY
286 };
287 /** All columns in a row are arranged with key columns first. */
288 std::vector<Column_meta> m_columns;
289
290 /** Total bitmap header length for the row. */
291 size_t m_bitmap_length = 0;
292
293 /** Total header length. */
294 size_t m_header_length = 0;
295
296 /** Length of the first key column. Helps to get the row pointer from first
297 key data pointer. */
298 size_t m_first_key_len = 0;
299
300 /** Key length in bytes for non-integer keys. This is required to estimate
301 the space required to save keys. */
302 size_t m_key_length = 0;
303
304 /** Number of columns used in primary key. */
305 uint32_t m_keys = 0;
306
307 /** Number of columns not used in primary Key. */
308 uint32_t m_non_keys = 0;
309
310 /** Key type for comparison. */
312
313 /** Total number of columns. A key could be on a column prefix.
314 m_columns <= m_keys + m_non_keys */
315 uint32_t m_num_columns = 0;
316
317 /** Approximate row length. */
319};
320
321namespace Bulk_load {
322
323/** Callbacks for collecting time statistics */
325 /* Operation begin. */
326 std::function<void()> m_fn_begin;
327 /* Operation end. */
328 std::function<void()> m_fn_end;
329};
330
331} // namespace Bulk_load
332
333/** Bulk Data conversion. */
334BEGIN_SERVICE_DEFINITION(bulk_data_convert)
335/** Convert row from text format for MySQL column format. Convert as many
336rows as possible consuming the data buffer starting form next_index. On
337output next_index is the next row index that is not yet consumed. If it
338matches the size of input text_rows, then all rows are consumed.
339@param[in,out] thd session THD
340@param[in] table MySQL TABLE
341@param[in] text_rows rows with column in text
342@param[in,out] next_index next_index in text_rows to be processed
343@param[in,out] buffer data buffer for keeping sql row data
344@param[in,out] buffer_length length of the data buffer
345@param[in] charset input row data character set
346@param[in] metadata row metadata
347@param[out] sql_rows rows with column in MySQL column format
348@return error code. */
350 (THD * thd, const TABLE *table, const Rows_text &text_rows,
351 size_t &next_index, char *buffer, size_t &buffer_length,
352 const CHARSET_INFO *charset, const Row_meta &metadata,
353 Rows_mysql &sql_rows,
355
356/** Convert row to MySQL column format from raw form
357@param[in,out] buffer input raw data buffer
358@param[in] buffer_length buffer length
359@param[in] metadata row metadata
360@param[in] start_index start row index in row bunch
361@param[out] consumed_length length of buffer consumed
362@param[in,out] sql_rows row bunch to fill data
363@return error code. */
365 (char *buffer, size_t buffer_length, const Row_meta &metadata,
366 size_t start_index, size_t &consumed_length,
367 Rows_mysql &sql_rows));
368
369/** Convert row to MySQL column format using the key
370@param[in] metadata row metadata
371@param[in] sql_keys Key bunch
372@param[in] key_offset offset for the key
373@param[in,out] sql_rows row bunch to fill data
374@param[in] sql_index index of the row to be filled
375@return error code. */
377 (const Row_meta &metadata, const Rows_mysql &sql_keys,
378 size_t key_offset, Rows_mysql &sql_rows, size_t sql_index));
379
380/** Check if session is interrupted.
381@param[in,out] thd session THD
382@return true if connection or statement is killed. */
384
385/** Compare two key columns
386@param[in] key1 first key
387@param[in] key2 second key
388@param[in] col_meta column meta information
389@return positive, 0, negative, if key_1 is greater, equal, less than key_2 */
391 (const Column_mysql &key1, const Column_mysql &key2,
392 const Column_meta &col_meta));
393
394/** Get Table row metadata.
395@param[in,out] thd session THD
396@param[in] table MySQL TABLE
397@param[in] have_key include Primary Key metadata
398@param[out] metadata Metadata
399@return true if successful. */
401 (THD * thd, const TABLE *table, bool have_key,
402 Row_meta &metadata));
403
404END_SERVICE_DEFINITION(bulk_data_convert)
405
406/** Column metadata information. */
407/* Bulk data load to SE. */
409/** Begin Loading bulk data to SE.
410@param[in,out] thd session THD
411@param[in] table MySQL TABLE
412@param[in] data_size total data size to load
413@param[in] memory SE memory to be used
414@param[in] num_threads Number of concurrent threads
415@return SE bulk load context or nullptr in case of an error. */
416DECLARE_METHOD(void *, begin,
417 (THD * thd, const TABLE *table, size_t data_size, size_t memory,
418 size_t num_threads));
419
420/** Load a set of rows to SE table by one thread.
421@param[in,out] thd session THD
422@param[in,out] ctx SE load context returned by begin()
423@param[in] table MySQL TABLE
424@param[in] sql_rows row data to load
425@param[in] thread current thread number
426@param[in] wait_cbks wait stat callbacks
427@return true if successful. */
428DECLARE_METHOD(bool, load,
429 (THD * thd, void *ctx, const TABLE *table,
430 const Rows_mysql &sql_rows, size_t thread,
431 Bulk_load::Stat_callbacks &wait_cbks));
432
433/** End Loading bulk data to SE.
434
435Called at the end of bulk load execution, even if begin or load calls failed.
436
437@param[in,out] thd session THD
438@param[in,out] ctx SE load context
439@param[in] table MySQL TABLE
440@param[in] error true, if exiting after error
441@return true if successful. */
442DECLARE_METHOD(bool, end,
443 (THD * thd, void *ctx, const TABLE *table, bool error));
444
445/** Check if a table is supported by the bulk load implementation.
446@param[in,out] thd session THD
447@param[in] table MySQL TABLE
448@return true if table is supported. */
450
451/** Get available buffer pool memory for bulk load operations.
452@param[in,out] thd session THD
453@param[in] table MySQL TABLE
454@return buffer pool memory available for bulk load. */
456
Implements the row and column memory management for parse and load operations.
Definition: bulk_data_service.h:86
bool set_num_rows(size_t n_rows)
Set the number of rows.
Definition: bulk_data_service.h:163
std::vector< Column_type > m_columns
All the columns.
Definition: bulk_data_service.h:194
size_t get_next_row_offset(size_t &offset) const
Get next row offset from current row offset.
Definition: bulk_data_service.h:134
bool process_columns(size_t row_index, F &&cbk)
Process all columns, invoking callback for each.
Definition: bulk_data_service.h:103
bool process_columns_by_offset(size_t row_offset, F &&cbk)
Definition: bulk_data_service.h:111
size_t get_num_cols() const
Definition: bulk_data_service.h:96
size_t m_num_rows
Number of rows.
Definition: bulk_data_service.h:197
size_t get_row_offset(size_t row_index) const
Get current row offset to access columns.
Definition: bulk_data_service.h:126
size_t get_num_rows() const
Definition: bulk_data_service.h:93
const Column_type & read_column(size_t row_offset, size_t col_index) const
Get constant column for reading using row offset and column index.
Definition: bulk_data_service.h:153
Row_bunch(size_t n_cols)
Create a new row bunch.
Definition: bulk_data_service.h:90
static const size_t S_MAX_TOTAL_COLS
Limit allocation up to 600M columns.
Definition: bulk_data_service.h:190
size_t m_num_columns
Number of columns in each row.
Definition: bulk_data_service.h:200
Column_type & get_column(size_t row_offset, size_t col_index)
Get column using row offset and column index.
Definition: bulk_data_service.h:143
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
static int compare_keys(PFS_table_share *pfs, const TABLE_SHARE *share)
Definition: pfs_instr_class.cc:2187
static uint16 key1[1001]
Definition: hp_test2.cc:50
void error(const char *format,...)
int mysql_format_from_raw(char *buffer, size_t buffer_length, const Row_meta &metadata, size_t start_index, size_t &consumed_length, Rows_mysql &sql_rows) noexcept
Definition: bulk_data_service.cc:1160
int mysql_format(THD *thd, const TABLE *table, const Rows_text &text_rows, size_t &next_index, char *buffer, size_t &buffer_length, const CHARSET_INFO *charset, const Row_meta &metadata, Rows_mysql &sql_rows, Bulk_load_error_location_details &error_details) noexcept
Definition: bulk_data_service.cc:1189
bool get_row_metadata(THD *, const TABLE *table, bool have_key, Row_meta &metadata) noexcept
Definition: bulk_data_service.cc:1392
int mysql_format_using_key(const Row_meta &metadata, const Rows_mysql &sql_keys, size_t key_offset, Rows_mysql &sql_rows, size_t sql_index) noexcept
Definition: bulk_data_service.cc:1139
bool is_killed(THD *thd) noexcept
Definition: bulk_data_service.cc:1241
size_t get_se_memory_size(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1633
bool is_table_supported(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1637
Definition: bulk_data_service.h:321
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
const std::string charset("charset")
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
Definition: aligned_atomic.h:44
const char * begin(const char *const c)
Definition: base64.h:44
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
#define DECLARE_METHOD(retval, name, args)
Declares a method as a part of the Service definition.
Definition: service.h:103
#define END_SERVICE_DEFINITION(name)
A macro to end the last Service definition started with the BEGIN_SERVICE_DEFINITION macro.
Definition: service.h:91
#define BEGIN_SERVICE_DEFINITION(name)
Declares a new Service.
Definition: service.h:86
Callbacks for collecting time statistics.
Definition: bulk_data_service.h:324
std::function< void()> m_fn_begin
Definition: bulk_data_service.h:326
std::function< void()> m_fn_end
Definition: bulk_data_service.h:328
Definition: bulk_data_service.h:45
std::string filename
Definition: bulk_data_service.h:46
size_t row_number
Definition: bulk_data_service.h:47
std::string column_input_data
Definition: bulk_data_service.h:50
std::string column_name
Definition: bulk_data_service.h:48
std::string column_type
Definition: bulk_data_service.h:49
Definition: m_ctype.h:423
Column metadata information.
Definition: bulk_data_service.h:207
bool m_is_prefix_key
If the key is prefix of the column.
Definition: bulk_data_service.h:239
uint16_t m_index
Index of column in row.
Definition: bulk_data_service.h:264
bool m_is_single_byte_len
If character column length can be kept in one byte.
Definition: bulk_data_service.h:255
uint16_t m_null_byte
Byte index in NULL bitmap.
Definition: bulk_data_service.h:267
bool m_is_desc_key
If the key is descending.
Definition: bulk_data_service.h:236
Compare m_compare
If it is integer type.
Definition: bulk_data_service.h:245
uint16_t m_fixed_len
The length of column data if fixed.
Definition: bulk_data_service.h:258
bool is_integer() const
Definition: bulk_data_service.h:221
Compare
Data comparison method.
Definition: bulk_data_service.h:209
uint16_t m_max_len
Maximum length of data in bytes.
Definition: bulk_data_service.h:261
bool m_is_fixed_len
If it is fixed length type.
Definition: bulk_data_service.h:242
int m_type
Field type.
Definition: bulk_data_service.h:227
bool m_is_key
If column is part of primary key.
Definition: bulk_data_service.h:233
uint16_t m_null_bit
BIT number in NULL bitmap.
Definition: bulk_data_service.h:270
bool m_fixed_len_if_set_in_row
Check the row header to find out if it is fixed length.
Definition: bulk_data_service.h:252
bool m_is_nullable
If column could be NULL.
Definition: bulk_data_service.h:230
bool m_is_unsigned
If it is unsigned integer type.
Definition: bulk_data_service.h:248
const void * m_charset
Character set for char & varchar columns.
Definition: bulk_data_service.h:273
Definition: bulk_data_service.h:61
uint64_t m_int_data
Column data in integer format.
Definition: bulk_data_service.h:75
bool m_is_null
If column is NULL.
Definition: bulk_data_service.h:69
char * m_data_ptr
Column data.
Definition: bulk_data_service.h:72
int16_t m_type
Column Data Type.
Definition: bulk_data_service.h:63
uint16_t m_data_len
Column data length.
Definition: bulk_data_service.h:66
Definition: bulk_data_service.h:53
const char * m_data_ptr
Column data.
Definition: bulk_data_service.h:55
size_t m_data_len
Column data length.
Definition: bulk_data_service.h:58
Definition: mysql.h:300
Row metadata.
Definition: bulk_data_service.h:277
size_t m_bitmap_length
Total bitmap header length for the row.
Definition: bulk_data_service.h:291
size_t m_first_key_len
Length of the first key column.
Definition: bulk_data_service.h:298
size_t m_header_length
Total header length.
Definition: bulk_data_service.h:294
Key_type
Key type for fast comparison.
Definition: bulk_data_service.h:279
uint32_t m_non_keys
Number of columns not used in primary Key.
Definition: bulk_data_service.h:308
uint32_t m_num_columns
Total number of columns.
Definition: bulk_data_service.h:315
uint32_t m_keys
Number of columns used in primary key.
Definition: bulk_data_service.h:305
size_t m_key_length
Key length in bytes for non-integer keys.
Definition: bulk_data_service.h:302
std::vector< Column_meta > m_columns
All columns in a row are arranged with key columns first.
Definition: bulk_data_service.h:288
Key_type m_key_type
Key type for comparison.
Definition: bulk_data_service.h:311
size_t m_approx_row_len
Approximate row length.
Definition: bulk_data_service.h:318
Definition: table.h:1405