24#ifndef MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
25#define MYSQL_HARNESS_NET_TS_WIN32_NAMED_PIPE_H_
29#include <system_error>
59 return {
static_cast<int>(GetLastError()), std::system_category()};
76class basic_named_pipe_impl_base {
81 basic_named_pipe_impl_base(
net::io_context &io_ctx) : io_ctx_{&io_ctx} {}
83 basic_named_pipe_impl_base(
const basic_named_pipe_impl_base &) =
delete;
84 basic_named_pipe_impl_base &operator=(
const basic_named_pipe_impl_base &) =
86 basic_named_pipe_impl_base(basic_named_pipe_impl_base &&rhs) noexcept
87 : native_handle_{std::exchange(rhs.native_handle_,
89 native_non_blocking_{std::exchange(rhs.native_non_blocking_, 0)},
90 io_ctx_{std::move(rhs.io_ctx_)} {}
92 basic_named_pipe_impl_base &operator=(
93 basic_named_pipe_impl_base &&rhs)
noexcept {
96 native_non_blocking_ = std::exchange(rhs.native_non_blocking_, 0);
97 io_ctx_ = rhs.io_ctx_;
102 ~basic_named_pipe_impl_base() =
default;
105 return native_handle_;
110 bool is_open() const noexcept {
114 executor_type get_executor() noexcept {
return io_ctx_->get_executor(); }
134 int native_non_blocking_{0};
148template <
class Protocol>
149class basic_named_pipe_impl :
public basic_named_pipe_impl_base {
151 using __base = basic_named_pipe_impl_base;
155 using endpoint_type =
typename protocol_type::endpoint;
157 constexpr explicit basic_named_pipe_impl(
net::io_context &ctx) noexcept
160 basic_named_pipe_impl(
const basic_named_pipe_impl &) =
delete;
161 basic_named_pipe_impl &operator=(
const basic_named_pipe_impl &) =
delete;
162 basic_named_pipe_impl(basic_named_pipe_impl &&rhs) =
default;
164 basic_named_pipe_impl &operator=(basic_named_pipe_impl &&rhs)
noexcept {
165 if (
this == std::addressof(rhs)) {
168 __base::operator=(std::move(rhs));
173 ~basic_named_pipe_impl() {}
175 executor_type get_executor() noexcept {
return __base::get_executor(); }
179 constexpr bool is_open() const noexcept {
return __base::is_open(); }
187 protocol_ = protocol;
194 DWORD wait_mode = v ? PIPE_NOWAIT : PIPE_WAIT;
197 SetNamedPipeHandleState(native_handle_, &wait_mode,
nullptr,
nullptr);
212 using clock_type = std::chrono::steady_clock;
213 using namespace std::chrono_literals;
214 const auto retry_step = 10ms;
215 const auto end_time = clock_type::now() + 1s;
218 CreateFile(
TEXT(ep.path().c_str()), GENERIC_READ | GENERIC_WRITE, 0,
219 nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL,
nullptr);
225 const std::error_code ec_pipe_busy{ERROR_PIPE_BUSY,
226 std::system_category()};
228 if ((error_code == ec_pipe_busy) && clock_type::now() < end_time) {
229 std::this_thread::sleep_for(retry_step);
242 protocol_type protocol_{endpoint_type{}.protocol()};
253template <
class Protocol>
254class basic_named_pipe :
private basic_named_pipe_impl<Protocol> {
256 using __base = basic_named_pipe_impl<Protocol>;
260 using executor_type =
typename __base::executor_type;
262 using endpoint_type =
typename protocol_type::endpoint;
266 executor_type get_executor() noexcept {
return __base::get_executor(); }
275 constexpr bool is_open() const noexcept {
return __base::is_open(); }
290 template <
class MutableBufferSequence>
292 const MutableBufferSequence &buffers) {
293 size_t transferred{};
298 if (!ReadFile(
native_handle(), bufcur->data(), bufcur->size(), &numRead,
303 transferred += numRead;
313 template <
class ConstBufferSequence>
315 const ConstBufferSequence &buffers) {
316 size_t transferred{};
323 if (!WriteFile(
native_handle(), bufcur->data(), bufcur->size(), &written,
328 transferred += written;
341 : __base{ctx, proto} {}
349 basic_named_pipe(
const basic_named_pipe &) =
delete;
350 basic_named_pipe &operator=(
const basic_named_pipe &) =
delete;
351 basic_named_pipe(basic_named_pipe &&) =
default;
352 basic_named_pipe &operator=(basic_named_pipe &&) =
default;
354 ~basic_named_pipe() =
default;
362template <
class Protocol>
363class basic_named_pipe_socket :
public basic_named_pipe<Protocol> {
365 using __base = basic_named_pipe<Protocol>;
370 using endpoint_type =
typename protocol_type::endpoint;
372 explicit basic_named_pipe_socket(
net::io_context &ctx) : __base(ctx) {}
373 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto)
374 : __base(ctx, proto) {}
376 basic_named_pipe_socket(
net::io_context &ctx,
const protocol_type &proto,
380 basic_named_pipe_socket(
const basic_named_pipe_socket &) =
delete;
381 basic_named_pipe_socket &operator=(
const basic_named_pipe_socket &) =
delete;
384 basic_named_pipe_socket(basic_named_pipe_socket &&other) =
default;
386 basic_named_pipe_socket &operator=(basic_named_pipe_socket &&) =
default;
388 ~basic_named_pipe_socket() {
389 if (is_open())
close();
392 constexpr bool is_open() const noexcept {
return __base::is_open(); }
414 template <
class ConstBufferSequence>
416 const ConstBufferSequence &buffers) {
417 return __base::write_some(buffers);
424 native_non_blocking_ =
static_cast<int>(v);
432 if (!connect_res)
return connect_res;
434 if (native_non_blocking_ == 1) {
442 int native_non_blocking_{-1};
450template <
class Protocol>
451class basic_named_pipe_acceptor :
public basic_named_pipe_impl<Protocol> {
453 using __base = basic_named_pipe_impl<Protocol>;
457 using socket_type = basic_named_pipe_socket<protocol_type>;
458 using endpoint_type =
typename protocol_type::endpoint;
459 using executor_type =
typename __base::executor_type;
462 explicit basic_named_pipe_acceptor(
net::io_context &ctx) : __base(ctx) {}
463 basic_named_pipe_acceptor(
net::io_context &ctx,
const protocol_type &proto)
464 : __base(ctx, proto) {}
466 basic_named_pipe_acceptor(
const basic_named_pipe_acceptor &) =
delete;
467 basic_named_pipe_acceptor &operator=(
const basic_named_pipe_acceptor &) =
471 basic_named_pipe_acceptor(basic_named_pipe_acceptor &&other) =
default;
473 basic_named_pipe_acceptor &operator=(basic_named_pipe_acceptor &&) =
default;
475 ~basic_named_pipe_acceptor() {
476 if (is_open())
close();
479 executor_type get_executor() noexcept {
return __base::get_executor(); }
481 bool is_open()
const {
return __base::is_open(); }
501 return __base::release();
508 native_non_blocking_ =
static_cast<int>(v);
527 if (ep.path().empty()) {
533 if (!ep_.path().empty()) {
540 const auto protocol = protocol_type();
542 if (native_non_blocking_ == 1)
flags |= PIPE_NOWAIT;
545 auto handle = CreateNamedPipe(
546 TEXT(ep_.path().c_str()), PIPE_ACCESS_DUPLEX,
547 protocol.type() | protocol.protocol() | PIPE_REJECT_REMOTE_CLIENTS |
549 back_log_, 1024 * 16, 1024 * 16, NMPWAIT_USE_DEFAULT_WAIT,
nullptr);
566 int back_log = PIPE_UNLIMITED_INSTANCES) {
571 if (
back_log > PIPE_UNLIMITED_INSTANCES) {
592 if (ep_.path().empty()) {
597 const auto protocol = protocol_type();
603 const std::error_code ec_pipe_connected{ERROR_PIPE_CONNECTED,
604 std::system_category()};
605 const std::error_code ec_no_data{ERROR_NO_DATA,
606 std::system_category()};
611 if (last_ec == ec_pipe_connected || last_ec == ec_no_data) {
612 return {std::in_place, get_executor().context(), protocol,
619 return {std::in_place, get_executor().context(), protocol,
native_handle()};
628 int back_log_{PIPE_UNLIMITED_INSTANCES};
629 int native_non_blocking_{-1};
635template <
typename Protocol>
636class basic_named_pipe_endpoint {
640 basic_named_pipe_endpoint() =
default;
641 basic_named_pipe_endpoint(std::string
path)
642 : path_{
path.substr(0,
std::min(max_path_len(),
path.size()))} {}
644 std::string
path()
const {
return path_; }
646 constexpr protocol_type protocol() const noexcept {
return protocol_type(); }
648 size_t size()
const {
return path().size(); }
650 size_t capacity()
const {
return max_path_len(); }
652 void resize(
size_t size) { path_.resize(size); }
655 constexpr size_t max_path_len()
const {
return 256; }
663class message_protocol {
665 using endpoint = basic_named_pipe_endpoint<message_protocol>;
666 using socket = basic_named_pipe_socket<message_protocol>;
667 using acceptor = basic_named_pipe_acceptor<message_protocol>;
669 int type()
const {
return PIPE_TYPE_MESSAGE; }
670 int protocol()
const {
return PIPE_READMODE_MESSAGE; }
678 using endpoint = basic_named_pipe_endpoint<byte_protocol>;
679 using socket = basic_named_pipe_socket<byte_protocol>;
680 using acceptor = basic_named_pipe_acceptor<byte_protocol>;
682 int type()
const {
return PIPE_TYPE_BYTE; }
683 int protocol()
const {
return PIPE_READMODE_BYTE; }
Definition: protocol.h:32
Definition: io_context.h:989
Definition: io_context.h:60
Definition: expected.h:943
static int flags[50]
Definition: hp_test1.cc:39
static bool connected
Definition: mysql.cc:168
ulong back_log
Definition: mysqld.cc:1342
static char * path
Definition: mysqldump.cc:148
Definition: authentication.cc:35
@ success
A grow operation succeeded.
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:78
constexpr file_handle_type kInvalidHandle
Definition: file.h:53
std::error_code last_error_code()
Definition: file.h:56
stdx::expected< void, std::error_code > close(file_handle_type native_handle)
close file handle.
Definition: file.h:238
stdx::expected< void, error_type > listen(native_handle_type native_handle, int backlog)
Definition: socket.h:148
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:62
stdx::expected< void, error_type > bind(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap bind() in a portable way.
Definition: socket.h:338
stdx::expected< native_handle_type, error_type > accept(native_handle_type native_handle, struct sockaddr *addr, socklen_t *addr_len)
wrap accept() in a portable way.
Definition: socket.h:366
stdx::expected< bool, error_type > native_non_blocking(native_handle_type native_handle)
Definition: socket.h:105
stdx::expected< void, error_type > connect(native_handle_type native_handle, const struct sockaddr *addr, size_t addr_len)
wrap connect() in a portable way.
Definition: socket.h:352
int native_handle_type
Definition: socket_constants.h:50
const const_buffer * buffer_sequence_end(const const_buffer &b) noexcept
Definition: buffer.h:184
const const_buffer * buffer_sequence_begin(const const_buffer &b) noexcept
Definition: buffer.h:179
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:63
Definition: varlen_sort.h:174
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:78
native_handle_type native_handle()
Definition: process.h:55
constexpr auto make_unexpected(E &&e) -> unexpected< std::decay_t< E > >
Definition: expected.h:124
required string type
Definition: replication_group_member_actions.proto:33
#define HANDLE
Definition: violite.h:158