MySQL 8.4.6
Source Code Documentation
connection.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
28
29#include <algorithm>
30#include <atomic>
31#include <bitset>
32#include <cctype>
33#include <list>
34#include <map>
35#include <sstream>
36#include <string>
37#include <utility>
38#include <vector>
39
40#include "cno/core.h"
44#include "http/base/http_time.h"
45#include "http/base/method.h"
50#include "http/cno/error_code.h"
51#include "http/cno/string.h"
52
56
57namespace http {
58namespace base {
59
60namespace impl {
61
62inline void set_socket_parent(net::ip::tcp::socket *, const char *) {
63 // Do nothing, net::ip::tcp::socket, is missing the custom
64 // `set_parent` method.
65}
66
69 return s;
70}
71
72template <typename T>
74 return get_socket(&s->lower_layer());
75}
76
77template <typename T>
78auto *get_socket1(T *s) {
79 return &s->lower_layer();
80}
81
82template <typename T>
83const net::ip::tcp::socket *get_socket(const T *s) {
84 return get_socket(&s->lower_layer());
85}
86
87template <typename T>
88void set_socket_parent(T *s, const char *parent) {
89 s->set_parent(parent);
90
92}
93
94} // namespace impl
95
96enum Pending {
100 k_pending_writing = 1 << 3
102
103template <typename IOLayer>
105 public:
112 using IO = IOLayer;
113
114 public:
115 Connection(IOLayer s, base::method::Bitset *allowed_method,
116 ConnectionStatusCallbacks *connection_handler,
117 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
118 : socket_(std::move(s)),
119 allowed_method_(allowed_method),
120 connection_handler_{connection_handler} {
121 std::stringstream ss;
122 ss << "HTTP-" << this;
123
124 socket_.set_option(net::ip::tcp::no_delay{true});
125 impl::set_socket_parent(&socket_, ss.str().c_str());
126 cno_init(&cno_, kind);
127 cno_.disallow_h2_prior_knowledge = 1;
128 cno::callback_init(&cno_, this);
129 output_buffers_.emplace_back(4096);
130 cno_begin(&cno_, version);
131 }
132
133 ~Connection() override {
134 cno_fini(&cno_);
135 socket_.close();
136 }
137
138 public: // ConnectionInterface implementation
139 void shutdown(bool) override {
141 }
142
143 bool send(const uint32_t *stream_id_ptr, const int status_code,
144 const std::string &method, const std::string &path,
145 const Headers &headers, const IOBuffer &data) override {
146 cno_message_t message;
147 std::vector<cno_header_t> cno_header(headers.size(), cno_header_t());
148 std::vector<std::string> http2_headers_names;
149 const bool only_header = 0 == data.length();
150
151 if (CNO_HTTP2 == cno_.mode) {
152 http2_headers_names.reserve(headers.size());
153 }
154
155 auto output = cno_header.data();
156
157 for (const auto &entry : headers) {
158 if (CNO_HTTP2 == cno_.mode) {
159 auto &header_name =
160 http2_headers_names.emplace_back(make_lower(entry.first));
161 output->name.size = header_name.length();
162 output->name.data = header_name.c_str();
163 } else {
164 output->name.size = entry.first.length();
165 output->name.data = entry.first.c_str();
166 }
167
168 output->value.size = entry.second.length();
169 output->value.data = entry.second.c_str();
170 ++output;
171 }
172
173 message.code = status_code;
174 message.headers = cno_header.data();
175 message.headers_len = cno_header.size();
176 message.path.data = path.c_str();
177 message.path.size = path.length();
178 message.method.data = method.c_str();
179 message.method.size = method.length();
180
181 uint32_t stream_id =
182 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&cno_);
183 if (CNO_OK != cno_write_head(&cno_, stream_id, &message, only_header)) {
184 return false;
185 }
186
187 if (!only_header) {
188 return CNO_OK == cno_write_data(&cno_, stream_id, data.get().c_str(),
189 data.length(), true);
190 }
191
192 return true;
193 }
194
195 std::string get_peer_address() const override {
196 auto *s = impl::get_socket(&socket_);
197 if (auto e = s->remote_endpoint()) {
198 return e->address().to_string();
199 }
200
201 return {};
202 }
203
204 uint16_t get_peer_port() const override {
205 auto *s = impl::get_socket(&socket_);
206 if (auto e = s->remote_endpoint()) {
207 return e->port();
208 }
209
210 return 0;
211 }
212
213 IOLayer &get_socket() { return socket_; }
214
215 void start() override { do_net_recv(); }
216
217 protected:
218 std::string make_lower(std::string s) {
219 std::transform(s.begin(), s.end(), s.begin(), ::tolower);
220 return s;
221 }
222
223 void do_net_send() {
225 [this](std::error_code error, auto size) {
226 switch (on_net_send(error, size)) {
227 case k_pending_none:
228 break;
229
231 break;
232
234 do_net_send();
235 break;
236
240 break;
241 }
242 });
243 }
244
245 void do_net_recv() {
246 reading_pending_ = true;
247 socket_.async_receive(
248 input_mutable_buffer_, [this](std::error_code error, auto size) {
249 switch (on_net_receive(error, size)) {
251 do_net_recv();
252 break;
253
255 case k_pending_none:
256 reading_pending_ = false;
257 break;
258
260 reading_pending_ = false;
263 break;
264 }
265 });
266 }
267
268 Pending on_net_receive(const std::error_code &ec,
269 std::size_t bytes_transferred) {
270 if (!running_) {
272 }
273
274 if (ec) {
277 }
278
279 const int result = cno_consume(
280 &cno_, reinterpret_cast<char *>(input_buffer_), bytes_transferred);
281
282 if (result < 0) {
283 auto ec = make_error_code(cno_error());
286 }
287
288 if (!keep_alive_) {
290 }
291
292 if (!running_) {
294 }
295
296 if (suspend_) return k_pending_none;
297
298 return k_pending_reading;
299 }
300
301 Pending on_net_send(const std::error_code &ec, size_t size) {
302 bool has_more = true;
303 bool should_close = false;
304 {
305 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
306
307 if (!ec) {
308 while (size) {
309 auto &page = output_buffers_.front();
310 auto size_on_page = std::min(page.size(), size);
311 page += size_on_page;
312 size -= size_on_page;
313
314 if (page.empty()) {
315 if (1 == output_buffers_.size()) {
316 page.reset();
317 } else {
318 output_buffers_.pop_front();
319 }
320 }
321 }
322 }
323
324 if (0 == output_buffers_.front().size()) {
325 has_more = false;
326 output_pending_ = false;
327
328 if (!running_) {
329 should_close = true;
330 }
331 }
332 }
333
334 if (ec) {
335 stop_running();
336 output_pending_ = false;
337
340 }
341 if (has_more) return k_pending_writing;
342
344
345 if (should_close) {
347 }
348 if (suspend_) return k_pending_none;
349
350 return k_pending_reading;
351 }
352
353 void resume() { suspend_ = false; }
354 void suspend() { suspend_ = true; }
355
356 /**
357 * Mark the connection that it should stop running
358 *
359 * @returns information if the object may be delete
360 * @retval 'false' Connection object can be removed immediately
361 * @retval 'true' Connection object must wait until IO is finished.
362 */
364 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
365 running_ = false;
366
367 return output_pending_;
368 }
369
370 protected:
371 virtual void on_output_buffer_empty() {}
372
373 protected: // CnoInterface implementation
374 int on_cno_writev(const cno_buffer_t *buffer, size_t count) override {
375 bool was_first = false;
376 {
377 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
379
380 bool expected = false;
381 if (impl::get_socket(&socket_)->is_open())
382 was_first = output_pending_.compare_exchange_weak(expected, true);
383 auto source_it = buffers.begin();
384
385 while (source_it != buffers.end()) {
386 // The constructor fills the output buffer with single page
387 // and all algorithms that clear not used pages, leave at
388 // last one page.
389 // thus we do not need to check if there are no pages.
390 auto &obuffer = output_buffers_.back();
391
392 if (0 == source_it->size()) {
393 ++source_it;
394 continue;
395 }
396
397 if (0 == obuffer.space_left()) {
398 output_buffers_.emplace_back(4096);
399 continue;
400 }
401
402 (*source_it) += obuffer.write(
403 static_cast<const uint8_t *>(source_it->data()), source_it->size());
404 }
405 }
406
407 if (was_first) {
408 do_net_send();
409 }
410
411 return 0;
412 }
413
414 int on_cno_message_tail([[maybe_unused]] const uint32_t session_id,
415 [[maybe_unused]] const cno_tail_t *tail) override {
416 // processed_request_ = true;
417 return 0;
418 }
419
420 int on_cno_stream_start([[maybe_unused]] const uint32_t id) override {
421 return 0;
422 }
423
424 int on_cno_close() override {
425 keep_alive_ = false;
426 return 0;
427 }
428
429 protected:
430 bool keep_alive_{true};
431 IOLayer socket_;
433 cno_connection_t cno_;
434
435 uint8_t input_buffer_[512];
437
439 std::list<owned_buffer> output_buffers_;
440
441 std::atomic<bool> output_pending_{false};
442 std::atomic<bool> reading_pending_{false};
443 std::atomic<bool> running_{true};
444 std::atomic<bool> suspend_{false};
445
447};
448
449} // namespace base
450} // namespace http
451
452#endif // ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:104
int on_cno_close() override
Definition: connection.h:424
void shutdown(bool) override
Definition: connection.h:139
void start() override
Definition: connection.h:215
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:414
std::string make_lower(std::string s)
Definition: connection.h:218
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:363
base::method::Bitset Methods
Definition: connection.h:108
virtual void on_output_buffer_empty()
Definition: connection.h:371
std::atomic< bool > suspend_
Definition: connection.h:444
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:301
Methods * allowed_method_
Definition: connection.h:432
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:446
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:374
std::string get_peer_address() const override
Definition: connection.h:195
void do_net_recv()
Definition: connection.h:245
uint16_t get_peer_port() const override
Definition: connection.h:204
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:143
IOLayer IO
Definition: connection.h:112
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:111
void suspend()
Definition: connection.h:354
cno_connection_t cno_
Definition: connection.h:433
std::atomic< bool > reading_pending_
Definition: connection.h:442
bool keep_alive_
Definition: connection.h:430
std::list< owned_buffer > output_buffers_
Definition: connection.h:439
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:268
void resume()
Definition: connection.h:353
std::atomic< bool > output_pending_
Definition: connection.h:441
uint8_t input_buffer_[512]
Definition: connection.h:435
IOLayer & get_socket()
Definition: connection.h:213
std::mutex output_buffer_mutex_
Definition: connection.h:438
IOLayer socket_
Definition: connection.h:431
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:420
std::atomic< bool > running_
Definition: connection.h:443
~Connection() override
Definition: connection.h:133
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:436
void do_net_send()
Definition: connection.h:223
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:115
headers of a HTTP response/request.
Definition: headers.h:43
virtual uint32_t size() const
Definition: headers.cc:85
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
Definition: buffer.h:113
static constexpr shutdown_type shutdown_send
Definition: socket.h:185
int page
Definition: ctype-mb.cc:1234
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:149
bool transform(const dd::Spatial_reference_system *source_srs, const Geometry &in, const dd::Spatial_reference_system *target_srs, const char *func_name, std::unique_ptr< Geometry > *out) noexcept
Transforms a geometry from one SRS to another.
Definition: transform.cc:216
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:62
auto * get_socket1(T *s)
Definition: connection.h:78
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:67
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:96
@ k_pending_none
Definition: connection.h:97
@ k_pending_reading
Definition: connection.h:99
@ k_pending_closing
Definition: connection.h:98
@ k_pending_writing
Definition: connection.h:100
Request::Headers Headers
Definition: request.cc:35
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:91
Definition: connection.h:57
Definition: http_server_component.cc:34
char tolower(const char &ch)
Definition: parsing_helpers.h:41
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35
Definition: result.h:30