MySQL 9.4.0
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
connection.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
28
29#include <algorithm>
30#include <atomic>
31#include <bitset>
32#include <list>
33#include <map>
34#include <sstream>
35#include <string>
36#include <utility>
37#include <vector>
38
39#include "cno/core.h"
42#include "http/base/details/owned_buffer.h"
43#include "http/base/http_time.h"
44#include "http/base/method.h"
49#include "http/cno/error_code.h"
50#include "http/cno/string.h"
51
55
56namespace http {
57namespace base {
58
59namespace impl {
60
61inline void set_socket_parent(net::ip::tcp::socket *, const char *) {
62 // Do nothing, net::ip::tcp::socket, is missing the custom
63 // `set_parent` method.
64}
65
68 return s;
69}
70
71template <typename T>
73 return get_socket(&s->lower_layer());
74}
75
76template <typename T>
77auto *get_socket1(T *s) {
78 return &s->lower_layer();
79}
80
81template <typename T>
83 return get_socket(&s->lower_layer());
84}
85
86template <typename T>
87void set_socket_parent(T *s, const char *parent) {
88 s->set_parent(parent);
89
91}
92
93} // namespace impl
94
95enum Pending {
99 k_pending_writing = 1 << 3
101
102template <typename IOLayer>
104 public:
111 using IO = IOLayer;
112
113 public:
114 Connection(IOLayer s, base::method::Bitset *allowed_method,
115 ConnectionStatusCallbacks *connection_handler,
116 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
117 : socket_(std::move(s)),
118 allowed_method_(allowed_method),
119 connection_handler_{connection_handler} {
120 std::stringstream ss;
121 ss << "HTTP-" << this;
122
123 socket_.set_option(net::ip::tcp::no_delay{true});
124 impl::set_socket_parent(&socket_, ss.str().c_str());
125 cno_init(&cno_, kind);
126 cno_.disallow_h2_prior_knowledge = 1;
127 cno::callback_init(&cno_, this);
128 output_buffers_.emplace_back(4096);
129 cno_begin(&cno_, version);
130 }
131
132 ~Connection() override {
133 cno_fini(&cno_);
134 socket_.close();
135 }
136
137 public: // ConnectionInterface implementation
138 void shutdown(bool) override {
140 }
141
142 bool send(const uint32_t *stream_id_ptr, const int status_code,
143 const std::string &method, const std::string &path,
144 const Headers &headers, const IOBuffer &data) override {
145 cno_message_t message;
146 std::vector<cno_header_t> cno_header(headers.size(), cno_header_t());
147 std::vector<std::string> http2_headers_names;
148 const bool only_header = 0 == data.length();
149
150 if (CNO_HTTP2 == cno_.mode) {
151 http2_headers_names.reserve(headers.size());
152 }
153
154 auto output = cno_header.data();
155
156 for (const auto &entry : headers) {
157 if (CNO_HTTP2 == cno_.mode) {
158 auto &header_name = http2_headers_names.emplace_back(
160 output->name.size = header_name.length();
161 output->name.data = header_name.c_str();
162 } else {
163 output->name.size = entry.first.length();
164 output->name.data = entry.first.c_str();
165 }
166
167 output->value.size = entry.second.length();
168 output->value.data = entry.second.c_str();
169 ++output;
170 }
171
172 message.code = status_code;
173 message.headers = cno_header.data();
174 message.headers_len = cno_header.size();
175 message.path.data = path.c_str();
176 message.path.size = path.length();
177 message.method.data = method.c_str();
178 message.method.size = method.length();
179
180 uint32_t stream_id =
181 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&cno_);
182 if (CNO_OK != cno_write_head(&cno_, stream_id, &message, only_header)) {
183 return false;
184 }
185
186 if (!only_header) {
187 return CNO_OK == cno_write_data(&cno_, stream_id, data.get().c_str(),
188 data.length(), true);
189 }
190
191 return true;
192 }
193
194 std::string get_peer_address() const override {
195 auto *s = impl::get_socket(&socket_);
196 if (auto e = s->remote_endpoint()) {
197 return e->address().to_string();
198 }
199
200 return {};
201 }
202
203 uint16_t get_peer_port() const override {
204 auto *s = impl::get_socket(&socket_);
205 if (auto e = s->remote_endpoint()) {
206 return e->port();
207 }
208
209 return 0;
210 }
211
212 IOLayer &get_socket() { return socket_; }
213
214 void start() override { do_net_recv(); }
215
216 protected:
217 void do_net_send() {
219 [this](std::error_code error, auto size) {
220 switch (on_net_send(error, size)) {
221 case k_pending_none:
222 break;
223
225 break;
226
228 do_net_send();
229 break;
230
234 break;
235 }
236 });
237 }
238
239 void do_net_recv() {
240 reading_pending_ = true;
241 socket_.async_receive(
242 input_mutable_buffer_, [this](std::error_code error, auto size) {
243 switch (on_net_receive(error, size)) {
245 do_net_recv();
246 break;
247
249 case k_pending_none:
250 reading_pending_ = false;
251 break;
252
254 reading_pending_ = false;
257 break;
258 }
259 });
260 }
261
262 Pending on_net_receive(const std::error_code &ec,
263 std::size_t bytes_transferred) {
264 if (!running_) {
266 }
267
268 if (ec) {
271 }
272
273 const int result = cno_consume(
274 &cno_, reinterpret_cast<char *>(input_buffer_), bytes_transferred);
275
276 if (result < 0) {
277 auto ec = make_error_code(cno_error());
280 }
281
282 if (!keep_alive_) {
284 }
285
286 if (!running_) {
288 }
289
290 if (suspend_) return k_pending_none;
291
292 return k_pending_reading;
293 }
294
295 Pending on_net_send(const std::error_code &ec, size_t size) {
296 bool has_more = true;
297 bool should_close = false;
298 {
299 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
300
301 if (!ec) {
302 while (size) {
303 auto &page = output_buffers_.front();
304 auto size_on_page = std::min(page.size(), size);
305 page += size_on_page;
306 size -= size_on_page;
307
308 if (page.empty()) {
309 if (1 == output_buffers_.size()) {
310 page.reset();
311 } else {
312 output_buffers_.pop_front();
313 }
314 }
315 }
316 }
317
318 if (0 == output_buffers_.front().size()) {
319 has_more = false;
320 output_pending_ = false;
321
322 if (!running_) {
323 should_close = true;
324 }
325 }
326 }
327
328 if (ec) {
329 stop_running();
330 output_pending_ = false;
331
334 }
335 if (has_more) return k_pending_writing;
336
338
339 if (should_close) {
341 }
342 if (suspend_) return k_pending_none;
343
344 return k_pending_reading;
345 }
346
347 void resume() { suspend_ = false; }
348 void suspend() { suspend_ = true; }
349
350 /**
351 * Mark the connection that it should stop running
352 *
353 * @returns information if the object may be delete
354 * @retval 'false' Connection object can be removed immediately
355 * @retval 'true' Connection object must wait until IO is finished.
356 */
358 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
359 running_ = false;
360
361 return output_pending_;
362 }
363
364 protected:
365 virtual void on_output_buffer_empty() {}
366
367 protected: // CnoInterface implementation
368 int on_cno_writev(const cno_buffer_t *buffer, size_t count) override {
369 bool was_first = false;
370 {
371 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
373
374 bool expected = false;
375 if (impl::get_socket(&socket_)->is_open())
376 was_first = output_pending_.compare_exchange_weak(expected, true);
377 auto source_it = buffers.begin();
378
379 while (source_it != buffers.end()) {
380 // The constructor fills the output buffer with single page
381 // and all algorithms that clear not used pages, leave at
382 // last one page.
383 // thus we do not need to check if there are no pages.
384 auto &obuffer = output_buffers_.back();
385
386 if (0 == source_it->size()) {
387 ++source_it;
388 continue;
389 }
390
391 if (0 == obuffer.space_left()) {
392 output_buffers_.emplace_back(4096);
393 continue;
394 }
395
396 (*source_it) += obuffer.write(
397 static_cast<const uint8_t *>(source_it->data()), source_it->size());
398 }
399 }
400
401 if (was_first) {
402 do_net_send();
403 }
404
405 return 0;
406 }
407
408 int on_cno_message_tail([[maybe_unused]] const uint32_t session_id,
409 [[maybe_unused]] const cno_tail_t *tail) override {
410 // processed_request_ = true;
411 return 0;
412 }
413
414 int on_cno_stream_start([[maybe_unused]] const uint32_t id) override {
415 return 0;
416 }
417
418 int on_cno_close() override {
419 keep_alive_ = false;
420 return 0;
421 }
422
423 protected:
424 bool keep_alive_{true};
425 IOLayer socket_;
427 cno_connection_t cno_;
428
429 uint8_t input_buffer_[512];
431
433 std::list<owned_buffer> output_buffers_;
434
435 std::atomic<bool> output_pending_{false};
436 std::atomic<bool> reading_pending_{false};
437 std::atomic<bool> running_{true};
438 std::atomic<bool> suspend_{false};
439
441};
442
443} // namespace base
444} // namespace http
445
446#endif // ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:103
int on_cno_close() override
Definition: connection.h:418
void shutdown(bool) override
Definition: connection.h:138
void start() override
Definition: connection.h:214
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:408
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:357
base::method::Bitset Methods
Definition: connection.h:107
virtual void on_output_buffer_empty()
Definition: connection.h:365
std::atomic< bool > suspend_
Definition: connection.h:438
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:295
Methods * allowed_method_
Definition: connection.h:426
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:440
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:368
std::string get_peer_address() const override
Definition: connection.h:194
void do_net_recv()
Definition: connection.h:239
uint16_t get_peer_port() const override
Definition: connection.h:203
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:142
IOLayer IO
Definition: connection.h:111
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:110
void suspend()
Definition: connection.h:348
cno_connection_t cno_
Definition: connection.h:427
std::atomic< bool > reading_pending_
Definition: connection.h:436
bool keep_alive_
Definition: connection.h:424
std::list< owned_buffer > output_buffers_
Definition: connection.h:433
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:262
void resume()
Definition: connection.h:347
std::atomic< bool > output_pending_
Definition: connection.h:435
uint8_t input_buffer_[512]
Definition: connection.h:429
IOLayer & get_socket()
Definition: connection.h:212
std::mutex output_buffer_mutex_
Definition: connection.h:432
IOLayer socket_
Definition: connection.h:425
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:414
std::atomic< bool > running_
Definition: connection.h:437
~Connection() override
Definition: connection.h:132
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:430
void do_net_send()
Definition: connection.h:217
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:114
headers of a HTTP response/request.
Definition: headers.h:43
virtual uint32_t size() const
Definition: headers.cc:84
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
Definition: buffer.h:113
static constexpr shutdown_type shutdown_send
Definition: socket.h:185
int page
Definition: ctype-mb.cc:1226
#define T
Definition: jit_executor_value.cc:373
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:150
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:61
auto * get_socket1(T *s)
Definition: connection.h:77
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:66
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:95
@ k_pending_none
Definition: connection.h:96
@ k_pending_reading
Definition: connection.h:98
@ k_pending_closing
Definition: connection.h:97
@ k_pending_writing
Definition: connection.h:99
Request::Headers Headers
Definition: request.cc:34
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:90
Definition: connection.h:56
Definition: http_server_component.cc:34
HARNESS_EXPORT std::string make_lower(std::string s)
lower-case a string.
Definition: string_utils.cc:99
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35
Definition: result.h:30