26#ifndef ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
27#define ROUTER_SRC_HTTP_INCLUDE_HTTP_BASE_CONNECTION_H_
78 return &s->lower_layer();
88 s->set_parent(parent);
102template <
typename IOLayer>
116 CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION
version)
120 std::stringstream ss;
121 ss <<
"HTTP-" <<
this;
123 socket_.set_option(net::ip::tcp::no_delay{
true});
125 cno_init(&
cno_, kind);
137 bool send(
const uint32_t *stream_id_ptr,
const int status_code,
138 const std::string &method,
const std::string &
path,
140 cno_message_t message;
141 std::vector<cno_header_t> cno_header(headers.
size(), cno_header_t());
142 const bool only_header = 0 == data.
length();
144 auto output = cno_header.data();
146 for (
const auto &
entry : headers) {
147 output->name.size =
entry.first.length();
148 output->name.data =
entry.first.c_str();
150 output->value.size =
entry.second.length();
151 output->value.data =
entry.second.c_str();
155 message.code = status_code;
156 message.headers = cno_header.data();
157 message.headers_len = cno_header.size();
158 message.path.data =
path.c_str();
159 message.path.size =
path.length();
160 message.method.data = method.c_str();
161 message.method.size = method.length();
164 stream_id_ptr ? *stream_id_ptr : cno_next_stream(&
cno_);
165 if (CNO_OK != cno_write_head(&
cno_, stream_id, &message, only_header)) {
170 return CNO_OK == cno_write_data(&
cno_, stream_id, data.
get().c_str(),
179 if (
auto e = s->remote_endpoint()) {
180 return e->address().to_string();
188 if (
auto e = s->remote_endpoint()) {
202 [
this](std::error_code
error,
auto size) {
245 std::size_t bytes_transferred) {
259 const int result = cno_consume(
287 bool has_more =
true;
288 bool should_close =
false;
295 auto size_on_page = std::min(
page.size(),
size);
296 page += size_on_page;
297 size -= size_on_page;
360 bool was_first =
false;
365 bool expected =
false;
368 auto source_it = buffers.begin();
370 while (source_it != buffers.end()) {
377 if (0 == source_it->size()) {
382 if (0 == obuffer.space_left()) {
387 (*source_it) += obuffer.write(
388 static_cast<const uint8_t *
>(source_it->data()), source_it->size());
400 [[maybe_unused]]
const cno_tail_t *tail)
override {
Definition: connection_interface.h:41
Definition: connection_status_callbacks.h:35
virtual void on_connection_io_error(Connection *connection, const std::error_code &ec)=0
virtual void on_connection_close(Connection *connection)=0
Definition: connection.h:103
int on_cno_close() override
Definition: connection.h:409
void start() override
Definition: connection.h:197
int on_cno_message_tail(const uint32_t session_id, const cno_tail_t *tail) override
Definition: connection.h:399
bool stop_running()
Mark the connection that it should stop running.
Definition: connection.h:348
base::method::Bitset Methods
Definition: connection.h:107
virtual void on_output_buffer_empty()
Definition: connection.h:356
std::atomic< bool > suspend_
Definition: connection.h:429
Pending on_net_send(const std::error_code &ec, size_t size)
Definition: connection.h:286
Methods * allowed_method_
Definition: connection.h:417
ConnectionStatusCallbacks * connection_handler_
Definition: connection.h:431
int on_cno_writev(const cno_buffer_t *buffer, size_t count) override
Definition: connection.h:359
std::string get_peer_address() const override
Definition: connection.h:177
void do_net_recv()
Definition: connection.h:222
uint16_t get_peer_port() const override
Definition: connection.h:186
bool send(const uint32_t *stream_id_ptr, const int status_code, const std::string &method, const std::string &path, const Headers &headers, const IOBuffer &data) override
Definition: connection.h:137
IOLayer IO
Definition: connection.h:111
http::base::details::ref_buffers< std::list< owned_buffer > > ref_buffers
Definition: connection.h:110
void suspend()
Definition: connection.h:339
cno_connection_t cno_
Definition: connection.h:418
bool keep_alive_
Definition: connection.h:415
std::list< owned_buffer > output_buffers_
Definition: connection.h:424
Pending on_net_receive(const std::error_code &ec, std::size_t bytes_transferred)
Definition: connection.h:244
void resume()
Definition: connection.h:338
std::atomic< bool > output_pending_
Definition: connection.h:427
uint8_t input_buffer_[512]
Definition: connection.h:420
std::atomic< bool > processed_request_
Definition: connection.h:426
IOLayer & get_socket()
Definition: connection.h:195
std::mutex output_buffer_mutex_
Definition: connection.h:423
IOLayer socket_
Definition: connection.h:416
int on_cno_stream_start(const uint32_t id) override
Definition: connection.h:405
std::atomic< bool > running_
Definition: connection.h:428
~Connection() override
Definition: connection.h:131
net::mutable_buffer input_mutable_buffer_
Definition: connection.h:421
void do_net_send()
Definition: connection.h:200
Connection(IOLayer s, base::method::Bitset *allowed_method, ConnectionStatusCallbacks *connection_handler, CNO_CONNECTION_KIND kind, CNO_HTTP_VERSION version)
Definition: connection.h:114
Definition: io_buffer.h:41
virtual const std::string & get() const
Definition: io_buffer.h:82
virtual size_t length() const
Definition: io_buffer.h:52
Definition: owned_buffer.h:83
Definition: owned_buffer.h:68
Definition: buffer_sequence.h:53
Definition: cno_interface.h:36
Definition: socket.h:1090
int page
Definition: ctype-mb.cc:1224
static int count
Definition: myisam_ftdump.cc:45
void error(const char *format,...)
static char * path
Definition: mysqldump.cc:149
void set_socket_parent(net::ip::tcp::socket *, const char *)
Definition: connection.h:61
auto * get_socket1(T *s)
Definition: connection.h:77
net::ip::tcp::socket * get_socket(net::ip::tcp::socket *s)
Definition: connection.h:66
std::bitset< Pos::_LAST+1 > Bitset
Definition: method.h:57
Pending
Definition: connection.h:95
@ k_pending_none
Definition: connection.h:96
@ k_pending_reading
Definition: connection.h:98
@ k_pending_closing
Definition: connection.h:97
@ k_pending_writing
Definition: connection.h:99
Request::Headers Headers
Definition: request.cc:35
HTTP_COMMON_EXPORT void callback_init(cno_connection_t *cno, CnoInterface *)
Definition: callback_init.cc:91
Definition: connection.h:56
Definition: http_server_component.cc:34
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
mutable_buffer buffer(void *p, size_t n) noexcept
Definition: buffer.h:418
Definition: gcs_xcom_synode.h:64
static std::mutex lock
Definition: net_ns.cc:56
required uint64 version
Definition: replication_group_member_actions.proto:41
Definition: completion_hash.h:35