MySQL 8.0.29
Source Code Documentation
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23*/
24
25#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
26#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27
28#include <atomic>
29#include <chrono>
30#include <iterator>
31#include <limits> // numeric_limits
32#include <list>
33#include <map>
34#include <memory> // unique_ptr
35#include <mutex>
36#include <system_error> // error_code
37#include <unordered_map>
38#include <utility>
39#include <vector>
40
41#include "my_config.h" // HAVE_EPOLL
50
51namespace net {
52
53#if defined(HAVE_EPOLL)
55#else
57#endif
58
60 public:
61 class executor_type;
62
63 using count_type = size_t;
65
67 : io_context{std::make_unique<net::impl::socket::SocketService>(),
69
71 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
72 std::unique_ptr<IoServiceBase> &&io_service)
76
77 explicit io_context(int /* concurrency_hint */) : io_context() {}
78 io_context(const io_context &) = delete;
79 io_context &operator=(const io_context &) = delete;
80
81 executor_type get_executor() noexcept;
82
84
85 template <class Rep, class Period>
86 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
87
88 template <class Clock, class Duration>
90 const std::chrono::time_point<Clock, Duration> &abs_time);
91
93
94 template <class Rep, class Period>
95 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
96
97 template <class Clock, class Duration>
99 const std::chrono::time_point<Clock, Duration> &abs_time);
100
103 void stop() {
104 {
105 std::lock_guard<std::mutex> lk(mtx_);
106 stopped_ = true;
107 }
108
109 io_service_->notify();
110 }
111
112 bool stopped() const noexcept {
113 std::lock_guard<std::mutex> lk(mtx_);
114 return stopped_;
115 }
116
117 void restart() {
118 std::lock_guard<std::mutex> lk(mtx_);
119 stopped_ = false;
120 }
121
123 return socket_service_.get();
124 }
125
126 IoServiceBase *io_service() const { return io_service_.get(); }
127
128 /**
129 * get the status of the implicit open() call of the io-service.
130 *
131 * the io_service_.open() may fail due to out-of-file-descriptors.
132 *
133 * run() will fail silently if the io-service failed to open.
134 *
135 * @returns std::error_code on error
136 */
139 }
140
141 private:
142 /**
143 * queued work from io_context::executor_type::dispatch()/post()/defer().
144 */
146 public:
147 // simple, generic storage of callable.
148 //
149 // std::function<void()> is similar, but doesn't work for move-only
150 // callables like lambda's that capture a move-only type
152 public:
153 virtual ~BasicCallable() = default;
154
155 virtual void invoke() = 0;
156 };
157
158 template <class Func>
159 class Callable : public BasicCallable {
160 public:
161 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
162
163 void invoke() override { f_(); }
164
165 private:
166 Func f_;
167 };
168
169 using op_type = std::unique_ptr<BasicCallable>;
170
171 /**
172 * run a deferred work item.
173 *
174 * @returns number work items run.
175 * @retval 0 work queue was empty, nothing was run.
176 */
177 size_t run_one() {
178 // tmp list to hold the current operation to run.
179 //
180 // makes it simple and fast to move the head element and shorten the time
181 // the lock is held.
182 decltype(work_) tmp;
183
184 // lock is only needed as long as we modify the work-queue.
185 {
186 std::lock_guard<std::mutex> lk(work_mtx_);
187
188 if (work_.empty()) return 0;
189
190 // move the head of the work queue out and release the lock.
191 //
192 // note: std::list.splice() moves pointers.
193 tmp.splice(tmp.begin(), work_, work_.begin());
194 }
195
196 // run the deferred work.
197 tmp.front()->invoke();
198
199 // and destruct the list at the end.
200
201 return 1;
202 }
203
204 /**
205 * queue work for later execution.
206 */
207 template <class Func, class ProtoAllocator>
208 void post(Func &&f, const ProtoAllocator & /* a */) {
209 std::lock_guard<std::mutex> lk(work_mtx_);
210
211 work_.emplace_back(
212 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
213 }
214
215 /**
216 * check if work is queued for later execution.
217 *
218 * @retval true if some work is queued.
219 */
220 bool has_outstanding_work() const {
221 std::lock_guard<std::mutex> lk(work_mtx_);
222 return !work_.empty();
223 }
224
225 private:
226 mutable std::mutex work_mtx_;
227 std::list<op_type> work_;
228 };
229
231
232 /**
233 * defer work for later execution.
234 */
235 template <class Func, class ProtoAllocator>
236 void defer_work(Func &&f, const ProtoAllocator &a) {
237 deferred_work_.post(std::forward<Func>(f), a);
238
239 // wakeup the possibly blocked io-thread.
240 io_service()->notify();
241 }
242
243 template <class Clock, class Duration>
245 std::unique_lock<std::mutex> &lk,
246 const std::chrono::time_point<Clock, Duration> &abs_time);
247
248 count_type do_one(std::unique_lock<std::mutex> &lk,
249 std::chrono::milliseconds timeout);
250
251 template <typename _Clock, typename _WaitTraits>
253
255
256 template <class Protocol>
257 friend class basic_socket_impl;
258
259 template <class Protocol>
260 friend class basic_socket;
261
262 template <class Protocol>
264
265 template <class Protocol>
267
268 bool stopped_{false};
269 std::atomic<count_type> work_count_{};
270
271 // must be first member-var to ensure it is destroyed last
272 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
273 std::unique_ptr<IoServiceBase> io_service_;
275
276 // has outstanding work
277 //
278 // work is outstanding when
279 //
280 // - work-count from on_work_started()/on_work_finished() is more than 0 and
281 // - any active or cancelled operations are still ongoing
282 bool has_outstanding_work() const {
283 if (!cancelled_ops_.empty()) return true;
284 if (active_ops_.has_outstanding_work()) return true;
285 if (deferred_work_.has_outstanding_work()) return true;
286
287 if (work_count_ > 0) return true;
288
289 return false;
290 }
291
292 // monitor all .run()s in the io-context needs to be stopped.
293 class monitor {
294 public:
295 monitor(io_context &ctx) : ctx_{ctx} {}
296
297 monitor(const monitor &) = delete;
298 monitor(monitor &&) = delete;
299
301 std::lock_guard<std::mutex> lk(ctx_.mtx_);
302
303 // ctx_.call_stack_.pop_back();
304
305 if (!ctx_.has_outstanding_work()) {
306 // like stop(), just that we already have the mutex
307 ctx_.stopped_ = true;
308 ctx_.io_service_->notify();
309 }
310 }
311
312 private:
314 };
315
317
318 /**
319 * base class of async operation.
320 *
321 * - file-descriptor
322 * - wait-event
323 */
324 class async_op {
325 public:
327
329
330 virtual ~async_op() = default;
331
332 virtual void run(io_context &) = 0;
333
336
338 wait_type event() const { return event_; }
339
340 private:
343 };
344
345 /**
346 * async operation with callback.
347 */
348 template <class Op>
349 class async_op_impl : public async_op {
350 public:
352 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
353
354 void run(io_context & /* io_ctx */) override {
355 if (is_cancelled()) {
356 op_(make_error_code(std::errc::operation_canceled));
357 } else {
358 op_(std::error_code{});
359 }
360 }
361
362 private:
363 Op op_;
364 };
365
366 class AsyncOps {
367 public:
368 using element_type = std::unique_ptr<async_op>;
369
370 bool has_outstanding_work() const {
371 std::lock_guard<std::mutex> lk(mtx_);
372
373 return !ops_.empty();
374 }
375
377 const auto handle = t->native_handle();
378
379 std::lock_guard<std::mutex> lk(mtx_);
380
381 auto it = ops_.find(handle);
382 if (it != ops_.end()) {
383 it->second.push_back(std::move(t));
384 } else {
385 std::vector<element_type> v;
386 v.push_back(std::move(t));
387 ops_.emplace(handle, std::move(v));
388 }
389 }
390
392 return extract_first(fd, [events](auto const &el) {
393 return static_cast<short>(el->event()) & events;
394 });
395 }
396
398 return extract_first(fd, [](auto const &) { return true; });
399 }
400
401 private:
402 template <class Pred>
404 std::lock_guard<std::mutex> lk(mtx_);
405
406 const auto it = ops_.find(fd);
407 if (it != ops_.end()) {
408 auto &async_ops = it->second;
409
410 const auto end = async_ops.end();
411 for (auto cur = async_ops.begin(); cur != end; ++cur) {
412 auto &el = *cur;
413
414 if (el->native_handle() == fd && pred(el)) {
415 auto op = std::move(el);
416
417 if (async_ops.size() == 1) {
418 // remove the current container and with it its only element
419 ops_.erase(it);
420 } else {
421 // remove the current entry
422 async_ops.erase(cur);
423 }
424
425 return op;
426 }
427 }
428 }
429
430 return {};
431 }
432
433 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
434 16 * 1024};
435
436 mutable std::mutex mtx_;
437 };
438
440
441 // cancelled async operators.
442 std::list<std::unique_ptr<async_op>> cancelled_ops_;
443
444 template <class Op>
446 // add the socket-wait op to the queue
448 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
449
450 {
451 auto res = io_service_->add_fd_interest(fd, wt);
452 if (!res) {
453#if !defined(NDEBUG)
454 // fd may be -1 or so
455 std::cerr << "!! add_fd_interest(" << fd << ", ..."
456 << ") " << res.error() << " " << res.error().message()
457 << std::endl;
458#endif
459 // adding failed. Cancel it again.
460 //
461 // code should be similar to ::cancel(fd)
462 std::lock_guard<std::mutex> lk(mtx_);
463
464 if (auto op = active_ops_.extract_first(fd, static_cast<short>(wt))) {
465 op->cancel();
466 cancelled_ops_.push_back(std::move(op));
467 }
468 }
469 }
470
471 io_service_->notify();
472 }
473
475 protected:
477
478 mutable std::mutex queue_mtx_;
479
480 public:
481 virtual bool run_one() = 0;
482 virtual std::chrono::milliseconds next() const = 0;
483 };
484
485 template <class Timer>
487 public:
489
491 // add timer_queue to io_context
492
493 auto &io_ctx = static_cast<io_context &>(ctx);
494
495 // @note: don't move this lock+push into the timer_queue_base constructor
496 //
497 // @see
498 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
499 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
500 io_ctx.timer_queues_.push_back(this);
501 }
502
503 void shutdown() noexcept override {}
504
505 io_context &context() noexcept {
506 return static_cast<io_context &>(service::context());
507 }
508
509 template <class Op>
510 void push(const Timer &timer, Op &&op) {
512
513 std::lock_guard<std::mutex> lk(queue_mtx_);
514
515#if 0
516 pending_timers_.insert(
517 std::upper_bound(
518 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
519 [](const auto &a, const auto &b) { return a < b->expiry(); }),
520 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
521#else
522 if (timer.id() == nullptr) abort();
523
524 // add timer
525 pending_timers_.emplace(std::make_pair(
526 timer.id(),
527 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
528
529 if (timer.id() == nullptr) abort();
530 if (timer.expiry() == Timer::time_point::min()) abort();
531
532 // sorted timer ids by expiry
534 std::make_pair(timer.expiry(), timer.id()));
535#endif
536 }
537
538 std::chrono::milliseconds next() const override {
539 typename Timer::time_point expiry;
540 {
541 std::lock_guard<std::mutex> lk(queue_mtx_);
542
543 // no pending timers, return the max-timeout
544 if (cancelled_timers_.empty()) {
545 if (pending_timer_expiries_.empty())
546 return std::chrono::milliseconds::max();
547
548#if 0
549 expiry = pending_timers_.front()->expiry();
550#else
551 expiry = pending_timer_expiries_.begin()->first;
552#endif
553 } else {
554 // cancelled timers should be executed directly
555 return std::chrono::milliseconds::min();
556 }
557
558 // the lock isn't needed anymore.
559 }
560
561 auto duration = Timer::traits_type::to_wait_duration(expiry);
562 if (duration < duration.zero()) {
563 duration = duration.zero();
564 }
565
566 // round up the next wait-duration to wait /at least/ the expected time.
567 //
568 // In case the expiry is 990us, wait 1ms
569 // If it is 0ns, leave it at 0ms;
570
571 auto duration_ms =
572 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
573
574 using namespace std::chrono_literals;
575
576 // round up to the next millisecond.
577 if ((duration - duration_ms).count() != 0) {
578 duration_ms += 1ms;
579 }
580
581 return duration_ms;
582 }
583
584 bool run_one() override {
585 std::unique_ptr<pending_timer> pt;
586
587 {
588 std::lock_guard<std::mutex> lk(queue_mtx_);
589
590 // if the pending-timers queue is empty, leave
591 // if the top is cancelled or expired, run it
592 if (cancelled_timers_.empty()) {
593 if (pending_timers_.empty()) return false;
594
595#if 0
596 // list
597 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
598 return false;
599 }
600 pt = std::move(pending_timers_.front());
601 pending_timers_.pop_front();
602#else
603 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
604
605 auto min = Timer::time_point::min();
606 for (const auto &cur : pending_timer_expiries_) {
607 if (cur.first < min) abort();
608
609 min = cur.first;
610 }
611
612 const auto now = Timer::clock_type::now();
613
614 // multimap
615 auto pending_expiry_it = pending_timer_expiries_.begin();
616 auto timepoint = pending_expiry_it->first;
617
618 if (timepoint > now) {
619 // not expired yet. leave
620 return false;
621 }
622 typename Timer::Id *timer_id = pending_expiry_it->second;
623
624 auto pending_it = pending_timers_.find(timer_id);
625 if (pending_it == pending_timers_.end()) {
626 abort();
627 }
628 if (pending_it->second->id() != timer_id) {
629 abort();
630 }
631 if (pending_it->second->expiry() != pending_expiry_it->first) {
632 abort();
633 }
634
635 pt = std::move(pending_it->second);
636 pending_timer_expiries_.erase(pending_expiry_it);
637 pending_timers_.erase(pending_it);
638#endif
639 } else {
640 pt = std::move(cancelled_timers_.front());
641 cancelled_timers_.pop_front();
642 }
643 }
644
645 pt->run();
646
648
649 return true;
650 }
651
652 size_t cancel(const Timer &t) {
653 size_t count{};
654
655 {
656 std::lock_guard<std::mutex> lk(queue_mtx_);
657
658#if 0
659 const auto end = pending_timers_.end();
660
661 // the same timer may be pushed multiple times to the queue
662 // therefore, check all entries
663 for (auto cur = pending_timers_.begin(); cur != end;) {
664 auto &cur_timer = cur->second;
665 if (cur_timer->id() == t.id()) {
666 cur_timer->cancel();
667 ++count;
668
669 auto nxt = std::next(cur);
670 // move the timer over to the cancelled timers
672 cur);
673 cur = nxt;
674 } else {
675 ++cur;
676 }
677 }
678#else
679 auto eq_range = pending_timers_.equal_range(t.id());
680
681 for (auto cur = eq_range.first; cur != eq_range.second;) {
682 auto expiry_eq_range =
683 pending_timer_expiries_.equal_range(cur->second->expiry());
684
685 size_t erase_count{};
686
687 for (auto expiry_cur = expiry_eq_range.first;
688 expiry_cur != expiry_eq_range.second;) {
689 if (expiry_cur->first == cur->second->expiry() &&
690 expiry_cur->second == cur->second->id() && erase_count == 0) {
691 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
692 ++erase_count;
693 } else {
694 ++expiry_cur;
695 }
696 }
697
698 // nothing found ... boom
699 if (erase_count == 0) abort();
700
701 cur->second->cancel();
702
703 // move timer to cancelled timers
704 cancelled_timers_.emplace_back(std::move(cur->second));
705
706 ++count;
707
708 cur = pending_timers_.erase(cur);
709 }
710#endif
711 }
712
713 return count;
714 }
715
717 public:
718 using time_point = typename Timer::time_point;
719 using timer_id = typename Timer::Id *;
720
721 pending_timer(const Timer &timer)
722 : expiry_{timer.expiry()}, id_{timer.id()} {}
723
724 virtual ~pending_timer() = default;
725
726 bool is_cancelled() const { return id_ == nullptr; }
727 void cancel() {
728 id_ = nullptr;
729
730 // ensure that it bubbles up to the top
731 expiry_ = expiry_.min();
732 }
733
734 time_point expiry() const noexcept { return expiry_; }
735 timer_id id() const { return id_; }
736
737 virtual void run() = 0;
738
739 private:
742 };
743
744 template <class Op>
746 public:
747 pending_timer_op(const Timer &timer, Op &&op)
748 : pending_timer(timer), op_{std::move(op)} {}
749
750 void run() override {
751 if (this->is_cancelled()) {
752 op_(make_error_code(std::errc::operation_canceled));
753 } else {
754 op_(std::error_code{});
755 }
756 }
757
758 private:
759 Op op_;
760 };
761
762 // cancelled timers, earliest cancelled timer first
763 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
764
765 // active timers, smallest time-point first
766 std::multimap<typename Timer::time_point, typename Timer::Id *>
768 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
770 };
771
772 /**
773 * async wait for a timer expire.
774 *
775 * adds the op and timer to the timer_queue
776 *
777 * @param timer timer
778 * @param op completion handler to call when timer is triggered
779 */
780 template <class Timer, class Op>
781 void async_wait(const Timer &timer, Op &&op) {
782 auto &queue = use_service<timer_queue<Timer>>(*this);
783
784 queue.push(timer, std::forward<Op>(op));
785
786 // wakeup the blocked poll_one() to handle possible timer events.
787 io_service_->notify();
788 }
789
790 /**
791 * cancel all async-ops of a timer.
792 */
793 template <class Timer>
794 size_t cancel(const Timer &timer) {
795 if (!has_service<timer_queue<Timer>>(*this)) {
796 return 0;
797 }
798
799 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
800 if (count) {
801 // if a timer was canceled, interrupt the io-service
802 io_service_->notify();
803 }
804 return count;
805 }
806
807 // cancel oldest
808 template <class Timer>
809 size_t cancel_one(const Timer & /* timer */) {
810 // TODO: implement if async_wait is implemented
811 return 0;
812 }
813
814 /** pointers to the timer-queues of this io-contexts.
815 *
816 * timer-queues are one per timer-type (std::chrono::steady_clock,
817 * std::chrono::system_clock, ...)
818 *
819 * timer_queue_base is the base class of the timer-queues
820 *
821 * the timer-queue's themselves are ownered by the io_context's executor via
822 * execution_context::add_service()
823 *
824 * protected via 'mtx_'
825 */
826 std::vector<timer_queue_base *> timer_queues_;
827
828 /**
829 * mutex that protects the core parts of the io-context.
830 *
831 * - timer_queues_
832 */
833 mutable std::mutex mtx_{};
834
835 mutable std::mutex do_one_mtx_{};
836 mutable std::condition_variable do_one_cond_{};
837 bool is_running_{false};
838
839 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
840 lk.lock();
842 }
843
844 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
845 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
846
847 is_running(true);
848 }
849
850 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
851 is_running(false);
852 lk.unlock();
853 do_one_cond_.notify_one();
854 }
855
856 void is_running(bool v) { is_running_ = v; }
857 bool is_running() const { return is_running_; }
858};
859} // namespace net
860
861namespace net {
863 count_type n = 0;
864
865 std::unique_lock<std::mutex> lk(do_one_mtx_);
866
867 using namespace std::chrono_literals;
868
869 // in the first round, we already have the lock, the all other rounds we
870 // need to take the lock first
871 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
872 wait_no_runner_(lk)) {
873 if (n != std::numeric_limits<count_type>::max()) ++n;
874 }
875 return n;
876}
877
879 using namespace std::chrono_literals;
880
881 std::unique_lock<std::mutex> lk(do_one_mtx_);
882
884
885 return do_one(lk, -1ms);
886}
887
888template <class Rep, class Period>
890 const std::chrono::duration<Rep, Period> &rel_time) {
891 return run_until(std::chrono::steady_clock::now() + rel_time);
892}
893
894template <class Clock, class Duration>
896 const std::chrono::time_point<Clock, Duration> &abs_time) {
897 count_type n = 0;
898
899 std::unique_lock<std::mutex> lk(do_one_mtx_);
900
901 using namespace std::chrono_literals;
902
903 // in the first round, we already have the lock, the all other rounds we
904 // need to take the lock first
905 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
906 wait_no_runner_(lk)) {
907 if (n != std::numeric_limits<count_type>::max()) ++n;
908 }
909 return n;
910}
911
912template <class Rep, class Period>
914 const std::chrono::duration<Rep, Period> &rel_time) {
915 return run_one_until(std::chrono::steady_clock::now() + rel_time);
916}
917
918template <class Clock, class Duration>
920 const std::chrono::time_point<Clock, Duration> &abs_time) {
921 std::unique_lock<std::mutex> lk(do_one_mtx_);
922
924
925 return do_one_until(lk, abs_time);
926}
927
929 count_type n = 0;
930 std::unique_lock<std::mutex> lk(do_one_mtx_);
931
932 using namespace std::chrono_literals;
933
934 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
935 wait_no_runner_(lk)) {
936 if (n != std::numeric_limits<count_type>::max()) ++n;
937 }
938 return n;
939}
940
942 std::unique_lock<std::mutex> lk(do_one_mtx_);
943
944 using namespace std::chrono_literals;
945
947 return do_one(lk, 0ms);
948}
949
950/**
951 * cancel all async-ops of a file-descriptor.
952 */
955 bool need_notify{false};
956 {
957 // check all async-ops
958 std::lock_guard<std::mutex> lk(mtx_);
959
960 while (auto op = active_ops_.extract_first(fd)) {
961 op->cancel();
962
963 cancelled_ops_.push_back(std::move(op));
964
965 need_notify = true;
966 }
967 }
968
969 // wakeup the loop to deliver the cancelled fds
970 if (true || need_notify) {
971 io_service_->remove_fd(fd);
972 io_service_->notify();
973 }
974
975 return {};
976}
977
979 public:
980 executor_type(const executor_type &rhs) noexcept = default;
981 executor_type(executor_type &&rhs) noexcept = default;
982 executor_type &operator=(const executor_type &rhs) noexcept = default;
983 executor_type &operator=(executor_type &&rhs) noexcept = default;
984
985 ~executor_type() = default;
986
987 bool running_in_this_thread() const noexcept {
988 // TODO: check if this task is running in this thread. Currently, it is
989 // "yes", as we don't allow post()ing to other threads
990
991 // track call-chain
992 return true;
993 }
994 io_context &context() const noexcept { return *io_ctx_; }
995
996 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
997 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
998
999 /**
1000 * execute function.
1001 *
1002 * Effect:
1003 *
1004 * The executor
1005 *
1006 * - MAY block forward progress of the caller until f() finishes.
1007 */
1008 template <class Func, class ProtoAllocator>
1009 void dispatch(Func &&f, const ProtoAllocator &a) const {
1010 if (running_in_this_thread()) {
1011 // run it in this thread.
1012 std::decay_t<Func>(std::forward<Func>(f))();
1013 } else {
1014 // queue function call for later execution.
1015 post(std::forward<Func>(f), a);
1016 }
1017 }
1018
1019 /**
1020 * queue function for execution.
1021 *
1022 * Effects:
1023 *
1024 * The executor
1025 *
1026 * - SHALL NOT block forward progress of the caller pending completion of f().
1027 * - MAY begin f() progress before the call to post completes.
1028 */
1029 template <class Func, class ProtoAllocator>
1030 void post(Func &&f, const ProtoAllocator &a) const {
1031 io_ctx_->defer_work(std::forward<Func>(f), a);
1032 }
1033
1034 /**
1035 * defer function call for later execution.
1036 *
1037 * Effect:
1038 *
1039 * The executor:
1040 *
1041 * - SHALL NOT block forward progress of the caller pending completion of f().
1042 * - SHOULD NOT begin f()'s progress before the call to defer()
1043 * completes.
1044 */
1045 template <class Func, class ProtoAllocator>
1046 void defer(Func &&f, const ProtoAllocator &a) const {
1047 post(std::forward<Func>(f), a);
1048 }
1049
1050 private:
1052
1053 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1054
1056};
1057
1059 const io_context::executor_type &b) noexcept {
1060 return std::addressof(a.context()) == std::addressof(b.context());
1061}
1063 const io_context::executor_type &b) noexcept {
1064 return !(a == b);
1065}
1066
1067// io_context::executor_type is an executor even though it doesn't have an
1068// default constructor
1069template <>
1070struct is_executor<io_context::executor_type> : std::true_type {};
1071
1073 return executor_type(*this);
1074}
1075
1076template <class Clock, class Duration>
1078 std::unique_lock<std::mutex> &lk,
1079 const std::chrono::time_point<Clock, Duration> &abs_time) {
1080 using namespace std::chrono_literals;
1081
1082 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1083 auto rel_time_ms =
1084 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1085
1086 if (rel_time_ms < 0ms) {
1087 // expired already.
1088 rel_time_ms = 0ms;
1089 } else if (rel_time_ms < rel_time) {
1090 // std::chrono::ceil()
1091 rel_time_ms += 1ms;
1092 }
1093
1094 return do_one(lk, rel_time_ms);
1095}
1096
1097// precond: lk MUST be locked
1099 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1100 timer_queue_base *timer_q{nullptr};
1101
1102 monitor mon(*this);
1103
1104 if (!has_outstanding_work()) {
1105 wake_one_runner_(lk);
1106 return 0;
1107 }
1108
1109 while (true) {
1110 // 1. deferred work.
1111 // 2. timer
1112 // 3. triggered events.
1113
1114 // timer (2nd round)
1115 if (timer_q) {
1116 if (timer_q->run_one()) {
1117 wake_one_runner_(lk);
1118 return 1;
1119 } else {
1120 timer_q = nullptr;
1121 }
1122 }
1123
1124 // deferred work
1125 if (deferred_work_.run_one()) {
1126 wake_one_runner_(lk);
1127 return 1;
1128 }
1129
1130 // timer
1131 std::chrono::milliseconds min_duration{0};
1132 {
1133 std::lock_guard<std::mutex> lk(mtx_);
1134 // check the smallest timestamp of all timer-queues
1135 for (auto q : timer_queues_) {
1136 const auto duration = q->next();
1137
1138 if (duration == duration.zero()) {
1139 timer_q = q;
1140 min_duration = duration;
1141 break;
1142 } else if ((duration != duration.max()) &&
1143 (timeout != timeout.zero()) &&
1144 (duration < min_duration || timer_q == nullptr)) {
1145 timer_q = q;
1146 min_duration = duration;
1147 }
1148 }
1149 }
1150
1151 // if we have a timer that has fired, run it right away
1152 if (timer_q && min_duration == min_duration.zero()) continue;
1153
1154 if (auto op = [this]() -> std::unique_ptr<async_op> {
1155 // handle all the cancelled ops without polling first
1156 std::lock_guard<std::mutex> lk(mtx_);
1157
1158 // ops have all cancelled operators at the front
1159 if (!cancelled_ops_.empty() &&
1160 cancelled_ops_.front()->is_cancelled()) {
1161 auto op = std::move(cancelled_ops_.front());
1162
1163 cancelled_ops_.pop_front();
1164
1165 return op;
1166 }
1167
1168 return {};
1169 }()) {
1170 // before we unlock the concurrent io-context-thread-lock increment the
1171 // work-count to ensure the next waiting thread exiting in case:
1172 //
1173 // - no io-events registered
1174 // - no timers registered
1176 wake_one_runner_(lk);
1177 op->run(*this);
1179
1180 return 1;
1181 }
1182
1183 if (stopped() || !io_service_open_res_) {
1184 break;
1185 }
1186
1187 // adjust min-duration according to caller's timeout
1188 //
1189 // - if there is no timer queued, use the caller's timeout
1190 // - if there is a timer queued, reduce min_duration to callers timeout if
1191 // it is lower and non-negative.
1192 //
1193 // note: negative timeout == infinite.
1194 if (timer_q == nullptr ||
1195 (timeout > timeout.zero() && timeout < min_duration)) {
1196 min_duration = timeout;
1197 }
1198
1199 auto res = io_service_->poll_one(min_duration);
1200 if (!res) {
1201 if (res.error() == std::errc::interrupted) {
1202 // poll again as it got interrupted
1203 continue;
1204 }
1205 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1206 timer_q != nullptr) {
1207 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1208 // less than the global timeout or there is no timeout.
1209 continue;
1210 }
1211
1212 wake_one_runner_(lk);
1213
1214#if 0
1215 // if the poll returns another error, it is a ok that we don't further
1216 // check it as we exit cleanly. Still it would be nice to be aware of it
1217 // in debug builds.
1218 assert(res.error() == io_service_errc::no_fds ||
1219 res.error() == std::errc::timed_out);
1220#endif
1221
1222 // either poll() timed out or there where no file-descriptors that fired
1223 return 0;
1224 }
1225
1226 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1227 // << res.value().event << std::endl;
1228
1229 if (auto op = [this](native_handle_type fd,
1230 short events) -> std::unique_ptr<async_op> {
1231 std::lock_guard<std::mutex> lk(mtx_);
1232
1233 return active_ops_.extract_first(fd, events);
1234 }(res->fd, res->event)) {
1236 wake_one_runner_(lk);
1237 op->run(*this);
1239 return 1;
1240 }
1241 // we may not find an async-op for this event if it already has been
1242 // cancelled. Loop around let the "is-cancelled" check handle it.
1243 }
1244
1245 wake_one_runner_(lk);
1246 return 0;
1247}
1248
1249} // namespace net
1250
1251#endif
Definition: io_service_base.h:75
virtual void notify()=0
Definition: socket.h:1408
template-less base-class of basic_socket_impl.
Definition: socket.h:334
Definition: socket.h:467
Definition: socket.h:707
Definition: socket.h:1140
Definition: timer.h:56
Definition: executor.h:307
execution_context & context() noexcept
Definition: executor.h:313
Definition: executor.h:151
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:299
Definition: socket_service_base.h:47
Definition: io_context.h:366
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:433
std::mutex mtx_
Definition: io_context.h:436
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:391
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:403
element_type extract_first(native_handle_type fd)
Definition: io_context.h:397
bool has_outstanding_work() const
Definition: io_context.h:370
void push_back(element_type &&t)
Definition: io_context.h:376
std::unique_ptr< async_op > element_type
Definition: io_context.h:368
Definition: io_context.h:159
Callable(Func &&f)
Definition: io_context.h:161
void invoke() override
Definition: io_context.h:163
Func f_
Definition: io_context.h:166
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:145
std::list< op_type > work_
Definition: io_context.h:227
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:169
std::mutex work_mtx_
Definition: io_context.h:226
size_t run_one()
run a deferred work item.
Definition: io_context.h:177
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:220
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:208
async operation with callback.
Definition: io_context.h:349
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:351
Op op_
Definition: io_context.h:363
void run(io_context &) override
Definition: io_context.h:354
base class of async operation.
Definition: io_context.h:324
native_handle_type fd_
Definition: io_context.h:341
wait_type event() const
Definition: io_context.h:338
virtual ~async_op()=default
void cancel()
Definition: io_context.h:334
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:328
bool is_cancelled() const
Definition: io_context.h:335
native_handle_type native_handle() const
Definition: io_context.h:337
wait_type event_
Definition: io_context.h:342
Definition: io_context.h:978
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1009
io_context * io_ctx_
Definition: io_context.h:1055
void on_work_started() const noexcept
Definition: io_context.h:996
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1051
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:994
executor_type(io_context &ctx)
Definition: io_context.h:1053
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1030
bool running_in_this_thread() const noexcept
Definition: io_context.h:987
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1046
void on_work_finished() const noexcept
Definition: io_context.h:997
Definition: io_context.h:293
io_context & ctx_
Definition: io_context.h:313
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:295
~monitor()
Definition: io_context.h:300
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:747
Op op_
Definition: io_context.h:759
void run() override
Definition: io_context.h:750
void cancel()
Definition: io_context.h:727
pending_timer(const Timer &timer)
Definition: io_context.h:721
time_point expiry() const noexcept
Definition: io_context.h:734
typename Timer::time_point time_point
Definition: io_context.h:718
timer_id id() const
Definition: io_context.h:735
typename Timer::Id * timer_id
Definition: io_context.h:719
bool is_cancelled() const
Definition: io_context.h:726
time_point expiry_
Definition: io_context.h:740
timer_id id_
Definition: io_context.h:741
Definition: io_context.h:474
timer_queue_base(execution_context &ctx)
Definition: io_context.h:476
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:478
Definition: io_context.h:486
std::chrono::milliseconds next() const override
Definition: io_context.h:538
size_t cancel(const Timer &t)
Definition: io_context.h:652
io_context & context() noexcept
Definition: io_context.h:505
bool run_one() override
Definition: io_context.h:584
void shutdown() noexcept override
Definition: io_context.h:503
timer_queue(execution_context &ctx)
Definition: io_context.h:490
void push(const Timer &timer, Op &&op)
Definition: io_context.h:510
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:767
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:769
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:763
Definition: io_context.h:59
count_type poll()
Definition: io_context.h:928
bool is_running_
Definition: io_context.h:837
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:833
std::condition_variable do_one_cond_
Definition: io_context.h:836
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:895
std::atomic< count_type > work_count_
Definition: io_context.h:269
count_type run_one()
Definition: io_context.h:878
io_context()
Definition: io_context.h:66
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:850
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:844
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:272
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:273
DeferredWork deferred_work_
Definition: io_context.h:230
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:236
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1077
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:889
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:781
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:137
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:913
size_t cancel_one(const Timer &)
Definition: io_context.h:809
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:64
std::mutex do_one_mtx_
Definition: io_context.h:835
io_context(int)
Definition: io_context.h:77
IoServiceBase * io_service() const
Definition: io_context.h:126
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:274
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1098
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:442
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:70
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:122
bool is_running() const
Definition: io_context.h:857
AsyncOps active_ops_
Definition: io_context.h:439
bool stopped_
Definition: io_context.h:268
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:826
count_type poll_one()
Definition: io_context.h:941
void stop()
Definition: io_context.h:103
count_type run()
Definition: io_context.h:862
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:839
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:445
executor_type get_executor() noexcept
Definition: io_context.h:1072
void restart()
Definition: io_context.h:117
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:794
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:953
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:919
void is_running(bool v)
Definition: io_context.h:856
size_t count_type
Definition: io_context.h:63
bool has_outstanding_work() const
Definition: io_context.h:282
bool stopped() const noexcept
Definition: io_context.h:112
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:54
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:936
static std::unique_ptr< net::io_context > io_ctx
Definition: io_plugin.cc:104
static int count
Definition: myisam_ftdump.cc:42
static QUEUE queue
Definition: myisampack.cc:206
static bool interrupted
Definition: mysqladmin.cc:65
Definition: authentication.cc:35
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:493
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:56
wait_type
Definition: socket_constants.h:85
int native_handle_type
Definition: socket_constants.h:50
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:51
Definition: buffer.h:42
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:575
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:571
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:78
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: varlen_sort.h:183
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:81
Definition: executor.h:369
int n
Definition: xcom_base.cc:505
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4053