26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_ 
   27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_ 
   37#include <system_error>   
   38#include <unordered_map> 
   55#if defined(HAVE_EPOLL) 
   73      std::unique_ptr<net::impl::socket::SocketServiceBase> &&
socket_service,
 
   98  template <class Rep, class Period>
 
  101  template <class Clock, class Duration>
 
  103      const 
std::chrono::time_point<Clock, Duration> &abs_time);
 
  107  template <class Rep, class Period>
 
  110  template <class Clock, class Duration>
 
  112      const 
std::chrono::time_point<Clock, Duration> &abs_time);
 
  118      std::lock_guard<std::mutex> lk(
mtx_);
 
  126    std::lock_guard<std::mutex> lk(
mtx_);
 
  131    std::lock_guard<std::mutex> lk(
mtx_);
 
  171    template <
class Func>
 
  182    using op_type = std::unique_ptr<BasicCallable>;
 
  199        std::lock_guard<std::mutex> lk(
work_mtx_);
 
  201        if (
work_.empty()) 
return 0;
 
  206        tmp.splice(tmp.begin(), 
work_, 
work_.begin());
 
  210      tmp.front()->invoke();
 
  220    template <
class Func, 
class ProtoAllocator>
 
  221    void post(Func &&f, 
const ProtoAllocator & ) {
 
  222      std::lock_guard<std::mutex> lk(
work_mtx_);
 
  234      std::lock_guard<std::mutex> lk(
work_mtx_);
 
  235      return !
work_.empty();
 
  248  template <
class Func, 
class ProtoAllocator>
 
  256  template <
class Clock, 
class Duration>
 
  258      std::unique_lock<std::mutex> &lk,
 
  259      const std::chrono::time_point<Clock, Duration> &abs_time);
 
  264  template <
typename _Clock, 
typename _WaitTraits>
 
  269  template <
class Protocol>
 
  272  template <
class Protocol>
 
  275  template <
class Protocol>
 
  278  template <
class Protocol>
 
  314      std::lock_guard<std::mutex> lk(
ctx_.
mtx_);
 
  371        op_(std::error_code{});
 
  384      std::lock_guard<std::mutex> lk(
mtx_);
 
  386      return !
