MySQL 9.1.0
Source Code Documentation
connection.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
28
29#include <algorithm>
30#include <atomic>
31#include <bitset>
32#include <list>
33#include <map>
34#include <sstream>
35#include <string>
36#include <utility>
37#include <vector>
38
39#include "cno/core.h"
43#include "http/base/http_time.h"
44#include "http/base/method.h"
49#include "http/cno/error_code.h"
50#include "http/cno/string.h"
51
53
55
56namespace http {
57namespace base {
58
59namespace impl {
60
61inline void set_socket_parent(net::ip::tcp::socket *, const char *) {
62 // Do nothing, net::ip::tcp::socket, is missing the custom
63 // `set_parent` method.
64}
65
68 return s;
69}
70
71template <typename T>
73 return get_socket(&s->lower_layer());
74}
75
76template <typename T>
77auto *get_socket1(T *s) {
78 return &s->lower_layer();
79}
80
81template <typename T>
82const net::ip::tcp::socket *get_socket(const T *s) {
83 return get_socket(&s->lower_layer());
84}
85
86template <typename T>
87void set_socket_parent(T *s, const char *parent) {
88 s->set_parent(parent);
89
91}
92
93} // namespace impl
94
95enum Pending {
99 k_pending_writing = 1 << 3
101
102template <typename IOLayer>
104 public:
111 using IO = IOLayer;
112
113 public:
114 Connection(IOLayer s, base::method::Bitset *allowed_method,
115 ConnectionStatusCallbacks *connection_handler,
116 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
117 : socket_(std::move(s)),
118 allowed_method_(allowed_method),
119 connection_handler_{connection_handler} {
120 std::stringstream ss;
121 ss << "HTTP-" << this;
122
123 socket_.set_option(net::ip::tcp::no_delay{true});
124 impl::set_socket_parent(&socket_, ss.str().c_str());
125 cno_init(&cno_, kind);
126 cno::callback_init(&cno_, this);
127 output_buffers_.emplace_back(4096);
128 cno_begin(&cno_, version);
129 }
130
131 ~Connection() override {
132 cno_fini(&cno_);
133 socket_.close();
134 }
135
136 public: // ConnectionInterface implementation
137 bool send(const uint32_t *stream_id_ptr, const int status_code,
138 const std::string &method, const std::string &path,
139 const Headers &headers, const IOBuffer &data) override {
140 cno_message_t message;
141 std::vector<cno_header_t> cno_header(headers.size(), cno_header_t());
142 const bool only_header = 0 == data.length();
143
144 auto output = cno_header.data();
145
146 for (const auto &entry : headers) {
147 output->name.size = entry.first.length();
148 output->name.data = entry.first.c_str();
149
150 output->value.size = entry.second.length();
151 output->value.data = entry.second.c_str();
152 ++output;
153 }
154
155 message.code = status_code;
156 message.headers = cno_header.data();
157 message.headers_len = cno_header.size();
158 message.path.data = path.c_str();
159 message.path.size = path.length();
160 message.method.data = method.c_str();
161 message.method.size = method.length();
162
163 uint32_t stream_id =
164 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&cno_);
165 if (CNO_OK != cno_write_head(&cno_, stream_id, &message, only_header)) {
166 return false;
167 }
168
169 if (!only_header) {
170 return CNO_OK == cno_write_data(&cno_, stream_id, data.get().c_str(),
171 data.length(), true);
172 }
173
174 return true;
175 }
176
177 std::string get_peer_address() const override {
178 auto *s = impl::get_socket(&socket_);
179 if (auto e = s->remote_endpoint()) {
180 return e->address().to_string();
181 }
182
183 return {};
184 }
185
186 uint16_t get_peer_port() const override {
187 auto *s = impl::get_socket(&socket_);
188 if (auto e = s->remote_endpoint()) {
189 return e->port();
190 }
191
192 return 0;
193 }
194
195 IOLayer &get_socket() { return socket_; }
196
197 void start() override { do_net_recv(); }
198
199 protected:
200 void do_net_send() {
202 [this](std::error_code error, auto size) {
203 switch (on_net_send(error, size)) {
204 case k_pending_none:
205 break;
206
208 break;
209
211 do_net_send();
212 break;
213
217 break;
218 }
219 });
220 }
221
222 void do_net_recv() {
223 socket_.async_receive(
224 input_mutable_buffer_, [this](std::error_code error, auto size) {
225 switch (on_net_receive(error, size)) {
227 do_net_recv();
228 break;
229
231 break;
232
233 case k_pending_none:
234 break;
235
239 break;
240 }
241 });
242 }
243
244 Pending on_net_receive(const std::error_code &ec,
245 std::size_t bytes_transferred) {
246 if (!running_) {
248 }
249
250 if (ec) {
251 stop_running();
252 processed_request_ = false;
253 output_pending_ = false;
254
256 return k_pending_closing;
257 }
258
259 const int result = cno_consume(
260 &cno_, reinterpret_cast<char *>(input_buffer_), bytes_transferred);
261
262 if (result < 0) {
263 processed_request_ = false;
264 output_pending_ = false;
265 stop_running();
266 auto ec = make_error_code(cno_error());
268 return k_pending_closing;
269 }
270
271 if (!keep_alive_) {
273 }
274
275 if (!running_) return k_pending_closing;
276 if (suspend_) return k_pending_none;
277
278 if (processed_request_) {
280 return k_pending_none;
281 }
282
283 return k_pending_reading;
284 }
285
286 Pending on_net_send(const std::error_code &ec, size_t size) {
287 bool has_more = true;
288 bool should_close = false;
289 {
290 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
291
292 if (!ec) {
293 while (size) {
294 auto &page = output_buffers_.front();
295 auto size_on_page = std::min(page.size(), size);
296 page += size_on_page;
297 size -= size_on_page;
298
299 if (page.empty()) {
300 if (1 == output_buffers_.size()) {
301 page.reset();
302 } else {
303 output_buffers_.pop_front();
304 }
305 }
306 }
307 }
308
309 if (0 == output_buffers_.front().size()) {
310 has_more = false;
311 processed_request_ = false;
312 output_pending_ = false;
313
314 if (!running_) {
315 should_close = true;
316 }
317 }
318 }
319
320 if (ec) {
321 stop_running();
322 processed_request_ = false;
323 output_pending_ = false;
324
326 return k_pending_closing;
327 }
328 if (has_more) return k_pending_writing;
329
331
332 if (should_close) return k_pending_closing;
333 if (suspend_) return k_pending_none;
334
335 return k_pending_reading;
336 }
337
338 void resume() { suspend_ = false; }
339 void suspend() { suspend_ = true; }
340
341 /**
342 * Mark the connection that it should stop running
343 *
344 * @returns information if the object may be delete
345 * @retval 'false' Connection object can be removed immediately
346 * @retval 'true' Connection object must wait until IO is finished.
347 */
349 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
350 running_ = false;
351
352 return output_pending_;
353 }
354
355 protected:
356 virtual void on_output_buffer_empty() {}
357
358 protected: // CnoInterface implementation
359 int on_cno_writev(const cno_buffer_t *buffer, size_t count) override {
360 bool was_first = false;
361 {
362 std::unique_lock<std::mutex> lock(output_buffer_mutex_);
364
365 bool expected = false;
366 if (impl::get_socket(&socket_)->is_open())
367 was_first = output_pending_.compare_exchange_weak(expected, true);
368 auto source_it = buffers.begin();
369
370 while (source_it != buffers.end()) {
371 // The constructor fills the output buffer with single page
372 // and all algorithms that clear not used pages, leave at
373 // last one page.
374 // thus we do not need to check if there are no pages.
375 auto &obuffer = output_buffers_.back();
376
377 if (0 == source_it->size()) {
378 ++source_it;
379 continue;
380 }
381
382 if (0 == obuffer.space_left()) {
383 output_buffers_.emplace_back(4096);
384 continue;
385 }
386
387 (*source_it) += obuffer.write(
388 static_cast<const uint8_t *>(source_it->data()), source_it->size());
389 }
390 }
391
392 if (was_first) {
393 do_net_send();
394 }
395
396 return 0;
397 }
398
399 int on_cno_message_tail([[maybe_unused]] const uint32_t session_id,
400 [[maybe_unused]] const cno_tail_t *tail) override {
401 processed_request_ = true;
402 return 0;
403 }
404
405 int on_cno_stream_start([[maybe_unused]] const uint32_t id) override {
406 return 0;
407 }
408
409 int on_cno_close() override {
410 keep_alive_ = false;
411 return 0;
412 }
413
414 protected:
415 bool keep_alive_{true};
416 IOLayer socket_;
418 cno_connection_t cno_;
419
420 uint8_t input_buffer_[512];
422
424 std::list<owned_buffer> output_buffers_;
425
426 std::atomic<bool> processed_request_{false};
427 std::atomic<bool> output_pending_{false};
428 std::atomic<bool> running_{true};
429 std::atomic<bool> suspend_{false};
430
432};
433
434} // namespace base
435} // namespace http
436
437#endif // ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:103
int on_cno_close() override
Definition: connection.h:409
void start() override
Definition: connection.h:197
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:399
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:348
base::method::Bitset Methods
Definition: connection.h:107
virtual void on_output_buffer_empty()
Definition: connection.h:356
std::atomic< bool > suspend_
Definition: connection.h:429
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:286
Methods * allowed_method_
Definition: connection.h:417
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:431
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:359
std::string get_peer_address() const override
Definition: connection.h:177
void do_net_recv()
Definition: connection.h:222
uint16_t get_peer_port() const override
Definition: connection.h:186
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:137
IOLayer IO
Definition: connection.h:111
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:110
void suspend()
Definition: connection.h:339
cno_connection_t cno_
Definition: connection.h:418
bool keep_alive_
Definition: connection.h:415
std::list< owned_buffer > output_buffers_
Definition: connection.h:424
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:244
void resume()
Definition: connection.h:338
std::atomic< bool > output_pending_
Definition: connection.h:427
uint8_t input_buffer_[512]
Definition: connection.h:420
std::atomic< bool > processed_request_
Definition: connection.h:426
IOLayer & get_socket()
Definition: connection.h:195
std::mutex output_buffer_mutex_
Definition: connection.h:423
IOLayer socket_
Definition: connection.h:416
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:405
std::atomic< bool > running_
Definition: connection.h:428
~Connection() override
Definition: connection.h:131
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:421
void do_net_send()
Definition: connection.h:200
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:114
headers of a HTTP response/request.
Definition: headers.h:43
virtual uint32_t size() const
Definition: headers.cc:85
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
Definition: buffer.h:113
int page
Definition: ctype-mb.cc:1224
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:149
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:61
auto * get_socket1(T *s)
Definition: connection.h:77
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:66
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:95
@ k_pending_none
Definition: connection.h:96
@ k_pending_reading
Definition: connection.h:98
@ k_pending_closing
Definition: connection.h:97
@ k_pending_writing
Definition: connection.h:99
Request::Headers Headers
Definition: request.cc:35
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:91
Definition: connection.h:56
Definition: http_server_component.cc:34
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35
Definition: result.h:30