25#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
26#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
36#include <system_error>
37#include <unordered_map>
54#if defined(HAVE_EPOLL)
72 std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
97 template <class Rep, class Period>
100 template <class Clock, class Duration>
102 const
std::chrono::time_point<Clock, Duration> &abs_time);
106 template <class Rep, class Period>
109 template <class Clock, class Duration>
111 const
std::chrono::time_point<Clock, Duration> &abs_time);
117 std::lock_guard<std::mutex> lk(
mtx_);
125 std::lock_guard<std::mutex> lk(
mtx_);
130 std::lock_guard<std::mutex> lk(
mtx_);
170 template <
class Func>
181 using op_type = std::unique_ptr<BasicCallable>;
198 std::lock_guard<std::mutex> lk(
work_mtx_);
200 if (
work_.empty())
return 0;
205 tmp.splice(tmp.begin(),
work_,
work_.begin());
209 tmp.front()->invoke();
219 template <
class Func,
class ProtoAllocator>
220 void post(Func &&f,
const ProtoAllocator & ) {
221 std::lock_guard<std::mutex> lk(
work_mtx_);
233 std::lock_guard<std::mutex> lk(
work_mtx_);
234 return !
work_.empty();
247 template <
class Func,
class ProtoAllocator>
255 template <
class Clock,
class Duration>
257 std::unique_lock<std::mutex> &lk,
258 const std::chrono::time_point<Clock, Duration> &abs_time);
261 std::chrono::milliseconds
timeout);
263 template <
typename _Clock,
typename _WaitTraits>
268 template <
class Protocol>
271 template <
class Protocol>
274 template <
class Protocol>
277 template <
class Protocol>
313 std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
370 op_(std::error_code{});
383 std::lock_guard<std::mutex> lk(
mtx_);
385 return !
ops_.empty();
389 const auto handle = t->native_handle();
391 std::lock_guard<std::mutex> lk(
mtx_);
394 if (it !=
ops_.end()) {
395 it->second.push_back(std::move(t));
397 std::vector<element_type> v;
398 v.push_back(std::move(t));
405 return static_cast<short>(el->event()) & events;
423 std::list<element_type> ops_to_delete;
425 std::lock_guard<std::mutex> lk(
mtx_);
426 for (
auto &fd_ops :
ops_) {
427 for (
auto &fd_op : fd_ops.second) {
428 ops_to_delete.push_back(std::move(fd_op));
439 template <
class Pred>
441 std::lock_guard<std::mutex> lk(
mtx_);
443 const auto it =
ops_.find(fd);
444 if (it !=
ops_.end()) {
445 auto &async_ops = it->second;
447 const auto end = async_ops.end();
448 for (
auto cur = async_ops.begin(); cur != end; ++cur) {
451 if (el->native_handle() == fd && pred(el)) {
452 auto op = std::move(el);
454 if (async_ops.size() == 1) {
459 async_ops.erase(cur);
470 std::unordered_map<native_handle_type, std::vector<element_type>>
ops_{
492 std::cerr <<
"!! add_fd_interest(" << fd <<
", ..."
493 <<
") " << res.error() <<
" " << res.error().message()
499 std::lock_guard<std::mutex> lk(
mtx_);
519 virtual std::chrono::milliseconds
next()
const = 0;
522 template <
class Timer>
530 auto &io_ctx =
static_cast<io_context &
>(ctx);
536 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
537 io_ctx.timer_queues_.push_back(
this);
547 void push(
const Timer &timer, Op &&op) {
556 [](
const auto &a,
const auto &b) { return a < b->expiry(); }),
559 if (timer.id() ==
nullptr) abort();
566 if (timer.id() ==
nullptr) abort();
567 if (timer.expiry() == Timer::time_point::min()) abort();
571 std::make_pair(timer.expiry(), timer.id()));
575 std::chrono::milliseconds
next()
const override {
576 typename Timer::time_point expiry;
583 return std::chrono::milliseconds::max();
592 return std::chrono::milliseconds::min();
598 auto duration = Timer::traits_type::to_wait_duration(expiry);
599 if (duration < duration.zero()) {
600 duration = duration.zero();
609 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
611 using namespace std::chrono_literals;
614 if ((duration - duration_ms).
count() != 0) {
622 std::unique_ptr<pending_timer> pt;
642 auto min = Timer::time_point::min();
644 if (cur.first < min) abort();
649 const auto now = Timer::clock_type::now();
653 auto timepoint = pending_expiry_it->first;
655 if (timepoint > now) {
659 typename Timer::Id *timer_id = pending_expiry_it->second;
665 if (pending_it->second->id() != timer_id) {
668 if (pending_it->second->expiry() != pending_expiry_it->first) {
672 pt = std::move(pending_it->second);
701 auto &cur_timer = cur->second;
702 if (cur_timer->id() == t.id()) {
706 auto nxt = std::next(cur);
718 for (
auto cur = eq_range.first; cur != eq_range.second;) {
719 auto expiry_eq_range =
722 size_t erase_count{};
724 for (
auto expiry_cur = expiry_eq_range.first;
725 expiry_cur != expiry_eq_range.second;) {
726 if (expiry_cur->first == cur->second->expiry() &&
727 expiry_cur->second == cur->second->id() && erase_count == 0) {
736 if (erase_count == 0) abort();
738 cur->second->cancel();
791 op_(std::error_code{});
803 std::multimap<typename Timer::time_point, typename Timer::Id *>
805 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
817 template <
class Timer,
class Op>
819 auto &
queue = use_service<timer_queue<Timer>>(*this);
821 queue.push(timer, std::forward<Op>(op));
830 template <
class Timer>
836 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
845 template <
class Timer>
906 using namespace std::chrono_literals;
912 if (
n != std::numeric_limits<count_type>::max()) ++
n;
918 using namespace std::chrono_literals;
927template <
class Rep,
class Period>
929 const std::chrono::duration<Rep, Period> &rel_time) {
930 return run_until(std::chrono::steady_clock::now() + rel_time);
933template <
class Clock,
class Duration>
935 const std::chrono::time_point<Clock, Duration> &abs_time) {
940 using namespace std::chrono_literals;
946 if (
n != std::numeric_limits<count_type>::max()) ++
n;
951template <
class Rep,
class Period>
953 const std::chrono::duration<Rep, Period> &rel_time) {
954 return run_one_until(std::chrono::steady_clock::now() + rel_time);
957template <
class Clock,
class Duration>
959 const std::chrono::time_point<Clock, Duration> &abs_time) {
971 using namespace std::chrono_literals;
975 if (
n != std::numeric_limits<count_type>::max()) ++
n;
983 using namespace std::chrono_literals;
1015 template <
class Func,
class ProtoAllocator>
1016 void dispatch(Func &&f,
const ProtoAllocator &a)
const {
1019 std::decay_t<Func>(std::forward<Func>(f))();
1022 post(std::forward<Func>(f), a);
1036 template <
class Func,
class ProtoAllocator>
1037 void post(Func &&f,
const ProtoAllocator &a)
const {
1052 template <
class Func,
class ProtoAllocator>
1053 void defer(Func &&f,
const ProtoAllocator &a)
const {
1054 post(std::forward<Func>(f), a);
1067 return std::addressof(a.context()) == std::addressof(b.context());
1088 bool need_notify{
false};
1091 std::lock_guard<std::mutex> lk(
mtx_);
1103 if (
true || need_notify) {
1112template <
class Clock,
class Duration>
1114 std::unique_lock<std::mutex> &lk,
1115 const std::chrono::time_point<Clock, Duration> &abs_time) {
1116 using namespace std::chrono_literals;
1118 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1120 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1122 if (rel_time_ms < 0ms) {
1125 }
else if (rel_time_ms < rel_time) {
1130 return do_one(lk, rel_time_ms);
1141 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds
timeout) {
1160 if (timer_q->run_one()) {
1175 std::chrono::milliseconds min_duration{0};
1177 std::lock_guard<std::mutex> lk(
mtx_);
1180 const auto duration =
q->next();
1182 if (duration == duration.zero()) {
1184 min_duration = duration;
1186 }
else if ((duration != duration.max()) &&
1188 (duration < min_duration || timer_q ==
nullptr)) {
1190 min_duration = duration;
1196 if (timer_q && min_duration == min_duration.zero())
continue;
1198 if (
auto op = [
this]() -> std::unique_ptr<async_op> {
1200 std::lock_guard<std::mutex> lk(
mtx_);
1238 if (timer_q ==
nullptr ||
1249 if (res.error() == std::errc::timed_out && min_duration !=
timeout &&
1250 timer_q !=
nullptr) {
1263 res.error() == std::errc::timed_out);
1274 short events) -> std::unique_ptr<async_op> {
1275 std::lock_guard<std::mutex> lk(
mtx_);
1278 }(res->fd, res->event)) {
Definition: io_service_base.h:86
Definition: socket.h:1411
template-less base-class of basic_socket_impl.
Definition: socket.h:334
Definition: socket.h:1143
Definition: executor.h:307
execution_context & context() noexcept
Definition: executor.h:313
Definition: executor.h:153
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:299
void destroy() noexcept
Definition: executor.h:192
Definition: callstack.h:79
callstack of a thread.
Definition: callstack.h:70
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:150
Definition: socket_service_base.h:47
Definition: io_context.h:378
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:470
std::mutex mtx_
Definition: io_context.h:473
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:403
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:440
void release_all()
Definition: io_context.h:413
element_type extract_first(native_handle_type fd)
Definition: io_context.h:409
bool has_outstanding_work() const
Definition: io_context.h:382
void push_back(element_type &&t)
Definition: io_context.h:388
std::unique_ptr< async_op > element_type
Definition: io_context.h:380
Definition: io_context.h:163
virtual ~BasicCallable()=default
Definition: io_context.h:171
Callable(Func &&f)
Definition: io_context.h:173
void invoke() override
Definition: io_context.h:175
Func f_
Definition: io_context.h:178
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:157
std::list< op_type > work_
Definition: io_context.h:239
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:181
std::mutex work_mtx_
Definition: io_context.h:238
size_t run_one()
run a deferred work item.
Definition: io_context.h:189
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:232
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:220
async operation with callback.
Definition: io_context.h:361
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:363
Op op_
Definition: io_context.h:375
void run(io_context &) override
Definition: io_context.h:366
base class of async operation.
Definition: io_context.h:336
native_handle_type fd_
Definition: io_context.h:353
wait_type event() const
Definition: io_context.h:350
virtual ~async_op()=default
void cancel()
Definition: io_context.h:346
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:340
bool is_cancelled() const
Definition: io_context.h:347
native_handle_type native_handle() const
Definition: io_context.h:349
wait_type event_
Definition: io_context.h:354
Definition: io_context.h:989
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1016
io_context * io_ctx_
Definition: io_context.h:1062
void on_work_started() const noexcept
Definition: io_context.h:1003
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1058
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1001
executor_type(io_context &ctx)
Definition: io_context.h:1060
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1037
bool running_in_this_thread() const noexcept
Definition: io_context.h:998
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1053
void on_work_finished() const noexcept
Definition: io_context.h:1004
Definition: io_context.h:305
io_context & ctx_
Definition: io_context.h:325
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:307
~monitor()
Definition: io_context.h:312
monitor(monitor &&)=delete
Definition: io_context.h:782
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:784
Op op_
Definition: io_context.h:796
void run() override
Definition: io_context.h:787
Definition: io_context.h:753
void cancel()
Definition: io_context.h:764
pending_timer(const Timer &timer)
Definition: io_context.h:758
time_point expiry() const noexcept
Definition: io_context.h:771
typename Timer::time_point time_point
Definition: io_context.h:755
virtual ~pending_timer()=default
timer_id id() const
Definition: io_context.h:772
typename Timer::Id * timer_id
Definition: io_context.h:756
bool is_cancelled() const
Definition: io_context.h:763
time_point expiry_
Definition: io_context.h:777
timer_id id_
Definition: io_context.h:778
Definition: io_context.h:511
timer_queue_base(execution_context &ctx)
Definition: io_context.h:513
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:515
Definition: io_context.h:523
std::chrono::milliseconds next() const override
Definition: io_context.h:575
size_t cancel(const Timer &t)
Definition: io_context.h:689
io_context & context() noexcept
Definition: io_context.h:542
bool run_one() override
Definition: io_context.h:621
void shutdown() noexcept override
Definition: io_context.h:540
timer_queue(execution_context &ctx)
Definition: io_context.h:527
void push(const Timer &timer, Op &&op)
Definition: io_context.h:547
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:804
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:806
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:800
Definition: io_context.h:60
count_type poll()
Definition: io_context.h:967
bool is_running_
Definition: io_context.h:874
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:870
std::condition_variable do_one_cond_
Definition: io_context.h:873
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:934
~io_context()
Definition: io_context.h:80
std::atomic< count_type > work_count_
Definition: io_context.h:281
count_type run_one()
Definition: io_context.h:917
io_context()
Definition: io_context.h:67
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:887
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:881
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:284
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:285
DeferredWork deferred_work_
Definition: io_context.h:242
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:248
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1113
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:928
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1133
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:818
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:149
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:952
size_t cancel_one(const Timer &)
Definition: io_context.h:846
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:65
std::mutex do_one_mtx_
Definition: io_context.h:872
io_context(int)
Definition: io_context.h:78
IoServiceBase * io_service() const
Definition: io_context.h:138
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:286
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1140
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:479
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:71
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:134
bool is_running() const
Definition: io_context.h:894
AsyncOps active_ops_
Definition: io_context.h:476
bool stopped_
Definition: io_context.h:280
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:863
count_type poll_one()
Definition: io_context.h:980
void stop()
Definition: io_context.h:115
count_type run()
Definition: io_context.h:901
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:876
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:482
executor_type get_executor() noexcept
Definition: io_context.h:1079
void restart()
Definition: io_context.h:129
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:831
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1086
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:958
void is_running(bool v)
Definition: io_context.h:893
size_t count_type
Definition: io_context.h:64
bool has_outstanding_work() const
Definition: io_context.h:294
bool stopped() const noexcept
Definition: io_context.h:124
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:56
io_service based on the poll() system-call.
Definition: poll_io_service.h:51
Definition: expected.h:943
static int count
Definition: myisam_ftdump.cc:44
static QUEUE queue
Definition: myisampack.cc:209
static bool interrupted
Definition: mysqladmin.cc:70
Definition: authentication.cc:35
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:497
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:62
wait_type
Definition: socket_constants.h:85
int native_handle_type
Definition: socket_constants.h:50
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:51
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:575
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:571
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:102
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:63
Definition: varlen_sort.h:183
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:78
Definition: executor.h:369
int n
Definition: xcom_base.cc:508
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4085