26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_ 
   27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_ 
   37#include <system_error>   
   38#include <unordered_map> 
   55#if defined(HAVE_EPOLL) 
   73      std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
 
   98  template <class Rep, class Period>
 
  101  template <class Clock, class Duration>
 
  103      const 
std::chrono::time_point<Clock, Duration> &abs_time);
 
  107  template <class Rep, class Period>
 
  110  template <class Clock, class Duration>
 
  112      const 
std::chrono::time_point<Clock, Duration> &abs_time);
 
  118      std::lock_guard<std::mutex> lk(
mtx_);
 
  126    std::lock_guard<std::mutex> lk(
mtx_);
 
  131    std::lock_guard<std::mutex> lk(
mtx_);
 
  171    template <
class Func>
 
  182    using op_type = std::unique_ptr<BasicCallable>;
 
  199        std::lock_guard<std::mutex> lk(
work_mtx_);
 
  201        if (
work_.empty()) 
return 0;
 
  206        tmp.splice(tmp.begin(), 
work_, 
work_.begin());
 
  210      tmp.front()->invoke();
 
  220    template <
class Func, 
class ProtoAllocator>
 
  221    void post(Func &&f, 
const ProtoAllocator & ) {
 
  222      std::lock_guard<std::mutex> lk(
work_mtx_);
 
  234      std::lock_guard<std::mutex> lk(
work_mtx_);
 
  235      return !
work_.empty();
 
  248  template <
class Func, 
class ProtoAllocator>
 
  256  template <
class Clock, 
class Duration>
 
  258      std::unique_lock<std::mutex> &lk,
 
  259      const std::chrono::time_point<Clock, Duration> &abs_time);
 
  262                    std::chrono::milliseconds 
timeout);
 
  264  template <
typename _Clock, 
typename _WaitTraits>
 
  269  template <
class Protocol>
 
  272  template <
class Protocol>
 
  275  template <
class Protocol>
 
  278  template <
class Protocol>
 
  314      std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
 
  371        op_(std::error_code{});
 
  384      std::lock_guard<std::mutex> lk(
mtx_);
 
  386      return !
