MySQL 8.3.0
Source Code Documentation
bulk_data_service.h
Go to the documentation of this file.
1/* Copyright (c) 2022, 2023, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23/**
24 @file
25 Services for bulk data conversion and load to SE.
26*/
27
28#pragma once
29
30#include <assert.h>
32#include <stddef.h>
33#include <cstring>
34#include <functional>
35#include <limits>
36#include <memory>
37#include <string>
38#include <vector>
39
40class THD;
41struct TABLE;
42struct CHARSET_INFO;
43
45 std::string filename;
46 size_t row_number;
47 std::string column_name;
48 std::string column_type;
49 std::string column_input_data;
50};
51
53 /** Column data. */
54 const char *m_data_ptr{};
55
56 /** Column data length. */
57 size_t m_data_len{};
58};
59
61 /** Column Data Type */
62 int16_t m_type{};
63
64 /** Column data length. */
65 uint16_t m_data_len{};
66
67 /** If column is NULL. */
68 bool m_is_null{false};
69
70 /** Column data */
71 char *m_data_ptr{};
72
73 /** Column data in integer format. Used only for specific datatype. */
74 uint64_t m_int_data;
75};
76
77/** Implements the row and column memory management for parse and load
78operations. We try to pre-allocate the memory contiguously as much as we can to
79maximize the performance.
80
81@tparam Column_type Column_text when used in the CSV context, Column_sql when
82used in the InnoDB context.
83*/
84template <typename Column_type>
85class Row_bunch {
86 public:
87 /** Create a new row bunch.
88 @param[in] n_cols number of columns */
89 Row_bunch(size_t n_cols) : m_num_columns(n_cols) {}
90
91 /** @return return number of rows in the bunch. */
92 size_t get_num_rows() const { return m_num_rows; }
93
94 /** @return return number of columns in each row. */
95 size_t get_num_cols() const { return m_num_columns; }
96
97 /** Process all columns, invoking callback for each.
98 @param[in] row_index index of the row
99 @param[in] cbk callback function
100 @return true if successful */
101 template <typename F>
102 bool process_columns(size_t row_index, F &&cbk) {
103 assert(row_index < m_num_rows);
104
105 auto row_offset = row_index * m_num_columns;
106 return process_columns_by_offset(row_offset, std::move(cbk));
107 }
108
109 template <typename F>
110 bool process_columns_by_offset(size_t row_offset, F &&cbk) {
111 assert(row_offset + m_num_columns <= m_columns.size());
112
113 for (size_t index = 0; index < m_num_columns; ++index) {
114 bool last_col = (index == m_num_columns - 1);
115 if (!cbk(m_columns[row_offset + index], last_col)) {
116 return false;
117 }
118 }
119 return true;
120 }
121
122 /** Get current row offset to access columns.
123 @param[in] row_index row index
124 @return row offset in column vector. */
125 size_t get_row_offset(size_t row_index) const {
126 assert(row_index < m_num_rows);
127 return row_index * m_num_columns;
128 }
129
130 /** Get next row offset from current row offset.
131 @param[in,out] offset row offset
132 @return true if there is a next row. */
133 size_t get_next_row_offset(size_t &offset) const {
134 offset += m_num_columns;
135 return (offset < m_columns.size());
136 }
137
138 /** Get column using row offset and column index.
139 @param[in] row_offset row offset in column vector
140 @param[in] col_index index of the column within row
141 @return column data */
142 Column_type &get_column(size_t row_offset, size_t col_index) {
143 assert(col_index < m_num_columns);
144 assert(row_offset + col_index < m_columns.size());
145 return m_columns[row_offset + col_index];
146 }
147
148 /** Get constant column for reading using row offset and column index.
149 @param[in] row_offset row offset in column vector
150 @param[in] col_index index of the column within row
151 @return column data */
152 const Column_type &read_column(size_t row_offset, size_t col_index) const {
153 assert(col_index < m_num_columns);
154 assert(row_offset + col_index < m_columns.size());
155 return m_columns[row_offset + col_index];
156 }
157
158 /** Set the number of rows. Adjust number of rows base on maximum column
159 storage limit.
160 @param[in,out] n_rows number of rows
161 @return true if successful, false if too many rows or columns. */
162 bool set_num_rows(size_t n_rows) {
163 /* Avoid any overflow during multiplication. */
164 if (n_rows > std::numeric_limits<uint32_t>::max() ||
165 m_num_columns > std::numeric_limits<uint32_t>::max()) {
166 return false;
167 }
168 auto total_cols = (uint64_t)n_rows * m_num_columns;
169
170 if (total_cols > S_MAX_TOTAL_COLS) {
171 return false;
172 }
173
174 m_num_rows = n_rows;
175
176 /* Extend columns if needed. */
177 if (m_columns.size() < total_cols) {
178 m_columns.resize(total_cols);
179 }
180 return true;
181 }
182
183 /** Limit allocation up to 600M columns. This number is rounded up from an
184 * estimate of the number of columns with the max chunk size (1024M). In the
185 * worst case we can have 2 bytes per column so a chunk can contain around
186 * 512M columns, and because of rows that spill over chunk boundaries we
187 * assume we can append a full additional row (which should have at most 4096
188 * columns). Rounded up to 600M. */
189 const static size_t S_MAX_TOTAL_COLS = 600 * 1024 * 1024;
190
191 private:
192 /** All the columns. */
193 std::vector<Column_type> m_columns;
194
195 /** Number of rows. */
196 size_t m_num_rows{};
197
198 /** Number of columns in each row. */
200};
201
204
205/** Column metadata information. */
207 /** Data comparison method. */
208 enum class Compare {
209 /* Integer comparison */
211 /* Unsigned Integer comparison */
213 /* Binary comparison (memcmp) */
214 BINARY,
215 /* Need to callback to use appropriate comparison function in server. */
216 MYSQL
217 };
218
219 /** @return true if integer type. */
220 bool is_integer() const {
223 }
224
225 /** Field type */
227
228 /** If column could be NULL. */
230
231 /** If column is part of primary key. */
233
234 /** If the key is descending. */
236
237 /** If the key is prefix of the column. */
239
240 /** If it is fixed length type. */
242
243 /** If it is integer type. */
245
246 /** If it is unsigned integer type. */
248
249 /** Check the row header to find out if it is fixed length. For
250 character data type the row header indicates fixed length. */
252
253 /** If character column length can be kept in one byte. */
255
256 /** The length of column data if fixed. */
257 uint16_t m_fixed_len;
258
259 /** Maximum length of data in bytes. */
260 uint16_t m_max_len;
261
262 /** Index of column in row. */
263 uint16_t m_index;
264
265 /** Byte index in NULL bitmap. */
266 uint16_t m_null_byte;
267
268 /** BIT number in NULL bitmap. */
269 uint16_t m_null_bit;
270
271 /** Character set for char & varchar columns. */
272 const void *m_charset;
273};
274
275/** Row metadata */
276struct Row_meta {
277 /** Key type for fast comparison. */
278 enum class Key_type {
279 /* All Keys are signed integer an ascending. */
281 /* All keys are integer. */
282 INT,
283 /* Keys are of any supported type. */
284 ANY
285 };
286 /** All columns in a row are arranged with key columns first. */
287 std::vector<Column_meta> m_columns;
288
289 /** Total bitmap header length for the row. */
290 size_t m_bitmap_length = 0;
291
292 /** Total header length. */
293 size_t m_header_length = 0;
294
295 /** Length of the first key column. Helps to get the row pointer from first
296 key data pointer. */
297 size_t m_first_key_len = 0;
298
299 /** Key length in bytes for non-integer keys. This is required to estimate
300 the space required to save keys. */
301 size_t m_key_length = 0;
302
303 /** Number of columns used in primary key. */
304 uint32_t m_keys = 0;
305
306 /** Number of columns not used in primary Key. */
307 uint32_t m_non_keys = 0;
308
309 /** Key type for comparison. */
311
312 /** Total number of columns. A key could be on a column prefix.
313 m_columns <= m_keys + m_non_keys */
314 uint32_t m_num_columns = 0;
315
316 /** Approximate row length. */
318};
319
320namespace Bulk_load {
321
322/** Callbacks for collecting time statistics */
324 /* Operation begin. */
325 std::function<void()> m_fn_begin;
326 /* Operation end. */
327 std::function<void()> m_fn_end;
328};
329
330} // namespace Bulk_load
331
332/** Bulk Data conversion. */
333BEGIN_SERVICE_DEFINITION(bulk_data_convert)
334/** Convert row from text format for MySQL column format. Convert as many
335rows as possible consuming the data buffer starting form next_index. On
336output next_index is the next row index that is not yet consumed. If it
337matches the size of input text_rows, then all rows are consumed.
338@param[in,out] thd session THD
339@param[in] table MySQL TABLE
340@param[in] text_rows rows with column in text
341@param[in,out] next_index next_index in text_rows to be processed
342@param[in,out] buffer data buffer for keeping sql row data
343@param[in,out] buffer_length length of the data buffer
344@param[in] charset input row data character set
345@param[in] metadata row metadata
346@param[out] sql_rows rows with column in MySQL column format
347@return error code. */
349 (THD * thd, const TABLE *table, const Rows_text &text_rows,
350 size_t &next_index, char *buffer, size_t &buffer_length,
351 const CHARSET_INFO *charset, const Row_meta &metadata,
352 Rows_mysql &sql_rows,
354
355/** Convert row to MySQL column format from raw form
356@param[in,out] buffer input raw data buffer
357@param[in] buffer_length buffer length
358@param[in] metadata row metadata
359@param[in] start_index start row index in row bunch
360@param[out] consumed_length length of buffer consumed
361@param[in,out] sql_rows row bunch to fill data
362@return error code. */
364 (char *buffer, size_t buffer_length, const Row_meta &metadata,
365 size_t start_index, size_t &consumed_length,
366 Rows_mysql &sql_rows));
367
368/** Convert row to MySQL column format using the key
369@param[in] metadata row metadata
370@param[in] sql_keys Key bunch
371@param[in] key_offset offset for the key
372@param[in,out] sql_rows row bunch to fill data
373@param[in] sql_index index of the row to be filled
374@return error code. */
376 (const Row_meta &metadata, const Rows_mysql &sql_keys,
377 size_t key_offset, Rows_mysql &sql_rows, size_t sql_index));
378
379/** Check if session is interrupted.
380@param[in,out] thd session THD
381@return true if connection or statement is killed. */
383
384/** Compare two key columns
385@param[in] key1 first key
386@param[in] key2 second key
387@param[in] col_meta column meta information
388@return positive, 0, negative, if key_1 is greater, equal, less than key_2 */
390 (const Column_mysql &key1, const Column_mysql &key2,
391 const Column_meta &col_meta));
392
393/** Get Table row metadata.
394@param[in,out] thd session THD
395@param[in] table MySQL TABLE
396@param[in] have_key include Primary Key metadata
397@param[out] metadata Metadata
398@return true if successful. */
400 (THD * thd, const TABLE *table, bool have_key,
401 Row_meta &metadata));
402
403END_SERVICE_DEFINITION(bulk_data_convert)
404
405/** Column metadata information. */
406/* Bulk data load to SE. */
408/** Begin Loading bulk data to SE.
409@param[in,out] thd session THD
410@param[in] table MySQL TABLE
411@param[in] data_size total data size to load
412@param[in] memory SE memory to be used
413@param[in] num_threads Number of concurrent threads
414@return SE bulk load context or nullptr in case of an error. */
415DECLARE_METHOD(void *, begin,
416 (THD * thd, const TABLE *table, size_t data_size, size_t memory,
417 size_t num_threads));
418
419/** Load a set of rows to SE table by one thread.
420@param[in,out] thd session THD
421@param[in,out] ctx SE load context returned by begin()
422@param[in] table MySQL TABLE
423@param[in] sql_rows row data to load
424@param[in] thread current thread number
425@param[in] wait_cbks wait stat callbacks
426@return true if successful. */
427DECLARE_METHOD(bool, load,
428 (THD * thd, void *ctx, const TABLE *table,
429 const Rows_mysql &sql_rows, size_t thread,
430 Bulk_load::Stat_callbacks &wait_cbks));
431
432/** End Loading bulk data to SE.
433
434Called at the end of bulk load execution, even if begin or load calls failed.
435
436@param[in,out] thd session THD
437@param[in,out] ctx SE load context
438@param[in] table MySQL TABLE
439@param[in] error true, if exiting after error
440@return true if successful. */
441DECLARE_METHOD(bool, end,
442 (THD * thd, void *ctx, const TABLE *table, bool error));
443
444/** Check if a table is supported by the bulk load implementation.
445@param[in,out] thd session THD
446@param[in] table MySQL TABLE
447@return true if table is supported. */
449
450/** Get available buffer pool memory for bulk load operations.
451@param[in,out] thd session THD
452@param[in] table MySQL TABLE
453@return buffer pool memory available for bulk load. */
455
Implements the row and column memory management for parse and load operations.
Definition: bulk_data_service.h:85
bool set_num_rows(size_t n_rows)
Set the number of rows.
Definition: bulk_data_service.h:162
std::vector< Column_type > m_columns
All the columns.
Definition: bulk_data_service.h:193
size_t get_next_row_offset(size_t &offset) const
Get next row offset from current row offset.
Definition: bulk_data_service.h:133
bool process_columns(size_t row_index, F &&cbk)
Process all columns, invoking callback for each.
Definition: bulk_data_service.h:102
bool process_columns_by_offset(size_t row_offset, F &&cbk)
Definition: bulk_data_service.h:110
size_t get_num_cols() const
Definition: bulk_data_service.h:95
size_t m_num_rows
Number of rows.
Definition: bulk_data_service.h:196
size_t get_row_offset(size_t row_index) const
Get current row offset to access columns.
Definition: bulk_data_service.h:125
size_t get_num_rows() const
Definition: bulk_data_service.h:92
const Column_type & read_column(size_t row_offset, size_t col_index) const
Get constant column for reading using row offset and column index.
Definition: bulk_data_service.h:152
Row_bunch(size_t n_cols)
Create a new row bunch.
Definition: bulk_data_service.h:89
static const size_t S_MAX_TOTAL_COLS
Limit allocation up to 600M columns.
Definition: bulk_data_service.h:189
size_t m_num_columns
Number of columns in each row.
Definition: bulk_data_service.h:199
Column_type & get_column(size_t row_offset, size_t col_index)
Get column using row offset and column index.
Definition: bulk_data_service.h:142
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:35
static int compare_keys(PFS_table_share *pfs, const TABLE_SHARE *share)
Definition: pfs_instr_class.cc:2186
static uint16 key1[1001]
Definition: hp_test2.cc:49
int mysql_format_from_raw(char *buffer, size_t buffer_length, const Row_meta &metadata, size_t start_index, size_t &consumed_length, Rows_mysql &sql_rows) noexcept
Definition: bulk_data_service.cc:1159
int mysql_format(THD *thd, const TABLE *table, const Rows_text &text_rows, size_t &next_index, char *buffer, size_t &buffer_length, const CHARSET_INFO *charset, const Row_meta &metadata, Rows_mysql &sql_rows, Bulk_load_error_location_details &error_details) noexcept
Definition: bulk_data_service.cc:1188
bool get_row_metadata(THD *, const TABLE *table, bool have_key, Row_meta &metadata) noexcept
Definition: bulk_data_service.cc:1391
int mysql_format_using_key(const Row_meta &metadata, const Rows_mysql &sql_keys, size_t key_offset, Rows_mysql &sql_rows, size_t sql_index) noexcept
Definition: bulk_data_service.cc:1138
bool is_killed(THD *thd) noexcept
Definition: bulk_data_service.cc:1240
void * begin(THD *thd, const TABLE *table, size_t data_size, size_t memory, size_t num_threads) noexcept
Definition: bulk_data_service.cc:1533
size_t get_se_memory_size(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1632
bool is_table_supported(THD *thd, const TABLE *table) noexcept
Definition: bulk_data_service.cc:1636
Definition: bulk_data_service.h:320
static PFS_engine_table_share_proxy table
Definition: pfs.cc:60
const std::string charset("charset")
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:307
Definition: aligned_atomic.h:43
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:417
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
#define DECLARE_METHOD(retval, name, args)
Declares a method as a part of the Service definition.
Definition: service.h:102
#define END_SERVICE_DEFINITION(name)
A macro to end the last Service definition started with the BEGIN_SERVICE_DEFINITION macro.
Definition: service.h:90
#define BEGIN_SERVICE_DEFINITION(name)
Declares a new Service.
Definition: service.h:85
Callbacks for collecting time statistics.
Definition: bulk_data_service.h:323
std::function< void()> m_fn_begin
Definition: bulk_data_service.h:325
std::function< void()> m_fn_end
Definition: bulk_data_service.h:327
Definition: bulk_data_service.h:44
std::string filename
Definition: bulk_data_service.h:45
size_t row_number
Definition: bulk_data_service.h:46
std::string column_input_data
Definition: bulk_data_service.h:49
std::string column_name
Definition: bulk_data_service.h:47
std::string column_type
Definition: bulk_data_service.h:48
Definition: m_ctype.h:422
Column metadata information.
Definition: bulk_data_service.h:206
bool m_is_prefix_key
If the key is prefix of the column.
Definition: bulk_data_service.h:238
uint16_t m_index
Index of column in row.
Definition: bulk_data_service.h:263
bool m_is_single_byte_len
If character column length can be kept in one byte.
Definition: bulk_data_service.h:254
uint16_t m_null_byte
Byte index in NULL bitmap.
Definition: bulk_data_service.h:266
bool m_is_desc_key
If the key is descending.
Definition: bulk_data_service.h:235
Compare m_compare
If it is integer type.
Definition: bulk_data_service.h:244
uint16_t m_fixed_len
The length of column data if fixed.
Definition: bulk_data_service.h:257
bool is_integer() const
Definition: bulk_data_service.h:220
Compare
Data comparison method.
Definition: bulk_data_service.h:208
uint16_t m_max_len
Maximum length of data in bytes.
Definition: bulk_data_service.h:260
bool m_is_fixed_len
If it is fixed length type.
Definition: bulk_data_service.h:241
int m_type
Field type.
Definition: bulk_data_service.h:226
bool m_is_key
If column is part of primary key.
Definition: bulk_data_service.h:232
uint16_t m_null_bit
BIT number in NULL bitmap.
Definition: bulk_data_service.h:269
bool m_fixed_len_if_set_in_row
Check the row header to find out if it is fixed length.
Definition: bulk_data_service.h:251
bool m_is_nullable
If column could be NULL.
Definition: bulk_data_service.h:229
bool m_is_unsigned
If it is unsigned integer type.
Definition: bulk_data_service.h:247
const void * m_charset
Character set for char & varchar columns.
Definition: bulk_data_service.h:272
Definition: bulk_data_service.h:60
uint64_t m_int_data
Column data in integer format.
Definition: bulk_data_service.h:74
bool m_is_null
If column is NULL.
Definition: bulk_data_service.h:68
char * m_data_ptr
Column data.
Definition: bulk_data_service.h:71
int16_t m_type
Column Data Type.
Definition: bulk_data_service.h:62
uint16_t m_data_len
Column data length.
Definition: bulk_data_service.h:65
Definition: bulk_data_service.h:52
const char * m_data_ptr
Column data.
Definition: bulk_data_service.h:54
size_t m_data_len
Column data length.
Definition: bulk_data_service.h:57
Definition: mysql.h:297
Row metadata.
Definition: bulk_data_service.h:276
size_t m_bitmap_length
Total bitmap header length for the row.
Definition: bulk_data_service.h:290
size_t m_first_key_len
Length of the first key column.
Definition: bulk_data_service.h:297
size_t m_header_length
Total header length.
Definition: bulk_data_service.h:293
Key_type
Key type for fast comparison.
Definition: bulk_data_service.h:278
uint32_t m_non_keys
Number of columns not used in primary Key.
Definition: bulk_data_service.h:307
uint32_t m_num_columns
Total number of columns.
Definition: bulk_data_service.h:314
uint32_t m_keys
Number of columns used in primary key.
Definition: bulk_data_service.h:304
size_t m_key_length
Key length in bytes for non-integer keys.
Definition: bulk_data_service.h:301
std::vector< Column_meta > m_columns
All columns in a row are arranged with key columns first.
Definition: bulk_data_service.h:287
Key_type m_key_type
Key type for comparison.
Definition: bulk_data_service.h:310
size_t m_approx_row_len
Approximate row length.
Definition: bulk_data_service.h:317
Definition: table.h:1403