25#ifndef MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
26#define MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
30#include <system_error>
60 return {
static_cast<int>(GetLastError()), std::system_category()};
77class basic_named_pipe_impl_base {
82 basic_named_pipe_impl_base(
net::io_context &io_ctx) : io_ctx_{&io_ctx} {}
84 basic_named_pipe_impl_base(
const basic_named_pipe_impl_base &) =
delete;
85 basic_named_pipe_impl_base &operator=(
const basic_named_pipe_impl_base &) =
87 basic_named_pipe_impl_base(basic_named_pipe_impl_base &&rhs) noexcept
88 : native_handle_{std::exchange(rhs.native_handle_,
90 native_non_blocking_{std::exchange(rhs.native_non_blocking_, 0)},
91 io_ctx_{std::move(rhs.io_ctx_)} {}
93 basic_named_pipe_impl_base &operator=(
94 basic_named_pipe_impl_base &&rhs)
noexcept {
97 native_non_blocking_ = std::exchange(rhs.native_non_blocking_, 0);
98 io_ctx_ = rhs.io_ctx_;
103 ~basic_named_pipe_impl_base() =
default;
106 return native_handle_;
111 bool is_open() const noexcept {
115 executor_type get_executor() noexcept {
return io_ctx_->get_executor(); }
134 int native_non_blocking_{0};
148template <
class Protocol>
149class basic_named_pipe_impl :
public basic_named_pipe_impl_base {
151 using __base = basic_named_pipe_impl_base;
155 using endpoint_type =
typename protocol_type::endpoint;
157 constexpr explicit basic_named_pipe_impl(
net::io_context &ctx) noexcept
160 basic_named_pipe_impl(
const basic_named_pipe_impl &) =
delete;
161 basic_named_pipe_impl &operator=(
const basic_named_pipe_impl &) =
delete;
162 basic_named_pipe_impl(basic_named_pipe_impl &&rhs) =
default;
164 basic_named_pipe_impl &operator=(basic_named_pipe_impl &&rhs)
noexcept {
165 if (
this == std::addressof(rhs)) {
168 __base::operator=(std::move(rhs));
173 ~basic_named_pipe_impl() {}
175 executor_type get_executor() noexcept {
return __base::get_executor(); }
179 constexpr bool is_open() const noexcept {
return __base::is_open(); }
186 protocol_ = protocol;
193 DWORD wait_mode = v ? PIPE_NOWAIT : PIPE_WAIT;
196 SetNamedPipeHandleState(native_handle_, &wait_mode,
nullptr,
nullptr);
210 using clock_type = std::chrono::steady_clock;
211 using namespace std::chrono_literals;
212 const auto retry_step = 10ms;
213 const auto end_time = clock_type::now() + 1s;
216 CreateFile(
TEXT(ep.path().c_str()), GENERIC_READ | GENERIC_WRITE, 0,
217 nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL,
nullptr);
223 const std::error_code ec_pipe_busy{ERROR_PIPE_BUSY,
224 std::system_category()};
226 if ((error_code == ec_pipe_busy) && clock_type::now() < end_time) {
227 std::this_thread::sleep_for(retry_step);
240 protocol_type protocol_{endpoint_type{}.protocol()};
251template <
class Protocol>
252class basic_named_pipe :
private basic_named_pipe_impl<Protocol> {
254 using __base = basic_named_pipe_impl<Protocol>;
258 using executor_type =
typename __base::executor_type;
260 using endpoint_type =
typename protocol_type::endpoint;
264 executor_type get_executor() noexcept {
return __base::get_executor(); }
273 constexpr bool is_open() const noexcept {
return __base::is_open(); }
288 template <
class MutableBufferSequence>
290 const MutableBufferSequence &buffers) {
291 size_t transferred{};
296 if (!ReadFile(
native_handle(), bufcur->data(), bufcur->size(), &numRead,
301 transferred += numRead;
311 template <
class ConstBufferSequence>
313 const ConstBufferSequence &buffers) {
314 size_t transferred{};
321 if (!WriteFile(
native_handle(), bufcur->data(), bufcur->size(), &written,
326 transferred += written;
339 : __base{ctx, proto} {}
347 basic_named_pipe(
const basic_named_pipe &) =
delete;
348 basic_named_pipe &operator=(
const basic_named_pipe &) =
delete;
349 basic_named_pipe(basic_named_pipe &&) =
default;
350 basic_named_pipe &operator=(basic_named_pipe &&) =
default;
352 ~basic_named_pipe() =
default;
360template <
class Protocol>
361class basic_named_pipe_socket :
public basic_named_pipe<Protocol> {
363 using __base = basic_named_pipe<Protocol>;
368 using endpoint_type =
typename protocol_type::endpoint;
370 explicit basic_named_pipe_socket(
net::io_context &ctx) : __base(ctx) {}
371 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto)
372 : __base(ctx, proto) {}
374 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto,
378 basic_named_pipe_socket(
const basic_named_pipe_socket &) =
delete;
379 basic_named_pipe_socket &operator=(
const basic_named_pipe_socket &) =
delete;
382 basic_named_pipe_socket(basic_named_pipe_socket &&other) =
default;
384 basic_named_pipe_socket &operator=(basic_named_pipe_socket &&) =
default;
386 ~basic_named_pipe_socket() {
387 if (is_open())
close();
390 constexpr bool is_open() const noexcept {
return __base::is_open(); }
411 template <
class ConstBufferSequence>
413 const ConstBufferSequence &buffers) {
414 return __base::write_some(buffers);
421 native_non_blocking_ =
static_cast<int>(v);
429 if (!connect_res)
return connect_res;
431 if (native_non_blocking_ == 1) {
439 int native_non_blocking_{-1};
447template <
class Protocol>
448class basic_named_pipe_acceptor :
public basic_named_pipe_impl<Protocol> {
450 using __base = basic_named_pipe_impl<Protocol>;
454 using socket_type = basic_named_pipe_socket<protocol_type>;
455 using endpoint_type =
typename protocol_type::endpoint;
456 using executor_type =
typename __base::executor_type;
459 explicit basic_named_pipe_acceptor(
net::io_context &ctx) : __base(ctx) {}
460 basic_named_pipe_acceptor(
net::io_context &ctx,
const protocol_type &proto)
461 : __base(ctx, proto) {}
463 basic_named_pipe_acceptor(
const basic_named_pipe_acceptor &) =
delete;
464 basic_named_pipe_acceptor &operator=(
const basic_named_pipe_acceptor &) =
468 basic_named_pipe_acceptor(basic_named_pipe_acceptor &&other) =
default;
470 basic_named_pipe_acceptor &operator=(basic_named_pipe_acceptor &&) =
default;
472 ~basic_named_pipe_acceptor() {
473 if (is_open())
close();
476 executor_type get_executor() noexcept {
return __base::get_executor(); }
478 bool is_open()
const {
return __base::is_open(); }
498 return __base::release();
505 native_non_blocking_ =
static_cast<int>(v);
524 if (ep.path().empty()) {
529 if (!ep_.path().empty()) {
535 const auto protocol = protocol_type();
537 if (native_non_blocking_ == 1)
flags |= PIPE_NOWAIT;
540 auto handle = CreateNamedPipe(
541 TEXT(ep_.path().c_str()), PIPE_ACCESS_DUPLEX,
542 protocol.type() | protocol.protocol() | PIPE_REJECT_REMOTE_CLIENTS |
544 back_log_, 1024 * 16, 1024 * 16, NMPWAIT_USE_DEFAULT_WAIT,
nullptr);
561 int back_log = PIPE_UNLIMITED_INSTANCES) {
565 if (
back_log > PIPE_UNLIMITED_INSTANCES) {
587 if (ep_.path().empty()) {
591 const auto protocol = protocol_type();
597 const std::error_code ec_pipe_connected{ERROR_PIPE_CONNECTED,
598 std::system_category()};
599 const std::error_code ec_no_data{ERROR_NO_DATA,
600 std::system_category()};
605 if (last_ec == ec_pipe_connected || last_ec == ec_no_data) {
606 return ret_type{std::in_place, get_executor().context(), protocol,
613 return ret_type{std::in_place, get_executor().context(), protocol,
623 int back_log_{PIPE_UNLIMITED_INSTANCES};
624 int native_non_blocking_{-1};
630template <
typename Protocol>
631class basic_named_pipe_endpoint {
635 basic_named_pipe_endpoint() =
default;
636 basic_named_pipe_endpoint(std::string
path)
639 std::string
path()
const {
return path_; }
641 constexpr protocol_type protocol() const noexcept {
return protocol_type(); }
643 size_t size()
const {
return path().size(); }
645 size_t capacity()
const {
return max_path_len(); }
647 void resize(
size_t size) { path_.resize(
size); }
650 constexpr size_t max_path_len()
const {
return 256; }
658class message_protocol {
660 using endpoint = basic_named_pipe_endpoint<message_protocol>;
661 using socket = basic_named_pipe_socket<message_protocol>;
662 using acceptor = basic_named_pipe_acceptor<message_protocol>;
664 int type()
const {
return PIPE_TYPE_MESSAGE; }
665 int protocol()
const {
return PIPE_READMODE_MESSAGE; }
673 using endpoint = basic_named_pipe_endpoint<byte_protocol>;
674 using socket = basic_named_pipe_socket<byte_protocol>;
675 using acceptor = basic_named_pipe_acceptor<byte_protocol>;
677 int type()
const {
return PIPE_TYPE_BYTE; }
678 int protocol()
const {
return PIPE_READMODE_BYTE; }
Definition: protocol.h:33
Definition: io_context.h:991
Definition: io_context.h:61
Definition: expected.h:284
static int flags[50]
Definition: hp_test1.cc:40
static bool connected
Definition: mysql.cc:169
ulong back_log
Definition: mysqld.cc:1340
static char * path
Definition: mysqldump.cc:149
Definition: http_server_component.cc:34
@ success
A grow operation succeeded.
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
size_t size(const char *const c)
Definition: base64.h:46
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
std::error_code last_error_code()
Definition: file.h:57
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
stdx::expected< void, error_type > listen(native_handle_type native_handle, int backlog)
Definition: socket.h:149
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
stdx::expected< void, error_type > bind(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap bind() in a portable way.
Definition: socket.h:339
stdx::expected< native_handle_type, error_type > accept(native_handle_type native_handle, struct sockaddr *addr, socklen_t *addr_len)
wrap accept() in a portable way.
Definition: socket.h:367
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
stdx::expected< void, error_type > connect(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap connect() in a portable way.
Definition: socket.h:353
int native_handle_type
Definition: socket_constants.h:51
const const_buffer * buffer_sequence_end(const const_buffer &b) noexcept
Definition: buffer.h:185
const const_buffer * buffer_sequence_begin(const const_buffer &b) noexcept
Definition: buffer.h:180
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
native_handle_type native_handle()
Definition: process.h:56
unexpected(E) -> unexpected< E >
required string type
Definition: replication_group_member_actions.proto:34
#define HANDLE
Definition: violite.h:159