26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
42#include "http/base/details/owned_buffer.h"
77 return &s->lower_layer();
87 s->set_parent(parent);
101template <
typename IOLayer>
115 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION
version)
119 std::stringstream ss;
120 ss <<
"HTTP-" <<
this;
122 socket_.set_option(net::ip::tcp::no_delay{
true});
124 cno_init(&
cno_, kind);
140 bool send(
const uint32_t *stream_id_ptr,
const int status_code,
141 const std::string &method,
const std::string &
path,
143 cno_message_t message;
144 std::vector<cno_header_t> cno_header(headers.
size(), cno_header_t());
145 const bool only_header = 0 == data.
length();
147 auto output = cno_header.data();
149 for (
const auto &
entry : headers) {
150 output->name.size =
entry.first.length();
151 output->name.data =
entry.first.c_str();
153 output->value.size =
entry.second.length();
154 output->value.data =
entry.second.c_str();
158 message.code = status_code;
159 message.headers = cno_header.data();
160 message.headers_len = cno_header.size();
161 message.path.data =
path.c_str();
162 message.path.size =
path.length();
163 message.method.data = method.c_str();
164 message.method.size = method.length();
167 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&
cno_);
168 if (CNO_OK != cno_write_head(&
cno_, stream_id, &message, only_header)) {
173 return CNO_OK == cno_write_data(&
cno_, stream_id, data.
get().c_str(),
182 if (
auto e = s->remote_endpoint()) {
183 return e->address().to_string();
191 if (
auto e = s->remote_endpoint()) {
205 [
this](std::error_code
error,
auto size) {
249 std::size_t bytes_transferred) {
259 const int result = cno_consume(
282 bool has_more =
true;
283 bool should_close =
false;
290 auto size_on_page = std::min(
page.size(),
size);
291 page += size_on_page;
292 size -= size_on_page;
355 bool was_first =
false;
360 bool expected =
false;
363 auto source_it = buffers.begin();
365 while (source_it != buffers.end()) {
372 if (0 == source_it->size()) {
377 if (0 == obuffer.space_left()) {
382 (*source_it) += obuffer.write(
383 static_cast<const uint8_t *
>(source_it->data()), source_it->size());
395 [[maybe_unused]]
const cno_tail_t *tail)
override {
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:102
int on_cno_close() override
Definition: connection.h:404
void shutdown(bool) override
Definition: connection.h:136
void start() override
Definition: connection.h:200
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:394
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:343
base::method::Bitset Methods
Definition: connection.h:106
virtual void on_output_buffer_empty()
Definition: connection.h:351
std::atomic< bool > suspend_
Definition: connection.h:424
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:281
Methods * allowed_method_
Definition: connection.h:412
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:426
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:354
std::string get_peer_address() const override
Definition: connection.h:180
void do_net_recv()
Definition: connection.h:225
uint16_t get_peer_port() const override
Definition: connection.h:189
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:140
IOLayer IO
Definition: connection.h:110
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:109
void suspend()
Definition: connection.h:334
cno_connection_t cno_
Definition: connection.h:413
std::atomic< bool > reading_pending_
Definition: connection.h:422
bool keep_alive_
Definition: connection.h:410
std::list< owned_buffer > output_buffers_
Definition: connection.h:419
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:248
void resume()
Definition: connection.h:333
std::atomic< bool > output_pending_
Definition: connection.h:421
uint8_t input_buffer_[512]
Definition: connection.h:415
IOLayer & get_socket()
Definition: connection.h:198
std::mutex output_buffer_mutex_
Definition: connection.h:418
IOLayer socket_
Definition: connection.h:411
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:400
std::atomic< bool > running_
Definition: connection.h:423
~Connection() override
Definition: connection.h:130
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:416
void do_net_send()
Definition: connection.h:203
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:113
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
static constexpr shutdown_type shutdown_send
Definition: socket.h:185
int page
Definition: ctype-mb.cc:1226
#define T
Definition: jit_executor_value.cc:373
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:150
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:60
auto * get_socket1(T *s)
Definition: connection.h:76
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:65
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:94
@ k_pending_none
Definition: connection.h:95
@ k_pending_reading
Definition: connection.h:97
@ k_pending_closing
Definition: connection.h:96
@ k_pending_writing
Definition: connection.h:98
Request::Headers Headers
Definition: request.cc:34
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:90
Definition: connection.h:55
Definition: http_server_component.cc:34
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35