26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
37#include <system_error>
38#include <unordered_map>
55#if defined(HAVE_EPOLL)
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
98 template <class Rep, class Period>
101 template <class Clock, class Duration>
103 const
std::chrono::time_point<Clock, Duration> &abs_time);
107 template <class Rep, class Period>
110 template <class Clock, class Duration>
112 const
std::chrono::time_point<Clock, Duration> &abs_time);
118 std::lock_guard<std::mutex> lk(
mtx_);
126 std::lock_guard<std::mutex> lk(
mtx_);
131 std::lock_guard<std::mutex> lk(
mtx_);
171 template <
class Func>
182 using op_type = std::unique_ptr<BasicCallable>;
199 std::lock_guard<std::mutex> lk(
work_mtx_);
201 if (
work_.empty())
return 0;
206 tmp.splice(tmp.begin(),
work_,
work_.begin());
210 tmp.front()->invoke();
220 template <
class Func,
class ProtoAllocator>
221 void post(Func &&f,
const ProtoAllocator & ) {
222 std::lock_guard<std::mutex> lk(
work_mtx_);
234 std::lock_guard<std::mutex> lk(
work_mtx_);
235 return !
work_.empty();
248 template <
class Func,
class ProtoAllocator>
256 template <
class Clock,
class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
262 std::chrono::milliseconds
timeout);
264 template <
typename _Clock,
typename _WaitTraits>
269 template <
class Protocol>
272 template <
class Protocol>
275 template <
class Protocol>
278 template <
class Protocol>
314 std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
371 op_(std::error_code{});
384 std::lock_guard<std::mutex> lk(
mtx_);
386 return !
ops_.empty();
390 const auto handle = t->native_handle();
392 std::lock_guard<std::mutex> lk(
mtx_);
395 if (it !=
ops_.end()) {
396 it->second.push_back(std::move(t));
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
406 return static_cast<short>(el->event()) & events;
424 std::list<element_type> ops_to_delete;
426 std::lock_guard<std::mutex> lk(
mtx_);
427 for (
auto &fd_ops :
ops_) {
428 for (
auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
440 template <
class Pred>
442 std::lock_guard<std::mutex> lk(
mtx_);
444 const auto it =
ops_.find(fd);
445 if (it !=
ops_.end()) {
446 auto &async_ops = it->second;
448 const auto end = async_ops.end();
449 for (
auto cur = async_ops.begin(); cur != end; ++cur) {
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
455 if (async_ops.size() == 1) {
460 async_ops.erase(cur);
471 std::unordered_map<native_handle_type, std::vector<element_type>>
ops_{
493 std::cerr <<
"!! add_fd_interest(" << fd <<
", ..."
494 <<
") " << res.error() <<
" " << res.error().message()
500 std::lock_guard<std::mutex> lk(
mtx_);
520 virtual std::chrono::milliseconds
next()
const = 0;
523 template <
class Timer>
531 auto &io_ctx =
static_cast<io_context &
>(ctx);
537 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
538 io_ctx.timer_queues_.push_back(
this);
561 void push(
const Timer &timer, Op &&op) {
570 [](
const auto &a,
const auto &b) { return a < b->expiry(); }),
573 if (timer.id() ==
nullptr) abort();
580 if (timer.id() ==
nullptr) abort();
581 if (timer.expiry() == Timer::time_point::min()) abort();
585 std::make_pair(timer.expiry(), timer.id()));
589 std::chrono::milliseconds
next()
const override {
590 typename Timer::time_point expiry;
597 return std::chrono::milliseconds::max();
606 return std::chrono::milliseconds::min();
612 auto duration = Timer::traits_type::to_wait_duration(expiry);
613 if (duration < duration.zero()) {
614 duration = duration.zero();
623 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
625 using namespace std::chrono_literals;
628 if ((duration - duration_ms).
count() != 0) {
636 std::unique_ptr<pending_timer> pt;
656 auto min = Timer::time_point::min();
658 if (cur.first < min) abort();
663 const auto now = Timer::clock_type::now();
667 auto timepoint = pending_expiry_it->first;
669 if (timepoint > now) {
673 typename Timer::Id *timer_id = pending_expiry_it->second;
679 if (pending_it->second->id() != timer_id) {
682 if (pending_it->second->expiry() != pending_expiry_it->first) {
686 pt = std::move(pending_it->second);
715 auto &cur_timer = cur->second;
716 if (cur_timer->id() == t.id()) {
720 auto nxt = std::next(cur);
732 for (
auto cur = eq_range.first; cur != eq_range.second;) {
733 auto expiry_eq_range =
736 size_t erase_count{};
738 for (
auto expiry_cur = expiry_eq_range.first;
739 expiry_cur != expiry_eq_range.second;) {
740 if (expiry_cur->first == cur->second->expiry() &&
741 expiry_cur->second == cur->second->id() && erase_count == 0) {
750 if (erase_count == 0) abort();
752 cur->second->cancel();
805 op_(std::error_code{});
817 std::multimap<typename Timer::time_point, typename Timer::Id *>
819 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
831 template <
class Timer,
class Op>
833 auto &
queue = use_service<timer_queue<Timer>>(*this);
835 queue.push(timer, std::forward<Op>(op));
844 template <
class Timer>
850 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
859 template <
class Timer>
920 using namespace std::chrono_literals;
926 if (
n != std::numeric_limits<count_type>::max()) ++
n;
932 using namespace std::chrono_literals;
941template <
class Rep,
class Period>
943 const std::chrono::duration<Rep, Period> &rel_time) {
944 return run_until(std::chrono::steady_clock::now() + rel_time);
947template <
class Clock,
class Duration>
949 const std::chrono::time_point<Clock, Duration> &abs_time) {
954 using namespace std::chrono_literals;
960 if (
n != std::numeric_limits<count_type>::max()) ++
n;
965template <
class Rep,
class Period>
967 const std::chrono::duration<Rep, Period> &rel_time) {
968 return run_one_until(std::chrono::steady_clock::now() + rel_time);
971template <
class Clock,
class Duration>
973 const std::chrono::time_point<Clock, Duration> &abs_time) {
985 using namespace std::chrono_literals;
989 if (
n != std::numeric_limits<count_type>::max()) ++
n;
997 using namespace std::chrono_literals;
1029 template <
class Func,
class ProtoAllocator>
1030 void dispatch(Func &&f,
const ProtoAllocator &a)
const {
1033 std::decay_t<Func>(std::forward<Func>(f))();
1036 post(std::forward<Func>(f), a);
1050 template <
class Func,
class ProtoAllocator>
1051 void post(Func &&f,
const ProtoAllocator &a)
const {
1066 template <
class Func,
class ProtoAllocator>
1067 void defer(Func &&f,
const ProtoAllocator &a)
const {
1068 post(std::forward<Func>(f), a);
1081 return std::addressof(a.context()) == std::addressof(b.context());
1102 bool need_notify{
false};
1105 std::lock_guard<std::mutex> lk(
mtx_);
1117 if (
true || need_notify) {
1126template <
class Clock,
class Duration>
1128 std::unique_lock<std::mutex> &lk,
1129 const std::chrono::time_point<Clock, Duration> &abs_time) {
1130 using namespace std::chrono_literals;
1132 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1134 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1136 if (rel_time_ms < 0ms) {
1139 }
else if (rel_time_ms < rel_time) {
1144 return do_one(lk, rel_time_ms);
1155 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds
timeout) {
1174 if (timer_q->run_one()) {
1189 std::chrono::milliseconds min_duration{0};
1191 std::lock_guard<std::mutex> lk(
mtx_);
1194 const auto duration =
q->next();
1196 if (duration == duration.zero()) {
1198 min_duration = duration;
1200 }
else if ((duration != duration.max()) &&
1202 (duration < min_duration || timer_q ==
nullptr)) {
1204 min_duration = duration;
1210 if (timer_q && min_duration <= min_duration.zero())
continue;
1212 if (
auto op = [
this]() -> std::unique_ptr<async_op> {
1214 std::lock_guard<std::mutex> lk(
mtx_);
1252 if (timer_q ==
nullptr ||
1263 if (res.error() == std::errc::timed_out && min_duration !=
timeout &&
1264 timer_q !=
nullptr) {
1277 res.error() == std::errc::timed_out);
1288 short events) -> std::unique_ptr<async_op> {
1289 std::lock_guard<std::mutex> lk(
mtx_);
1292 }(res->fd, res->event)) {
Definition: io_service_base.h:87
Definition: socket.h:1412
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:1144
Definition: executor.h:308
execution_context & context() noexcept
Definition: executor.h:314
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
void destroy() noexcept
Definition: executor.h:193
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:164
virtual ~BasicCallable()=default
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:1003
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1030
io_context * io_ctx_
Definition: io_context.h:1076
void on_work_started() const noexcept
Definition: io_context.h:1017
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1072
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1015
executor_type(io_context &ctx)
Definition: io_context.h:1074
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1051
bool running_in_this_thread() const noexcept
Definition: io_context.h:1012
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1067
void on_work_finished() const noexcept
Definition: io_context.h:1018
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
Definition: io_context.h:796
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:798
Op op_
Definition: io_context.h:810
void run() override
Definition: io_context.h:801
Definition: io_context.h:767
void cancel()
Definition: io_context.h:778
pending_timer(const Timer &timer)
Definition: io_context.h:772
time_point expiry() const noexcept
Definition: io_context.h:785
typename Timer::time_point time_point
Definition: io_context.h:769
virtual ~pending_timer()=default
timer_id id() const
Definition: io_context.h:786
typename Timer::Id * timer_id
Definition: io_context.h:770
bool is_cancelled() const
Definition: io_context.h:777
time_point expiry_
Definition: io_context.h:791
timer_id id_
Definition: io_context.h:792
Definition: io_context.h:512
timer_queue_base(execution_context &ctx)
Definition: io_context.h:514
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:516
Definition: io_context.h:524
std::chrono::milliseconds next() const override
Definition: io_context.h:589
size_t cancel(const Timer &t)
Definition: io_context.h:703
io_context & context() noexcept
Definition: io_context.h:556
bool run_one() override
Definition: io_context.h:635
void shutdown() noexcept override
Definition: io_context.h:554
timer_queue(execution_context &ctx)
Definition: io_context.h:528
~timer_queue() override
Definition: io_context.h:541
void push(const Timer &timer, Op &&op)
Definition: io_context.h:561
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:818
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:820
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:814
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:981
bool is_running_
Definition: io_context.h:888
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:884
std::condition_variable do_one_cond_
Definition: io_context.h:887
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:948
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:931
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:901
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:895
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1127
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:942
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1147
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:832
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:966
size_t cancel_one(const Timer &)
Definition: io_context.h:860
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:886
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1154
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:908
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:877
count_type poll_one()
Definition: io_context.h:994
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:915
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:890
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1093
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:845
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1100
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:972
void is_running(bool v)
Definition: io_context.h:907
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:57
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:944
static int count
Definition: myisam_ftdump.cc:43
static QUEUE queue
Definition: myisampack.cc:207
static bool interrupted
Definition: mysqladmin.cc:66
Definition: authentication.cc:36
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:576
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:572
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
Definition: executor.h:370
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4065