MySQL 8.4.0
Source Code Documentation
bulk_load_service.h
Go to the documentation of this file.
1/* Copyright (c) 2022, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#pragma once
25
26/**
27 @file
28 This service provides interface for loading data in bulk from CSV files.
29
30*/
31
32#include "my_rapidjson_size_t.h"
33
35#include <rapidjson/document.h>
36#include <rapidjson/error/en.h>
37#include <rapidjson/rapidjson.h>
38#include <rapidjson/stringbuffer.h>
39#include <rapidjson/writer.h>
40
41#include <algorithm>
42#include <cctype>
43#include <cstdlib>
44#include <optional>
45#include <sstream>
46#include <string>
47#include <unordered_set>
48#include "m_string.h"
49#include "my_thread_local.h"
50
51/* Forward declaration for opaque types. */
52class THD;
53struct TABLE;
54struct CHARSET_INFO;
55
56using Bulk_loader = void;
57
58/** Bulk loader source. */
59enum class Bulk_source {
60 /** Local file system. */
61 LOCAL,
62 /** OCI object store. */
63 OCI,
64 /** Amazon S3. */
65 S3
66};
67
68inline std::string trim_left(const std::string &s) {
69 auto pos = s.find_first_not_of(" \n\r\t");
70 return s.substr(pos);
71}
72
75 const std::string &input_string, const size_t &n_files)
77 m_input_string(trim_left(input_string)),
79
80 std::string m_file_prefix;
81 std::optional<std::string> m_file_suffix;
83 size_t m_start_index{1};
84 bool m_is_dryrun{false};
85
86 bool parse(std::string &error);
87
88 std::ostream &print(std::ostream &out) const;
89
90 /** Check if the COUNT clause has been explicitly specified.
91 @return true if COUNT is specified explicitly, false otherwise. */
92 bool is_count_specified() const { return m_n_files > 0; }
93
94 private:
95 rapidjson::Document m_doc;
97 const std::string m_input_string;
98
99 /* This value can be 0, only if COUNT clause is not specified. If COUNT
100 clause is specified, this value will be greater than 0. */
101 const size_t m_n_files{0};
102};
103
104inline std::ostream &Bulk_load_file_info::print(std::ostream &out) const {
105 std::string suffix = m_file_suffix.has_value() ? m_file_suffix.value() : "";
106 out << "[Bulk_load_file_info: m_file_prefix=" << m_file_prefix << ", "
107 << "m_file_suffix=" << suffix << ", "
108 << "m_appendtolastprefix=" << m_appendtolastprefix << ", "
109 << "m_start_index=" << m_start_index << ", "
110 << "m_is_dryrun=" << m_is_dryrun << "]";
111 return out;
112}
113
114inline std::ostream &operator<<(std::ostream &out,
115 const Bulk_load_file_info &obj) {
116 return obj.print(out);
117}
118
119/** Check whether the specified argument is a valid JSON object. Used to check
120whether the user specified JSON or a regular filename as the LOAD location
121argument.
122@param[in] file_name_arg filename argument provided by the user.
123@return true if the arg is a JSON object. */
124inline static bool is_json_object(const std::string &file_name_arg) {
125 rapidjson::Document doc;
126 doc.Parse(file_name_arg.c_str());
127 return !doc.HasParseError() && doc.IsObject();
128}
129
130/** Validates whether the json argument matches the expected schema for bulk
131load, if it matches it fills out the Bulk_load_input structure, sets error
132and returns false otherwise.
133@param[out] error contains the appropriate error message.
134@param[out] info parsed structure of file information (containing prefix and
135optional suffix)
136@param[in] doc rapidjson document
137@return false if configuration contains unknown or unsupported values. */
138inline static bool parse_input_arg(std::string &error,
140 const rapidjson::Document &doc) {
141 constexpr char PREFIX_KEY[] = "url-prefix";
142 constexpr char SUFFIX_KEY[] = "url-suffix";
143 constexpr char APPENDTOLASTPREFIX_KEY[] = "url-prefix-last-append";
144 constexpr char SEQUENCE_START_KEY[] = "url-sequence-start";
145 constexpr char DRYRUN_KEY[] = "is-dryrun";
146 static const std::unordered_set<std::string> all_keys = {
147 PREFIX_KEY, SUFFIX_KEY, APPENDTOLASTPREFIX_KEY, SEQUENCE_START_KEY,
148 DRYRUN_KEY};
149
150 if (!doc.IsObject()) {
151 error = "Invalid JSON object used for filename argument!";
152 return false;
153 }
154
155 for (const auto &child : doc.GetObject()) {
156 std::string key = child.name.GetString();
157 if (all_keys.find(key) == all_keys.end()) {
158 std::stringstream ss;
159 ss << "Unsupported JSON key: " << key;
160 error = ss.str();
161 return false;
162 }
163 }
164
165 if (!doc.HasMember(PREFIX_KEY)) {
166 error = "Missing url-prefix in JSON filename argument!";
167 return false;
168 }
169
170 if (!doc[PREFIX_KEY].IsString()) {
171 std::stringstream ss;
172 ss << "The value of key " << PREFIX_KEY << " must be a string";
173 error = ss.str();
174 return false;
175 }
176
177 info.m_file_prefix = doc[PREFIX_KEY].GetString();
178
179 if (doc.HasMember(SUFFIX_KEY)) {
180 if (!info.is_count_specified()) {
182 sout << "Cannot specify " << SUFFIX_KEY << " without COUNT clause";
183 error = sout.str();
184 return false;
185 }
186
187 if (!doc[SUFFIX_KEY].IsString()) {
188 std::stringstream ss;
189 ss << "The value of key " << SUFFIX_KEY << " must be a string";
190 error = ss.str();
191 return false;
192 }
193 info.m_file_suffix = doc[SUFFIX_KEY].GetString();
194 }
195
196 if (doc.HasMember(APPENDTOLASTPREFIX_KEY)) {
197 if (!info.is_count_specified()) {
199 sout << "Cannot specify " << APPENDTOLASTPREFIX_KEY
200 << " without COUNT clause";
201 error = sout.str();
202 return false;
203 }
204 if (!doc[APPENDTOLASTPREFIX_KEY].IsString()) {
205 std::stringstream ss;
206 ss << "The value of key " << APPENDTOLASTPREFIX_KEY
207 << " must be a string";
208 error = ss.str();
209 return false;
210 }
211 info.m_appendtolastprefix = doc[APPENDTOLASTPREFIX_KEY].GetString();
212 }
213
214 if (doc.HasMember(SEQUENCE_START_KEY)) {
215 if (!info.is_count_specified()) {
217 sout << "Cannot specify " << SEQUENCE_START_KEY
218 << " without COUNT clause";
219 error = sout.str();
220 return false;
221 }
222 if (doc[SEQUENCE_START_KEY].IsInt64()) {
223 /* Check for -ve numbers and report error */
224 const int64_t val = doc[SEQUENCE_START_KEY].GetInt64();
225 if (val < 0) {
227 sout << "The value of key " << SEQUENCE_START_KEY
228 << " cannot be negative: (" << val << ")";
229 error = sout.str();
230 return false;
231 }
232 }
233 if (doc[SEQUENCE_START_KEY].IsUint64()) {
234 info.m_start_index = doc[SEQUENCE_START_KEY].GetUint64();
235 } else if (doc[SEQUENCE_START_KEY].IsString()) {
236 const std::string val = doc[SEQUENCE_START_KEY].GetString();
237 if (val.empty()) {
239 sout << "The value of key " << SEQUENCE_START_KEY << " cannot be empty";
240 error = sout.str();
241 return false;
242 } else if ((val.length() == 7) &&
243 (native_strncasecmp(val.c_str(), "default", 7) == 0)) {
244 info.m_start_index = 1;
245 } else if (std::all_of(val.begin(), val.end(),
246 [](unsigned char c) { return std::isdigit(c); })) {
247 info.m_start_index = std::strtoull(val.c_str(), nullptr, 10);
248 } else {
250 sout << "The value of key " << SEQUENCE_START_KEY << " is invalid ("
251 << val << ")";
252 error = sout.str();
253 return false;
254 }
255 } else {
257 sout << "Invalid value for key " << SEQUENCE_START_KEY;
258 error = sout.str();
259 return false;
260 }
261 }
262
263 if (doc.HasMember(DRYRUN_KEY)) {
264 if (doc[DRYRUN_KEY].IsBool()) {
265 info.m_is_dryrun = doc[DRYRUN_KEY].GetBool();
266 } else if (doc[DRYRUN_KEY].IsString()) {
267 const std::string val = doc[DRYRUN_KEY].GetString();
268 if (val == "1" || (native_strncasecmp(val.c_str(), "on", 2) == 0) ||
269 (native_strncasecmp(val.c_str(), "true", 4) == 0)) {
270 info.m_is_dryrun = true;
271 } else if (val == "0" ||
272 (native_strncasecmp(val.c_str(), "off", 3) == 0) ||
273 (native_strncasecmp(val.c_str(), "false", 5)) == 0) {
274 info.m_is_dryrun = false;
275 } else {
277 sout << "Unsupported " << DRYRUN_KEY << " value: " << val;
278 error = sout.str();
279 return false;
280 }
281 } else if (doc[DRYRUN_KEY].IsUint64()) {
282 const uint64_t val = doc[DRYRUN_KEY].GetUint64();
283 if (val == 0) {
284 info.m_is_dryrun = false;
285 } else if (val == 1) {
286 info.m_is_dryrun = true;
287 } else {
289 sout << "Unsupported " << DRYRUN_KEY << " value: " << val;
290 error = sout.str();
291 return false;
292 }
293 } else {
294 std::stringstream ss;
295 ss << "Invalid value for key " << DRYRUN_KEY;
296 error = ss.str();
297 return false;
298 }
299 } else {
300 info.m_is_dryrun = false;
301 }
302
303 error = "";
304 return true;
305}
306
307inline bool Bulk_load_file_info::parse(std::string &error) {
308 rapidjson::ParseResult ok = m_doc.Parse(m_input_string.c_str());
309 std::string parse_error;
310
311 if (!ok) {
312 parse_error = rapidjson::GetParseError_En(ok.Code());
313 }
314
315 if (!m_doc.HasParseError()) {
316 if (!parse_input_arg(error, *this, m_doc)) {
317 return false;
318 }
319 } else {
320 if (m_source == Bulk_source::OCI) {
321 auto pos = m_input_string.find(":");
322 std::string protocol = m_input_string.substr(0, pos);
323 std::for_each(protocol.begin(), protocol.end(),
324 [](unsigned char c) { return std::tolower(c); });
325 if (protocol == "http" || protocol == "https") {
326 /* Protocol is supported. */
327 } else {
329 if (protocol.starts_with('{')) {
330 sout << "Could be malformed JSON (" << parse_error << ") or ";
331 }
332 sout << "Unsupported protocol in URL";
333 error = sout.str();
334 return false;
335 }
336 } else if (m_source == Bulk_source::LOCAL) {
337 /* Nothing yet. */
338 }
339
340 /* In case json parsing failed, the file_name_arg only contains the file
341 * prefix. */
343 m_file_suffix = std::nullopt;
344 }
345 return true;
346}
347
348/** Bulk data compression algorithm. */
350
351/** Bulk loader string attributes. */
352enum class Bulk_string {
353 /** Schema name */
355 /* Table name */
357 /* File prefix URL */
359 /* File suffix */
361 /** Column terminator */
363 /** Row terminator */
364 ROW_TERM,
365 /** String to append to last file prefix. */
367};
368
369/** Bulk loader boolean attributes. */
370enum class Bulk_condition {
371 /** The algorithm used is different based on whether the data is in sorted
372 primary key order. This option tells whether to expect sorted input. */
374 /** If enclosing is optional. */
376 /** If true, the current execution is only a dry run. No need to load data
377 into the table. */
378 DRYRUN
379};
380
381/** Bulk loader size attributes. */
382enum class Bulk_size {
383 /** Number of input files. */
385 /** Number of rows to skip. */
387 /** Number of columns in the table. */
389 /** Number of concurrent loaders to use, */
391 /** Total memory size to use for LOAD in bytes. */
392 MEMORY,
393 /** Index of the first file. */
395};
396
397/** Bulk loader single byte attributes. */
398enum class Bulk_char {
399 /** Escape character. */
401 /** Column enclosing character. */
403};
404
405/** Bulk load driver service. */
407
408/**
409 Create bulk loader.
410 @param[in] thd mysql THD
411 @param[in] table mysql TABLE object
412 @param[in] src bulk loader source
413 @param[in] charset source data character set
414 @return bulk loader object, opaque type.
415*/
416DECLARE_METHOD(Bulk_loader *, create_bulk_loader,
417 (THD * thd, my_thread_id connection_id, const TABLE *table,
419/**
420 Set string attribute for loading data.
421 @param[in,out] loader bulk loader
422 @param[in] type attribute type
423 @param[in] value attribute value
424*/
425DECLARE_METHOD(void, set_string,
426 (Bulk_loader * loader, Bulk_string type, std::string value));
427/**
428 Set single byte character attribute for loading data.
429 @param[in,out] loader bulk loader
430 @param[in] type attribute type
431 @param[in] value attribute value
432*/
433DECLARE_METHOD(void, set_char,
434 (Bulk_loader * loader, Bulk_char type, unsigned char value));
435/**
436 Set size attribute for loading data.
437 @param[in,out] loader bulk loader
438 @param[in] type attribute type
439 @param[in] value attribute value
440*/
441DECLARE_METHOD(void, set_size,
442 (Bulk_loader * loader, Bulk_size type, size_t value));
443/**
444 Set boolean condition attribute for loading data.
445 @param[in,out] loader bulk loader
446 @param[in] type attribute type
447 @param[in] value attribute value
448*/
449DECLARE_METHOD(void, set_condition,
451
452/**
453 Set boolean condition attribute for loading data.
454 @param[in,out] loader bulk loader
455 @param[in] algorithm the compression algorithm used
456*/
457DECLARE_METHOD(void, set_compression_algorithm,
459
460/**
461 Load data from CSV files.
462 @param[in,out] loader bulk loader
463 @return true if successful.
464*/
465DECLARE_METHOD(bool, load, (Bulk_loader * loader, size_t &affected_rows));
466
467/**
468 Drop bulk loader.
469 @param[in,out] thd mysql THD
470 @param[in,out] loader loader object to drop
471*/
472DECLARE_METHOD(void, drop_bulk_loader, (THD * thd, Bulk_loader *loader));
473
474END_SERVICE_DEFINITION(bulk_load_driver)
Bulk_char
Bulk loader single byte attributes.
Definition: bulk_load_service.h:398
@ ENCLOSE_CHAR
Column enclosing character.
@ ESCAPE_CHAR
Escape character.
static bool parse_input_arg(std::string &error, Bulk_load_file_info &info, const rapidjson::Document &doc)
Validates whether the json argument matches the expected schema for bulk load, if it matches it fills...
Definition: bulk_load_service.h:138
Bulk_condition
Bulk loader boolean attributes.
Definition: bulk_load_service.h:370
@ OPTIONAL_ENCLOSE
If enclosing is optional.
@ ORDERED_DATA
The algorithm used is different based on whether the data is in sorted primary key order.
@ DRYRUN
If true, the current execution is only a dry run.
Bulk_source
Bulk loader source.
Definition: bulk_load_service.h:59
@ LOCAL
Local file system.
@ OCI
OCI object store.
@ S3
Amazon S3.
Bulk_compression_algorithm
Bulk data compression algorithm.
Definition: bulk_load_service.h:349
std::ostream & operator<<(std::ostream &out, const Bulk_load_file_info &obj)
Definition: bulk_load_service.h:114
static bool is_json_object(const std::string &file_name_arg)
Check whether the specified argument is a valid JSON object.
Definition: bulk_load_service.h:124
std::string trim_left(const std::string &s)
Definition: bulk_load_service.h:68
void Bulk_loader
Definition: bulk_load_service.h:56
Bulk_string
Bulk loader string attributes.
Definition: bulk_load_service.h:352
@ COLUMN_TERM
Column terminator.
@ ROW_TERM
Row terminator.
@ APPENDTOLASTPREFIX
String to append to last file prefix.
@ SCHEMA_NAME
Schema name.
Bulk_size
Bulk loader size attributes.
Definition: bulk_load_service.h:382
@ COUNT_COLUMNS
Number of columns in the table.
@ MEMORY
Total memory size to use for LOAD in bytes.
@ START_INDEX
Index of the first file.
@ CONCURRENCY
Number of concurrent loaders to use,.
@ COUNT_ROW_SKIP
Number of rows to skip.
@ COUNT_FILES
Number of input files.
static Mysys_charset_loader * loader
Definition: charset.cc:185
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
static int native_strncasecmp(const char *s1, const char *s2, size_t n)
Definition: m_string.h:216
Define rapidjson::SizeType to be std::size_t.
uint32 my_thread_id
Definition: my_thread_local.h:34
void error(const char *format,...)
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:323
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
const std::string charset("charset")
ulong n_files
Number of files to use for the double write buffer.
Definition: buf0dblwr.cc:79
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2870
required string key
Definition: replication_asynchronous_connection_failover.proto:60
repeated Source source
Definition: replication_asynchronous_connection_failover.proto:42
required string type
Definition: replication_group_member_actions.proto:34
#define DECLARE_METHOD(retval, name, args)
Declares a method as a part of the Service definition.
Definition: service.h:103
#define END_SERVICE_DEFINITION(name)
A macro to end the last Service definition started with the BEGIN_SERVICE_DEFINITION macro.
Definition: service.h:91
#define BEGIN_SERVICE_DEFINITION(name)
Declares a new Service.
Definition: service.h:86
Definition: bulk_load_service.h:73
size_t m_start_index
Definition: bulk_load_service.h:83
std::string m_appendtolastprefix
Definition: bulk_load_service.h:82
std::ostream & print(std::ostream &out) const
Definition: bulk_load_service.h:104
std::string m_file_prefix
Definition: bulk_load_service.h:80
const std::string m_input_string
Definition: bulk_load_service.h:97
rapidjson::Document m_doc
Definition: bulk_load_service.h:95
bool is_count_specified() const
Check if the COUNT clause has been explicitly specified.
Definition: bulk_load_service.h:92
bool m_is_dryrun
Definition: bulk_load_service.h:84
const size_t m_n_files
Definition: bulk_load_service.h:101
const Bulk_source m_source
Definition: bulk_load_service.h:96
Bulk_load_file_info(const Bulk_source &source, const std::string &input_string, const size_t &n_files)
Definition: bulk_load_service.h:74
bool parse(std::string &error)
Definition: bulk_load_service.h:307
std::optional< std::string > m_file_suffix
Definition: bulk_load_service.h:81
Definition: m_ctype.h:423
Definition: table.h:1405