MySQL 9.1.0
Source Code Documentation
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
28
29#include <atomic>
30#include <chrono>
31#include <iterator>
32#include <limits> // numeric_limits
33#include <list>
34#include <map>
35#include <memory> // unique_ptr
36#include <mutex>
37#include <system_error> // error_code
38#include <unordered_map>
39#include <utility>
40#include <vector>
41
42#include "my_config.h" // HAVE_EPOLL
52
53namespace net {
54
55#if defined(HAVE_EPOLL)
57#else
59#endif
60
62 public:
63 class executor_type;
64
65 using count_type = size_t;
67
69 : io_context{std::make_unique<net::impl::socket::SocketService>(),
71
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
74 std::unique_ptr<IoServiceBase> &&io_service)
78
79 explicit io_context(int /* concurrency_hint */) : io_context() {}
80
83 cancelled_ops_.clear();
84 // Make sure the services are destroyed before our internal fields. The
85 // services own the timers that can indirectly call our methods when
86 // destructed. See UT NetTS_io_context.pending_timer_on_destroy for an
87 // example.
88 destroy();
89 }
90
91 io_context(const io_context &) = delete;
92 io_context &operator=(const io_context &) = delete;
93
94 executor_type get_executor() noexcept;
95
97
98 template <class Rep, class Period>
99 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
100
101 template <class Clock, class Duration>
103 const std::chrono::time_point<Clock, Duration> &abs_time);
104
106
107 template <class Rep, class Period>
108 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
109
110 template <class Clock, class Duration>
112 const std::chrono::time_point<Clock, Duration> &abs_time);
113
116 void stop() {
117 {
118 std::lock_guard<std::mutex> lk(mtx_);
119 stopped_ = true;
120 }
121
123 }
124
125 bool stopped() const noexcept {
126 std::lock_guard<std::mutex> lk(mtx_);
127 return stopped_;
128 }
129
130 void restart() {
131 std::lock_guard<std::mutex> lk(mtx_);
132 stopped_ = false;
133 }
134
136 return socket_service_.get();
137 }
138
139 IoServiceBase *io_service() const { return io_service_.get(); }
140
141 /**
142 * get the status of the implicit open() call of the io-service.
143 *
144 * the io_service_.open() may fail due to out-of-file-descriptors.
145 *
146 * run() will fail silently if the io-service failed to open.
147 *
148 * @returns std::error_code on error
149 */
152 }
153
154 private:
155 /**
156 * queued work from io_context::executor_type::dispatch()/post()/defer().
157 */
159 public:
160 // simple, generic storage of callable.
161 //
162 // std::function<void()> is similar, but doesn't work for move-only
163 // callables like lambda's that capture a move-only type
165 public:
166 virtual ~BasicCallable() = default;
167
168 virtual void invoke() = 0;
169 };
170
171 template <class Func>
172 class Callable : public BasicCallable {
173 public:
174 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
175
176 void invoke() override { f_(); }
177
178 private:
179 Func f_;
180 };
181
182 using op_type = std::unique_ptr<BasicCallable>;
183
184 /**
185 * run a deferred work item.
186 *
187 * @returns number work items run.
188 * @retval 0 work queue was empty, nothing was run.
189 */
190 size_t run_one() {
191 // tmp list to hold the current operation to run.
192 //
193 // makes it simple and fast to move the head element and shorten the time
194 // the lock is held.
195 decltype(work_) tmp;
196
197 // lock is only needed as long as we modify the work-queue.
198 {
199 std::lock_guard<std::mutex> lk(work_mtx_);
200
201 if (work_.empty()) return 0;
202
203 // move the head of the work queue out and release the lock.
204 //
205 // note: std::list.splice() moves pointers.
206 tmp.splice(tmp.begin(), work_, work_.begin());
207 }
208
209 // run the deferred work.
210 tmp.front()->invoke();
211
212 // and destruct the list at the end.
213
214 return 1;
215 }
216
217 /**
218 * queue work for later execution.
219 */
220 template <class Func, class ProtoAllocator>
221 void post(Func &&f, const ProtoAllocator & /* a */) {
222 std::lock_guard<std::mutex> lk(work_mtx_);
223
224 work_.emplace_back(
225 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
226 }
227
228 /**
229 * check if work is queued for later execution.
230 *
231 * @retval true if some work is queued.
232 */
233 bool has_outstanding_work() const {
234 std::lock_guard<std::mutex> lk(work_mtx_);
235 return !work_.empty();
236 }
237
238 private:
239 mutable std::mutex work_mtx_;
240 std::list<op_type> work_;
241 };
242
244
245 /**
246 * defer work for later execution.
247 */
248 template <class Func, class ProtoAllocator>
249 void defer_work(Func &&f, const ProtoAllocator &a) {
250 deferred_work_.post(std::forward<Func>(f), a);
251
252 // wakeup the possibly blocked io-thread.
254 }
255
256 template <class Clock, class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
260
261 count_type do_one(std::unique_lock<std::mutex> &lk,
262 std::chrono::milliseconds timeout);
263
264 template <typename _Clock, typename _WaitTraits>
266
268
269 template <class Protocol>
270 friend class basic_socket_impl;
271
272 template <class Protocol>
273 friend class basic_socket;
274
275 template <class Protocol>
277
278 template <class Protocol>
280
281 bool stopped_{false};
282 std::atomic<count_type> work_count_{};
283
284 // must be first member-var to ensure it is destroyed last
285 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
286 std::unique_ptr<IoServiceBase> io_service_;
288
289 // has outstanding work
290 //
291 // work is outstanding when
292 //
293 // - work-count from on_work_started()/on_work_finished() is more than 0 and
294 // - any active or cancelled operations are still ongoing
295 bool has_outstanding_work() const {
296 if (!cancelled_ops_.empty()) return true;
297 if (active_ops_.has_outstanding_work()) return true;
298 if (deferred_work_.has_outstanding_work()) return true;
299
300 if (work_count_ > 0) return true;
301
302 return false;
303 }
304
305 // monitor all .run()s in the io-context needs to be stopped.
306 class monitor {
307 public:
308 monitor(io_context &ctx) : ctx_{ctx} {}
309
310 monitor(const monitor &) = delete;
311 monitor(monitor &&) = delete;
312
314 std::lock_guard<std::mutex> lk(ctx_.mtx_);
315
316 // ctx_.call_stack_.pop_back();
317
318 if (!ctx_.has_outstanding_work()) {
319 // like stop(), just that we already have the mutex
320 ctx_.stopped_ = true;
321 ctx_.io_service_->notify();
322 }
323 }
324
325 private:
327 };
328
330
331 /**
332 * base class of async operation.
333 *
334 * - file-descriptor
335 * - wait-event
336 */
337 class async_op {
338 public:
340
342
343 virtual ~async_op() = default;
344
345 virtual void run(io_context &) = 0;
346
349
351 wait_type event() const { return event_; }
352
353 private:
356 };
357
358 /**
359 * async operation with callback.
360 */
361 template <class Op>
362 class async_op_impl : public async_op {
363 public:
365 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
366
367 void run(io_context & /* io_ctx */) override {
368 if (is_cancelled()) {
369 op_(make_error_code(std::errc::operation_canceled));
370 } else {
371 op_(std::error_code{});
372 }
373 }
374
375 private:
376 Op op_;
377 };
378
379 class AsyncOps {
380 public:
381 using element_type = std::unique_ptr<async_op>;
382
383 bool has_outstanding_work() const {
384 std::lock_guard<std::mutex> lk(mtx_);
385
386 return !ops_.empty();
387 }
388
390 const auto handle = t->native_handle();
391
392 std::lock_guard<std::mutex> lk(mtx_);
393
394 auto it = ops_.find(handle);
395 if (it != ops_.end()) {
396 it->second.push_back(std::move(t));
397 } else {
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
400 ops_.emplace(handle, std::move(v));
401 }
402 }
403
405 return extract_first(fd, [events](auto const &el) {
406 return static_cast<short>(el->event()) & events;
407 });
408 }
409
411 return extract_first(fd, [](auto const &) { return true; });
412 }
413
414 void release_all() {
415 // We expect that this method is called before AsyncOps destructor, to
416 // make sure that ops_ map is empty when the destructor executes. If the
417 // ops_ is not empty when destructed, the destructor of its element can
418 // trigger a method that will try to access that map (that is destructed).
419 // Example: we have an AsyncOp that captures some Socket object with a
420 // smart pointer. When the destructor of this AsyncOp is called, it can
421 // also call the destructor of that Socket, which in turn will call
422 // socket.close(), causing the Socket to unregister its operations in
423 // its respective io_context object which is us (calls extract_first()).
424 std::list<element_type> ops_to_delete;
425 {
426 std::lock_guard<std::mutex> lk(mtx_);
427 for (auto &fd_ops : ops_) {
428 for (auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
430 }
431 }
432 ops_.clear();
433 // It is important that we release the mtx_ here before the
434 // ops_to_delete go out of scope and are deleted. AsyncOp destructor can
435 // indirectly call extract_first which would lead to a deadlock.
436 }
437 }
438
439 private:
440 template <class Pred>
442 std::lock_guard<std::mutex> lk(mtx_);
443
444 const auto it = ops_.find(fd);
445 if (it != ops_.end()) {
446 auto &async_ops = it->second;
447
448 const auto end = async_ops.end();
449 for (auto cur = async_ops.begin(); cur != end; ++cur) {
450 auto &el = *cur;
451
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
454
455 if (async_ops.size() == 1) {
456 // remove the current container and with it its only element
457 ops_.erase(it);
458 } else {
459 // remove the current entry
460 async_ops.erase(cur);
461 }
462
463 return op;
464 }
465 }
466 }
467
468 return {};
469 }
470
471 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
472 16 * 1024};
473
474 mutable std::mutex mtx_;
475 };
476
478
479 // cancelled async operators.
480 std::list<std::unique_ptr<async_op>> cancelled_ops_;
481
482 template <class Op>
484 // add the socket-wait op to the queue
486 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
487
488 {
489 auto res = io_service_->add_fd_interest(fd, wt);
490 if (!res) {
491#if 0
492 // fd may be -1 or so
493 std::cerr << "!! add_fd_interest(" << fd << ", ..."
494 << ") " << res.error() << " " << res.error().message()
495 << std::endl;
496#endif
497 // adding failed. Cancel it again.
498 //
499 // code should be similar to ::cancel(fd)
500 std::lock_guard<std::mutex> lk(mtx_);
501
502 if (auto as_op =
503 active_ops_.extract_first(fd, static_cast<short>(wt))) {
504 as_op->cancel();
505 cancelled_ops_.push_back(std::move(as_op));
506 }
507 }
508 }
509
511 }
512
514 protected:
516
517 mutable std::mutex queue_mtx_;
518
519 public:
520 virtual bool run_one() = 0;
521 virtual std::chrono::milliseconds next() const = 0;
522 };
523
524 template <class Timer>
526 public:
528
530 // add timer_queue to io_context
531
532 auto &io_ctx = static_cast<io_context &>(ctx);
533
534 // @note: don't move this lock+push into the timer_queue_base constructor
535 //
536 // @see
537 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
538 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
539 io_ctx.timer_queues_.push_back(this);
540 }
541
542 void shutdown() noexcept override {}
543
544 io_context &context() noexcept {
545 return static_cast<io_context &>(service::context());
546 }
547
548 template <class Op>
549 void push(const Timer &timer, Op &&op) {
551
552 std::lock_guard<std::mutex> lk(queue_mtx_);
553
554#if 0
555 pending_timers_.insert(
556 std::upper_bound(
557 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
558 [](const auto &a, const auto &b) { return a < b->expiry(); }),
559 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
560#else
561 if (timer.id() == nullptr) abort();
562
563 // add timer
564 pending_timers_.emplace(std::make_pair(
565 timer.id(),
566 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
567
568 if (timer.id() == nullptr) abort();
569 if (timer.expiry() == Timer::time_point::min()) abort();
570
571 // sorted timer ids by expiry
573 std::make_pair(timer.expiry(), timer.id()));
574#endif
575 }
576
577 std::chrono::milliseconds next() const override {
578 typename Timer::time_point expiry;
579 {
580 std::lock_guard<std::mutex> lk(queue_mtx_);
581
582 // no pending timers, return the max-timeout
583 if (cancelled_timers_.empty()) {
584 if (pending_timer_expiries_.empty())
585 return std::chrono::milliseconds::max();
586
587#if 0
588 expiry = pending_timers_.front()->expiry();
589#else
590 expiry = pending_timer_expiries_.begin()->first;
591#endif
592 } else {
593 // cancelled timers should be executed directly
594 return std::chrono::milliseconds::min();
595 }
596
597 // the lock isn't needed anymore.
598 }
599
600 auto duration = Timer::traits_type::to_wait_duration(expiry);
601 if (duration < duration.zero()) {
602 duration = duration.zero();
603 }
604
605 // round up the next wait-duration to wait /at least/ the expected time.
606 //
607 // In case the expiry is 990us, wait 1ms
608 // If it is 0ns, leave it at 0ms;
609
610 auto duration_ms =
611 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
612
613 using namespace std::chrono_literals;
614
615 // round up to the next millisecond.
616 if ((duration - duration_ms).count() != 0) {
617 duration_ms += 1ms;
618 }
619
620 return duration_ms;
621 }
622
623 bool run_one() override {
624 std::unique_ptr<pending_timer> pt;
625
626 {
627 std::lock_guard<std::mutex> lk(queue_mtx_);
628
629 // if the pending-timers queue is empty, leave
630 // if the top is cancelled or expired, run it
631 if (cancelled_timers_.empty()) {
632 if (pending_timers_.empty()) return false;
633
634#if 0
635 // list
636 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
637 return false;
638 }
639 pt = std::move(pending_timers_.front());
640 pending_timers_.pop_front();
641#else
642 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
643
644 auto min = Timer::time_point::min();
645 for (const auto &cur : pending_timer_expiries_) {
646 if (cur.first < min) abort();
647
648 min = cur.first;
649 }
650
651 const auto now = Timer::clock_type::now();
652
653 // multimap
654 auto pending_expiry_it = pending_timer_expiries_.begin();
655 auto timepoint = pending_expiry_it->first;
656
657 if (timepoint > now) {
658 // not expired yet. leave
659 return false;
660 }
661 typename Timer::Id *timer_id = pending_expiry_it->second;
662
663 auto pending_it = pending_timers_.find(timer_id);
664 if (pending_it == pending_timers_.end()) {
665 abort();
666 }
667 if (pending_it->second->id() != timer_id) {
668 abort();
669 }
670 if (pending_it->second->expiry() != pending_expiry_it->first) {
671 abort();
672 }
673
674 pt = std::move(pending_it->second);
675 pending_timer_expiries_.erase(pending_expiry_it);
676 pending_timers_.erase(pending_it);
677#endif
678 } else {
679 pt = std::move(cancelled_timers_.front());
680 cancelled_timers_.pop_front();
681 }
682 }
683
684 pt->run();
685
687
688 return true;
689 }
690
691 size_t cancel(const Timer &t) {
692 size_t count{};
693
694 {
695 std::lock_guard<std::mutex> lk(queue_mtx_);
696
697#if 0
698 const auto end = pending_timers_.end();
699
700 // the same timer may be pushed multiple times to the queue
701 // therefore, check all entries
702 for (auto cur = pending_timers_.begin(); cur != end;) {
703 auto &cur_timer = cur->second;
704 if (cur_timer->id() == t.id()) {
705 cur_timer->cancel();
706 ++count;
707
708 auto nxt = std::next(cur);
709 // move the timer over to the cancelled timers
711 cur);
712 cur = nxt;
713 } else {
714 ++cur;
715 }
716 }
717#else
718 auto eq_range = pending_timers_.equal_range(t.id());
719
720 for (auto cur = eq_range.first; cur != eq_range.second;) {
721 auto expiry_eq_range =
722 pending_timer_expiries_.equal_range(cur->second->expiry());
723
724 size_t erase_count{};
725
726 for (auto expiry_cur = expiry_eq_range.first;
727 expiry_cur != expiry_eq_range.second;) {
728 if (expiry_cur->first == cur->second->expiry() &&
729 expiry_cur->second == cur->second->id() && erase_count == 0) {
730 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
731 ++erase_count;
732 } else {
733 ++expiry_cur;
734 }
735 }
736
737 // nothing found ... boom
738 if (erase_count == 0) abort();
739
740 cur->second->cancel();
741
742 // move timer to cancelled timers
743 cancelled_timers_.emplace_back(std::move(cur->second));
744
745 ++count;
746
747 cur = pending_timers_.erase(cur);
748 }
749#endif
750 }
751
752 return count;
753 }
754
756 public:
757 using time_point = typename Timer::time_point;
758 using timer_id = typename Timer::Id *;
759
760 pending_timer(const Timer &timer)
761 : expiry_{timer.expiry()}, id_{timer.id()} {}
762
763 virtual ~pending_timer() = default;
764
765 bool is_cancelled() const { return id_ == nullptr; }
766 void cancel() {
767 id_ = nullptr;
768
769 // ensure that it bubbles up to the top
770 expiry_ = expiry_.min();
771 }
772
773 time_point expiry() const noexcept { return expiry_; }
774 timer_id id() const { return id_; }
775
776 virtual void run() = 0;
777
778 private:
781 };
782
783 template <class Op>
785 public:
786 pending_timer_op(const Timer &timer, Op &&op)
787 : pending_timer(timer), op_{std::move(op)} {}
788
789 void run() override {
790 if (this->is_cancelled()) {
791 op_(make_error_code(std::errc::operation_canceled));
792 } else {
793 op_(std::error_code{});
794 }
795 }
796
797 private:
798 Op op_;
799 };
800
801 // cancelled timers, earliest cancelled timer first
802 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
803
804 // active timers, smallest time-point first
805 std::multimap<typename Timer::time_point, typename Timer::Id *>
807 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
809 };
810
811 /**
812 * async wait for a timer expire.
813 *
814 * adds the op and timer to the timer_queue
815 *
816 * @param timer timer
817 * @param op completion handler to call when timer is triggered
818 */
819 template <class Timer, class Op>
820 void async_wait(const Timer &timer, Op &&op) {
821 auto &queue = use_service<timer_queue<Timer>>(*this);
822
823 queue.push(timer, std::forward<Op>(op));
824
825 // wakeup the blocked poll_one() to handle possible timer events.
827 }
828
829 /**
830 * cancel all async-ops of a timer.
831 */
832 template <class Timer>
833 size_t cancel(const Timer &timer) {
834 if (!has_service<timer_queue<Timer>>(*this)) {
835 return 0;
836 }
837
838 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
839 if (count) {
840 // if a timer was canceled, interrupt the io-service
842 }
843 return count;
844 }
845
846 // cancel oldest
847 template <class Timer>
848 size_t cancel_one(const Timer & /* timer */) {
849 // TODO: implement if async_wait is implemented
850 return 0;
851 }
852
853 /** pointers to the timer-queues of this io-contexts.
854 *
855 * timer-queues are one per timer-type (std::chrono::steady_clock,
856 * std::chrono::system_clock, ...)
857 *
858 * timer_queue_base is the base class of the timer-queues
859 *
860 * the timer-queue's themselves are ownered by the io_context's executor via
861 * execution_context::add_service()
862 *
863 * protected via 'mtx_'
864 */
865 std::vector<timer_queue_base *> timer_queues_;
866
867 /**
868 * mutex that protects the core parts of the io-context.
869 *
870 * - timer_queues_
871 */
872 mutable std::mutex mtx_{};
873
874 mutable std::mutex do_one_mtx_{};
875 mutable std::condition_variable do_one_cond_{};
876 bool is_running_{false};
877
878 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
879 lk.lock();
881 }
882
883 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
884 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
885
886 is_running(true);
887 }
888
889 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
890 is_running(false);
891 lk.unlock();
892 do_one_cond_.notify_one();
893 }
894
895 void is_running(bool v) { is_running_ = v; }
896 bool is_running() const { return is_running_; }
897
899};
900} // namespace net
901
902namespace net {
904 count_type n = 0;
905
906 std::unique_lock<std::mutex> lk(do_one_mtx_);
907
908 using namespace std::chrono_literals;
909
910 // in the first round, we already have the lock, the all other rounds we
911 // need to take the lock first
912 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
913 wait_no_runner_(lk)) {
914 if (n != std::numeric_limits<count_type>::max()) ++n;
915 }
916 return n;
917}
918
920 using namespace std::chrono_literals;
921
922 std::unique_lock<std::mutex> lk(do_one_mtx_);
923
925
926 return do_one(lk, -1ms);
927}
928
929template <class Rep, class Period>
931 const std::chrono::duration<Rep, Period> &rel_time) {
932 return run_until(std::chrono::steady_clock::now() + rel_time);
933}
934
935template <class Clock, class Duration>
937 const std::chrono::time_point<Clock, Duration> &abs_time) {
938 count_type n = 0;
939
940 std::unique_lock<std::mutex> lk(do_one_mtx_);
941
942 using namespace std::chrono_literals;
943
944 // in the first round, we already have the lock, the all other rounds we
945 // need to take the lock first
946 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
947 wait_no_runner_(lk)) {
948 if (n != std::numeric_limits<count_type>::max()) ++n;
949 }
950 return n;
951}
952
953template <class Rep, class Period>
955 const std::chrono::duration<Rep, Period> &rel_time) {
956 return run_one_until(std::chrono::steady_clock::now() + rel_time);
957}
958
959template <class Clock, class Duration>
961 const std::chrono::time_point<Clock, Duration> &abs_time) {
962 std::unique_lock<std::mutex> lk(do_one_mtx_);
963
965
966 return do_one_until(lk, abs_time);
967}
968
970 count_type n = 0;
971 std::unique_lock<std::mutex> lk(do_one_mtx_);
972
973 using namespace std::chrono_literals;
974
975 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
976 wait_no_runner_(lk)) {
977 if (n != std::numeric_limits<count_type>::max()) ++n;
978 }
979 return n;
980}
981
983 std::unique_lock<std::mutex> lk(do_one_mtx_);
984
985 using namespace std::chrono_literals;
986
988 return do_one(lk, 0ms);
989}
990
992 public:
993 executor_type(const executor_type &rhs) noexcept = default;
994 executor_type(executor_type &&rhs) noexcept = default;
995 executor_type &operator=(const executor_type &rhs) noexcept = default;
996 executor_type &operator=(executor_type &&rhs) noexcept = default;
997
998 ~executor_type() = default;
999
1000 bool running_in_this_thread() const noexcept {
1002 }
1003 io_context &context() const noexcept { return *io_ctx_; }
1004
1005 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
1006 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
1007
1008 /**
1009 * execute function.
1010 *
1011 * Effect:
1012 *
1013 * The executor
1014 *
1015 * - MAY block forward progress of the caller until f() finishes.
1016 */
1017 template <class Func, class ProtoAllocator>
1018 void dispatch(Func &&f, const ProtoAllocator &a) const {
1019 if (running_in_this_thread()) {
1020 // run it in this thread.
1021 std::decay_t<Func>(std::forward<Func>(f))();
1022 } else {
1023 // queue function call for later execution.
1024 post(std::forward<Func>(f), a);
1025 }
1026 }
1027
1028 /**
1029 * queue function for execution.
1030 *
1031 * Effects:
1032 *
1033 * The executor
1034 *
1035 * - SHALL NOT block forward progress of the caller pending completion of f().
1036 * - MAY begin f() progress before the call to post completes.
1037 */
1038 template <class Func, class ProtoAllocator>
1039 void post(Func &&f, const ProtoAllocator &a) const {
1040 io_ctx_->defer_work(std::forward<Func>(f), a);
1041 }
1042
1043 /**
1044 * defer function call for later execution.
1045 *
1046 * Effect:
1047 *
1048 * The executor:
1049 *
1050 * - SHALL NOT block forward progress of the caller pending completion of f().
1051 * - SHOULD NOT begin f()'s progress before the call to defer()
1052 * completes.
1053 */
1054 template <class Func, class ProtoAllocator>
1055 void defer(Func &&f, const ProtoAllocator &a) const {
1056 post(std::forward<Func>(f), a);
1057 }
1058
1059 private:
1061
1062 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1063
1065};
1066
1068 const io_context::executor_type &b) noexcept {
1069 return std::addressof(a.context()) == std::addressof(b.context());
1070}
1072 const io_context::executor_type &b) noexcept {
1073 return !(a == b);
1074}
1075
1076// io_context::executor_type is an executor even though it doesn't have an
1077// default constructor
1078template <>
1079struct is_executor<io_context::executor_type> : std::true_type {};
1080
1082 return executor_type(*this);
1083}
1084
1085/**
1086 * cancel all async-ops of a file-descriptor.
1087 */
1089 native_handle_type fd) {
1090 bool need_notify{false};
1091 {
1092 // check all async-ops
1093 std::lock_guard<std::mutex> lk(mtx_);
1094
1095 while (auto op = active_ops_.extract_first(fd)) {
1096 op->cancel();
1097
1098 cancelled_ops_.push_back(std::move(op));
1099
1100 need_notify = true;
1101 }
1102 }
1103
1104 // wakeup the loop to deliver the cancelled fds
1105 if (true || need_notify) {
1106 io_service_->remove_fd(fd);
1107
1109 }
1110
1111 return {};
1112}
1113
1114template <class Clock, class Duration>
1116 std::unique_lock<std::mutex> &lk,
1117 const std::chrono::time_point<Clock, Duration> &abs_time) {
1118 using namespace std::chrono_literals;
1119
1120 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1121 auto rel_time_ms =
1122 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1123
1124 if (rel_time_ms < 0ms) {
1125 // expired already.
1126 rel_time_ms = 0ms;
1127 } else if (rel_time_ms < rel_time) {
1128 // std::chrono::ceil()
1129 rel_time_ms += 1ms;
1130 }
1131
1132 return do_one(lk, rel_time_ms);
1133}
1134
1136 if (impl::Callstack<io_context>::contains(this) == nullptr) {
1137 io_service_->notify();
1138 }
1139}
1140
1141// precond: lk MUST be locked
1143 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1145
1146 timer_queue_base *timer_q{nullptr};
1147
1148 monitor mon(*this);
1149
1150 if (!has_outstanding_work()) {
1151 wake_one_runner_(lk);
1152 return 0;
1153 }
1154
1155 while (true) {
1156 // 1. deferred work.
1157 // 2. timer
1158 // 3. triggered events.
1159
1160 // timer (2nd round)
1161 if (timer_q) {
1162 if (timer_q->run_one()) {
1163 wake_one_runner_(lk);
1164 return 1;
1165 } else {
1166 timer_q = nullptr;
1167 }
1168 }
1169
1170 // deferred work
1171 if (deferred_work_.run_one()) {
1172 wake_one_runner_(lk);
1173 return 1;
1174 }
1175
1176 // timer
1177 std::chrono::milliseconds min_duration{0};
1178 {
1179 std::lock_guard<std::mutex> lock(mtx_);
1180 // check the smallest timestamp of all timer-queues
1181 for (auto q : timer_queues_) {
1182 const auto duration = q->next();
1183
1184 if (duration == duration.zero()) {
1185 timer_q = q;
1186 min_duration = duration;
1187 break;
1188 } else if ((duration != duration.max()) &&
1189 (timeout != timeout.zero()) &&
1190 (duration < min_duration || timer_q == nullptr)) {
1191 timer_q = q;
1192 min_duration = duration;
1193 }
1194 }
1195 }
1196
1197 // if we have a timer that has fired or was cancelled, run it right away
1198 if (timer_q && min_duration <= min_duration.zero()) continue;
1199
1200 if (auto op = [this]() -> std::unique_ptr<async_op> {
1201 // handle all the cancelled ops without polling first
1202 std::lock_guard<std::mutex> lock(mtx_);
1203
1204 // ops have all cancelled operators at the front
1205 if (!cancelled_ops_.empty() &&
1206 cancelled_ops_.front()->is_cancelled()) {
1207 auto cancelled_op = std::move(cancelled_ops_.front());
1208
1209 cancelled_ops_.pop_front();
1210
1211 return cancelled_op;
1212 }
1213
1214 return {};
1215 }()) {
1216 // before we unlock the concurrent io-context-thread-lock increment the
1217 // work-count to ensure the next waiting thread exiting in case:
1218 //
1219 // - no io-events registered
1220 // - no timers registered
1222 wake_one_runner_(lk);
1223 op->run(*this);
1225
1226 return 1;
1227 }
1228
1229 if (stopped() || !io_service_open_res_) {
1230 break;
1231 }
1232
1233 // adjust min-duration according to caller's timeout
1234 //
1235 // - if there is no timer queued, use the caller's timeout
1236 // - if there is a timer queued, reduce min_duration to callers timeout if
1237 // it is lower and non-negative.
1238 //
1239 // note: negative timeout == infinite.
1240 if (timer_q == nullptr ||
1241 (timeout > timeout.zero() && timeout < min_duration)) {
1242 min_duration = timeout;
1243 }
1244
1245 auto res = io_service_->poll_one(min_duration);
1246 if (!res) {
1247 if (res.error() == std::errc::interrupted) {
1248 // poll again as it got interrupted
1249 continue;
1250 }
1251 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1252 timer_q != nullptr) {
1253 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1254 // less than the global timeout or there is no timeout.
1255 continue;
1256 }
1257
1258 wake_one_runner_(lk);
1259
1260#if 0
1261 // if the poll returns another error, it is a ok that we don't further
1262 // check it as we exit cleanly. Still it would be nice to be aware of it
1263 // in debug builds.
1264 assert(res.error() == io_service_errc::no_fds ||
1265 res.error() == std::errc::timed_out);
1266#endif
1267
1268 // either poll() timed out or there where no file-descriptors that fired
1269 return 0;
1270 }
1271
1272 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1273 // << res.value().event << std::endl;
1274
1275 if (auto op = [this](native_handle_type fd,
1276 short events) -> std::unique_ptr<async_op> {
1277 std::lock_guard<std::mutex> lock(mtx_);
1278
1279 return active_ops_.extract_first(fd, events);
1280 }(res->fd, res->event)) {
1282 wake_one_runner_(lk);
1283 op->run(*this);
1285 return 1;
1286 }
1287 // we may not find an async-op for this event if it already has been
1288 // cancelled. Loop around let the "is-cancelled" check handle it.
1289 }
1290
1291 wake_one_runner_(lk);
1292 return 0;
1293}
1294
1295} // namespace net
1296
1297#endif
Definition: io_service_base.h:87
Definition: socket.h:1293
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:470
Definition: socket.h:711
Definition: socket.h:1090
Definition: timer.h:57
Definition: executor.h:291
execution_context & context() noexcept
Definition: executor.h:297
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
void destroy() noexcept
Definition: executor.h:176
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:991
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1018
io_context * io_ctx_
Definition: io_context.h:1064
void on_work_started() const noexcept
Definition: io_context.h:1005
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1060
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1003
executor_type(io_context &ctx)
Definition: io_context.h:1062
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1039
bool running_in_this_thread() const noexcept
Definition: io_context.h:1000
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1055
void on_work_finished() const noexcept
Definition: io_context.h:1006
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:786
Op op_
Definition: io_context.h:798
void run() override
Definition: io_context.h:789
void cancel()
Definition: io_context.h:766
pending_timer(const Timer &timer)
Definition: io_context.h:760
time_point expiry() const noexcept
Definition: io_context.h:773
typename Timer::time_point time_point
Definition: io_context.h:757
timer_id id() const
Definition: io_context.h:774
typename Timer::Id * timer_id
Definition: io_context.h:758
bool is_cancelled() const
Definition: io_context.h:765
time_point expiry_
Definition: io_context.h:779
timer_id id_
Definition: io_context.h:780
Definition: io_context.h:513
timer_queue_base(execution_context &ctx)
Definition: io_context.h:515
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:517
Definition: io_context.h:525
std::chrono::milliseconds next() const override
Definition: io_context.h:577
size_t cancel(const Timer &t)
Definition: io_context.h:691
io_context & context() noexcept
Definition: io_context.h:544
bool run_one() override
Definition: io_context.h:623
void shutdown() noexcept override
Definition: io_context.h:542
timer_queue(execution_context &ctx)
Definition: io_context.h:529
void push(const Timer &timer, Op &&op)
Definition: io_context.h:549
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:806
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:808
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:802
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:969
bool is_running_
Definition: io_context.h:876
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:872
std::condition_variable do_one_cond_
Definition: io_context.h:875
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:936
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:919
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:889
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:883
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1115
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:930
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1135
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:820
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:954
size_t cancel_one(const Timer &)
Definition: io_context.h:848
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:874
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1142
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:896
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:865
count_type poll_one()
Definition: io_context.h:982
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:903
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:878
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1081
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:833
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1088
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:960
void is_running(bool v)
Definition: io_context.h:895
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:58
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:286
static int count
Definition: myisam_ftdump.cc:45
static QUEUE queue
Definition: myisampack.cc:210
static bool interrupted
Definition: mysqladmin.cc:72
Definition: http_server_component.cc:34
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:551
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:547
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
static std::mutex lock
Definition: net_ns.cc:56
Definition: executor.h:353
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4086