35#include <rapidjson/document.h>
36#include <rapidjson/error/en.h>
37#include <rapidjson/rapidjson.h>
38#include <rapidjson/stringbuffer.h>
39#include <rapidjson/writer.h>
47#include <unordered_set>
69 auto pos = s.find_first_not_of(
" \n\r\t");
75 const std::string &input_string,
const size_t &
n_files)
88 std::ostream &
print(std::ostream &out)
const;
106 out <<
"[Bulk_load_file_info: m_file_prefix=" <<
m_file_prefix <<
", "
107 <<
"m_file_suffix=" << suffix <<
", "
116 return obj.
print(out);
125 rapidjson::Document doc;
126 doc.Parse(file_name_arg.c_str());
127 return !doc.HasParseError() && doc.IsObject();
140 const rapidjson::Document &doc) {
141 constexpr char PREFIX_KEY[] =
"url-prefix";
142 constexpr char SUFFIX_KEY[] =
"url-suffix";
143 constexpr char APPENDTOLASTPREFIX_KEY[] =
"url-prefix-last-append";
144 constexpr char SEQUENCE_START_KEY[] =
"url-sequence-start";
145 constexpr char DRYRUN_KEY[] =
"is-dryrun";
146 static const std::unordered_set<std::string> all_keys = {
147 PREFIX_KEY, SUFFIX_KEY, APPENDTOLASTPREFIX_KEY, SEQUENCE_START_KEY,
150 if (!doc.IsObject()) {
151 error =
"Invalid JSON object used for filename argument!";
155 for (
const auto &child : doc.GetObject()) {
156 std::string
key = child.name.GetString();
157 if (all_keys.find(
key) == all_keys.end()) {
158 std::stringstream ss;
159 ss <<
"Unsupported JSON key: " <<
key;
165 if (!doc.HasMember(PREFIX_KEY)) {
166 error =
"Missing url-prefix in JSON filename argument!";
170 if (!doc[PREFIX_KEY].IsString()) {
171 std::stringstream ss;
172 ss <<
"The value of key " << PREFIX_KEY <<
" must be a string";
179 if (doc.HasMember(SUFFIX_KEY)) {
182 sout <<
"Cannot specify " << SUFFIX_KEY <<
" without COUNT clause";
187 if (!doc[SUFFIX_KEY].IsString()) {
188 std::stringstream ss;
189 ss <<
"The value of key " << SUFFIX_KEY <<
" must be a string";
196 if (doc.HasMember(APPENDTOLASTPREFIX_KEY)) {
199 sout <<
"Cannot specify " << APPENDTOLASTPREFIX_KEY
200 <<
" without COUNT clause";
204 if (!doc[APPENDTOLASTPREFIX_KEY].IsString()) {
205 std::stringstream ss;
206 ss <<
"The value of key " << APPENDTOLASTPREFIX_KEY
207 <<
" must be a string";
214 if (doc.HasMember(SEQUENCE_START_KEY)) {
217 sout <<
"Cannot specify " << SEQUENCE_START_KEY
218 <<
" without COUNT clause";
222 if (doc[SEQUENCE_START_KEY].IsInt64()) {
224 const int64_t val = doc[SEQUENCE_START_KEY].GetInt64();
227 sout <<
"The value of key " << SEQUENCE_START_KEY
228 <<
" cannot be negative: (" << val <<
")";
233 if (doc[SEQUENCE_START_KEY].IsUint64()) {
235 }
else if (doc[SEQUENCE_START_KEY].IsString()) {
236 const std::string val = doc[SEQUENCE_START_KEY].GetString();
239 sout <<
"The value of key " << SEQUENCE_START_KEY <<
" cannot be empty";
242 }
else if ((val.length() == 7) &&
245 }
else if (std::all_of(val.begin(), val.end(),
246 [](
unsigned char c) { return std::isdigit(c); })) {
247 info.
m_start_index = std::strtoull(val.c_str(),
nullptr, 10);
250 sout <<
"The value of key " << SEQUENCE_START_KEY <<
" is invalid ("
257 sout <<
"Invalid value for key " << SEQUENCE_START_KEY;
263 if (doc.HasMember(DRYRUN_KEY)) {
264 if (doc[DRYRUN_KEY].IsBool()) {
266 }
else if (doc[DRYRUN_KEY].IsString()) {
267 const std::string val = doc[DRYRUN_KEY].GetString();
271 }
else if (val ==
"0" ||
277 sout <<
"Unsupported " << DRYRUN_KEY <<
" value: " << val;
281 }
else if (doc[DRYRUN_KEY].IsUint64()) {
282 const uint64_t val = doc[DRYRUN_KEY].GetUint64();
285 }
else if (val == 1) {
289 sout <<
"Unsupported " << DRYRUN_KEY <<
" value: " << val;
294 std::stringstream ss;
295 ss <<
"Invalid value for key " << DRYRUN_KEY;
309 std::string parse_error;
312 parse_error = rapidjson::GetParseError_En(
ok.Code());
315 if (!
m_doc.HasParseError()) {
324 [](
unsigned char c) { return std::tolower(c); });
325 if (protocol ==
"http" || protocol ==
"https") {
329 if (protocol.starts_with(
'{')) {
330 sout <<
"Could be malformed JSON (" << parse_error <<
") or ";
332 sout <<
"Unsupported protocol in URL";
Bulk_char
Bulk loader single byte attributes.
Definition: bulk_load_service.h:398
@ ENCLOSE_CHAR
Column enclosing character.
@ ESCAPE_CHAR
Escape character.
static bool parse_input_arg(std::string &error, Bulk_load_file_info &info, const rapidjson::Document &doc)
Validates whether the json argument matches the expected schema for bulk load, if it matches it fills...
Definition: bulk_load_service.h:138
Bulk_condition
Bulk loader boolean attributes.
Definition: bulk_load_service.h:370
@ OPTIONAL_ENCLOSE
If enclosing is optional.
@ ORDERED_DATA
The algorithm used is different based on whether the data is in sorted primary key order.
@ DRYRUN
If true, the current execution is only a dry run.
Bulk_source
Bulk loader source.
Definition: bulk_load_service.h:59
@ LOCAL
Local file system.
Bulk_compression_algorithm
Bulk data compression algorithm.
Definition: bulk_load_service.h:349
std::ostream & operator<<(std::ostream &out, const Bulk_load_file_info &obj)
Definition: bulk_load_service.h:114
static bool is_json_object(const std::string &file_name_arg)
Check whether the specified argument is a valid JSON object.
Definition: bulk_load_service.h:124
std::string trim_left(const std::string &s)
Definition: bulk_load_service.h:68
void Bulk_loader
Definition: bulk_load_service.h:56
Bulk_string
Bulk loader string attributes.
Definition: bulk_load_service.h:352
@ COLUMN_TERM
Column terminator.
@ ROW_TERM
Row terminator.
@ APPENDTOLASTPREFIX
String to append to last file prefix.
@ SCHEMA_NAME
Schema name.
Bulk_size
Bulk loader size attributes.
Definition: bulk_load_service.h:382
@ COUNT_COLUMNS
Number of columns in the table.
@ MEMORY
Total memory size to use for LOAD in bytes.
@ START_INDEX
Index of the first file.
@ CONCURRENCY
Number of concurrent loaders to use,.
@ COUNT_ROW_SKIP
Number of rows to skip.
@ COUNT_FILES
Number of input files.
static Mysys_charset_loader * loader
Definition: charset.cc:185
For each client connection we create a separate thread with THD serving as a thread/connection descri...
Definition: sql_lexer_thd.h:36
static int native_strncasecmp(const char *s1, const char *s2, size_t n)
Definition: m_string.h:216
Define rapidjson::SizeType to be std::uint64_t.
uint32 my_thread_id
Definition: my_thread_local.h:34
void error(const char *format,...)
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:323
static PFS_engine_table_share_proxy table
Definition: pfs.cc:61
const std::string charset("charset")
ulong n_files
Number of files to use for the double write buffer.
Definition: buf0dblwr.cc:79
bool load(THD *, const dd::String_type &fname, dd::String_type *buf)
Read an sdi file from disk and store in a buffer.
Definition: sdi_file.cc:308
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2871
required string key
Definition: replication_asynchronous_connection_failover.proto:60
repeated Source source
Definition: replication_asynchronous_connection_failover.proto:42
required string type
Definition: replication_group_member_actions.proto:34
#define DECLARE_METHOD(retval, name, args)
Declares a method as a part of the Service definition.
Definition: service.h:103
#define END_SERVICE_DEFINITION(name)
A macro to end the last Service definition started with the BEGIN_SERVICE_DEFINITION macro.
Definition: service.h:91
#define BEGIN_SERVICE_DEFINITION(name)
Declares a new Service.
Definition: service.h:86
Definition: bulk_load_service.h:73
size_t m_start_index
Definition: bulk_load_service.h:83
std::string m_appendtolastprefix
Definition: bulk_load_service.h:82
std::ostream & print(std::ostream &out) const
Definition: bulk_load_service.h:104
std::string m_file_prefix
Definition: bulk_load_service.h:80
const std::string m_input_string
Definition: bulk_load_service.h:97
rapidjson::Document m_doc
Definition: bulk_load_service.h:95
bool is_count_specified() const
Check if the COUNT clause has been explicitly specified.
Definition: bulk_load_service.h:92
bool m_is_dryrun
Definition: bulk_load_service.h:84
const size_t m_n_files
Definition: bulk_load_service.h:101
const Bulk_source m_source
Definition: bulk_load_service.h:96
Bulk_load_file_info(const Bulk_source &source, const std::string &input_string, const size_t &n_files)
Definition: bulk_load_service.h:74
bool parse(std::string &error)
Definition: bulk_load_service.h:307
std::optional< std::string > m_file_suffix
Definition: bulk_load_service.h:81
Definition: m_ctype.h:423