MySQL 9.3.0
Source Code Documentation
connection.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
28
29#include <algorithm>
30#include <atomic>
31#include <bitset>
32#include <list>
33#include <map>
34#include <sstream>
35#include <string>
36#include <utility>
37#include <vector>
38
39#include "cno/core.h"
42#include "http/base/details/owned_buffer.h"
43#include "http/base/http_time.h"
44#include "http/base/method.h"
49#include "http/cno/error_code.h"
50#include "http/cno/string.h"
51
54
55namespace http {
56namespace base {
57
58namespace impl {
59
60inline void set_socket_parent(net::ip::tcp::socket *, const char *) {
61 // Do nothing, net::ip::tcp::socket, is missing the custom
62 // `set_parent` method.
63}
64
67 return s;
68}
69
70template <typename T>
72 return get_socket(&s->lower_layer());
73}
74
75template <typename T>
76auto *get_socket1(T *s) {
77 return &s->lower_layer();
78}
79
80template <typename T>
82 return get_socket(&s->lower_layer());
83}
84
85template <typename T>
86void set_socket_parent(T *s, const char *parent) {
87 s->set_parent(parent);
88
90}
91
92} // namespace impl
93
94enum Pending {
98 k_pending_writing = 1 << 3
99};
100
101template <typename IOLayer>
103 public:
110 using IO = IOLayer;
111
112 public:
113 Connection(IOLayer s, base::method::Bitset *allowed_method,
114 ConnectionStatusCallbacks *connection_handler,
115 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
116 : socket_(std::move(s)),
117 allowed_method_(allowed_method),
118 connection_handler_{connection_handler} {
119 std::stringstream ss;
120 ss << "HTTP-" << this;
121
122 socket_.set_option(net::ip::tcp::no_delay{true});
123 impl::set_socket_parent(&socket_, ss.str().c_str());
124 cno_init(&cno_, kind);
125 cno::callback_init(&cno_, this);
126 output_buffers_.emplace_back(4096);
127 cno_begin(&cno_, version);
128 }
129
130 ~Connection() override {
131 cno_fini(&cno_);
132 socket_.close();
133 }
134
135 public: // ConnectionInterface implementation
136 void shutdown(bool) override {
138 }
139
140 bool send(const uint32_t *stream_id_ptr, const int status_code,
141 const std::string &method, const std::string &path,
142 const Headers &headers, const IOBuffer &data) override {
143 cno_message_t message;
144 std::vector<cno_header_t> cno_header(headers.size(), cno_header_t());
145 const bool only_header = 0 == data.length();
146
147 auto output = cno_header.data();
148
149 for (const auto &entry : headers) {
150 output->name.size = entry.first.length();
151 output->name.data = entry.first.c_str();
152
153 output->value.size = entry.second.length();
154 output->value.data = entry.second.c_str();
155 ++output;
156 }
157
158 message.code = status_code;
159 message.headers = cno_header.data();
160 message.headers_len = cno_header.size();
161 message.path.data = path.c_str();
162 message.path.size = path.length();
163 message.method.data = method.c_str();
164 message.method.size = method.length();
165
166 uint32_t stream_id =
167 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&cno_);
168 if (CNO_OK != cno_write_head(&cno_, stream_id, &message, only_header)) {
169 return false;
170 }
171
172 if (!only_header) {
173 return CNO_OK == cno_write_data(&cno_, stream_id, data.get().c_str(),
174 data.length(), true);
175 }
176
177 return true;
178 }
179
180 std::string get_peer_address() const override {
181 auto *s = impl::get_socket(&socket_);
182 if (auto e = s->remote_endpoint()) {
183 return e->address().to_string();
184 }
185
186 return {};
187 }
188
189 uint16_t get_peer_port() const override {
190 auto *s = impl::get_socket(&socket_);
191 if (auto e = s->remote_endpoint()) {
192 return e->port();
193 }
194
195 return 0;
196 }
197
198 IOLayer &get_socket() { return socket_; }
199
200 void start() override { do_net_recv(); }
201
202 protected:
203 void do_net_send() {
205 [this](std::error_code error, auto size) {
206 switch (on_net_send(error, size)) {
207 case k_pending_none:
208 break;
209
211 break;
212
214 do_net_send();
215 break;
216
220 break;
221 }
222 });
223 }
224
225 void do_net_recv() {
226 reading_pending_ = true;
227 socket_.async_receive(
228 input_mutable_buffer_, [this](std::error_code error, auto size) {
229 switch (on_net_receive(error, size)) {
231 do_net_recv();
232 break;
233
235 case k_pending_none:
236 reading_pending_ = false;
237 break;
238
240 reading_pending_ = false;
243 break;
244 }
245 });
246 }
247
248 Pending on_net_receive(const std::error_code &ec,
249 std::size_t bytes_transferred) {
250 if (!running_) {
252 }
253
254 if (ec) {
257 }
258
259 const int result = cno_consume(
260 &cno_, reinterpret_cast<char *>(input_buffer_), bytes_transferred);
261
262 if (result < 0) {
263 auto ec = make_error_code(cno_error());
266 }
267
268 if (!keep_alive_) {
270 }
271
272 if (!running_) {
274 }
275
276 if (suspend_) return k_pending_none;
277
278 return k_pending_reading;
279 }
280
281 Pending on_net_send(const std::error_code &ec, size_t size) {
282 bool has_more = true;
283 bool should_close = false;
284 {
285 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
286
287 if (!ec) {
288 while (size) {
289 auto &page = output_buffers_.front();
290 auto size_on_page = std::min(page.size(), size);
291 page += size_on_page;
292 size -= size_on_page;
293
294 if (page.empty()) {
295 if (1 == output_buffers_.size()) {
296 page.reset();
297 } else {
298 output_buffers_.pop_front();
299 }
300 }
301 }
302 }
303
304 if (0 == output_buffers_.front().size()) {
305 has_more = false;
306 output_pending_ = false;
307
308 if (!running_) {
309 should_close = true;
310 }
311 }
312 }
313
314 if (ec) {
315 stop_running();
316 output_pending_ = false;
317
320 }
321 if (has_more) return k_pending_writing;
322
324
325 if (should_close) {
327 }
328 if (suspend_) return k_pending_none;
329
330 return k_pending_reading;
331 }
332
333 void resume() { suspend_ = false; }
334 void suspend() { suspend_ = true; }
335
336 /**
337 * Mark the connection that it should stop running
338 *
339 * @returns information if the object may be delete
340 * @retval 'false' Connection object can be removed immediately
341 * @retval 'true' Connection object must wait until IO is finished.
342 */
344 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
345 running_ = false;
346
347 return output_pending_;
348 }
349
350 protected:
351 virtual void on_output_buffer_empty() {}
352
353 protected: // CnoInterface implementation
354 int on_cno_writev(const cno_buffer_t *buffer, size_t count) override {
355 bool was_first = false;
356 {
357 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
359
360 bool expected = false;
361 if (impl::get_socket(&socket_)->is_open())
362 was_first = output_pending_.compare_exchange_weak(expected, true);
363 auto source_it = buffers.begin();
364
365 while (source_it != buffers.end()) {
366 // The constructor fills the output buffer with single page
367 // and all algorithms that clear not used pages, leave at
368 // last one page.
369 // thus we do not need to check if there are no pages.
370 auto &obuffer = output_buffers_.back();
371
372 if (0 == source_it->size()) {
373 ++source_it;
374 continue;
375 }
376
377 if (0 == obuffer.space_left()) {
378 output_buffers_.emplace_back(4096);
379 continue;
380 }
381
382 (*source_it) += obuffer.write(
383 static_cast<const uint8_t *>(source_it->data()), source_it->size());
384 }
385 }
386
387 if (was_first) {
388 do_net_send();
389 }
390
391 return 0;
392 }
393
394 int on_cno_message_tail([[maybe_unused]] const uint32_t session_id,
395 [[maybe_unused]] const cno_tail_t *tail) override {
396 // processed_request_ = true;
397 return 0;
398 }
399
400 int on_cno_stream_start([[maybe_unused]] const uint32_t id) override {
401 return 0;
402 }
403
404 int on_cno_close() override {
405 keep_alive_ = false;
406 return 0;
407 }
408
409 protected:
410 bool keep_alive_{true};
411 IOLayer socket_;
413 cno_connection_t cno_;
414
415 uint8_t input_buffer_[512];
417
419 std::list<owned_buffer> output_buffers_;
420
421 std::atomic<bool> output_pending_{false};
422 std::atomic<bool> reading_pending_{false};
423 std::atomic<bool> running_{true};
424 std::atomic<bool> suspend_{false};
425
427};
428
429} // namespace base
430} // namespace http
431
432#endif // ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:102
int on_cno_close() override
Definition: connection.h:404
void shutdown(bool) override
Definition: connection.h:136
void start() override
Definition: connection.h:200
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:394
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:343
base::method::Bitset Methods
Definition: connection.h:106
virtual void on_output_buffer_empty()
Definition: connection.h:351
std::atomic< bool > suspend_
Definition: connection.h:424
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:281
Methods * allowed_method_
Definition: connection.h:412
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:426
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:354
std::string get_peer_address() const override
Definition: connection.h:180
void do_net_recv()
Definition: connection.h:225
uint16_t get_peer_port() const override
Definition: connection.h:189
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:140
IOLayer IO
Definition: connection.h:110
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:109
void suspend()
Definition: connection.h:334
cno_connection_t cno_
Definition: connection.h:413
std::atomic< bool > reading_pending_
Definition: connection.h:422
bool keep_alive_
Definition: connection.h:410
std::list< owned_buffer > output_buffers_
Definition: connection.h:419
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:248
void resume()
Definition: connection.h:333
std::atomic< bool > output_pending_
Definition: connection.h:421
uint8_t input_buffer_[512]
Definition: connection.h:415
IOLayer & get_socket()
Definition: connection.h:198
std::mutex output_buffer_mutex_
Definition: connection.h:418
IOLayer socket_
Definition: connection.h:411
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:400
std::atomic< bool > running_
Definition: connection.h:423
~Connection() override
Definition: connection.h:130
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:416
void do_net_send()
Definition: connection.h:203
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:113
headers of a HTTP response/request.
Definition: headers.h:43
virtual uint32_t size() const
Definition: headers.cc:84
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
Definition: buffer.h:113
static constexpr shutdown_type shutdown_send
Definition: socket.h:185
int page
Definition: ctype-mb.cc:1226
#define T
Definition: jit_executor_value.cc:373
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:150
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:60
auto * get_socket1(T *s)
Definition: connection.h:76
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:65
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:94
@ k_pending_none
Definition: connection.h:95
@ k_pending_reading
Definition: connection.h:97
@ k_pending_closing
Definition: connection.h:96
@ k_pending_writing
Definition: connection.h:98
Request::Headers Headers
Definition: request.cc:34
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:90
Definition: connection.h:55
Definition: http_server_component.cc:34
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35
Definition: result.h:30