ops_.empty();
 
  390      const auto handle = t->native_handle();
 
  392      std::lock_guard<std::mutex> lk(
mtx_);
 
  395      if (it != 
ops_.end()) {
 
  396        it->second.push_back(std::move(t));
 
  398        std::vector<element_type> v;
 
  399        v.push_back(std::move(t));
 
  406        return static_cast<short>(el->event()) & events;
 
  424      std::list<element_type> ops_to_delete;
 
  426        std::lock_guard<std::mutex> lk(
mtx_);
 
  427        for (
auto &fd_ops : 
ops_) {
 
  428          for (
auto &fd_op : fd_ops.second) {
 
  429            ops_to_delete.push_back(std::move(fd_op));
 
  440    template <
class Pred>
 
  442      std::lock_guard<std::mutex> lk(
mtx_);
 
  444      const auto it = 
ops_.find(fd);
 
  445      if (it != 
ops_.end()) {
 
  446        auto &async_ops = it->second;
 
  448        const auto end = async_ops.end();
 
  449        for (
auto cur = async_ops.begin(); cur != end; ++cur) {
 
  452          if (el->native_handle() == fd && pred(el)) {
 
  453            auto op = std::move(el);
 
  455            if (async_ops.size() == 1) {
 
  460              async_ops.erase(cur);
 
  471    std::unordered_map<native_handle_type, std::vector<element_type>> 
ops_{
 
  493        std::cerr << 
"!! add_fd_interest(" << fd << 
", ..." 
  494                  << 
") " << res.error() << 
" " << res.error().message()
 
  500        std::lock_guard<std::mutex> lk(
mtx_);
 
  524  template <
class Timer>
 
  532      auto &io_ctx = 
static_cast<io_context &
>(ctx);
 
  538      std::lock_guard<std::mutex> lk(io_ctx.mtx_);
 
  539      io_ctx.timer_queues_.push_back(
this);
 
  562    void push(
const Timer &timer, Op &&op) {
 
  571              [](
const auto &a, 
const auto &b) { return a < b->expiry(); }),
 
  574      if (timer.id() == 
nullptr) abort();
 
  581      if (timer.id() == 
nullptr) abort();
 
  582      if (timer.expiry() == Timer::time_point::min()) abort();
 
  586          std::make_pair(timer.expiry(), timer.id()));
 
  591      typename Timer::time_point expiry;
 
  607          return std::chrono::milliseconds::min();
 
  613      auto duration = Timer::traits_type::to_wait_duration(expiry);
 
  614      if (duration < duration.zero()) {
 
  615        duration = duration.zero();
 
  624          std::chrono::duration_cast<std::chrono::milliseconds>(duration);
 
  626      using namespace std::chrono_literals;
 
  629      if ((duration - duration_ms).
count() != 0) {
 
  637      std::unique_ptr<pending_timer> pt;
 
  657          auto min = Timer::time_point::min();
 
  659            if (cur.first < min) abort();
 
  664          const auto now = Timer::clock_type::now();
 
  668          auto timepoint = pending_expiry_it->first;
 
  670          if (timepoint > now) {
 
  674          typename Timer::Id *timer_id = pending_expiry_it->second;
 
  680          if (pending_it->second->id() != timer_id) {
 
  683          if (pending_it->second->expiry() != pending_expiry_it->first) {
 
  687          pt = std::move(pending_it->second);
 
  716          auto &cur_timer = cur->second;
 
  717          if (cur_timer->id() == t.id()) {
 
  721            auto nxt = std::next(cur);
 
  733        for (
auto cur = eq_range.first; cur != eq_range.second;) {
 
  734          auto expiry_eq_range =
 
  737          size_t erase_count{};
 
  739          for (
auto expiry_cur = expiry_eq_range.first;
 
  740               expiry_cur != expiry_eq_range.second;) {
 
  741            if (expiry_cur->first == cur->second->expiry() &&
 
  742                expiry_cur->second == cur->second->id() && erase_count == 0) {
 
  751          if (erase_count == 0) abort();
 
  753          cur->second->cancel();
 
  806          op_(std::error_code{});
 
  818    std::multimap<typename Timer::time_point, typename Timer::Id *>
 
  820    std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
 
  832  template <
class Timer, 
class Op>
 
  834    auto &
queue = use_service<timer_queue<Timer>>(*this);
 
  836    queue.push(timer, std::forward<Op>(op));
 
  845  template <
class Timer>
 
  851    const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
 
  860  template <
class Timer>
 
  921  using namespace std::chrono_literals;
 
  933  using namespace std::chrono_literals;
 
  942template <
class Rep, 
class Period>
 
  944    const std::chrono::duration<Rep, Period> &rel_time) {
 
  945  return run_until(std::chrono::steady_clock::now() + rel_time);
 
  948template <
class Clock, 
class Duration>
 
  950    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
  955  using namespace std::chrono_literals;
 
  966template <
class Rep, 
class Period>
 
  968    const std::chrono::duration<Rep, Period> &rel_time) {
 
  969  return run_one_until(std::chrono::steady_clock::now() + rel_time);
 
  972template <
class Clock, 
class Duration>
 
  974    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
  986  using namespace std::chrono_literals;
 
  998  using namespace std::chrono_literals;
 
 1030  template <
class Func, 
class ProtoAllocator>
 
 1031  void dispatch(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1034      std::decay_t<Func>(std::forward<Func>(f))();
 
 1037      post(std::forward<Func>(f), a);
 
 1051  template <
class Func, 
class ProtoAllocator>
 
 1052  void post(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1067  template <
class Func, 
class ProtoAllocator>
 
 1068  void defer(Func &&f, 
const ProtoAllocator &a)
 const {
 
 1069    post(std::forward<Func>(f), a);
 
 1082  return std::addressof(a.context()) == std::addressof(b.context());
 
 1103  bool need_notify{
false};
 
 1106    std::lock_guard<std::mutex> lk(
mtx_);
 
 1118  if (
true || need_notify) {
 
 1127template <
class Clock, 
class Duration>
 
 1129    std::unique_lock<std::mutex> &lk,
 
 1130    const std::chrono::time_point<Clock, Duration> &abs_time) {
 
 1131  using namespace std::chrono_literals;
 
 1133  const auto rel_time = abs_time - std::chrono::steady_clock::now();
 
 1135      std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
 
 1137  if (rel_time_ms < 0ms) {
 
 1140  } 
else if (rel_time_ms < rel_time) {
 
 1145  return do_one(lk, rel_time_ms);
 
 1175      if (timer_q->run_one()) {
 
 1192      std::lock_guard<std::mutex> 
lock(
mtx_);
 
 1195        const auto duration = 
q->next();
 
 1197        if (duration == duration.zero()) {
 
 1199          min_duration = duration;
 
 1201        } 
else if ((duration != duration.max()) &&
 
 1203                   (duration < min_duration || timer_q == 
nullptr)) {
 
 1205          min_duration = duration;
 
 1211    if (timer_q && min_duration <= min_duration.zero()) 
continue;
 
 1213    if (
auto op = [
this]() -> std::unique_ptr<async_op> {
 
 1215          std::lock_guard<std::mutex> 
lock(
mtx_);
 
 1224            return cancelled_op;
 
 1253    if (timer_q == 
nullptr ||
 
 1264      if (res.error() == std::errc::timed_out && min_duration != 
timeout &&
 
 1265          timer_q != 
nullptr) {
 
 1278             res.error() == std::errc::timed_out);
 
 1289                         short events) -> std::unique_ptr<async_op> {
 
 1290          std::lock_guard<std::mutex> 
lock(
mtx_);
 
 1293        }(res->fd, res->event)) {
 
Definition: io_service_base.h:87
 
Definition: socket.h:1292
 
template-less base-class of basic_socket_impl.
Definition: socket.h:335
 
Definition: socket.h:1090
 
Definition: executor.h:291
 
execution_context & context() noexcept
Definition: executor.h:297
 
Definition: executor.h:154
 
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
 
void destroy() noexcept
Definition: executor.h:176
 
Definition: callstack.h:80
 
callstack of a thread.
Definition: callstack.h:71
 
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
 
Definition: socket_service_base.h:48
 
Definition: io_context.h:379
 
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
 
std::mutex mtx_
Definition: io_context.h:474
 
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
 
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
 
void release_all()
Definition: io_context.h:414
 
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
 
bool has_outstanding_work() const
Definition: io_context.h:383
 
void push_back(element_type &&t)
Definition: io_context.h:389
 
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
 
Definition: io_context.h:164
 
virtual ~BasicCallable()=default
 
Definition: io_context.h:172
 
Callable(Func &&f)
Definition: io_context.h:174
 
void invoke() override
Definition: io_context.h:176
 
Func f_
Definition: io_context.h:179
 
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
 
std::list< op_type > work_
Definition: io_context.h:240
 
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
 
std::mutex work_mtx_
Definition: io_context.h:239
 
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
 
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
 
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
 
async operation with callback.
Definition: io_context.h:362
 
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
 
Op op_
Definition: io_context.h:376
 
void run(io_context &) override
Definition: io_context.h:367
 
base class of async operation.
Definition: io_context.h:337
 
native_handle_type fd_
Definition: io_context.h:354
 
wait_type event() const
Definition: io_context.h:351
 
virtual ~async_op()=default
 
void cancel()
Definition: io_context.h:347
 
virtual void run(io_context &)=0
 
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
 
bool is_cancelled() const
Definition: io_context.h:348
 
native_handle_type native_handle() const
Definition: io_context.h:350
 
wait_type event_
Definition: io_context.h:355
 
Definition: io_context.h:1004
 
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1031
 
io_context * io_ctx_
Definition: io_context.h:1077
 
void on_work_started() const noexcept
Definition: io_context.h:1018
 
executor_type(executor_type &&rhs) noexcept=default
 
executor_type(const executor_type &rhs) noexcept=default
 
friend io_context
Definition: io_context.h:1073
 
executor_type & operator=(const executor_type &rhs) noexcept=default
 
executor_type & operator=(executor_type &&rhs) noexcept=default
 
io_context & context() const noexcept
Definition: io_context.h:1016
 
executor_type(io_context &ctx)
Definition: io_context.h:1075
 
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1052
 
bool running_in_this_thread() const noexcept
Definition: io_context.h:1013
 
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1068
 
void on_work_finished() const noexcept
Definition: io_context.h:1019
 
Definition: io_context.h:306
 
io_context & ctx_
Definition: io_context.h:326
 
monitor(const monitor &)=delete
 
monitor(io_context &ctx)
Definition: io_context.h:308
 
~monitor()
Definition: io_context.h:313
 
monitor(monitor &&)=delete
 
Definition: io_context.h:797
 
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:799
 
Op op_
Definition: io_context.h:811
 
void run() override
Definition: io_context.h:802
 
Definition: io_context.h:768
 
void cancel()
Definition: io_context.h:779
 
pending_timer(const Timer &timer)
Definition: io_context.h:773
 
time_point expiry() const noexcept
Definition: io_context.h:786
 
typename Timer::time_point time_point
Definition: io_context.h:770
 
virtual ~pending_timer()=default
 
timer_id id() const
Definition: io_context.h:787
 
typename Timer::Id * timer_id
Definition: io_context.h:771
 
bool is_cancelled() const
Definition: io_context.h:778
 
time_point expiry_
Definition: io_context.h:792
 
timer_id id_
Definition: io_context.h:793
 
Definition: io_context.h:513
 
timer_queue_base(execution_context &ctx)
Definition: io_context.h:515
 
virtual std::chrono::milliseconds next() const =0
 
std::mutex queue_mtx_
Definition: io_context.h:517
 
Definition: io_context.h:525
 
std::chrono::milliseconds next() const override
Definition: io_context.h:590
 
size_t cancel(const Timer &t)
Definition: io_context.h:704
 
io_context & context() noexcept
Definition: io_context.h:557
 
bool run_one() override
Definition: io_context.h:636
 
void shutdown() noexcept override
Definition: io_context.h:555
 
timer_queue(execution_context &ctx)
Definition: io_context.h:529
 
~timer_queue() override
Definition: io_context.h:542
 
void push(const Timer &timer, Op &&op)
Definition: io_context.h:562
 
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:819
 
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:821
 
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:815
 
Definition: io_context.h:61
 
count_type poll()
Definition: io_context.h:982
 
bool is_running_
Definition: io_context.h:889
 
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:885
 
std::condition_variable do_one_cond_
Definition: io_context.h:888
 
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:949
 
~io_context()
Definition: io_context.h:81
 
std::atomic< count_type > work_count_
Definition: io_context.h:282
 
count_type run_one()
Definition: io_context.h:932
 
io_context()
Definition: io_context.h:68
 
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:902
 
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:896
 
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
 
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
 
DeferredWork deferred_work_
Definition: io_context.h:243
 
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
 
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1128
 
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:943
 
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1148
 
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:833
 
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
 
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:967
 
size_t cancel_one(const Timer &)
Definition: io_context.h:861
 
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
 
std::mutex do_one_mtx_
Definition: io_context.h:887
 
io_context(int)
Definition: io_context.h:79
 
IoServiceBase * io_service() const
Definition: io_context.h:139
 
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
 
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1155
 
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
 
io_context(const io_context &)=delete
 
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
 
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
 
bool is_running() const
Definition: io_context.h:909
 
AsyncOps active_ops_
Definition: io_context.h:477
 
bool stopped_
Definition: io_context.h:281
 
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:878
 
count_type poll_one()
Definition: io_context.h:995
 
void stop()
Definition: io_context.h:116
 
count_type run()
Definition: io_context.h:916
 
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:891
 
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
 
executor_type get_executor() noexcept
Definition: io_context.h:1094
 
void restart()
Definition: io_context.h:130
 
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:846
 
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1101
 
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:973
 
void is_running(bool v)
Definition: io_context.h:908
 
size_t count_type
Definition: io_context.h:65
 
bool has_outstanding_work() const
Definition: io_context.h:295
 
bool stopped() const noexcept
Definition: io_context.h:125
 
io_context & operator=(const io_context &)=delete
 
Definition: linux_epoll_io_service.h:58
 
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
 
Definition: expected.h:286
 
static int count
Definition: myisam_ftdump.cc:45
 
static QUEUE queue
Definition: myisampack.cc:210
 
static bool interrupted
Definition: mysqladmin.cc:73
 
Definition: http_server_component.cc:34
 
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
 
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
 
std::chrono::milliseconds milliseconds
Definition: authorize_manager.cc:67
 
ValueType max(X &&first)
Definition: gtid.h:103
 
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
 
wait_type
Definition: socket_constants.h:86
 
int native_handle_type
Definition: socket_constants.h:51
 
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
 
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:551
 
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:547
 
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
 
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
 
Definition: gcs_xcom_synode.h:64
 
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
 
static std::mutex lock
Definition: net_ns.cc:56
 
Definition: executor.h:353
 
int n
Definition: xcom_base.cc:509
 
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4112