26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
37#include <system_error>
38#include <unordered_map>
55#if defined(HAVE_EPOLL)
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
98 template <class Rep, class Period>
101 template <class Clock, class Duration>
103 const
std::chrono::time_point<Clock, Duration> &abs_time);
107 template <class Rep, class Period>
110 template <class Clock, class Duration>
112 const
std::chrono::time_point<Clock, Duration> &abs_time);
118 std::lock_guard<std::mutex> lk(
mtx_);
126 std::lock_guard<std::mutex> lk(
mtx_);
131 std::lock_guard<std::mutex> lk(
mtx_);
171 template <
class Func>
182 using op_type = std::unique_ptr<BasicCallable>;
199 std::lock_guard<std::mutex> lk(
work_mtx_);
201 if (
work_.empty())
return 0;
206 tmp.splice(tmp.begin(),
work_,
work_.begin());
210 tmp.front()->invoke();
220 template <
class Func,
class ProtoAllocator>
221 void post(Func &&f,
const ProtoAllocator & ) {
222 std::lock_guard<std::mutex> lk(
work_mtx_);
234 std::lock_guard<std::mutex> lk(
work_mtx_);
235 return !
work_.empty();
248 template <
class Func,
class ProtoAllocator>
256 template <
class Clock,
class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
262 std::chrono::milliseconds
timeout);
264 template <
typename _Clock,
typename _WaitTraits>
269 template <
class Protocol>
272 template <
class Protocol>
275 template <
class Protocol>
278 template <
class Protocol>
314 std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
371 op_(std::error_code{});
384 std::lock_guard<std::mutex> lk(
mtx_);
386 return !
ops_.empty();
390 const auto handle = t->native_handle();
392 std::lock_guard<std::mutex> lk(
mtx_);
395 if (it !=
ops_.end()) {
396 it->second.push_back(std::move(t));
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
406 return static_cast<short>(el->event()) & events;
424 std::list<element_type> ops_to_delete;
426 std::lock_guard<std::mutex> lk(
mtx_);
427 for (
auto &fd_ops :
ops_) {
428 for (
auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
440 template <
class Pred>
442 std::lock_guard<std::mutex> lk(
mtx_);
444 const auto it =
ops_.find(fd);
445 if (it !=
ops_.end()) {
446 auto &async_ops = it->second;
448 const auto end = async_ops.end();
449 for (
auto cur = async_ops.begin(); cur != end; ++cur) {
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
455 if (async_ops.size() == 1) {
460 async_ops.erase(cur);
471 std::unordered_map<native_handle_type, std::vector<element_type>>
ops_{
493 std::cerr <<
"!! add_fd_interest(" << fd <<
", ..."
494 <<
") " << res.error() <<
" " << res.error().message()
500 std::lock_guard<std::mutex> lk(
mtx_);
520 virtual std::chrono::milliseconds
next()
const = 0;
523 template <
class Timer>
531 auto &io_ctx =
static_cast<io_context &
>(ctx);
537 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
538 io_ctx.timer_queues_.push_back(
this);
548 void push(
const Timer &timer, Op &&op) {
557 [](
const auto &a,
const auto &b) { return a < b->expiry(); }),
560 if (timer.id() ==
nullptr) abort();
567 if (timer.id() ==
nullptr) abort();
568 if (timer.expiry() == Timer::time_point::min()) abort();
572 std::make_pair(timer.expiry(), timer.id()));
576 std::chrono::milliseconds
next()
const override {
577 typename Timer::time_point expiry;
584 return std::chrono::milliseconds::max();
593 return std::chrono::milliseconds::min();
599 auto duration = Timer::traits_type::to_wait_duration(expiry);
600 if (duration < duration.zero()) {
601 duration = duration.zero();
610 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
612 using namespace std::chrono_literals;
615 if ((duration - duration_ms).
count() != 0) {
623 std::unique_ptr<pending_timer> pt;
643 auto min = Timer::time_point::min();
645 if (cur.first < min) abort();
650 const auto now = Timer::clock_type::now();
654 auto timepoint = pending_expiry_it->first;
656 if (timepoint > now) {
660 typename Timer::Id *timer_id = pending_expiry_it->second;
666 if (pending_it->second->id() != timer_id) {
669 if (pending_it->second->expiry() != pending_expiry_it->first) {
673 pt = std::move(pending_it->second);
702 auto &cur_timer = cur->second;
703 if (cur_timer->id() == t.id()) {
707 auto nxt = std::next(cur);
719 for (
auto cur = eq_range.first; cur != eq_range.second;) {
720 auto expiry_eq_range =
723 size_t erase_count{};
725 for (
auto expiry_cur = expiry_eq_range.first;
726 expiry_cur != expiry_eq_range.second;) {
727 if (expiry_cur->first == cur->second->expiry() &&
728 expiry_cur->second == cur->second->id() && erase_count == 0) {
737 if (erase_count == 0) abort();
739 cur->second->cancel();
792 op_(std::error_code{});
804 std::multimap<typename Timer::time_point, typename Timer::Id *>
806 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
818 template <
class Timer,
class Op>
820 auto &
queue = use_service<timer_queue<Timer>>(*this);
822 queue.push(timer, std::forward<Op>(op));
831 template <
class Timer>
837 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
846 template <
class Timer>
907 using namespace std::chrono_literals;
913 if (
n != std::numeric_limits<count_type>::max()) ++
n;
919 using namespace std::chrono_literals;
928template <
class Rep,
class Period>
930 const std::chrono::duration<Rep, Period> &rel_time) {
931 return run_until(std::chrono::steady_clock::now() + rel_time);
934template <
class Clock,
class Duration>
936 const std::chrono::time_point<Clock, Duration> &abs_time) {
941 using namespace std::chrono_literals;
947 if (
n != std::numeric_limits<count_type>::max()) ++
n;
952template <
class Rep,
class Period>
954 const std::chrono::duration<Rep, Period> &rel_time) {
955 return run_one_until(std::chrono::steady_clock::now() + rel_time);
958template <
class Clock,
class Duration>
960 const std::chrono::time_point<Clock, Duration> &abs_time) {
972 using namespace std::chrono_literals;
976 if (
n != std::numeric_limits<count_type>::max()) ++
n;
984 using namespace std::chrono_literals;
1016 template <
class Func,
class ProtoAllocator>
1017 void dispatch(Func &&f,
const ProtoAllocator &a)
const {
1020 std::decay_t<Func>(std::forward<Func>(f))();
1023 post(std::forward<Func>(f), a);
1037 template <
class Func,
class ProtoAllocator>
1038 void post(Func &&f,
const ProtoAllocator &a)
const {
1053 template <
class Func,
class ProtoAllocator>
1054 void defer(Func &&f,
const ProtoAllocator &a)
const {
1055 post(std::forward<Func>(f), a);
1068 return std::addressof(a.context()) == std::addressof(b.context());
1089 bool need_notify{
false};
1092 std::lock_guard<std::mutex> lk(
mtx_);
1104 if (
true || need_notify) {
1113template <
class Clock,
class Duration>
1115 std::unique_lock<std::mutex> &lk,
1116 const std::chrono::time_point<Clock, Duration> &abs_time) {
1117 using namespace std::chrono_literals;
1119 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1121 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1123 if (rel_time_ms < 0ms) {
1126 }
else if (rel_time_ms < rel_time) {
1131 return do_one(lk, rel_time_ms);
1142 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds
timeout) {
1161 if (timer_q->run_one()) {
1176 std::chrono::milliseconds min_duration{0};
1178 std::lock_guard<std::mutex> lk(
mtx_);
1181 const auto duration =
q->next();
1183 if (duration == duration.zero()) {
1185 min_duration = duration;
1187 }
else if ((duration != duration.max()) &&
1189 (duration < min_duration || timer_q ==
nullptr)) {
1191 min_duration = duration;
1197 if (timer_q && min_duration <= min_duration.zero())
continue;
1199 if (
auto op = [
this]() -> std::unique_ptr<async_op> {
1201 std::lock_guard<std::mutex> lk(
mtx_);
1239 if (timer_q ==
nullptr ||
1250 if (res.error() == std::errc::timed_out && min_duration !=
timeout &&
1251 timer_q !=
nullptr) {
1264 res.error() == std::errc::timed_out);
1275 short events) -> std::unique_ptr<async_op> {
1276 std::lock_guard<std::mutex> lk(
mtx_);
1279 }(res->fd, res->event)) {
Definition: io_service_base.h:87
Definition: socket.h:1412
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:1144
Definition: executor.h:308
execution_context & context() noexcept
Definition: executor.h:314
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
void destroy() noexcept
Definition: executor.h:193
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:164
virtual ~BasicCallable()=default
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:990
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1017
io_context * io_ctx_
Definition: io_context.h:1063
void on_work_started() const noexcept
Definition: io_context.h:1004
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1059
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1002
executor_type(io_context &ctx)
Definition: io_context.h:1061
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1038
bool running_in_this_thread() const noexcept
Definition: io_context.h:999
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1054
void on_work_finished() const noexcept
Definition: io_context.h:1005
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
Definition: io_context.h:783
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:785
Op op_
Definition: io_context.h:797
void run() override
Definition: io_context.h:788
Definition: io_context.h:754
void cancel()
Definition: io_context.h:765
pending_timer(const Timer &timer)
Definition: io_context.h:759
time_point expiry() const noexcept
Definition: io_context.h:772
typename Timer::time_point time_point
Definition: io_context.h:756
virtual ~pending_timer()=default
timer_id id() const
Definition: io_context.h:773
typename Timer::Id * timer_id
Definition: io_context.h:757
bool is_cancelled() const
Definition: io_context.h:764
time_point expiry_
Definition: io_context.h:778
timer_id id_
Definition: io_context.h:779
Definition: io_context.h:512
timer_queue_base(execution_context &ctx)
Definition: io_context.h:514
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:516
Definition: io_context.h:524
std::chrono::milliseconds next() const override
Definition: io_context.h:576
size_t cancel(const Timer &t)
Definition: io_context.h:690
io_context & context() noexcept
Definition: io_context.h:543
bool run_one() override
Definition: io_context.h:622
void shutdown() noexcept override
Definition: io_context.h:541
timer_queue(execution_context &ctx)
Definition: io_context.h:528
void push(const Timer &timer, Op &&op)
Definition: io_context.h:548
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:805
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:807
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:801
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:968
bool is_running_
Definition: io_context.h:875
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:871
std::condition_variable do_one_cond_
Definition: io_context.h:874
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:935
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:918
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:888
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:882
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1114
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:929
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1134
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:819
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:953
size_t cancel_one(const Timer &)
Definition: io_context.h:847
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:873
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1141
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:895
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:864
count_type poll_one()
Definition: io_context.h:981
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:902
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:877
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1080
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:832
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1087
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:959
void is_running(bool v)
Definition: io_context.h:894
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:57
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:944
static int count
Definition: myisam_ftdump.cc:43
static QUEUE queue
Definition: myisampack.cc:207
static bool interrupted
Definition: mysqladmin.cc:66
Definition: authentication.cc:36
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:576
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:572
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
Definition: executor.h:370
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4059