26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
42#include "http/base/details/owned_buffer.h"
78 return &s->lower_layer();
88 s->set_parent(parent);
102template <
typename IOLayer>
116 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION
version)
120 std::stringstream ss;
121 ss <<
"HTTP-" <<
this;
123 socket_.set_option(net::ip::tcp::no_delay{
true});
125 cno_init(&
cno_, kind);
126 cno_.disallow_h2_prior_knowledge = 1;
142 bool send(
const uint32_t *stream_id_ptr,
const int status_code,
143 const std::string &method,
const std::string &
path,
145 cno_message_t message;
146 std::vector<cno_header_t> cno_header(headers.
size(), cno_header_t());
147 std::vector<std::string> http2_headers_names;
148 const bool only_header = 0 == data.
length();
150 if (CNO_HTTP2 ==
cno_.mode) {
151 http2_headers_names.reserve(headers.
size());
154 auto output = cno_header.data();
156 for (
const auto &
entry : headers) {
157 if (CNO_HTTP2 ==
cno_.mode) {
158 auto &header_name = http2_headers_names.emplace_back(
160 output->name.size = header_name.length();
161 output->name.data = header_name.c_str();
163 output->name.size =
entry.first.length();
164 output->name.data =
entry.first.c_str();
167 output->value.size =
entry.second.length();
168 output->value.data =
entry.second.c_str();
172 message.code = status_code;
173 message.headers = cno_header.data();
174 message.headers_len = cno_header.size();
175 message.path.data =
path.c_str();
176 message.path.size =
path.length();
177 message.method.data = method.c_str();
178 message.method.size = method.length();
181 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&
cno_);
182 if (CNO_OK != cno_write_head(&
cno_, stream_id, &message, only_header)) {
187 return CNO_OK == cno_write_data(&
cno_, stream_id, data.
get().c_str(),
196 if (
auto e = s->remote_endpoint()) {
197 return e->address().to_string();
205 if (
auto e = s->remote_endpoint()) {
219 [
this](std::error_code
error,
auto size) {
263 std::size_t bytes_transferred) {
273 const int result = cno_consume(
296 bool has_more =
true;
297 bool should_close =
false;
304 auto size_on_page = std::min(
page.size(),
size);
305 page += size_on_page;
306 size -= size_on_page;
369 bool was_first =
false;
374 bool expected =
false;
377 auto source_it = buffers.begin();
379 while (source_it != buffers.end()) {
386 if (0 == source_it->size()) {
391 if (0 == obuffer.space_left()) {
396 (*source_it) += obuffer.write(
397 static_cast<const uint8_t *
>(source_it->data()), source_it->size());
409 [[maybe_unused]]
const cno_tail_t *tail)
override {
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:103
int on_cno_close() override
Definition: connection.h:418
void shutdown(bool) override
Definition: connection.h:138
void start() override
Definition: connection.h:214
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:408
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:357
base::method::Bitset Methods
Definition: connection.h:107
virtual void on_output_buffer_empty()
Definition: connection.h:365
std::atomic< bool > suspend_
Definition: connection.h:438
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:295
Methods * allowed_method_
Definition: connection.h:426
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:440
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:368
std::string get_peer_address() const override
Definition: connection.h:194
void do_net_recv()
Definition: connection.h:239
uint16_t get_peer_port() const override
Definition: connection.h:203
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:142
IOLayer IO
Definition: connection.h:111
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:110
void suspend()
Definition: connection.h:348
cno_connection_t cno_
Definition: connection.h:427
std::atomic< bool > reading_pending_
Definition: connection.h:436
bool keep_alive_
Definition: connection.h:424
std::list< owned_buffer > output_buffers_
Definition: connection.h:433
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:262
void resume()
Definition: connection.h:347
std::atomic< bool > output_pending_
Definition: connection.h:435
uint8_t input_buffer_[512]
Definition: connection.h:429
IOLayer & get_socket()
Definition: connection.h:212
std::mutex output_buffer_mutex_
Definition: connection.h:432
IOLayer socket_
Definition: connection.h:425
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:414
std::atomic< bool > running_
Definition: connection.h:437
~Connection() override
Definition: connection.h:132
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:430
void do_net_send()
Definition: connection.h:217
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:114
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
static constexpr shutdown_type shutdown_send
Definition: socket.h:185
int page
Definition: ctype-mb.cc:1226
#define T
Definition: jit_executor_value.cc:373
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:150
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:61
auto * get_socket1(T *s)
Definition: connection.h:77
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:66
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:95
@ k_pending_none
Definition: connection.h:96
@ k_pending_reading
Definition: connection.h:98
@ k_pending_closing
Definition: connection.h:97
@ k_pending_writing
Definition: connection.h:99
Request::Headers Headers
Definition: request.cc:34
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:90
Definition: connection.h:56
Definition: http_server_component.cc:34
HARNESS_EXPORT std::string make_lower(std::string s)
lower-case a string.
Definition: string_utils.cc:99
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35