MySQL 8.3.0
Source Code Documentation
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2023, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23*/
24
25#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
26#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27
28#include <atomic>
29#include <chrono>
30#include <iterator>
31#include <limits> // numeric_limits
32#include <list>
33#include <map>
34#include <memory> // unique_ptr
35#include <mutex>
36#include <system_error> // error_code
37#include <unordered_map>
38#include <utility>
39#include <vector>
40
41#include "my_config.h" // HAVE_EPOLL
51
52namespace net {
53
54#if defined(HAVE_EPOLL)
56#else
58#endif
59
61 public:
62 class executor_type;
63
64 using count_type = size_t;
66
68 : io_context{std::make_unique<net::impl::socket::SocketService>(),
70
72 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
73 std::unique_ptr<IoServiceBase> &&io_service)
77
78 explicit io_context(int /* concurrency_hint */) : io_context() {}
79
82 cancelled_ops_.clear();
83 // Make sure the services are destroyed before our internal fields. The
84 // services own the timers that can indirectly call our methods when
85 // destructed. See UT NetTS_io_context.pending_timer_on_destroy for an
86 // example.
87 destroy();
88 }
89
90 io_context(const io_context &) = delete;
91 io_context &operator=(const io_context &) = delete;
92
93 executor_type get_executor() noexcept;
94
96
97 template <class Rep, class Period>
98 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
99
100 template <class Clock, class Duration>
102 const std::chrono::time_point<Clock, Duration> &abs_time);
103
105
106 template <class Rep, class Period>
107 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
108
109 template <class Clock, class Duration>
111 const std::chrono::time_point<Clock, Duration> &abs_time);
112
115 void stop() {
116 {
117 std::lock_guard<std::mutex> lk(mtx_);
118 stopped_ = true;
119 }
120
122 }
123
124 bool stopped() const noexcept {
125 std::lock_guard<std::mutex> lk(mtx_);
126 return stopped_;
127 }
128
129 void restart() {
130 std::lock_guard<std::mutex> lk(mtx_);
131 stopped_ = false;
132 }
133
135 return socket_service_.get();
136 }
137
138 IoServiceBase *io_service() const { return io_service_.get(); }
139
140 /**
141 * get the status of the implicit open() call of the io-service.
142 *
143 * the io_service_.open() may fail due to out-of-file-descriptors.
144 *
145 * run() will fail silently if the io-service failed to open.
146 *
147 * @returns std::error_code on error
148 */
151 }
152
153 private:
154 /**
155 * queued work from io_context::executor_type::dispatch()/post()/defer().
156 */
158 public:
159 // simple, generic storage of callable.
160 //
161 // std::function<void()> is similar, but doesn't work for move-only
162 // callables like lambda's that capture a move-only type
164 public:
165 virtual ~BasicCallable() = default;
166
167 virtual void invoke() = 0;
168 };
169
170 template <class Func>
171 class Callable : public BasicCallable {
172 public:
173 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
174
175 void invoke() override { f_(); }
176
177 private:
178 Func f_;
179 };
180
181 using op_type = std::unique_ptr<BasicCallable>;
182
183 /**
184 * run a deferred work item.
185 *
186 * @returns number work items run.
187 * @retval 0 work queue was empty, nothing was run.
188 */
189 size_t run_one() {
190 // tmp list to hold the current operation to run.
191 //
192 // makes it simple and fast to move the head element and shorten the time
193 // the lock is held.
194 decltype(work_) tmp;
195
196 // lock is only needed as long as we modify the work-queue.
197 {
198 std::lock_guard<std::mutex> lk(work_mtx_);
199
200 if (work_.empty()) return 0;
201
202 // move the head of the work queue out and release the lock.
203 //
204 // note: std::list.splice() moves pointers.
205 tmp.splice(tmp.begin(), work_, work_.begin());
206 }
207
208 // run the deferred work.
209 tmp.front()->invoke();
210
211 // and destruct the list at the end.
212
213 return 1;
214 }
215
216 /**
217 * queue work for later execution.
218 */
219 template <class Func, class ProtoAllocator>
220 void post(Func &&f, const ProtoAllocator & /* a */) {
221 std::lock_guard<std::mutex> lk(work_mtx_);
222
223 work_.emplace_back(
224 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
225 }
226
227 /**
228 * check if work is queued for later execution.
229 *
230 * @retval true if some work is queued.
231 */
232 bool has_outstanding_work() const {
233 std::lock_guard<std::mutex> lk(work_mtx_);
234 return !work_.empty();
235 }
236
237 private:
238 mutable std::mutex work_mtx_;
239 std::list<op_type> work_;
240 };
241
243
244 /**
245 * defer work for later execution.
246 */
247 template <class Func, class ProtoAllocator>
248 void defer_work(Func &&f, const ProtoAllocator &a) {
249 deferred_work_.post(std::forward<Func>(f), a);
250
251 // wakeup the possibly blocked io-thread.
253 }
254
255 template <class Clock, class Duration>
257 std::unique_lock<std::mutex> &lk,
258 const std::chrono::time_point<Clock, Duration> &abs_time);
259
260 count_type do_one(std::unique_lock<std::mutex> &lk,
261 std::chrono::milliseconds timeout);
262
263 template <typename _Clock, typename _WaitTraits>
265
267
268 template <class Protocol>
269 friend class basic_socket_impl;
270
271 template <class Protocol>
272 friend class basic_socket;
273
274 template <class Protocol>
276
277 template <class Protocol>
279
280 bool stopped_{false};
281 std::atomic<count_type> work_count_{};
282
283 // must be first member-var to ensure it is destroyed last
284 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
285 std::unique_ptr<IoServiceBase> io_service_;
287
288 // has outstanding work
289 //
290 // work is outstanding when
291 //
292 // - work-count from on_work_started()/on_work_finished() is more than 0 and
293 // - any active or cancelled operations are still ongoing
294 bool has_outstanding_work() const {
295 if (!cancelled_ops_.empty()) return true;
296 if (active_ops_.has_outstanding_work()) return true;
297 if (deferred_work_.has_outstanding_work()) return true;
298
299 if (work_count_ > 0) return true;
300
301 return false;
302 }
303
304 // monitor all .run()s in the io-context needs to be stopped.
305 class monitor {
306 public:
307 monitor(io_context &ctx) : ctx_{ctx} {}
308
309 monitor(const monitor &) = delete;
310 monitor(monitor &&) = delete;
311
313 std::lock_guard<std::mutex> lk(ctx_.mtx_);
314
315 // ctx_.call_stack_.pop_back();
316
317 if (!ctx_.has_outstanding_work()) {
318 // like stop(), just that we already have the mutex
319 ctx_.stopped_ = true;
320 ctx_.io_service_->notify();
321 }
322 }
323
324 private:
326 };
327
329
330 /**
331 * base class of async operation.
332 *
333 * - file-descriptor
334 * - wait-event
335 */
336 class async_op {
337 public:
339
341
342 virtual ~async_op() = default;
343
344 virtual void run(io_context &) = 0;
345
348
350 wait_type event() const { return event_; }
351
352 private:
355 };
356
357 /**
358 * async operation with callback.
359 */
360 template <class Op>
361 class async_op_impl : public async_op {
362 public:
364 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
365
366 void run(io_context & /* io_ctx */) override {
367 if (is_cancelled()) {
368 op_(make_error_code(std::errc::operation_canceled));
369 } else {
370 op_(std::error_code{});
371 }
372 }
373
374 private:
375 Op op_;
376 };
377
378 class AsyncOps {
379 public:
380 using element_type = std::unique_ptr<async_op>;
381
382 bool has_outstanding_work() const {
383 std::lock_guard<std::mutex> lk(mtx_);
384
385 return !ops_.empty();
386 }
387
389 const auto handle = t->native_handle();
390
391 std::lock_guard<std::mutex> lk(mtx_);
392
393 auto it = ops_.find(handle);
394 if (it != ops_.end()) {
395 it->second.push_back(std::move(t));
396 } else {
397 std::vector<element_type> v;
398 v.push_back(std::move(t));
399 ops_.emplace(handle, std::move(v));
400 }
401 }
402
404 return extract_first(fd, [events](auto const &el) {
405 return static_cast<short>(el->event()) & events;
406 });
407 }
408
410 return extract_first(fd, [](auto const &) { return true; });
411 }
412
413 void release_all() {
414 // We expect that this method is called before AsyncOps destructor, to
415 // make sure that ops_ map is empty when the destructor executes. If the
416 // ops_ is not empty when destructed, the destructor of its element can
417 // trigger a method that will try to access that map (that is destructed).
418 // Example: we have an AsyncOp that captures some Socket object with a
419 // smart pointer. When the destructor of this AsyncOp is called, it can
420 // also call the destructor of that Socket, which in turn will call
421 // socket.close(), causing the Socket to unregister its operations in
422 // its respective io_context object which is us (calls extract_first()).
423 std::list<element_type> ops_to_delete;
424 {
425 std::lock_guard<std::mutex> lk(mtx_);
426 for (auto &fd_ops : ops_) {
427 for (auto &fd_op : fd_ops.second) {
428 ops_to_delete.push_back(std::move(fd_op));
429 }
430 }
431 ops_.clear();
432 // It is important that we release the mtx_ here before the
433 // ops_to_delete go out of scope and are deleted. AsyncOp destructor can
434 // indirectly call extract_first which would lead to a deadlock.
435 }
436 }
437
438 private:
439 template <class Pred>
441 std::lock_guard<std::mutex> lk(mtx_);
442
443 const auto it = ops_.find(fd);
444 if (it != ops_.end()) {
445 auto &async_ops = it->second;
446
447 const auto end = async_ops.end();
448 for (auto cur = async_ops.begin(); cur != end; ++cur) {
449 auto &el = *cur;
450
451 if (el->native_handle() == fd && pred(el)) {
452 auto op = std::move(el);
453
454 if (async_ops.size() == 1) {
455 // remove the current container and with it its only element
456 ops_.erase(it);
457 } else {
458 // remove the current entry
459 async_ops.erase(cur);
460 }
461
462 return op;
463 }
464 }
465 }
466
467 return {};
468 }
469
470 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
471 16 * 1024};
472
473 mutable std::mutex mtx_;
474 };
475
477
478 // cancelled async operators.
479 std::list<std::unique_ptr<async_op>> cancelled_ops_;
480
481 template <class Op>
483 // add the socket-wait op to the queue
485 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
486
487 {
488 auto res = io_service_->add_fd_interest(fd, wt);
489 if (!res) {
490#if 0
491 // fd may be -1 or so
492 std::cerr << "!! add_fd_interest(" << fd << ", ..."
493 << ") " << res.error() << " " << res.error().message()
494 << std::endl;
495#endif
496 // adding failed. Cancel it again.
497 //
498 // code should be similar to ::cancel(fd)
499 std::lock_guard<std::mutex> lk(mtx_);
500
501 if (auto op = active_ops_.extract_first(fd, static_cast<short>(wt))) {
502 op->cancel();
503 cancelled_ops_.push_back(std::move(op));
504 }
505 }
506 }
507
509 }
510
512 protected:
514
515 mutable std::mutex queue_mtx_;
516
517 public:
518 virtual bool run_one() = 0;
519 virtual std::chrono::milliseconds next() const = 0;
520 };
521
522 template <class Timer>
524 public:
526
528 // add timer_queue to io_context
529
530 auto &io_ctx = static_cast<io_context &>(ctx);
531
532 // @note: don't move this lock+push into the timer_queue_base constructor
533 //
534 // @see
535 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
536 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
537 io_ctx.timer_queues_.push_back(this);
538 }
539
540 void shutdown() noexcept override {}
541
542 io_context &context() noexcept {
543 return static_cast<io_context &>(service::context());
544 }
545
546 template <class Op>
547 void push(const Timer &timer, Op &&op) {
549
550 std::lock_guard<std::mutex> lk(queue_mtx_);
551
552#if 0
553 pending_timers_.insert(
554 std::upper_bound(
555 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
556 [](const auto &a, const auto &b) { return a < b->expiry(); }),
557 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
558#else
559 if (timer.id() == nullptr) abort();
560
561 // add timer
562 pending_timers_.emplace(std::make_pair(
563 timer.id(),
564 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
565
566 if (timer.id() == nullptr) abort();
567 if (timer.expiry() == Timer::time_point::min()) abort();
568
569 // sorted timer ids by expiry
571 std::make_pair(timer.expiry(), timer.id()));
572#endif
573 }
574
575 std::chrono::milliseconds next() const override {
576 typename Timer::time_point expiry;
577 {
578 std::lock_guard<std::mutex> lk(queue_mtx_);
579
580 // no pending timers, return the max-timeout
581 if (cancelled_timers_.empty()) {
582 if (pending_timer_expiries_.empty())
583 return std::chrono::milliseconds::max();
584
585#if 0
586 expiry = pending_timers_.front()->expiry();
587#else
588 expiry = pending_timer_expiries_.begin()->first;
589#endif
590 } else {
591 // cancelled timers should be executed directly
592 return std::chrono::milliseconds::min();
593 }
594
595 // the lock isn't needed anymore.
596 }
597
598 auto duration = Timer::traits_type::to_wait_duration(expiry);
599 if (duration < duration.zero()) {
600 duration = duration.zero();
601 }
602
603 // round up the next wait-duration to wait /at least/ the expected time.
604 //
605 // In case the expiry is 990us, wait 1ms
606 // If it is 0ns, leave it at 0ms;
607
608 auto duration_ms =
609 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
610
611 using namespace std::chrono_literals;
612
613 // round up to the next millisecond.
614 if ((duration - duration_ms).count() != 0) {
615 duration_ms += 1ms;
616 }
617
618 return duration_ms;
619 }
620
621 bool run_one() override {
622 std::unique_ptr<pending_timer> pt;
623
624 {
625 std::lock_guard<std::mutex> lk(queue_mtx_);
626
627 // if the pending-timers queue is empty, leave
628 // if the top is cancelled or expired, run it
629 if (cancelled_timers_.empty()) {
630 if (pending_timers_.empty()) return false;
631
632#if 0
633 // list
634 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
635 return false;
636 }
637 pt = std::move(pending_timers_.front());
638 pending_timers_.pop_front();
639#else
640 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
641
642 auto min = Timer::time_point::min();
643 for (const auto &cur : pending_timer_expiries_) {
644 if (cur.first < min) abort();
645
646 min = cur.first;
647 }
648
649 const auto now = Timer::clock_type::now();
650
651 // multimap
652 auto pending_expiry_it = pending_timer_expiries_.begin();
653 auto timepoint = pending_expiry_it->first;
654
655 if (timepoint > now) {
656 // not expired yet. leave
657 return false;
658 }
659 typename Timer::Id *timer_id = pending_expiry_it->second;
660
661 auto pending_it = pending_timers_.find(timer_id);
662 if (pending_it == pending_timers_.end()) {
663 abort();
664 }
665 if (pending_it->second->id() != timer_id) {
666 abort();
667 }
668 if (pending_it->second->expiry() != pending_expiry_it->first) {
669 abort();
670 }
671
672 pt = std::move(pending_it->second);
673 pending_timer_expiries_.erase(pending_expiry_it);
674 pending_timers_.erase(pending_it);
675#endif
676 } else {
677 pt = std::move(cancelled_timers_.front());
678 cancelled_timers_.pop_front();
679 }
680 }
681
682 pt->run();
683
685
686 return true;
687 }
688
689 size_t cancel(const Timer &t) {
690 size_t count{};
691
692 {
693 std::lock_guard<std::mutex> lk(queue_mtx_);
694
695#if 0
696 const auto end = pending_timers_.end();
697
698 // the same timer may be pushed multiple times to the queue
699 // therefore, check all entries
700 for (auto cur = pending_timers_.begin(); cur != end;) {
701 auto &cur_timer = cur->second;
702 if (cur_timer->id() == t.id()) {
703 cur_timer->cancel();
704 ++count;
705
706 auto nxt = std::next(cur);
707 // move the timer over to the cancelled timers
709 cur);
710 cur = nxt;
711 } else {
712 ++cur;
713 }
714 }
715#else
716 auto eq_range = pending_timers_.equal_range(t.id());
717
718 for (auto cur = eq_range.first; cur != eq_range.second;) {
719 auto expiry_eq_range =
720 pending_timer_expiries_.equal_range(cur->second->expiry());
721
722 size_t erase_count{};
723
724 for (auto expiry_cur = expiry_eq_range.first;
725 expiry_cur != expiry_eq_range.second;) {
726 if (expiry_cur->first == cur->second->expiry() &&
727 expiry_cur->second == cur->second->id() && erase_count == 0) {
728 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
729 ++erase_count;
730 } else {
731 ++expiry_cur;
732 }
733 }
734
735 // nothing found ... boom
736 if (erase_count == 0) abort();
737
738 cur->second->cancel();
739
740 // move timer to cancelled timers
741 cancelled_timers_.emplace_back(std::move(cur->second));
742
743 ++count;
744
745 cur = pending_timers_.erase(cur);
746 }
747#endif
748 }
749
750 return count;
751 }
752
754 public:
755 using time_point = typename Timer::time_point;
756 using timer_id = typename Timer::Id *;
757
758 pending_timer(const Timer &timer)
759 : expiry_{timer.expiry()}, id_{timer.id()} {}
760
761 virtual ~pending_timer() = default;
762
763 bool is_cancelled() const { return id_ == nullptr; }
764 void cancel() {
765 id_ = nullptr;
766
767 // ensure that it bubbles up to the top
768 expiry_ = expiry_.min();
769 }
770
771 time_point expiry() const noexcept { return expiry_; }
772 timer_id id() const { return id_; }
773
774 virtual void run() = 0;
775
776 private:
779 };
780
781 template <class Op>
783 public:
784 pending_timer_op(const Timer &timer, Op &&op)
785 : pending_timer(timer), op_{std::move(op)} {}
786
787 void run() override {
788 if (this->is_cancelled()) {
789 op_(make_error_code(std::errc::operation_canceled));
790 } else {
791 op_(std::error_code{});
792 }
793 }
794
795 private:
796 Op op_;
797 };
798
799 // cancelled timers, earliest cancelled timer first
800 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
801
802 // active timers, smallest time-point first
803 std::multimap<typename Timer::time_point, typename Timer::Id *>
805 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
807 };
808
809 /**
810 * async wait for a timer expire.
811 *
812 * adds the op and timer to the timer_queue
813 *
814 * @param timer timer
815 * @param op completion handler to call when timer is triggered
816 */
817 template <class Timer, class Op>
818 void async_wait(const Timer &timer, Op &&op) {
819 auto &queue = use_service<timer_queue<Timer>>(*this);
820
821 queue.push(timer, std::forward<Op>(op));
822
823 // wakeup the blocked poll_one() to handle possible timer events.
825 }
826
827 /**
828 * cancel all async-ops of a timer.
829 */
830 template <class Timer>
831 size_t cancel(const Timer &timer) {
832 if (!has_service<timer_queue<Timer>>(*this)) {
833 return 0;
834 }
835
836 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
837 if (count) {
838 // if a timer was canceled, interrupt the io-service
840 }
841 return count;
842 }
843
844 // cancel oldest
845 template <class Timer>
846 size_t cancel_one(const Timer & /* timer */) {
847 // TODO: implement if async_wait is implemented
848 return 0;
849 }
850
851 /** pointers to the timer-queues of this io-contexts.
852 *
853 * timer-queues are one per timer-type (std::chrono::steady_clock,
854 * std::chrono::system_clock, ...)
855 *
856 * timer_queue_base is the base class of the timer-queues
857 *
858 * the timer-queue's themselves are ownered by the io_context's executor via
859 * execution_context::add_service()
860 *
861 * protected via 'mtx_'
862 */
863 std::vector<timer_queue_base *> timer_queues_;
864
865 /**
866 * mutex that protects the core parts of the io-context.
867 *
868 * - timer_queues_
869 */
870 mutable std::mutex mtx_{};
871
872 mutable std::mutex do_one_mtx_{};
873 mutable std::condition_variable do_one_cond_{};
874 bool is_running_{false};
875
876 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
877 lk.lock();
879 }
880
881 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
882 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
883
884 is_running(true);
885 }
886
887 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
888 is_running(false);
889 lk.unlock();
890 do_one_cond_.notify_one();
891 }
892
893 void is_running(bool v) { is_running_ = v; }
894 bool is_running() const { return is_running_; }
895
897};
898} // namespace net
899
900namespace net {
902 count_type n = 0;
903
904 std::unique_lock<std::mutex> lk(do_one_mtx_);
905
906 using namespace std::chrono_literals;
907
908 // in the first round, we already have the lock, the all other rounds we
909 // need to take the lock first
910 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
911 wait_no_runner_(lk)) {
912 if (n != std::numeric_limits<count_type>::max()) ++n;
913 }
914 return n;
915}
916
918 using namespace std::chrono_literals;
919
920 std::unique_lock<std::mutex> lk(do_one_mtx_);
921
923
924 return do_one(lk, -1ms);
925}
926
927template <class Rep, class Period>
929 const std::chrono::duration<Rep, Period> &rel_time) {
930 return run_until(std::chrono::steady_clock::now() + rel_time);
931}
932
933template <class Clock, class Duration>
935 const std::chrono::time_point<Clock, Duration> &abs_time) {
936 count_type n = 0;
937
938 std::unique_lock<std::mutex> lk(do_one_mtx_);
939
940 using namespace std::chrono_literals;
941
942 // in the first round, we already have the lock, the all other rounds we
943 // need to take the lock first
944 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
945 wait_no_runner_(lk)) {
946 if (n != std::numeric_limits<count_type>::max()) ++n;
947 }
948 return n;
949}
950
951template <class Rep, class Period>
953 const std::chrono::duration<Rep, Period> &rel_time) {
954 return run_one_until(std::chrono::steady_clock::now() + rel_time);
955}
956
957template <class Clock, class Duration>
959 const std::chrono::time_point<Clock, Duration> &abs_time) {
960 std::unique_lock<std::mutex> lk(do_one_mtx_);
961
963
964 return do_one_until(lk, abs_time);
965}
966
968 count_type n = 0;
969 std::unique_lock<std::mutex> lk(do_one_mtx_);
970
971 using namespace std::chrono_literals;
972
973 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
974 wait_no_runner_(lk)) {
975 if (n != std::numeric_limits<count_type>::max()) ++n;
976 }
977 return n;
978}
979
981 std::unique_lock<std::mutex> lk(do_one_mtx_);
982
983 using namespace std::chrono_literals;
984
986 return do_one(lk, 0ms);
987}
988
990 public:
991 executor_type(const executor_type &rhs) noexcept = default;
992 executor_type(executor_type &&rhs) noexcept = default;
993 executor_type &operator=(const executor_type &rhs) noexcept = default;
994 executor_type &operator=(executor_type &&rhs) noexcept = default;
995
996 ~executor_type() = default;
997
998 bool running_in_this_thread() const noexcept {
1000 }
1001 io_context &context() const noexcept { return *io_ctx_; }
1002
1003 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
1004 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
1005
1006 /**
1007 * execute function.
1008 *
1009 * Effect:
1010 *
1011 * The executor
1012 *
1013 * - MAY block forward progress of the caller until f() finishes.
1014 */
1015 template <class Func, class ProtoAllocator>
1016 void dispatch(Func &&f, const ProtoAllocator &a) const {
1017 if (running_in_this_thread()) {
1018 // run it in this thread.
1019 std::decay_t<Func>(std::forward<Func>(f))();
1020 } else {
1021 // queue function call for later execution.
1022 post(std::forward<Func>(f), a);
1023 }
1024 }
1025
1026 /**
1027 * queue function for execution.
1028 *
1029 * Effects:
1030 *
1031 * The executor
1032 *
1033 * - SHALL NOT block forward progress of the caller pending completion of f().
1034 * - MAY begin f() progress before the call to post completes.
1035 */
1036 template <class Func, class ProtoAllocator>
1037 void post(Func &&f, const ProtoAllocator &a) const {
1038 io_ctx_->defer_work(std::forward<Func>(f), a);
1039 }
1040
1041 /**
1042 * defer function call for later execution.
1043 *
1044 * Effect:
1045 *
1046 * The executor:
1047 *
1048 * - SHALL NOT block forward progress of the caller pending completion of f().
1049 * - SHOULD NOT begin f()'s progress before the call to defer()
1050 * completes.
1051 */
1052 template <class Func, class ProtoAllocator>
1053 void defer(Func &&f, const ProtoAllocator &a) const {
1054 post(std::forward<Func>(f), a);
1055 }
1056
1057 private:
1059
1060 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1061
1063};
1064
1066 const io_context::executor_type &b) noexcept {
1067 return std::addressof(a.context()) == std::addressof(b.context());
1068}
1070 const io_context::executor_type &b) noexcept {
1071 return !(a == b);
1072}
1073
1074// io_context::executor_type is an executor even though it doesn't have an
1075// default constructor
1076template <>
1077struct is_executor<io_context::executor_type> : std::true_type {};
1078
1080 return executor_type(*this);
1081}
1082
1083/**
1084 * cancel all async-ops of a file-descriptor.
1085 */
1087 native_handle_type fd) {
1088 bool need_notify{false};
1089 {
1090 // check all async-ops
1091 std::lock_guard<std::mutex> lk(mtx_);
1092
1093 while (auto op = active_ops_.extract_first(fd)) {
1094 op->cancel();
1095
1096 cancelled_ops_.push_back(std::move(op));
1097
1098 need_notify = true;
1099 }
1100 }
1101
1102 // wakeup the loop to deliver the cancelled fds
1103 if (true || need_notify) {
1104 io_service_->remove_fd(fd);
1105
1107 }
1108
1109 return {};
1110}
1111
1112template <class Clock, class Duration>
1114 std::unique_lock<std::mutex> &lk,
1115 const std::chrono::time_point<Clock, Duration> &abs_time) {
1116 using namespace std::chrono_literals;
1117
1118 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1119 auto rel_time_ms =
1120 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1121
1122 if (rel_time_ms < 0ms) {
1123 // expired already.
1124 rel_time_ms = 0ms;
1125 } else if (rel_time_ms < rel_time) {
1126 // std::chrono::ceil()
1127 rel_time_ms += 1ms;
1128 }
1129
1130 return do_one(lk, rel_time_ms);
1131}
1132
1134 if (impl::Callstack<io_context>::contains(this) == nullptr) {
1135 io_service_->notify();
1136 }
1137}
1138
1139// precond: lk MUST be locked
1141 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1143
1144 timer_queue_base *timer_q{nullptr};
1145
1146 monitor mon(*this);
1147
1148 if (!has_outstanding_work()) {
1149 wake_one_runner_(lk);
1150 return 0;
1151 }
1152
1153 while (true) {
1154 // 1. deferred work.
1155 // 2. timer
1156 // 3. triggered events.
1157
1158 // timer (2nd round)
1159 if (timer_q) {
1160 if (timer_q->run_one()) {
1161 wake_one_runner_(lk);
1162 return 1;
1163 } else {
1164 timer_q = nullptr;
1165 }
1166 }
1167
1168 // deferred work
1169 if (deferred_work_.run_one()) {
1170 wake_one_runner_(lk);
1171 return 1;
1172 }
1173
1174 // timer
1175 std::chrono::milliseconds min_duration{0};
1176 {
1177 std::lock_guard<std::mutex> lk(mtx_);
1178 // check the smallest timestamp of all timer-queues
1179 for (auto q : timer_queues_) {
1180 const auto duration = q->next();
1181
1182 if (duration == duration.zero()) {
1183 timer_q = q;
1184 min_duration = duration;
1185 break;
1186 } else if ((duration != duration.max()) &&
1187 (timeout != timeout.zero()) &&
1188 (duration < min_duration || timer_q == nullptr)) {
1189 timer_q = q;
1190 min_duration = duration;
1191 }
1192 }
1193 }
1194
1195 // if we have a timer that has fired or was cancelled, run it right away
1196 if (timer_q && min_duration <= min_duration.zero()) continue;
1197
1198 if (auto op = [this]() -> std::unique_ptr<async_op> {
1199 // handle all the cancelled ops without polling first
1200 std::lock_guard<std::mutex> lk(mtx_);
1201
1202 // ops have all cancelled operators at the front
1203 if (!cancelled_ops_.empty() &&
1204 cancelled_ops_.front()->is_cancelled()) {
1205 auto op = std::move(cancelled_ops_.front());
1206
1207 cancelled_ops_.pop_front();
1208
1209 return op;
1210 }
1211
1212 return {};
1213 }()) {
1214 // before we unlock the concurrent io-context-thread-lock increment the
1215 // work-count to ensure the next waiting thread exiting in case:
1216 //
1217 // - no io-events registered
1218 // - no timers registered
1220 wake_one_runner_(lk);
1221 op->run(*this);
1223
1224 return 1;
1225 }
1226
1227 if (stopped() || !io_service_open_res_) {
1228 break;
1229 }
1230
1231 // adjust min-duration according to caller's timeout
1232 //
1233 // - if there is no timer queued, use the caller's timeout
1234 // - if there is a timer queued, reduce min_duration to callers timeout if
1235 // it is lower and non-negative.
1236 //
1237 // note: negative timeout == infinite.
1238 if (timer_q == nullptr ||
1239 (timeout > timeout.zero() && timeout < min_duration)) {
1240 min_duration = timeout;
1241 }
1242
1243 auto res = io_service_->poll_one(min_duration);
1244 if (!res) {
1245 if (res.error() == std::errc::interrupted) {
1246 // poll again as it got interrupted
1247 continue;
1248 }
1249 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1250 timer_q != nullptr) {
1251 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1252 // less than the global timeout or there is no timeout.
1253 continue;
1254 }
1255
1256 wake_one_runner_(lk);
1257
1258#if 0
1259 // if the poll returns another error, it is a ok that we don't further
1260 // check it as we exit cleanly. Still it would be nice to be aware of it
1261 // in debug builds.
1262 assert(res.error() == io_service_errc::no_fds ||
1263 res.error() == std::errc::timed_out);
1264#endif
1265
1266 // either poll() timed out or there where no file-descriptors that fired
1267 return 0;
1268 }
1269
1270 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1271 // << res.value().event << std::endl;
1272
1273 if (auto op = [this](native_handle_type fd,
1274 short events) -> std::unique_ptr<async_op> {
1275 std::lock_guard<std::mutex> lk(mtx_);
1276
1277 return active_ops_.extract_first(fd, events);
1278 }(res->fd, res->event)) {
1280 wake_one_runner_(lk);
1281 op->run(*this);
1283 return 1;
1284 }
1285 // we may not find an async-op for this event if it already has been
1286 // cancelled. Loop around let the "is-cancelled" check handle it.
1287 }
1288
1289 wake_one_runner_(lk);
1290 return 0;
1291}
1292
1293} // namespace net
1294
1295#endif
Definition: io_service_base.h:86
Definition: socket.h:1292
template-less base-class of basic_socket_impl.
Definition: socket.h:334
Definition: socket.h:470
Definition: socket.h:710
Definition: socket.h:1089
Definition: timer.h:56
Definition: executor.h:290
execution_context & context() noexcept
Definition: executor.h:296
Definition: executor.h:153
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:282
void destroy() noexcept
Definition: executor.h:175
Definition: callstack.h:79
callstack of a thread.
Definition: callstack.h:70
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:150
Definition: socket_service_base.h:47
Definition: io_context.h:378
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:470
std::mutex mtx_
Definition: io_context.h:473
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:403
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:440
void release_all()
Definition: io_context.h:413
element_type extract_first(native_handle_type fd)
Definition: io_context.h:409
bool has_outstanding_work() const
Definition: io_context.h:382
void push_back(element_type &&t)
Definition: io_context.h:388
std::unique_ptr< async_op > element_type
Definition: io_context.h:380
Definition: io_context.h:171
Callable(Func &&f)
Definition: io_context.h:173
void invoke() override
Definition: io_context.h:175
Func f_
Definition: io_context.h:178
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:157
std::list< op_type > work_
Definition: io_context.h:239
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:181
std::mutex work_mtx_
Definition: io_context.h:238
size_t run_one()
run a deferred work item.
Definition: io_context.h:189
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:232
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:220
async operation with callback.
Definition: io_context.h:361
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:363
Op op_
Definition: io_context.h:375
void run(io_context &) override
Definition: io_context.h:366
base class of async operation.
Definition: io_context.h:336
native_handle_type fd_
Definition: io_context.h:353
wait_type event() const
Definition: io_context.h:350
virtual ~async_op()=default
void cancel()
Definition: io_context.h:346
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:340
bool is_cancelled() const
Definition: io_context.h:347
native_handle_type native_handle() const
Definition: io_context.h:349
wait_type event_
Definition: io_context.h:354
Definition: io_context.h:989
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1016
io_context * io_ctx_
Definition: io_context.h:1062
void on_work_started() const noexcept
Definition: io_context.h:1003
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1058
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1001
executor_type(io_context &ctx)
Definition: io_context.h:1060
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1037
bool running_in_this_thread() const noexcept
Definition: io_context.h:998
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1053
void on_work_finished() const noexcept
Definition: io_context.h:1004
Definition: io_context.h:305
io_context & ctx_
Definition: io_context.h:325
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:307
~monitor()
Definition: io_context.h:312
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:784
Op op_
Definition: io_context.h:796
void run() override
Definition: io_context.h:787
void cancel()
Definition: io_context.h:764
pending_timer(const Timer &timer)
Definition: io_context.h:758
time_point expiry() const noexcept
Definition: io_context.h:771
typename Timer::time_point time_point
Definition: io_context.h:755
timer_id id() const
Definition: io_context.h:772
typename Timer::Id * timer_id
Definition: io_context.h:756
bool is_cancelled() const
Definition: io_context.h:763
time_point expiry_
Definition: io_context.h:777
timer_id id_
Definition: io_context.h:778
Definition: io_context.h:511
timer_queue_base(execution_context &ctx)
Definition: io_context.h:513
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:515
Definition: io_context.h:523
std::chrono::milliseconds next() const override
Definition: io_context.h:575
size_t cancel(const Timer &t)
Definition: io_context.h:689
io_context & context() noexcept
Definition: io_context.h:542
bool run_one() override
Definition: io_context.h:621
void shutdown() noexcept override
Definition: io_context.h:540
timer_queue(execution_context &ctx)
Definition: io_context.h:527
void push(const Timer &timer, Op &&op)
Definition: io_context.h:547
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:804
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:806
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:800
Definition: io_context.h:60
count_type poll()
Definition: io_context.h:967
bool is_running_
Definition: io_context.h:874
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:870
std::condition_variable do_one_cond_
Definition: io_context.h:873
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:934
~io_context()
Definition: io_context.h:80
std::atomic< count_type > work_count_
Definition: io_context.h:281
count_type run_one()
Definition: io_context.h:917
io_context()
Definition: io_context.h:67
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:887
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:881
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:284
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:285
DeferredWork deferred_work_
Definition: io_context.h:242
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:248
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1113
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:928
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1133
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:818
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:149
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:952
size_t cancel_one(const Timer &)
Definition: io_context.h:846
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:65
std::mutex do_one_mtx_
Definition: io_context.h:872
io_context(int)
Definition: io_context.h:78
IoServiceBase * io_service() const
Definition: io_context.h:138
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:286
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1140
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:479
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:71
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:134
bool is_running() const
Definition: io_context.h:894
AsyncOps active_ops_
Definition: io_context.h:476
bool stopped_
Definition: io_context.h:280
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:863
count_type poll_one()
Definition: io_context.h:980
void stop()
Definition: io_context.h:115
count_type run()
Definition: io_context.h:901
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:876
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:482
executor_type get_executor() noexcept
Definition: io_context.h:1079
void restart()
Definition: io_context.h:129
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:831
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1086
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:958
void is_running(bool v)
Definition: io_context.h:893
size_t count_type
Definition: io_context.h:64
bool has_outstanding_work() const
Definition: io_context.h:294
bool stopped() const noexcept
Definition: io_context.h:124
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:57
io_service based on the poll() system-call.
Definition: poll_io_service.h:51
Definition: expected.h:943
static int count
Definition: myisam_ftdump.cc:44
static QUEUE queue
Definition: myisampack.cc:209
static bool interrupted
Definition: mysqladmin.cc:70
Definition: authentication.cc:35
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:497
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:62
wait_type
Definition: socket_constants.h:85
int native_handle_type
Definition: socket_constants.h:50
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:51
Definition: buffer.h:44
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:558
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:554
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:102
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:63
Definition: varlen_sort.h:174
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:78
Definition: executor.h:352
int n
Definition: xcom_base.cc:508
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4085