26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
37#include <system_error>
38#include <unordered_map>
55#if defined(HAVE_EPOLL)
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
98 template <class Rep, class Period>
101 template <class Clock, class Duration>
103 const
std::chrono::time_point<Clock, Duration> &abs_time);
107 template <class Rep, class Period>
110 template <class Clock, class Duration>
112 const
std::chrono::time_point<Clock, Duration> &abs_time);
118 std::lock_guard<std::mutex> lk(
mtx_);
126 std::lock_guard<std::mutex> lk(
mtx_);
131 std::lock_guard<std::mutex> lk(
mtx_);
171 template <
class Func>
182 using op_type = std::unique_ptr<BasicCallable>;
199 std::lock_guard<std::mutex> lk(
work_mtx_);
201 if (
work_.empty())
return 0;
206 tmp.splice(tmp.begin(),
work_,
work_.begin());
210 tmp.front()->invoke();
220 template <
class Func,
class ProtoAllocator>
221 void post(Func &&f,
const ProtoAllocator & ) {
222 std::lock_guard<std::mutex> lk(
work_mtx_);
234 std::lock_guard<std::mutex> lk(
work_mtx_);
235 return !
work_.empty();
248 template <
class Func,
class ProtoAllocator>
256 template <
class Clock,
class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
262 std::chrono::milliseconds
timeout);
264 template <
typename _Clock,
typename _WaitTraits>
269 template <
class Protocol>
272 template <
class Protocol>
275 template <
class Protocol>
278 template <
class Protocol>
314 std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
371 op_(std::error_code{});
384 std::lock_guard<std::mutex> lk(
mtx_);
386 return !
ops_.empty();
390 const auto handle = t->native_handle();
392 std::lock_guard<std::mutex> lk(
mtx_);
395 if (it !=
ops_.end()) {
396 it->second.push_back(std::move(t));
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
406 return static_cast<short>(el->event()) & events;
424 std::list<element_type> ops_to_delete;
426 std::lock_guard<std::mutex> lk(
mtx_);
427 for (
auto &fd_ops :
ops_) {
428 for (
auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
440 template <
class Pred>
442 std::lock_guard<std::mutex> lk(
mtx_);
444 const auto it =
ops_.find(fd);
445 if (it !=
ops_.end()) {
446 auto &async_ops = it->second;
448 const auto end = async_ops.end();
449 for (
auto cur = async_ops.begin(); cur != end; ++cur) {
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
455 if (async_ops.size() == 1) {
460 async_ops.erase(cur);
471 std::unordered_map<native_handle_type, std::vector<element_type>>
ops_{
493 std::cerr <<
"!! add_fd_interest(" << fd <<
", ..."
494 <<
") " << res.error() <<
" " << res.error().message()
500 std::lock_guard<std::mutex> lk(
mtx_);
521 virtual std::chrono::milliseconds
next()
const = 0;
524 template <
class Timer>
532 auto &io_ctx =
static_cast<io_context &
>(ctx);
538 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
539 io_ctx.timer_queues_.push_back(
this);
549 void push(
const Timer &timer, Op &&op) {
558 [](
const auto &a,
const auto &b) { return a < b->expiry(); }),
561 if (timer.id() ==
nullptr) abort();
568 if (timer.id() ==
nullptr) abort();
569 if (timer.expiry() == Timer::time_point::min()) abort();
573 std::make_pair(timer.expiry(), timer.id()));
577 std::chrono::milliseconds
next()
const override {
578 typename Timer::time_point expiry;
585 return std::chrono::milliseconds::max();
594 return std::chrono::milliseconds::min();
600 auto duration = Timer::traits_type::to_wait_duration(expiry);
601 if (duration < duration.zero()) {
602 duration = duration.zero();
611 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
613 using namespace std::chrono_literals;
616 if ((duration - duration_ms).
count() != 0) {
624 std::unique_ptr<pending_timer> pt;
644 auto min = Timer::time_point::min();
646 if (cur.first < min) abort();
651 const auto now = Timer::clock_type::now();
655 auto timepoint = pending_expiry_it->first;
657 if (timepoint > now) {
661 typename Timer::Id *timer_id = pending_expiry_it->second;
667 if (pending_it->second->id() != timer_id) {
670 if (pending_it->second->expiry() != pending_expiry_it->first) {
674 pt = std::move(pending_it->second);
703 auto &cur_timer = cur->second;
704 if (cur_timer->id() == t.id()) {
708 auto nxt = std::next(cur);
720 for (
auto cur = eq_range.first; cur != eq_range.second;) {
721 auto expiry_eq_range =
724 size_t erase_count{};
726 for (
auto expiry_cur = expiry_eq_range.first;
727 expiry_cur != expiry_eq_range.second;) {
728 if (expiry_cur->first == cur->second->expiry() &&
729 expiry_cur->second == cur->second->id() && erase_count == 0) {
738 if (erase_count == 0) abort();
740 cur->second->cancel();
793 op_(std::error_code{});
805 std::multimap<typename Timer::time_point, typename Timer::Id *>
807 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
819 template <
class Timer,
class Op>
821 auto &
queue = use_service<timer_queue<Timer>>(*this);
823 queue.push(timer, std::forward<Op>(op));
832 template <
class Timer>
838 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
847 template <
class Timer>
908 using namespace std::chrono_literals;
914 if (
n != std::numeric_limits<count_type>::max()) ++
n;
920 using namespace std::chrono_literals;
929template <
class Rep,
class Period>
931 const std::chrono::duration<Rep, Period> &rel_time) {
932 return run_until(std::chrono::steady_clock::now() + rel_time);
935template <
class Clock,
class Duration>
937 const std::chrono::time_point<Clock, Duration> &abs_time) {
942 using namespace std::chrono_literals;
948 if (
n != std::numeric_limits<count_type>::max()) ++
n;
953template <
class Rep,
class Period>
955 const std::chrono::duration<Rep, Period> &rel_time) {
956 return run_one_until(std::chrono::steady_clock::now() + rel_time);
959template <
class Clock,
class Duration>
961 const std::chrono::time_point<Clock, Duration> &abs_time) {
973 using namespace std::chrono_literals;
977 if (
n != std::numeric_limits<count_type>::max()) ++
n;
985 using namespace std::chrono_literals;
1017 template <
class Func,
class ProtoAllocator>
1018 void dispatch(Func &&f,
const ProtoAllocator &a)
const {
1021 std::decay_t<Func>(std::forward<Func>(f))();
1024 post(std::forward<Func>(f), a);
1038 template <
class Func,
class ProtoAllocator>
1039 void post(Func &&f,
const ProtoAllocator &a)
const {
1054 template <
class Func,
class ProtoAllocator>
1055 void defer(Func &&f,
const ProtoAllocator &a)
const {
1056 post(std::forward<Func>(f), a);
1069 return std::addressof(a.context()) == std::addressof(b.context());
1090 bool need_notify{
false};
1093 std::lock_guard<std::mutex> lk(
mtx_);
1105 if (
true || need_notify) {
1114template <
class Clock,
class Duration>
1116 std::unique_lock<std::mutex> &lk,
1117 const std::chrono::time_point<Clock, Duration> &abs_time) {
1118 using namespace std::chrono_literals;
1120 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1122 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1124 if (rel_time_ms < 0ms) {
1127 }
else if (rel_time_ms < rel_time) {
1132 return do_one(lk, rel_time_ms);
1143 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds
timeout) {
1162 if (timer_q->run_one()) {
1177 std::chrono::milliseconds min_duration{0};
1179 std::lock_guard<std::mutex>
lock(
mtx_);
1182 const auto duration =
q->next();
1184 if (duration == duration.zero()) {
1186 min_duration = duration;
1188 }
else if ((duration != duration.max()) &&
1190 (duration < min_duration || timer_q ==
nullptr)) {
1192 min_duration = duration;
1198 if (timer_q && min_duration <= min_duration.zero())
continue;
1200 if (
auto op = [
this]() -> std::unique_ptr<async_op> {
1202 std::lock_guard<std::mutex>
lock(
mtx_);
1211 return cancelled_op;
1240 if (timer_q ==
nullptr ||
1251 if (res.error() == std::errc::timed_out && min_duration !=
timeout &&
1252 timer_q !=
nullptr) {
1265 res.error() == std::errc::timed_out);
1276 short events) -> std::unique_ptr<async_op> {
1277 std::lock_guard<std::mutex>
lock(
mtx_);
1280 }(res->fd, res->event)) {
Definition: io_service_base.h:87
Definition: socket.h:1293
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:1090
Definition: executor.h:291
execution_context & context() noexcept
Definition: executor.h:297
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
void destroy() noexcept
Definition: executor.h:176
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:164
virtual ~BasicCallable()=default
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:991
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1018
io_context * io_ctx_
Definition: io_context.h:1064
void on_work_started() const noexcept
Definition: io_context.h:1005
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1060
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1003
executor_type(io_context &ctx)
Definition: io_context.h:1062
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1039
bool running_in_this_thread() const noexcept
Definition: io_context.h:1000
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1055
void on_work_finished() const noexcept
Definition: io_context.h:1006
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
Definition: io_context.h:784
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:786
Op op_
Definition: io_context.h:798
void run() override
Definition: io_context.h:789
Definition: io_context.h:755
void cancel()
Definition: io_context.h:766
pending_timer(const Timer &timer)
Definition: io_context.h:760
time_point expiry() const noexcept
Definition: io_context.h:773
typename Timer::time_point time_point
Definition: io_context.h:757
virtual ~pending_timer()=default
timer_id id() const
Definition: io_context.h:774
typename Timer::Id * timer_id
Definition: io_context.h:758
bool is_cancelled() const
Definition: io_context.h:765
time_point expiry_
Definition: io_context.h:779
timer_id id_
Definition: io_context.h:780
Definition: io_context.h:513
timer_queue_base(execution_context &ctx)
Definition: io_context.h:515
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:517
Definition: io_context.h:525
std::chrono::milliseconds next() const override
Definition: io_context.h:577
size_t cancel(const Timer &t)
Definition: io_context.h:691
io_context & context() noexcept
Definition: io_context.h:544
bool run_one() override
Definition: io_context.h:623
void shutdown() noexcept override
Definition: io_context.h:542
timer_queue(execution_context &ctx)
Definition: io_context.h:529
void push(const Timer &timer, Op &&op)
Definition: io_context.h:549
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:806
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:808
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:802
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:969
bool is_running_
Definition: io_context.h:876
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:872
std::condition_variable do_one_cond_
Definition: io_context.h:875
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:936
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:919
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:889
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:883
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1115
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:930
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1135
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:820
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:954
size_t cancel_one(const Timer &)
Definition: io_context.h:848
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:874
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1142
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:896
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:865
count_type poll_one()
Definition: io_context.h:982
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:903
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:878
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1081
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:833
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1088
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:960
void is_running(bool v)
Definition: io_context.h:895
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:58
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:286
static int count
Definition: myisam_ftdump.cc:45
static QUEUE queue
Definition: myisampack.cc:210
static bool interrupted
Definition: mysqladmin.cc:72
Definition: http_server_component.cc:34
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:551
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:547
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
static std::mutex lock
Definition: net_ns.cc:56
Definition: executor.h:353
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4086