ops_.empty();
 
  390      const auto handle = t->native_handle();
 
  392      std::lock_guard<std::mutex> lk(
mtx_);
 
  395      if (it != 
ops_.end()) {
 
  396        it->second.push_back(std::move(t));
 
  398        std::vector<element_type> v;
 
  399        v.push_back(std::move(t));
 
  406        return static_cast<short>(el->event()) & events;
 
  424      std::list<element_type> ops_to_delete;
 
  426        std::lock_guard<std::mutex> lk(
mtx_);
 
  427        for (
auto &fd_ops : 
ops_) {
 
  428          for (
auto &fd_op : fd_ops.second) {
 
  429            ops_to_delete.push_back(std::move(fd_op));
 
  440    template <
class Pred>
 
  442      std::lock_guard<std::mutex> lk(
mtx_);
 
  444      const auto it = 
ops_.find(fd);
 
  445      if (it != 
ops_.end()) {
 
  446        auto &async_ops = it->second;
 
  448        const auto end = async_ops.end();
 
  449        for (
auto cur = async_ops.begin(); cur != end; ++cur) {
 
  452          if (el->native_handle() == fd && pred(el)) {
 
  453            auto op = std::move(el);
 
  455            if (async_ops.size() == 1) {
 
  460              async_ops.erase(cur);
 
  471    std::unordered_map<native_handle_type, std::vector<element_type>> 
ops_{
 
  493        std::cerr << 
"!! add_fd_interest(" << fd << 
", ..." 
  494                  << 
") " << res.error() << 
" " << res.error().message()
 
  500        std::lock_guard<std::mutex> lk(
mtx_);
 
  520    virtual std::chrono::milliseconds 
next() 
const = 0;
 
  523  template <
class Timer>
 
  531      auto &io_ctx = 
static_cast<io_context &
>(ctx);
 
  537      std::lock_guard<std::mutex> lk(io_ctx.mtx_);
 
  538      io_ctx.timer_queues_.push_back(
this);
 
  561    void push(
const Timer &timer, Op &&op) {
 
  570              [](
const auto &a, 
const auto &b) { return a < b->expiry(); }),
 
  573      if (timer.id() == 
nullptr) abort();
 
  580      if (timer.id() == 
nullptr) abort();
 
  581      if (timer.expiry() == Timer::time_point::min()) abort();
 
  585          std::make_pair(timer.expiry(), timer.id()));
 
  589    std::chrono::milliseconds 
next()
 const override {
 
  590      typename Timer::time_point expiry;
 
  597            return std::chrono::milliseconds::max();
 
  606          return std::chrono::milliseconds::min();
 
  612      auto duration = Timer::traits_type::to_wait_duration(expiry);
 
  613      if (duration < duration.zero()) {
 
  614        duration = duration.zero();
 
  623          std::chrono::duration_cast<std::chrono::milliseconds>(duration);
 
  625      using namespace std::chrono_literals;
 
  628      if ((duration - duration_ms).
count() != 0) {
 
  636      std::unique_ptr<pending_timer> pt;
 
  656          auto min = Timer::time_point::min();
 
  658            if (cur.first < min) abort();
 
  663          const auto now = Timer::clock_type::now();
 
  667          auto timepoint = pending_expiry_it->first;
 
  669          if (timepoint > now) {
 
  673          typename Timer::Id *timer_id = pending_expiry_it->second;
 
  679          if (pending_it->second->id() != timer_id) {
 
  682          if (pending_it->second->expiry() != pending_expiry_it->first) {
 
  686          pt = std::move(pending_it->second);
 
  715          auto &cur_timer = cur->second;
 
  716          if (cur_timer->id() == t.id()) {
 
  720            auto nxt = std::next(cur);
 
  732        for (
auto cur = eq_range.first; cur != eq_range.second;) {
 
  733          auto expiry_eq_range =
 
  736          size_t erase_count{};
 
  738          for (
auto expiry_cur = expiry_eq_range.first;
 
  739               expiry_cur != expiry_eq_range.second;) {
 
  740            if (expiry_cur->first == cur->second->expiry() &&
 
  741                expiry_cur->second == cur->second->id() && erase_count == 0) {
 
  750          if (erase_count == 0) abort();
 
  752          cur->second->cancel();
 
  805          op_(std::error_code{});
 
  817    std::multimap<typename Timer::time_point, typename Timer::Id *>
 
  819    std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
 
  831  template <
class Timer, 
class Op>
 
  833    auto &
queue = use_service<timer_queue<Timer>>(*this);
 
  835    queue.push(timer, std::forward<Op>(op));
 
  844  template <
class Timer>
 
  850    const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
 
  859  template <
class Timer>
 
  920  using namespace std::chrono_literals;
 
  926    if (
n != std::numeric_limits<count_type>::max()) ++
n;
 
  932  using namespace std::chrono_literals;
 
  941template <
class Rep, 
class Period>
 
  943    const std::chrono::duration<Rep, Period> &rel_time) {
 
  944  return run_until(std::chrono::steady_clock::now() + rel_time);
 
  947template <
class Clock, 
class Duration>
 
  949    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
  954  using namespace std::chrono_literals;
 
  960    if (
n != std::numeric_limits<count_type>::max()) ++
n;
 
  965template <
class Rep, 
class Period>
 
  967    const std::chrono::duration<Rep, Period> &rel_time) {
 
  968  return run_one_until(std::chrono::steady_clock::now() + rel_time);
 
  971template <
class Clock, 
class Duration>
 
  973    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
  985  using namespace std::chrono_literals;
 
  989    if (
n != std::numeric_limits<count_type>::max()) ++
n;
 
  997  using namespace std::chrono_literals;
 
 1029  template <
class Func, 
class ProtoAllocator>
 
 1030  void dispatch(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1033      std::decay_t<Func>(std::forward<Func>(f))();
 
 1036      post(std::forward<Func>(f), a);
 
 1050  template <
class Func, 
class ProtoAllocator>
 
 1051  void post(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1066  template <
class Func, 
class ProtoAllocator>
 
 1067  void defer(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1068    post(std::forward<Func>(f), a);
 
 1081  return std::addressof(a.context()) == std::addressof(b.context());
 
 1102  bool need_notify{
false};
 
 1105    std::lock_guard<std::mutex> lk(
mtx_);
 
 1117  if (
true || need_notify) {
 
 1126template <
class Clock, 
class Duration>
 
 1128    std::unique_lock<std::mutex> &lk,
 
 1129    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
 1130  using namespace std::chrono_literals;
 
 1132  const auto rel_time = abs_time - std::chrono::steady_clock::now();
 
 1134      std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
 
 1136  if (rel_time_ms < 0ms) {
 
 1139  } 
else if (rel_time_ms < rel_time) {
 
 1144  return do_one(lk, rel_time_ms);
 
 1155    std::unique_lock<std::mutex> &lk, std::chrono::milliseconds 
timeout) {
 
 1174      if (timer_q->run_one()) {
 
 1189    std::chrono::milliseconds min_duration{0};
 
 1191      std::lock_guard<std::mutex> lk(
mtx_);
 
 1194        const auto duration = 
q->next();
 
 1196        if (duration == duration.zero()) {
 
 1198          min_duration = duration;
 
 1200        } 
else if ((duration != duration.max()) &&
 
 1202                   (duration < min_duration || timer_q == 
nullptr)) {
 
 1204          min_duration = duration;
 
 1210    if (timer_q && min_duration <= min_duration.zero()) 
continue;
 
 1212    if (
auto op = [
this]() -> std::unique_ptr<async_op> {
 
 1214          std::lock_guard<std::mutex> lk(
mtx_);
 
 1252    if (timer_q == 
nullptr ||
 
 1263      if (res.error() == std::errc::timed_out && min_duration != 
timeout &&
 
 1264          timer_q != 
nullptr) {
 
 1277             res.error() == std::errc::timed_out);
 
 1288                         short events) -> std::unique_ptr<async_op> {
 
 1289          std::lock_guard<std::mutex> lk(
mtx_);
 
 1292        }(res->fd, res->event)) {
 
Definition: io_service_base.h:87
 
Definition: socket.h:1412
 
template-less base-class of basic_socket_impl.
Definition: socket.h:335
 
Definition: socket.h:1144
 
Definition: executor.h:308
 
execution_context & context() noexcept
Definition: executor.h:314
 
Definition: executor.h:154
 
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
 
void destroy() noexcept
Definition: executor.h:193
 
Definition: callstack.h:80
 
callstack of a thread.
Definition: callstack.h:71
 
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
 
Definition: socket_service_base.h:48
 
Definition: io_context.h:379
 
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
 
std::mutex mtx_
Definition: io_context.h:474
 
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
 
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
 
void release_all()
Definition: io_context.h:414
 
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
 
bool has_outstanding_work() const
Definition: io_context.h:383
 
void push_back(element_type &&t)
Definition: io_context.h:389
 
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
 
Definition: io_context.h:164
 
virtual ~BasicCallable()=default
 
Definition: io_context.h:172
 
Callable(Func &&f)
Definition: io_context.h:174
 
void invoke() override
Definition: io_context.h:176
 
Func f_
Definition: io_context.h:179
 
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
 
std::list< op_type > work_
Definition: io_context.h:240
 
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
 
std::mutex work_mtx_
Definition: io_context.h:239
 
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
 
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
 
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
 
async operation with callback.
Definition: io_context.h:362
 
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
 
Op op_
Definition: io_context.h:376
 
void run(io_context &) override
Definition: io_context.h:367
 
base class of async operation.
Definition: io_context.h:337
 
native_handle_type fd_
Definition: io_context.h:354
 
wait_type event() const
Definition: io_context.h:351
 
virtual ~async_op()=default
 
void cancel()
Definition: io_context.h:347
 
virtual void run(io_context &)=0
 
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
 
bool is_cancelled() const
Definition: io_context.h:348
 
native_handle_type native_handle() const
Definition: io_context.h:350
 
wait_type event_
Definition: io_context.h:355
 
Definition: io_context.h:1003
 
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1030
 
io_context * io_ctx_
Definition: io_context.h:1076
 
void on_work_started() const noexcept
Definition: io_context.h:1017
 
executor_type(executor_type &&rhs) noexcept=default
 
executor_type(const executor_type &rhs) noexcept=default
 
friend io_context
Definition: io_context.h:1072
 
executor_type & operator=(const executor_type &rhs) noexcept=default
 
executor_type & operator=(executor_type &&rhs) noexcept=default
 
io_context & context() const noexcept
Definition: io_context.h:1015
 
executor_type(io_context &ctx)
Definition: io_context.h:1074
 
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1051
 
bool running_in_this_thread() const noexcept
Definition: io_context.h:1012
 
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1067
 
void on_work_finished() const noexcept
Definition: io_context.h:1018
 
Definition: io_context.h:306
 
io_context & ctx_
Definition: io_context.h:326
 
monitor(const monitor &)=delete
 
monitor(io_context &ctx)
Definition: io_context.h:308
 
~monitor()
Definition: io_context.h:313
 
monitor(monitor &&)=delete
 
Definition: io_context.h:796
 
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:798
 
Op op_
Definition: io_context.h:810
 
void run() override
Definition: io_context.h:801
 
Definition: io_context.h:767
 
void cancel()
Definition: io_context.h:778
 
pending_timer(const Timer &timer)
Definition: io_context.h:772
 
time_point expiry() const noexcept
Definition: io_context.h:785
 
typename Timer::time_point time_point
Definition: io_context.h:769
 
virtual ~pending_timer()=default
 
timer_id id() const
Definition: io_context.h:786
 
typename Timer::Id * timer_id
Definition: io_context.h:770
 
bool is_cancelled() const
Definition: io_context.h:777
 
time_point expiry_
Definition: io_context.h:791
 
timer_id id_
Definition: io_context.h:792
 
Definition: io_context.h:512
 
timer_queue_base(execution_context &ctx)
Definition: io_context.h:514
 
virtual std::chrono::milliseconds next() const =0
 
std::mutex queue_mtx_
Definition: io_context.h:516
 
Definition: io_context.h:524
 
std::chrono::milliseconds next() const override
Definition: io_context.h:589
 
size_t cancel(const Timer &t)
Definition: io_context.h:703
 
io_context & context() noexcept
Definition: io_context.h:556
 
bool run_one() override
Definition: io_context.h:635
 
void shutdown() noexcept override
Definition: io_context.h:554
 
timer_queue(execution_context &ctx)
Definition: io_context.h:528
 
~timer_queue() override
Definition: io_context.h:541
 
void push(const Timer &timer, Op &&op)
Definition: io_context.h:561
 
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:818
 
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:820
 
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:814
 
Definition: io_context.h:61
 
count_type poll()
Definition: io_context.h:981
 
bool is_running_
Definition: io_context.h:888
 
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:884
 
std::condition_variable do_one_cond_
Definition: io_context.h:887
 
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:948
 
~io_context()
Definition: io_context.h:81
 
std::atomic< count_type > work_count_
Definition: io_context.h:282
 
count_type run_one()
Definition: io_context.h:931
 
io_context()
Definition: io_context.h:68
 
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:901
 
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:895
 
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
 
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
 
DeferredWork deferred_work_
Definition: io_context.h:243
 
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
 
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1127
 
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:942
 
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1147
 
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:832
 
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
 
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:966
 
size_t cancel_one(const Timer &)
Definition: io_context.h:860
 
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
 
std::mutex do_one_mtx_
Definition: io_context.h:886
 
io_context(int)
Definition: io_context.h:79
 
IoServiceBase * io_service() const
Definition: io_context.h:139
 
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
 
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1154
 
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
 
io_context(const io_context &)=delete
 
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
 
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
 
bool is_running() const
Definition: io_context.h:908
 
AsyncOps active_ops_
Definition: io_context.h:477
 
bool stopped_
Definition: io_context.h:281
 
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:877
 
count_type poll_one()
Definition: io_context.h:994
 
void stop()
Definition: io_context.h:116
 
count_type run()
Definition: io_context.h:915
 
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:890
 
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
 
executor_type get_executor() noexcept
Definition: io_context.h:1093
 
void restart()
Definition: io_context.h:130
 
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:845
 
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1100
 
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:972
 
void is_running(bool v)
Definition: io_context.h:907
 
size_t count_type
Definition: io_context.h:65
 
bool has_outstanding_work() const
Definition: io_context.h:295
 
bool stopped() const noexcept
Definition: io_context.h:125
 
io_context & operator=(const io_context &)=delete
 
Definition: linux_epoll_io_service.h:57
 
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
 
Definition: expected.h:944
 
static int count
Definition: myisam_ftdump.cc:43
 
static QUEUE queue
Definition: myisampack.cc:207
 
static bool interrupted
Definition: mysqladmin.cc:66
 
Definition: authentication.cc:36
 
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
 
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
 
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
 
wait_type
Definition: socket_constants.h:86
 
int native_handle_type
Definition: socket_constants.h:51
 
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
 
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:576
 
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:572
 
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
 
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
 
Definition: gcs_xcom_synode.h:64
 
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
 
Definition: executor.h:370
 
int n
Definition: xcom_base.cc:509
 
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4065