25#ifndef MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
26#define MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
30#include <system_error>
60 return {
static_cast<int>(GetLastError()), std::system_category()};
77class basic_named_pipe_impl_base {
82 basic_named_pipe_impl_base(
net::io_context &io_ctx) : io_ctx_{&io_ctx} {}
84 basic_named_pipe_impl_base(
const basic_named_pipe_impl_base &) =
delete;
85 basic_named_pipe_impl_base &operator=(
const basic_named_pipe_impl_base &) =
87 basic_named_pipe_impl_base(basic_named_pipe_impl_base &&rhs) noexcept
88 : native_handle_{std::exchange(rhs.native_handle_,
90 native_non_blocking_{std::exchange(rhs.native_non_blocking_, 0)},
91 io_ctx_{std::move(rhs.io_ctx_)} {}
93 basic_named_pipe_impl_base &operator=(
94 basic_named_pipe_impl_base &&rhs)
noexcept {
97 native_non_blocking_ = std::exchange(rhs.native_non_blocking_, 0);
98 io_ctx_ = rhs.io_ctx_;
103 ~basic_named_pipe_impl_base() =
default;
106 return native_handle_;
111 bool is_open() const noexcept {
115 executor_type get_executor() noexcept {
return io_ctx_->get_executor(); }
135 int native_non_blocking_{0};
149template <
class Protocol>
150class basic_named_pipe_impl :
public basic_named_pipe_impl_base {
152 using __base = basic_named_pipe_impl_base;
156 using endpoint_type =
typename protocol_type::endpoint;
158 constexpr explicit basic_named_pipe_impl(
net::io_context &ctx) noexcept
161 basic_named_pipe_impl(
const basic_named_pipe_impl &) =
delete;
162 basic_named_pipe_impl &operator=(
const basic_named_pipe_impl &) =
delete;
163 basic_named_pipe_impl(basic_named_pipe_impl &&rhs) =
default;
165 basic_named_pipe_impl &operator=(basic_named_pipe_impl &&rhs)
noexcept {
166 if (
this == std::addressof(rhs)) {
169 __base::operator=(std::move(rhs));
174 ~basic_named_pipe_impl() {}
176 executor_type get_executor() noexcept {
return __base::get_executor(); }
180 constexpr bool is_open() const noexcept {
return __base::is_open(); }
188 protocol_ = protocol;
195 DWORD wait_mode = v ? PIPE_NOWAIT : PIPE_WAIT;
198 SetNamedPipeHandleState(native_handle_, &wait_mode,
nullptr,
nullptr);
213 using clock_type = std::chrono::steady_clock;
214 using namespace std::chrono_literals;
215 const auto retry_step = 10ms;
216 const auto end_time = clock_type::now() + 1s;
219 CreateFile(
TEXT(ep.path().c_str()), GENERIC_READ | GENERIC_WRITE, 0,
220 NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL,
NULL);
226 const std::error_code ec_pipe_busy{ERROR_PIPE_BUSY,
227 std::system_category()};
229 if ((error_code == ec_pipe_busy) && clock_type::now() < end_time) {
230 std::this_thread::sleep_for(retry_step);
243 protocol_type protocol_{endpoint_type{}.protocol()};
254template <
class Protocol>
255class basic_named_pipe :
private basic_named_pipe_impl<Protocol> {
257 using __base = basic_named_pipe_impl<Protocol>;
261 using executor_type =
typename __base::executor_type;
263 using endpoint_type =
typename protocol_type::endpoint;
267 executor_type get_executor() noexcept {
return __base::get_executor(); }
276 constexpr bool is_open() const noexcept {
return __base::is_open(); }
291 template <
class MutableBufferSequence>
293 const MutableBufferSequence &buffers) {
294 size_t transferred{};
299 if (!ReadFile(
native_handle(), bufcur->data(), bufcur->size(), &numRead,
304 transferred += numRead;
314 template <
class ConstBufferSequence>
316 const ConstBufferSequence &buffers) {
317 size_t transferred{};
324 if (!WriteFile(
native_handle(), bufcur->data(), bufcur->size(), &written,
329 transferred += written;
342 : __base{ctx, proto} {}
350 basic_named_pipe(
const basic_named_pipe &) =
delete;
351 basic_named_pipe &operator=(
const basic_named_pipe &) =
delete;
352 basic_named_pipe(basic_named_pipe &&) =
default;
353 basic_named_pipe &operator=(basic_named_pipe &&) =
default;
355 ~basic_named_pipe() =
default;
363template <
class Protocol>
364class basic_named_pipe_socket :
public basic_named_pipe<Protocol> {
366 using __base = basic_named_pipe<Protocol>;
371 using endpoint_type =
typename protocol_type::endpoint;
373 explicit basic_named_pipe_socket(
net::io_context &ctx) : __base(ctx) {}
374 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto)
375 : __base(ctx, proto) {}
377 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto,
381 basic_named_pipe_socket(
const basic_named_pipe_socket &) =
delete;
382 basic_named_pipe_socket &operator=(
const basic_named_pipe_socket &) =
delete;
385 basic_named_pipe_socket(basic_named_pipe_socket &&other) =
default;
387 basic_named_pipe_socket &operator=(basic_named_pipe_socket &&) =
default;
389 ~basic_named_pipe_socket() {
390 if (is_open())
close();
393 constexpr bool is_open() const noexcept {
return __base::is_open(); }
415 template <
class ConstBufferSequence>
417 const ConstBufferSequence &buffers) {
418 return __base::write_some(buffers);
425 native_non_blocking_ =
static_cast<int>(v);
433 if (!connect_res)
return connect_res;
435 if (native_non_blocking_ == 1) {
443 int native_non_blocking_{-1};
451template <
class Protocol>
452class basic_named_pipe_acceptor :
public basic_named_pipe_impl<Protocol> {
454 using __base = basic_named_pipe_impl<Protocol>;
458 using socket_type = basic_named_pipe_socket<protocol_type>;
459 using endpoint_type =
typename protocol_type::endpoint;
460 using executor_type =
typename __base::executor_type;
463 explicit basic_named_pipe_acceptor(
net::io_context &ctx) : __base(ctx) {}
464 basic_named_pipe_acceptor(
net::io_context &ctx,
const protocol_type &proto)
465 : __base(ctx, proto) {}
467 basic_named_pipe_acceptor(
const basic_named_pipe_acceptor &) =
delete;
468 basic_named_pipe_acceptor &operator=(
const basic_named_pipe_acceptor &) =
472 basic_named_pipe_acceptor(basic_named_pipe_acceptor &&other) =
default;
474 basic_named_pipe_acceptor &operator=(basic_named_pipe_acceptor &&) =
default;
476 ~basic_named_pipe_acceptor() {
477 if (is_open())
close();
480 executor_type get_executor() noexcept {
return __base::get_executor(); }
482 bool is_open()
const {
return __base::is_open(); }
502 return __base::release();
509 native_non_blocking_ =
static_cast<int>(v);
528 if (ep.path().empty()) {
534 if (!ep_.path().empty()) {
541 const auto protocol = protocol_type();
543 if (native_non_blocking_ == 1)
flags |= PIPE_NOWAIT;
546 auto handle = CreateNamedPipe(
547 TEXT(ep_.path().c_str()), PIPE_ACCESS_DUPLEX,
548 protocol.type() | protocol.protocol() | PIPE_REJECT_REMOTE_CLIENTS |
550 back_log_, 1024 * 16, 1024 * 16, NMPWAIT_USE_DEFAULT_WAIT,
NULL);
567 int back_log = PIPE_UNLIMITED_INSTANCES) {
572 if (
back_log > PIPE_UNLIMITED_INSTANCES) {
593 if (ep_.path().empty()) {
598 const auto protocol = protocol_type();
604 const std::error_code ec_pipe_connected{ERROR_PIPE_CONNECTED,
605 std::system_category()};
606 const std::error_code ec_no_data{ERROR_NO_DATA,
607 std::system_category()};
612 if (last_ec == ec_pipe_connected || last_ec == ec_no_data) {
613 return {std::in_place, get_executor().context(), protocol,
620 return {std::in_place, get_executor().context(), protocol,
native_handle()};
629 int back_log_{PIPE_UNLIMITED_INSTANCES};
630 int native_non_blocking_{-1};
636template <
typename Protocol>
637class basic_named_pipe_endpoint {
641 basic_named_pipe_endpoint() =
default;
642 basic_named_pipe_endpoint(std::string
path)
643 : path_{
path.substr(0,
std::min(max_path_len(),
path.size()))} {}
645 std::string
path()
const {
return path_; }
647 constexpr protocol_type protocol() const noexcept {
return protocol_type(); }
649 size_t size()
const {
return path().size(); }
651 size_t capacity()
const {
return max_path_len(); }
653 void resize(
size_t size) { path_.resize(size); }
656 constexpr size_t max_path_len()
const {
return 256; }
664class message_protocol {
666 using endpoint = basic_named_pipe_endpoint<message_protocol>;
667 using socket = basic_named_pipe_socket<message_protocol>;
668 using acceptor = basic_named_pipe_acceptor<message_protocol>;
670 int type()
const {
return PIPE_TYPE_MESSAGE; }
671 int protocol()
const {
return PIPE_READMODE_MESSAGE; }
679 using endpoint = basic_named_pipe_endpoint<byte_protocol>;
680 using socket = basic_named_pipe_socket<byte_protocol>;
681 using acceptor = basic_named_pipe_acceptor<byte_protocol>;
683 int type()
const {
return PIPE_TYPE_BYTE; }
684 int protocol()
const {
return PIPE_READMODE_BYTE; }
Definition: protocol.h:33
Definition: io_context.h:990
Definition: io_context.h:61
Definition: expected.h:944
static int flags[50]
Definition: hp_test1.cc:40
static bool connected
Definition: mysql.cc:156
ulong back_log
Definition: mysqld.cc:1329
static char * path
Definition: mysqldump.cc:137
Definition: authentication.cc:36
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
@ success
A grow operation succeeded.
constexpr file_handle_type kInvalidHandle
Definition: file.h:54
std::error_code last_error_code()
Definition: file.h:57
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:239
stdx::expected< void, error_type > listen(native_handle_type native_handle, int backlog)
Definition: socket.h:149
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
stdx::expected< void, error_type > bind(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap bind() in a portable way.
Definition: socket.h:339
stdx::expected< native_handle_type, error_type > accept(native_handle_type native_handle, struct sockaddr *addr, socklen_t *addr_len)
wrap accept() in a portable way.
Definition: socket.h:367
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:106
stdx::expected< void, error_type > connect(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap connect() in a portable way.
Definition: socket.h:353
int native_handle_type
Definition: socket_constants.h:51
const const_buffer * buffer_sequence_end(const const_buffer &b) noexcept
Definition: buffer.h:185
const const_buffer * buffer_sequence_begin(const const_buffer &b) noexcept
Definition: buffer.h:180
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
native_handle_type native_handle()
Definition: process.h:56
constexpr auto make_unexpected(E &&e) -> unexpected< std::decay_t< E > >
Definition: expected.h:125
required string type
Definition: replication_group_member_actions.proto:34
#define NULL
Definition: types.h:55
#define HANDLE
Definition: violite.h:159