MySQL 8.0.43
Source Code Documentation
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
28
29#include <atomic>
30#include <chrono>
31#include <iterator>
32#include <limits> // numeric_limits
33#include <list>
34#include <map>
35#include <memory> // unique_ptr
36#include <mutex>
37#include <system_error> // error_code
38#include <unordered_map>
39#include <utility>
40#include <vector>
41
42#include "my_config.h" // HAVE_EPOLL
52
53namespace net {
54
55#if defined(HAVE_EPOLL)
57#else
59#endif
60
62 public:
63 class executor_type;
64
65 using count_type = size_t;
67
69 : io_context{std::make_unique<net::impl::socket::SocketService>(),
71
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
74 std::unique_ptr<IoServiceBase> &&io_service)
78
79 explicit io_context(int /* concurrency_hint */) : io_context() {}
80
83 cancelled_ops_.clear();
84 // Make sure the services are destroyed before our internal fields. The
85 // services own the timers that can indirectly call our methods when
86 // destructed. See UT NetTS_io_context.pending_timer_on_destroy for an
87 // example.
88 destroy();
89 }
90
91 io_context(const io_context &) = delete;
92 io_context &operator=(const io_context &) = delete;
93
94 executor_type get_executor() noexcept;
95
97
98 template <class Rep, class Period>
99 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
100
101 template <class Clock, class Duration>
103 const std::chrono::time_point<Clock, Duration> &abs_time);
104
106
107 template <class Rep, class Period>
108 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
109
110 template <class Clock, class Duration>
112 const std::chrono::time_point<Clock, Duration> &abs_time);
113
116 void stop() {
117 {
118 std::lock_guard<std::mutex> lk(mtx_);
119 stopped_ = true;
120 }
121
123 }
124
125 bool stopped() const noexcept {
126 std::lock_guard<std::mutex> lk(mtx_);
127 return stopped_;
128 }
129
130 void restart() {
131 std::lock_guard<std::mutex> lk(mtx_);
132 stopped_ = false;
133 }
134
136 return socket_service_.get();
137 }
138
139 IoServiceBase *io_service() const { return io_service_.get(); }
140
141 /**
142 * get the status of the implicit open() call of the io-service.
143 *
144 * the io_service_.open() may fail due to out-of-file-descriptors.
145 *
146 * run() will fail silently if the io-service failed to open.
147 *
148 * @returns std::error_code on error
149 */
152 }
153
154 private:
155 /**
156 * queued work from io_context::executor_type::dispatch()/post()/defer().
157 */
159 public:
160 // simple, generic storage of callable.
161 //
162 // std::function<void()> is similar, but doesn't work for move-only
163 // callables like lambda's that capture a move-only type
165 public:
166 virtual ~BasicCallable() = default;
167
168 virtual void invoke() = 0;
169 };
170
171 template <class Func>
172 class Callable : public BasicCallable {
173 public:
174 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
175
176 void invoke() override { f_(); }
177
178 private:
179 Func f_;
180 };
181
182 using op_type = std::unique_ptr<BasicCallable>;
183
184 /**
185 * run a deferred work item.
186 *
187 * @returns number work items run.
188 * @retval 0 work queue was empty, nothing was run.
189 */
190 size_t run_one() {
191 // tmp list to hold the current operation to run.
192 //
193 // makes it simple and fast to move the head element and shorten the time
194 // the lock is held.
195 decltype(work_) tmp;
196
197 // lock is only needed as long as we modify the work-queue.
198 {
199 std::lock_guard<std::mutex> lk(work_mtx_);
200
201 if (work_.empty()) return 0;
202
203 // move the head of the work queue out and release the lock.
204 //
205 // note: std::list.splice() moves pointers.
206 tmp.splice(tmp.begin(), work_, work_.begin());
207 }
208
209 // run the deferred work.
210 tmp.front()->invoke();
211
212 // and destruct the list at the end.
213
214 return 1;
215 }
216
217 /**
218 * queue work for later execution.
219 */
220 template <class Func, class ProtoAllocator>
221 void post(Func &&f, const ProtoAllocator & /* a */) {
222 std::lock_guard<std::mutex> lk(work_mtx_);
223
224 work_.emplace_back(
225 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
226 }
227
228 /**
229 * check if work is queued for later execution.
230 *
231 * @retval true if some work is queued.
232 */
233 bool has_outstanding_work() const {
234 std::lock_guard<std::mutex> lk(work_mtx_);
235 return !work_.empty();
236 }
237
238 private:
239 mutable std::mutex work_mtx_;
240 std::list<op_type> work_;
241 };
242
244
245 /**
246 * defer work for later execution.
247 */
248 template <class Func, class ProtoAllocator>
249 void defer_work(Func &&f, const ProtoAllocator &a) {
250 deferred_work_.post(std::forward<Func>(f), a);
251
252 // wakeup the possibly blocked io-thread.
254 }
255
256 template <class Clock, class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
260
261 count_type do_one(std::unique_lock<std::mutex> &lk,
262 std::chrono::milliseconds timeout);
263
264 template <typename _Clock, typename _WaitTraits>
266
268
269 template <class Protocol>
270 friend class basic_socket_impl;
271
272 template <class Protocol>
273 friend class basic_socket;
274
275 template <class Protocol>
277
278 template <class Protocol>
280
281 bool stopped_{false};
282 std::atomic<count_type> work_count_{};
283
284 // must be first member-var to ensure it is destroyed last
285 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
286 std::unique_ptr<IoServiceBase> io_service_;
288
289 // has outstanding work
290 //
291 // work is outstanding when
292 //
293 // - work-count from on_work_started()/on_work_finished() is more than 0 and
294 // - any active or cancelled operations are still ongoing
295 bool has_outstanding_work() const {
296 if (!cancelled_ops_.empty()) return true;
297 if (active_ops_.has_outstanding_work()) return true;
298 if (deferred_work_.has_outstanding_work()) return true;
299
300 if (work_count_ > 0) return true;
301
302 return false;
303 }
304
305 // monitor all .run()s in the io-context needs to be stopped.
306 class monitor {
307 public:
308 monitor(io_context &ctx) : ctx_{ctx} {}
309
310 monitor(const monitor &) = delete;
311 monitor(monitor &&) = delete;
312
314 std::lock_guard<std::mutex> lk(ctx_.mtx_);
315
316 // ctx_.call_stack_.pop_back();
317
318 if (!ctx_.has_outstanding_work()) {
319 // like stop(), just that we already have the mutex
320 ctx_.stopped_ = true;
321 ctx_.io_service_->notify();
322 }
323 }
324
325 private:
327 };
328
330
331 /**
332 * base class of async operation.
333 *
334 * - file-descriptor
335 * - wait-event
336 */
337 class async_op {
338 public:
340
342
343 virtual ~async_op() = default;
344
345 virtual void run(io_context &) = 0;
346
349
351 wait_type event() const { return event_; }
352
353 private:
356 };
357
358 /**
359 * async operation with callback.
360 */
361 template <class Op>
362 class async_op_impl : public async_op {
363 public:
365 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
366
367 void run(io_context & /* io_ctx */) override {
368 if (is_cancelled()) {
369 op_(make_error_code(std::errc::operation_canceled));
370 } else {
371 op_(std::error_code{});
372 }
373 }
374
375 private:
376 Op op_;
377 };
378
379 class AsyncOps {
380 public:
381 using element_type = std::unique_ptr<async_op>;
382
383 bool has_outstanding_work() const {
384 std::lock_guard<std::mutex> lk(mtx_);
385
386 return !ops_.empty();
387 }
388
390 const auto handle = t->native_handle();
391
392 std::lock_guard<std::mutex> lk(mtx_);
393
394 auto it = ops_.find(handle);
395 if (it != ops_.end()) {
396 it->second.push_back(std::move(t));
397 } else {
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
400 ops_.emplace(handle, std::move(v));
401 }
402 }
403
405 return extract_first(fd, [events](auto const &el) {
406 return static_cast<short>(el->event()) & events;
407 });
408 }
409
411 return extract_first(fd, [](auto const &) { return true; });
412 }
413
414 void release_all() {
415 // We expect that this method is called before AsyncOps destructor, to
416 // make sure that ops_ map is empty when the destructor executes. If the
417 // ops_ is not empty when destructed, the destructor of its element can
418 // trigger a method that will try to access that map (that is destructed).
419 // Example: we have an AsyncOp that captures some Socket object with a
420 // smart pointer. When the destructor of this AsyncOp is called, it can
421 // also call the destructor of that Socket, which in turn will call
422 // socket.close(), causing the Socket to unregister its operations in
423 // its respective io_context object which is us (calls extract_first()).
424 std::list<element_type> ops_to_delete;
425 {
426 std::lock_guard<std::mutex> lk(mtx_);
427 for (auto &fd_ops : ops_) {
428 for (auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
430 }
431 }
432 ops_.clear();
433 // It is important that we release the mtx_ here before the
434 // ops_to_delete go out of scope and are deleted. AsyncOp destructor can
435 // indirectly call extract_first which would lead to a deadlock.
436 }
437 }
438
439 private:
440 template <class Pred>
442 std::lock_guard<std::mutex> lk(mtx_);
443
444 const auto it = ops_.find(fd);
445 if (it != ops_.end()) {
446 auto &async_ops = it->second;
447
448 const auto end = async_ops.end();
449 for (auto cur = async_ops.begin(); cur != end; ++cur) {
450 auto &el = *cur;
451
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
454
455 if (async_ops.size() == 1) {
456 // remove the current container and with it its only element
457 ops_.erase(it);
458 } else {
459 // remove the current entry
460 async_ops.erase(cur);
461 }
462
463 return op;
464 }
465 }
466 }
467
468 return {};
469 }
470
471 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
472 16 * 1024};
473
474 mutable std::mutex mtx_;
475 };
476
478
479 // cancelled async operators.
480 std::list<std::unique_ptr<async_op>> cancelled_ops_;
481
482 template <class Op>
484 // add the socket-wait op to the queue
486 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
487
488 {
489 auto res = io_service_->add_fd_interest(fd, wt);
490 if (!res) {
491#if 0
492 // fd may be -1 or so
493 std::cerr << "!! add_fd_interest(" << fd << ", ..."
494 << ") " << res.error() << " " << res.error().message()
495 << std::endl;
496#endif
497 // adding failed. Cancel it again.
498 //
499 // code should be similar to ::cancel(fd)
500 std::lock_guard<std::mutex> lk(mtx_);
501
502 if (auto op = active_ops_.extract_first(fd, static_cast<short>(wt))) {
503 op->cancel();
504 cancelled_ops_.push_back(std::move(op));
505 }
506 }
507 }
508
510 }
511
513 protected:
515
516 mutable std::mutex queue_mtx_;
517
518 public:
519 virtual bool run_one() = 0;
520 virtual std::chrono::milliseconds next() const = 0;
521 };
522
523 template <class Timer>
525 public:
527
529 // add timer_queue to io_context
530
531 auto &io_ctx = static_cast<io_context &>(ctx);
532
533 // @note: don't move this lock+push into the timer_queue_base constructor
534 //
535 // @see
536 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
537 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
538 io_ctx.timer_queues_.push_back(this);
539 }
540
541 ~timer_queue() override {
542 // remove every pending-timer from the pending-timers before destroying to
543 // avoid the owned Timer to destruct the pending-timer a 2nd time.
544 //
545 // A pending_timer_.erase() does NOT work here as it first destructs the
546 // node and then updates the iterators.
547 for (auto cur = pending_timers_.begin(); cur != pending_timers_.end();
548 cur = pending_timers_.begin()) {
549 // extract and then destroy it.
550 pending_timers_.extract(cur);
551 }
552 }
553
554 void shutdown() noexcept override {}
555
556 io_context &context() noexcept {
557 return static_cast<io_context &>(service::context());
558 }
559
560 template <class Op>
561 void push(const Timer &timer, Op &&op) {
563
564 std::lock_guard<std::mutex> lk(queue_mtx_);
565
566#if 0
567 pending_timers_.insert(
568 std::upper_bound(
569 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
570 [](const auto &a, const auto &b) { return a < b->expiry(); }),
571 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
572#else
573 if (timer.id() == nullptr) abort();
574
575 // add timer
576 pending_timers_.emplace(std::make_pair(
577 timer.id(),
578 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
579
580 if (timer.id() == nullptr) abort();
581 if (timer.expiry() == Timer::time_point::min()) abort();
582
583 // sorted timer ids by expiry
585 std::make_pair(timer.expiry(), timer.id()));
586#endif
587 }
588
589 std::chrono::milliseconds next() const override {
590 typename Timer::time_point expiry;
591 {
592 std::lock_guard<std::mutex> lk(queue_mtx_);
593
594 // no pending timers, return the max-timeout
595 if (cancelled_timers_.empty()) {
596 if (pending_timer_expiries_.empty())
597 return std::chrono::milliseconds::max();
598
599#if 0
600 expiry = pending_timers_.front()->expiry();
601#else
602 expiry = pending_timer_expiries_.begin()->first;
603#endif
604 } else {
605 // cancelled timers should be executed directly
606 return std::chrono::milliseconds::min();
607 }
608
609 // the lock isn't needed anymore.
610 }
611
612 auto duration = Timer::traits_type::to_wait_duration(expiry);
613 if (duration < duration.zero()) {
614 duration = duration.zero();
615 }
616
617 // round up the next wait-duration to wait /at least/ the expected time.
618 //
619 // In case the expiry is 990us, wait 1ms
620 // If it is 0ns, leave it at 0ms;
621
622 auto duration_ms =
623 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
624
625 using namespace std::chrono_literals;
626
627 // round up to the next millisecond.
628 if ((duration - duration_ms).count() != 0) {
629 duration_ms += 1ms;
630 }
631
632 return duration_ms;
633 }
634
635 bool run_one() override {
636 std::unique_ptr<pending_timer> pt;
637
638 {
639 std::lock_guard<std::mutex> lk(queue_mtx_);
640
641 // if the pending-timers queue is empty, leave
642 // if the top is cancelled or expired, run it
643 if (cancelled_timers_.empty()) {
644 if (pending_timers_.empty()) return false;
645
646#if 0
647 // list
648 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
649 return false;
650 }
651 pt = std::move(pending_timers_.front());
652 pending_timers_.pop_front();
653#else
654 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
655
656 auto min = Timer::time_point::min();
657 for (const auto &cur : pending_timer_expiries_) {
658 if (cur.first < min) abort();
659
660 min = cur.first;
661 }
662
663 const auto now = Timer::clock_type::now();
664
665 // multimap
666 auto pending_expiry_it = pending_timer_expiries_.begin();
667 auto timepoint = pending_expiry_it->first;
668
669 if (timepoint > now) {
670 // not expired yet. leave
671 return false;
672 }
673 typename Timer::Id *timer_id = pending_expiry_it->second;
674
675 auto pending_it = pending_timers_.find(timer_id);
676 if (pending_it == pending_timers_.end()) {
677 abort();
678 }
679 if (pending_it->second->id() != timer_id) {
680 abort();
681 }
682 if (pending_it->second->expiry() != pending_expiry_it->first) {
683 abort();
684 }
685
686 pt = std::move(pending_it->second);
687 pending_timer_expiries_.erase(pending_expiry_it);
688 pending_timers_.erase(pending_it);
689#endif
690 } else {
691 pt = std::move(cancelled_timers_.front());
692 cancelled_timers_.pop_front();
693 }
694 }
695
696 pt->run();
697
699
700 return true;
701 }
702
703 size_t cancel(const Timer &t) {
704 size_t count{};
705
706 {
707 std::lock_guard<std::mutex> lk(queue_mtx_);
708
709#if 0
710 const auto end = pending_timers_.end();
711
712 // the same timer may be pushed multiple times to the queue
713 // therefore, check all entries
714 for (auto cur = pending_timers_.begin(); cur != end;) {
715 auto &cur_timer = cur->second;
716 if (cur_timer->id() == t.id()) {
717 cur_timer->cancel();
718 ++count;
719
720 auto nxt = std::next(cur);
721 // move the timer over to the cancelled timers
723 cur);
724 cur = nxt;
725 } else {
726 ++cur;
727 }
728 }
729#else
730 auto eq_range = pending_timers_.equal_range(t.id());
731
732 for (auto cur = eq_range.first; cur != eq_range.second;) {
733 auto expiry_eq_range =
734 pending_timer_expiries_.equal_range(cur->second->expiry());
735
736 size_t erase_count{};
737
738 for (auto expiry_cur = expiry_eq_range.first;
739 expiry_cur != expiry_eq_range.second;) {
740 if (expiry_cur->first == cur->second->expiry() &&
741 expiry_cur->second == cur->second->id() && erase_count == 0) {
742 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
743 ++erase_count;
744 } else {
745 ++expiry_cur;
746 }
747 }
748
749 // nothing found ... boom
750 if (erase_count == 0) abort();
751
752 cur->second->cancel();
753
754 // move timer to cancelled timers
755 cancelled_timers_.emplace_back(std::move(cur->second));
756
757 ++count;
758
759 cur = pending_timers_.erase(cur);
760 }
761#endif
762 }
763
764 return count;
765 }
766
768 public:
769 using time_point = typename Timer::time_point;
770 using timer_id = typename Timer::Id *;
771
772 pending_timer(const Timer &timer)
773 : expiry_{timer.expiry()}, id_{timer.id()} {}
774
775 virtual ~pending_timer() = default;
776
777 bool is_cancelled() const { return id_ == nullptr; }
778 void cancel() {
779 id_ = nullptr;
780
781 // ensure that it bubbles up to the top
782 expiry_ = expiry_.min();
783 }
784
785 time_point expiry() const noexcept { return expiry_; }
786 timer_id id() const { return id_; }
787
788 virtual void run() = 0;
789
790 private:
793 };
794
795 template <class Op>
797 public:
798 pending_timer_op(const Timer &timer, Op &&op)
799 : pending_timer(timer), op_{std::move(op)} {}
800
801 void run() override {
802 if (this->is_cancelled()) {
803 op_(make_error_code(std::errc::operation_canceled));
804 } else {
805 op_(std::error_code{});
806 }
807 }
808
809 private:
810 Op op_;
811 };
812
813 // cancelled timers, earliest cancelled timer first
814 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
815
816 // active timers, smallest time-point first
817 std::multimap<typename Timer::time_point, typename Timer::Id *>
819 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
821 };
822
823 /**
824 * async wait for a timer expire.
825 *
826 * adds the op and timer to the timer_queue
827 *
828 * @param timer timer
829 * @param op completion handler to call when timer is triggered
830 */
831 template <class Timer, class Op>
832 void async_wait(const Timer &timer, Op &&op) {
833 auto &queue = use_service<timer_queue<Timer>>(*this);
834
835 queue.push(timer, std::forward<Op>(op));
836
837 // wakeup the blocked poll_one() to handle possible timer events.
839 }
840
841 /**
842 * cancel all async-ops of a timer.
843 */
844 template <class Timer>
845 size_t cancel(const Timer &timer) {
846 if (!has_service<timer_queue<Timer>>(*this)) {
847 return 0;
848 }
849
850 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
851 if (count) {
852 // if a timer was canceled, interrupt the io-service
854 }
855 return count;
856 }
857
858 // cancel oldest
859 template <class Timer>
860 size_t cancel_one(const Timer & /* timer */) {
861 // TODO: implement if async_wait is implemented
862 return 0;
863 }
864
865 /** pointers to the timer-queues of this io-contexts.
866 *
867 * timer-queues are one per timer-type (std::chrono::steady_clock,
868 * std::chrono::system_clock, ...)
869 *
870 * timer_queue_base is the base class of the timer-queues
871 *
872 * the timer-queue's themselves are ownered by the io_context's executor via
873 * execution_context::add_service()
874 *
875 * protected via 'mtx_'
876 */
877 std::vector<timer_queue_base *> timer_queues_;
878
879 /**
880 * mutex that protects the core parts of the io-context.
881 *
882 * - timer_queues_
883 */
884 mutable std::mutex mtx_{};
885
886 mutable std::mutex do_one_mtx_{};
887 mutable std::condition_variable do_one_cond_{};
888 bool is_running_{false};
889
890 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
891 lk.lock();
893 }
894
895 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
896 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
897
898 is_running(true);
899 }
900
901 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
902 is_running(false);
903 lk.unlock();
904 do_one_cond_.notify_one();
905 }
906
907 void is_running(bool v) { is_running_ = v; }
908 bool is_running() const { return is_running_; }
909
911};
912} // namespace net
913
914namespace net {
916 count_type n = 0;
917
918 std::unique_lock<std::mutex> lk(do_one_mtx_);
919
920 using namespace std::chrono_literals;
921
922 // in the first round, we already have the lock, the all other rounds we
923 // need to take the lock first
924 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
925 wait_no_runner_(lk)) {
926 if (n != std::numeric_limits<count_type>::max()) ++n;
927 }
928 return n;
929}
930
932 using namespace std::chrono_literals;
933
934 std::unique_lock<std::mutex> lk(do_one_mtx_);
935
937
938 return do_one(lk, -1ms);
939}
940
941template <class Rep, class Period>
943 const std::chrono::duration<Rep, Period> &rel_time) {
944 return run_until(std::chrono::steady_clock::now() + rel_time);
945}
946
947template <class Clock, class Duration>
949 const std::chrono::time_point<Clock, Duration> &abs_time) {
950 count_type n = 0;
951
952 std::unique_lock<std::mutex> lk(do_one_mtx_);
953
954 using namespace std::chrono_literals;
955
956 // in the first round, we already have the lock, the all other rounds we
957 // need to take the lock first
958 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
959 wait_no_runner_(lk)) {
960 if (n != std::numeric_limits<count_type>::max()) ++n;
961 }
962 return n;
963}
964
965template <class Rep, class Period>
967 const std::chrono::duration<Rep, Period> &rel_time) {
968 return run_one_until(std::chrono::steady_clock::now() + rel_time);
969}
970
971template <class Clock, class Duration>
973 const std::chrono::time_point<Clock, Duration> &abs_time) {
974 std::unique_lock<std::mutex> lk(do_one_mtx_);
975
977
978 return do_one_until(lk, abs_time);
979}
980
982 count_type n = 0;
983 std::unique_lock<std::mutex> lk(do_one_mtx_);
984
985 using namespace std::chrono_literals;
986
987 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
988 wait_no_runner_(lk)) {
989 if (n != std::numeric_limits<count_type>::max()) ++n;
990 }
991 return n;
992}
993
995 std::unique_lock<std::mutex> lk(do_one_mtx_);
996
997 using namespace std::chrono_literals;
998
1000 return do_one(lk, 0ms);
1001}
1002
1004 public:
1005 executor_type(const executor_type &rhs) noexcept = default;
1006 executor_type(executor_type &&rhs) noexcept = default;
1007 executor_type &operator=(const executor_type &rhs) noexcept = default;
1008 executor_type &operator=(executor_type &&rhs) noexcept = default;
1009
1010 ~executor_type() = default;
1011
1012 bool running_in_this_thread() const noexcept {
1014 }
1015 io_context &context() const noexcept { return *io_ctx_; }
1016
1017 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
1018 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
1019
1020 /**
1021 * execute function.
1022 *
1023 * Effect:
1024 *
1025 * The executor
1026 *
1027 * - MAY block forward progress of the caller until f() finishes.
1028 */
1029 template <class Func, class ProtoAllocator>
1030 void dispatch(Func &&f, const ProtoAllocator &a) const {
1031 if (running_in_this_thread()) {
1032 // run it in this thread.
1033 std::decay_t<Func>(std::forward<Func>(f))();
1034 } else {
1035 // queue function call for later execution.
1036 post(std::forward<Func>(f), a);
1037 }
1038 }
1039
1040 /**
1041 * queue function for execution.
1042 *
1043 * Effects:
1044 *
1045 * The executor
1046 *
1047 * - SHALL NOT block forward progress of the caller pending completion of f().
1048 * - MAY begin f() progress before the call to post completes.
1049 */
1050 template <class Func, class ProtoAllocator>
1051 void post(Func &&f, const ProtoAllocator &a) const {
1052 io_ctx_->defer_work(std::forward<Func>(f), a);
1053 }
1054
1055 /**
1056 * defer function call for later execution.
1057 *
1058 * Effect:
1059 *
1060 * The executor:
1061 *
1062 * - SHALL NOT block forward progress of the caller pending completion of f().
1063 * - SHOULD NOT begin f()'s progress before the call to defer()
1064 * completes.
1065 */
1066 template <class Func, class ProtoAllocator>
1067 void defer(Func &&f, const ProtoAllocator &a) const {
1068 post(std::forward<Func>(f), a);
1069 }
1070
1071 private:
1073
1074 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1075
1077};
1078
1080 const io_context::executor_type &b) noexcept {
1081 return std::addressof(a.context()) == std::addressof(b.context());
1082}
1084 const io_context::executor_type &b) noexcept {
1085 return !(a == b);
1086}
1087
1088// io_context::executor_type is an executor even though it doesn't have an
1089// default constructor
1090template <>
1091struct is_executor<io_context::executor_type> : std::true_type {};
1092
1094 return executor_type(*this);
1095}
1096
1097/**
1098 * cancel all async-ops of a file-descriptor.
1099 */
1101 native_handle_type fd) {
1102 bool need_notify{false};
1103 {
1104 // check all async-ops
1105 std::lock_guard<std::mutex> lk(mtx_);
1106
1107 while (auto op = active_ops_.extract_first(fd)) {
1108 op->cancel();
1109
1110 cancelled_ops_.push_back(std::move(op));
1111
1112 need_notify = true;
1113 }
1114 }
1115
1116 // wakeup the loop to deliver the cancelled fds
1117 if (true || need_notify) {
1118 io_service_->remove_fd(fd);
1119
1121 }
1122
1123 return {};
1124}
1125
1126template <class Clock, class Duration>
1128 std::unique_lock<std::mutex> &lk,
1129 const std::chrono::time_point<Clock, Duration> &abs_time) {
1130 using namespace std::chrono_literals;
1131
1132 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1133 auto rel_time_ms =
1134 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1135
1136 if (rel_time_ms < 0ms) {
1137 // expired already.
1138 rel_time_ms = 0ms;
1139 } else if (rel_time_ms < rel_time) {
1140 // std::chrono::ceil()
1141 rel_time_ms += 1ms;
1142 }
1143
1144 return do_one(lk, rel_time_ms);
1145}
1146
1148 if (impl::Callstack<io_context>::contains(this) == nullptr) {
1149 io_service_->notify();
1150 }
1151}
1152
1153// precond: lk MUST be locked
1155 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1157
1158 timer_queue_base *timer_q{nullptr};
1159
1160 monitor mon(*this);
1161
1162 if (!has_outstanding_work()) {
1163 wake_one_runner_(lk);
1164 return 0;
1165 }
1166
1167 while (true) {
1168 // 1. deferred work.
1169 // 2. timer
1170 // 3. triggered events.
1171
1172 // timer (2nd round)
1173 if (timer_q) {
1174 if (timer_q->run_one()) {
1175 wake_one_runner_(lk);
1176 return 1;
1177 } else {
1178 timer_q = nullptr;
1179 }
1180 }
1181
1182 // deferred work
1183 if (deferred_work_.run_one()) {
1184 wake_one_runner_(lk);
1185 return 1;
1186 }
1187
1188 // timer
1189 std::chrono::milliseconds min_duration{0};
1190 {
1191 std::lock_guard<std::mutex> lk(mtx_);
1192 // check the smallest timestamp of all timer-queues
1193 for (auto q : timer_queues_) {
1194 const auto duration = q->next();
1195
1196 if (duration == duration.zero()) {
1197 timer_q = q;
1198 min_duration = duration;
1199 break;
1200 } else if ((duration != duration.max()) &&
1201 (timeout != timeout.zero()) &&
1202 (duration < min_duration || timer_q == nullptr)) {
1203 timer_q = q;
1204 min_duration = duration;
1205 }
1206 }
1207 }
1208
1209 // if we have a timer that has fired or was cancelled, run it right away
1210 if (timer_q && min_duration <= min_duration.zero()) continue;
1211
1212 if (auto op = [this]() -> std::unique_ptr<async_op> {
1213 // handle all the cancelled ops without polling first
1214 std::lock_guard<std::mutex> lk(mtx_);
1215
1216 // ops have all cancelled operators at the front
1217 if (!cancelled_ops_.empty() &&
1218 cancelled_ops_.front()->is_cancelled()) {
1219 auto op = std::move(cancelled_ops_.front());
1220
1221 cancelled_ops_.pop_front();
1222
1223 return op;
1224 }
1225
1226 return {};
1227 }()) {
1228 // before we unlock the concurrent io-context-thread-lock increment the
1229 // work-count to ensure the next waiting thread exiting in case:
1230 //
1231 // - no io-events registered
1232 // - no timers registered
1234 wake_one_runner_(lk);
1235 op->run(*this);
1237
1238 return 1;
1239 }
1240
1241 if (stopped() || !io_service_open_res_) {
1242 break;
1243 }
1244
1245 // adjust min-duration according to caller's timeout
1246 //
1247 // - if there is no timer queued, use the caller's timeout
1248 // - if there is a timer queued, reduce min_duration to callers timeout if
1249 // it is lower and non-negative.
1250 //
1251 // note: negative timeout == infinite.
1252 if (timer_q == nullptr ||
1253 (timeout > timeout.zero() && timeout < min_duration)) {
1254 min_duration = timeout;
1255 }
1256
1257 auto res = io_service_->poll_one(min_duration);
1258 if (!res) {
1259 if (res.error() == std::errc::interrupted) {
1260 // poll again as it got interrupted
1261 continue;
1262 }
1263 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1264 timer_q != nullptr) {
1265 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1266 // less than the global timeout or there is no timeout.
1267 continue;
1268 }
1269
1270 wake_one_runner_(lk);
1271
1272#if 0
1273 // if the poll returns another error, it is a ok that we don't further
1274 // check it as we exit cleanly. Still it would be nice to be aware of it
1275 // in debug builds.
1276 assert(res.error() == io_service_errc::no_fds ||
1277 res.error() == std::errc::timed_out);
1278#endif
1279
1280 // either poll() timed out or there where no file-descriptors that fired
1281 return 0;
1282 }
1283
1284 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1285 // << res.value().event << std::endl;
1286
1287 if (auto op = [this](native_handle_type fd,
1288 short events) -> std::unique_ptr<async_op> {
1289 std::lock_guard<std::mutex> lk(mtx_);
1290
1291 return active_ops_.extract_first(fd, events);
1292 }(res->fd, res->event)) {
1294 wake_one_runner_(lk);
1295 op->run(*this);
1297 return 1;
1298 }
1299 // we may not find an async-op for this event if it already has been
1300 // cancelled. Loop around let the "is-cancelled" check handle it.
1301 }
1302
1303 wake_one_runner_(lk);
1304 return 0;
1305}
1306
1307} // namespace net
1308
1309#endif
Definition: io_service_base.h:87
Definition: socket.h:1412
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:471
Definition: socket.h:711
Definition: socket.h:1144
Definition: timer.h:57
Definition: executor.h:308
execution_context & context() noexcept
Definition: executor.h:314
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
void destroy() noexcept
Definition: executor.h:193
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:1003
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1030
io_context * io_ctx_
Definition: io_context.h:1076
void on_work_started() const noexcept
Definition: io_context.h:1017
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1072
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1015
executor_type(io_context &ctx)
Definition: io_context.h:1074
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1051
bool running_in_this_thread() const noexcept
Definition: io_context.h:1012
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1067
void on_work_finished() const noexcept
Definition: io_context.h:1018
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:798
Op op_
Definition: io_context.h:810
void run() override
Definition: io_context.h:801
void cancel()
Definition: io_context.h:778
pending_timer(const Timer &timer)
Definition: io_context.h:772
time_point expiry() const noexcept
Definition: io_context.h:785
typename Timer::time_point time_point
Definition: io_context.h:769
timer_id id() const
Definition: io_context.h:786
typename Timer::Id * timer_id
Definition: io_context.h:770
bool is_cancelled() const
Definition: io_context.h:777
time_point expiry_
Definition: io_context.h:791
timer_id id_
Definition: io_context.h:792
Definition: io_context.h:512
timer_queue_base(execution_context &ctx)
Definition: io_context.h:514
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:516
Definition: io_context.h:524
std::chrono::milliseconds next() const override
Definition: io_context.h:589
size_t cancel(const Timer &t)
Definition: io_context.h:703
io_context & context() noexcept
Definition: io_context.h:556
bool run_one() override
Definition: io_context.h:635
void shutdown() noexcept override
Definition: io_context.h:554
timer_queue(execution_context &ctx)
Definition: io_context.h:528
~timer_queue() override
Definition: io_context.h:541
void push(const Timer &timer, Op &&op)
Definition: io_context.h:561
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:818
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:820
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:814
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:981
bool is_running_
Definition: io_context.h:888
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:884
std::condition_variable do_one_cond_
Definition: io_context.h:887
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:948
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:931
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:901
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:895
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1127
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:942
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1147
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:832
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:966
size_t cancel_one(const Timer &)
Definition: io_context.h:860
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:886
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1154
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:908
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:877
count_type poll_one()
Definition: io_context.h:994
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:915
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:890
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1093
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:845
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1100
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:972
void is_running(bool v)
Definition: io_context.h:907
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:57
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:944
static int count
Definition: myisam_ftdump.cc:43
static QUEUE queue
Definition: myisampack.cc:207
static bool interrupted
Definition: mysqladmin.cc:66
Definition: authentication.cc:36
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:576
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:572
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
Definition: executor.h:370
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4065