MySQL 8.0.40
Source Code Documentation
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
28
29#include <atomic>
30#include <chrono>
31#include <iterator>
32#include <limits> // numeric_limits
33#include <list>
34#include <map>
35#include <memory> // unique_ptr
36#include <mutex>
37#include <system_error> // error_code
38#include <unordered_map>
39#include <utility>
40#include <vector>
41
42#include "my_config.h" // HAVE_EPOLL
52
53namespace net {
54
55#if defined(HAVE_EPOLL)
57#else
59#endif
60
62 public:
63 class executor_type;
64
65 using count_type = size_t;
67
69 : io_context{std::make_unique<net::impl::socket::SocketService>(),
71
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
74 std::unique_ptr<IoServiceBase> &&io_service)
78
79 explicit io_context(int /* concurrency_hint */) : io_context() {}
80
83 cancelled_ops_.clear();
84 // Make sure the services are destroyed before our internal fields. The
85 // services own the timers that can indirectly call our methods when
86 // destructed. See UT NetTS_io_context.pending_timer_on_destroy for an
87 // example.
88 destroy();
89 }
90
91 io_context(const io_context &) = delete;
92 io_context &operator=(const io_context &) = delete;
93
94 executor_type get_executor() noexcept;
95
97
98 template <class Rep, class Period>
99 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
100
101 template <class Clock, class Duration>
103 const std::chrono::time_point<Clock, Duration> &abs_time);
104
106
107 template <class Rep, class Period>
108 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
109
110 template <class Clock, class Duration>
112 const std::chrono::time_point<Clock, Duration> &abs_time);
113
116 void stop() {
117 {
118 std::lock_guard<std::mutex> lk(mtx_);
119 stopped_ = true;
120 }
121
123 }
124
125 bool stopped() const noexcept {
126 std::lock_guard<std::mutex> lk(mtx_);
127 return stopped_;
128 }
129
130 void restart() {
131 std::lock_guard<std::mutex> lk(mtx_);
132 stopped_ = false;
133 }
134
136 return socket_service_.get();
137 }
138
139 IoServiceBase *io_service() const { return io_service_.get(); }
140
141 /**
142 * get the status of the implicit open() call of the io-service.
143 *
144 * the io_service_.open() may fail due to out-of-file-descriptors.
145 *
146 * run() will fail silently if the io-service failed to open.
147 *
148 * @returns std::error_code on error
149 */
152 }
153
154 private:
155 /**
156 * queued work from io_context::executor_type::dispatch()/post()/defer().
157 */
159 public:
160 // simple, generic storage of callable.
161 //
162 // std::function<void()> is similar, but doesn't work for move-only
163 // callables like lambda's that capture a move-only type
165 public:
166 virtual ~BasicCallable() = default;
167
168 virtual void invoke() = 0;
169 };
170
171 template <class Func>
172 class Callable : public BasicCallable {
173 public:
174 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
175
176 void invoke() override { f_(); }
177
178 private:
179 Func f_;
180 };
181
182 using op_type = std::unique_ptr<BasicCallable>;
183
184 /**
185 * run a deferred work item.
186 *
187 * @returns number work items run.
188 * @retval 0 work queue was empty, nothing was run.
189 */
190 size_t run_one() {
191 // tmp list to hold the current operation to run.
192 //
193 // makes it simple and fast to move the head element and shorten the time
194 // the lock is held.
195 decltype(work_) tmp;
196
197 // lock is only needed as long as we modify the work-queue.
198 {
199 std::lock_guard<std::mutex> lk(work_mtx_);
200
201 if (work_.empty()) return 0;
202
203 // move the head of the work queue out and release the lock.
204 //
205 // note: std::list.splice() moves pointers.
206 tmp.splice(tmp.begin(), work_, work_.begin());
207 }
208
209 // run the deferred work.
210 tmp.front()->invoke();
211
212 // and destruct the list at the end.
213
214 return 1;
215 }
216
217 /**
218 * queue work for later execution.
219 */
220 template <class Func, class ProtoAllocator>
221 void post(Func &&f, const ProtoAllocator & /* a */) {
222 std::lock_guard<std::mutex> lk(work_mtx_);
223
224 work_.emplace_back(
225 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
226 }
227
228 /**
229 * check if work is queued for later execution.
230 *
231 * @retval true if some work is queued.
232 */
233 bool has_outstanding_work() const {
234 std::lock_guard<std::mutex> lk(work_mtx_);
235 return !work_.empty();
236 }
237
238 private:
239 mutable std::mutex work_mtx_;
240 std::list<op_type> work_;
241 };
242
244
245 /**
246 * defer work for later execution.
247 */
248 template <class Func, class ProtoAllocator>
249 void defer_work(Func &&f, const ProtoAllocator &a) {
250 deferred_work_.post(std::forward<Func>(f), a);
251
252 // wakeup the possibly blocked io-thread.
254 }
255
256 template <class Clock, class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
260
261 count_type do_one(std::unique_lock<std::mutex> &lk,
262 std::chrono::milliseconds timeout);
263
264 template <typename _Clock, typename _WaitTraits>
266
268
269 template <class Protocol>
270 friend class basic_socket_impl;
271
272 template <class Protocol>
273 friend class basic_socket;
274
275 template <class Protocol>
277
278 template <class Protocol>
280
281 bool stopped_{false};
282 std::atomic<count_type> work_count_{};
283
284 // must be first member-var to ensure it is destroyed last
285 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
286 std::unique_ptr<IoServiceBase> io_service_;
288
289 // has outstanding work
290 //
291 // work is outstanding when
292 //
293 // - work-count from on_work_started()/on_work_finished() is more than 0 and
294 // - any active or cancelled operations are still ongoing
295 bool has_outstanding_work() const {
296 if (!cancelled_ops_.empty()) return true;
297 if (active_ops_.has_outstanding_work()) return true;
298 if (deferred_work_.has_outstanding_work()) return true;
299
300 if (work_count_ > 0) return true;
301
302 return false;
303 }
304
305 // monitor all .run()s in the io-context needs to be stopped.
306 class monitor {
307 public:
308 monitor(io_context &ctx) : ctx_{ctx} {}
309
310 monitor(const monitor &) = delete;
311 monitor(monitor &&) = delete;
312
314 std::lock_guard<std::mutex> lk(ctx_.mtx_);
315
316 // ctx_.call_stack_.pop_back();
317
318 if (!ctx_.has_outstanding_work()) {
319 // like stop(), just that we already have the mutex
320 ctx_.stopped_ = true;
321 ctx_.io_service_->notify();
322 }
323 }
324
325 private:
327 };
328
330
331 /**
332 * base class of async operation.
333 *
334 * - file-descriptor
335 * - wait-event
336 */
337 class async_op {
338 public:
340
342
343 virtual ~async_op() = default;
344
345 virtual void run(io_context &) = 0;
346
349
351 wait_type event() const { return event_; }
352
353 private:
356 };
357
358 /**
359 * async operation with callback.
360 */
361 template <class Op>
362 class async_op_impl : public async_op {
363 public:
365 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
366
367 void run(io_context & /* io_ctx */) override {
368 if (is_cancelled()) {
369 op_(make_error_code(std::errc::operation_canceled));
370 } else {
371 op_(std::error_code{});
372 }
373 }
374
375 private:
376 Op op_;
377 };
378
379 class AsyncOps {
380 public:
381 using element_type = std::unique_ptr<async_op>;
382
383 bool has_outstanding_work() const {
384 std::lock_guard<std::mutex> lk(mtx_);
385
386 return !ops_.empty();
387 }
388
390 const auto handle = t->native_handle();
391
392 std::lock_guard<std::mutex> lk(mtx_);
393
394 auto it = ops_.find(handle);
395 if (it != ops_.end()) {
396 it->second.push_back(std::move(t));
397 } else {
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
400 ops_.emplace(handle, std::move(v));
401 }
402 }
403
405 return extract_first(fd, [events](auto const &el) {
406 return static_cast<short>(el->event()) & events;
407 });
408 }
409
411 return extract_first(fd, [](auto const &) { return true; });
412 }
413
414 void release_all() {
415 // We expect that this method is called before AsyncOps destructor, to
416 // make sure that ops_ map is empty when the destructor executes. If the
417 // ops_ is not empty when destructed, the destructor of its element can
418 // trigger a method that will try to access that map (that is destructed).
419 // Example: we have an AsyncOp that captures some Socket object with a
420 // smart pointer. When the destructor of this AsyncOp is called, it can
421 // also call the destructor of that Socket, which in turn will call
422 // socket.close(), causing the Socket to unregister its operations in
423 // its respective io_context object which is us (calls extract_first()).
424 std::list<element_type> ops_to_delete;
425 {
426 std::lock_guard<std::mutex> lk(mtx_);
427 for (auto &fd_ops : ops_) {
428 for (auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
430 }
431 }
432 ops_.clear();
433 // It is important that we release the mtx_ here before the
434 // ops_to_delete go out of scope and are deleted. AsyncOp destructor can
435 // indirectly call extract_first which would lead to a deadlock.
436 }
437 }
438
439 private:
440 template <class Pred>
442 std::lock_guard<std::mutex> lk(mtx_);
443
444 const auto it = ops_.find(fd);
445 if (it != ops_.end()) {
446 auto &async_ops = it->second;
447
448 const auto end = async_ops.end();
449 for (auto cur = async_ops.begin(); cur != end; ++cur) {
450 auto &el = *cur;
451
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
454
455 if (async_ops.size() == 1) {
456 // remove the current container and with it its only element
457 ops_.erase(it);
458 } else {
459 // remove the current entry
460 async_ops.erase(cur);
461 }
462
463 return op;
464 }
465 }
466 }
467
468 return {};
469 }
470
471 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
472 16 * 1024};
473
474 mutable std::mutex mtx_;
475 };
476
478
479 // cancelled async operators.
480 std::list<std::unique_ptr<async_op>> cancelled_ops_;
481
482 template <class Op>
484 // add the socket-wait op to the queue
486 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
487
488 {
489 auto res = io_service_->add_fd_interest(fd, wt);
490 if (!res) {
491#if 0
492 // fd may be -1 or so
493 std::cerr << "!! add_fd_interest(" << fd << ", ..."
494 << ") " << res.error() << " " << res.error().message()
495 << std::endl;
496#endif
497 // adding failed. Cancel it again.
498 //
499 // code should be similar to ::cancel(fd)
500 std::lock_guard<std::mutex> lk(mtx_);
501
502 if (auto op = active_ops_.extract_first(fd, static_cast<short>(wt))) {
503 op->cancel();
504 cancelled_ops_.push_back(std::move(op));
505 }
506 }
507 }
508
510 }
511
513 protected:
515
516 mutable std::mutex queue_mtx_;
517
518 public:
519 virtual bool run_one() = 0;
520 virtual std::chrono::milliseconds next() const = 0;
521 };
522
523 template <class Timer>
525 public:
527
529 // add timer_queue to io_context
530
531 auto &io_ctx = static_cast<io_context &>(ctx);
532
533 // @note: don't move this lock+push into the timer_queue_base constructor
534 //
535 // @see
536 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
537 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
538 io_ctx.timer_queues_.push_back(this);
539 }
540
541 void shutdown() noexcept override {}
542
543 io_context &context() noexcept {
544 return static_cast<io_context &>(service::context());
545 }
546
547 template <class Op>
548 void push(const Timer &timer, Op &&op) {
550
551 std::lock_guard<std::mutex> lk(queue_mtx_);
552
553#if 0
554 pending_timers_.insert(
555 std::upper_bound(
556 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
557 [](const auto &a, const auto &b) { return a < b->expiry(); }),
558 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
559#else
560 if (timer.id() == nullptr) abort();
561
562 // add timer
563 pending_timers_.emplace(std::make_pair(
564 timer.id(),
565 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
566
567 if (timer.id() == nullptr) abort();
568 if (timer.expiry() == Timer::time_point::min()) abort();
569
570 // sorted timer ids by expiry
572 std::make_pair(timer.expiry(), timer.id()));
573#endif
574 }
575
576 std::chrono::milliseconds next() const override {
577 typename Timer::time_point expiry;
578 {
579 std::lock_guard<std::mutex> lk(queue_mtx_);
580
581 // no pending timers, return the max-timeout
582 if (cancelled_timers_.empty()) {
583 if (pending_timer_expiries_.empty())
584 return std::chrono::milliseconds::max();
585
586#if 0
587 expiry = pending_timers_.front()->expiry();
588#else
589 expiry = pending_timer_expiries_.begin()->first;
590#endif
591 } else {
592 // cancelled timers should be executed directly
593 return std::chrono::milliseconds::min();
594 }
595
596 // the lock isn't needed anymore.
597 }
598
599 auto duration = Timer::traits_type::to_wait_duration(expiry);
600 if (duration < duration.zero()) {
601 duration = duration.zero();
602 }
603
604 // round up the next wait-duration to wait /at least/ the expected time.
605 //
606 // In case the expiry is 990us, wait 1ms
607 // If it is 0ns, leave it at 0ms;
608
609 auto duration_ms =
610 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
611
612 using namespace std::chrono_literals;
613
614 // round up to the next millisecond.
615 if ((duration - duration_ms).count() != 0) {
616 duration_ms += 1ms;
617 }
618
619 return duration_ms;
620 }
621
622 bool run_one() override {
623 std::unique_ptr<pending_timer> pt;
624
625 {
626 std::lock_guard<std::mutex> lk(queue_mtx_);
627
628 // if the pending-timers queue is empty, leave
629 // if the top is cancelled or expired, run it
630 if (cancelled_timers_.empty()) {
631 if (pending_timers_.empty()) return false;
632
633#if 0
634 // list
635 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
636 return false;
637 }
638 pt = std::move(pending_timers_.front());
639 pending_timers_.pop_front();
640#else
641 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
642
643 auto min = Timer::time_point::min();
644 for (const auto &cur : pending_timer_expiries_) {
645 if (cur.first < min) abort();
646
647 min = cur.first;
648 }
649
650 const auto now = Timer::clock_type::now();
651
652 // multimap
653 auto pending_expiry_it = pending_timer_expiries_.begin();
654 auto timepoint = pending_expiry_it->first;
655
656 if (timepoint > now) {
657 // not expired yet. leave
658 return false;
659 }
660 typename Timer::Id *timer_id = pending_expiry_it->second;
661
662 auto pending_it = pending_timers_.find(timer_id);
663 if (pending_it == pending_timers_.end()) {
664 abort();
665 }
666 if (pending_it->second->id() != timer_id) {
667 abort();
668 }
669 if (pending_it->second->expiry() != pending_expiry_it->first) {
670 abort();
671 }
672
673 pt = std::move(pending_it->second);
674 pending_timer_expiries_.erase(pending_expiry_it);
675 pending_timers_.erase(pending_it);
676#endif
677 } else {
678 pt = std::move(cancelled_timers_.front());
679 cancelled_timers_.pop_front();
680 }
681 }
682
683 pt->run();
684
686
687 return true;
688 }
689
690 size_t cancel(const Timer &t) {
691 size_t count{};
692
693 {
694 std::lock_guard<std::mutex> lk(queue_mtx_);
695
696#if 0
697 const auto end = pending_timers_.end();
698
699 // the same timer may be pushed multiple times to the queue
700 // therefore, check all entries
701 for (auto cur = pending_timers_.begin(); cur != end;) {
702 auto &cur_timer = cur->second;
703 if (cur_timer->id() == t.id()) {
704 cur_timer->cancel();
705 ++count;
706
707 auto nxt = std::next(cur);
708 // move the timer over to the cancelled timers
710 cur);
711 cur = nxt;
712 } else {
713 ++cur;
714 }
715 }
716#else
717 auto eq_range = pending_timers_.equal_range(t.id());
718
719 for (auto cur = eq_range.first; cur != eq_range.second;) {
720 auto expiry_eq_range =
721 pending_timer_expiries_.equal_range(cur->second->expiry());
722
723 size_t erase_count{};
724
725 for (auto expiry_cur = expiry_eq_range.first;
726 expiry_cur != expiry_eq_range.second;) {
727 if (expiry_cur->first == cur->second->expiry() &&
728 expiry_cur->second == cur->second->id() && erase_count == 0) {
729 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
730 ++erase_count;
731 } else {
732 ++expiry_cur;
733 }
734 }
735
736 // nothing found ... boom
737 if (erase_count == 0) abort();
738
739 cur->second->cancel();
740
741 // move timer to cancelled timers
742 cancelled_timers_.emplace_back(std::move(cur->second));
743
744 ++count;
745
746 cur = pending_timers_.erase(cur);
747 }
748#endif
749 }
750
751 return count;
752 }
753
755 public:
756 using time_point = typename Timer::time_point;
757 using timer_id = typename Timer::Id *;
758
759 pending_timer(const Timer &timer)
760 : expiry_{timer.expiry()}, id_{timer.id()} {}
761
762 virtual ~pending_timer() = default;
763
764 bool is_cancelled() const { return id_ == nullptr; }
765 void cancel() {
766 id_ = nullptr;
767
768 // ensure that it bubbles up to the top
769 expiry_ = expiry_.min();
770 }
771
772 time_point expiry() const noexcept { return expiry_; }
773 timer_id id() const { return id_; }
774
775 virtual void run() = 0;
776
777 private:
780 };
781
782 template <class Op>
784 public:
785 pending_timer_op(const Timer &timer, Op &&op)
786 : pending_timer(timer), op_{std::move(op)} {}
787
788 void run() override {
789 if (this->is_cancelled()) {
790 op_(make_error_code(std::errc::operation_canceled));
791 } else {
792 op_(std::error_code{});
793 }
794 }
795
796 private:
797 Op op_;
798 };
799
800 // cancelled timers, earliest cancelled timer first
801 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
802
803 // active timers, smallest time-point first
804 std::multimap<typename Timer::time_point, typename Timer::Id *>
806 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
808 };
809
810 /**
811 * async wait for a timer expire.
812 *
813 * adds the op and timer to the timer_queue
814 *
815 * @param timer timer
816 * @param op completion handler to call when timer is triggered
817 */
818 template <class Timer, class Op>
819 void async_wait(const Timer &timer, Op &&op) {
820 auto &queue = use_service<timer_queue<Timer>>(*this);
821
822 queue.push(timer, std::forward<Op>(op));
823
824 // wakeup the blocked poll_one() to handle possible timer events.
826 }
827
828 /**
829 * cancel all async-ops of a timer.
830 */
831 template <class Timer>
832 size_t cancel(const Timer &timer) {
833 if (!has_service<timer_queue<Timer>>(*this)) {
834 return 0;
835 }
836
837 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
838 if (count) {
839 // if a timer was canceled, interrupt the io-service
841 }
842 return count;
843 }
844
845 // cancel oldest
846 template <class Timer>
847 size_t cancel_one(const Timer & /* timer */) {
848 // TODO: implement if async_wait is implemented
849 return 0;
850 }
851
852 /** pointers to the timer-queues of this io-contexts.
853 *
854 * timer-queues are one per timer-type (std::chrono::steady_clock,
855 * std::chrono::system_clock, ...)
856 *
857 * timer_queue_base is the base class of the timer-queues
858 *
859 * the timer-queue's themselves are ownered by the io_context's executor via
860 * execution_context::add_service()
861 *
862 * protected via 'mtx_'
863 */
864 std::vector<timer_queue_base *> timer_queues_;
865
866 /**
867 * mutex that protects the core parts of the io-context.
868 *
869 * - timer_queues_
870 */
871 mutable std::mutex mtx_{};
872
873 mutable std::mutex do_one_mtx_{};
874 mutable std::condition_variable do_one_cond_{};
875 bool is_running_{false};
876
877 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
878 lk.lock();
880 }
881
882 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
883 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
884
885 is_running(true);
886 }
887
888 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
889 is_running(false);
890 lk.unlock();
891 do_one_cond_.notify_one();
892 }
893
894 void is_running(bool v) { is_running_ = v; }
895 bool is_running() const { return is_running_; }
896
898};
899} // namespace net
900
901namespace net {
903 count_type n = 0;
904
905 std::unique_lock<std::mutex> lk(do_one_mtx_);
906
907 using namespace std::chrono_literals;
908
909 // in the first round, we already have the lock, the all other rounds we
910 // need to take the lock first
911 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
912 wait_no_runner_(lk)) {
913 if (n != std::numeric_limits<count_type>::max()) ++n;
914 }
915 return n;
916}
917
919 using namespace std::chrono_literals;
920
921 std::unique_lock<std::mutex> lk(do_one_mtx_);
922
924
925 return do_one(lk, -1ms);
926}
927
928template <class Rep, class Period>
930 const std::chrono::duration<Rep, Period> &rel_time) {
931 return run_until(std::chrono::steady_clock::now() + rel_time);
932}
933
934template <class Clock, class Duration>
936 const std::chrono::time_point<Clock, Duration> &abs_time) {
937 count_type n = 0;
938
939 std::unique_lock<std::mutex> lk(do_one_mtx_);
940
941 using namespace std::chrono_literals;
942
943 // in the first round, we already have the lock, the all other rounds we
944 // need to take the lock first
945 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
946 wait_no_runner_(lk)) {
947 if (n != std::numeric_limits<count_type>::max()) ++n;
948 }
949 return n;
950}
951
952template <class Rep, class Period>
954 const std::chrono::duration<Rep, Period> &rel_time) {
955 return run_one_until(std::chrono::steady_clock::now() + rel_time);
956}
957
958template <class Clock, class Duration>
960 const std::chrono::time_point<Clock, Duration> &abs_time) {
961 std::unique_lock<std::mutex> lk(do_one_mtx_);
962
964
965 return do_one_until(lk, abs_time);
966}
967
969 count_type n = 0;
970 std::unique_lock<std::mutex> lk(do_one_mtx_);
971
972 using namespace std::chrono_literals;
973
974 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
975 wait_no_runner_(lk)) {
976 if (n != std::numeric_limits<count_type>::max()) ++n;
977 }
978 return n;
979}
980
982 std::unique_lock<std::mutex> lk(do_one_mtx_);
983
984 using namespace std::chrono_literals;
985
987 return do_one(lk, 0ms);
988}
989
991 public:
992 executor_type(const executor_type &rhs) noexcept = default;
993 executor_type(executor_type &&rhs) noexcept = default;
994 executor_type &operator=(const executor_type &rhs) noexcept = default;
995 executor_type &operator=(executor_type &&rhs) noexcept = default;
996
997 ~executor_type() = default;
998
999 bool running_in_this_thread() const noexcept {
1001 }
1002 io_context &context() const noexcept { return *io_ctx_; }
1003
1004 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
1005 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
1006
1007 /**
1008 * execute function.
1009 *
1010 * Effect:
1011 *
1012 * The executor
1013 *
1014 * - MAY block forward progress of the caller until f() finishes.
1015 */
1016 template <class Func, class ProtoAllocator>
1017 void dispatch(Func &&f, const ProtoAllocator &a) const {
1018 if (running_in_this_thread()) {
1019 // run it in this thread.
1020 std::decay_t<Func>(std::forward<Func>(f))();
1021 } else {
1022 // queue function call for later execution.
1023 post(std::forward<Func>(f), a);
1024 }
1025 }
1026
1027 /**
1028 * queue function for execution.
1029 *
1030 * Effects:
1031 *
1032 * The executor
1033 *
1034 * - SHALL NOT block forward progress of the caller pending completion of f().
1035 * - MAY begin f() progress before the call to post completes.
1036 */
1037 template <class Func, class ProtoAllocator>
1038 void post(Func &&f, const ProtoAllocator &a) const {
1039 io_ctx_->defer_work(std::forward<Func>(f), a);
1040 }
1041
1042 /**
1043 * defer function call for later execution.
1044 *
1045 * Effect:
1046 *
1047 * The executor:
1048 *
1049 * - SHALL NOT block forward progress of the caller pending completion of f().
1050 * - SHOULD NOT begin f()'s progress before the call to defer()
1051 * completes.
1052 */
1053 template <class Func, class ProtoAllocator>
1054 void defer(Func &&f, const ProtoAllocator &a) const {
1055 post(std::forward<Func>(f), a);
1056 }
1057
1058 private:
1060
1061 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1062
1064};
1065
1067 const io_context::executor_type &b) noexcept {
1068 return std::addressof(a.context()) == std::addressof(b.context());
1069}
1071 const io_context::executor_type &b) noexcept {
1072 return !(a == b);
1073}
1074
1075// io_context::executor_type is an executor even though it doesn't have an
1076// default constructor
1077template <>
1078struct is_executor<io_context::executor_type> : std::true_type {};
1079
1081 return executor_type(*this);
1082}
1083
1084/**
1085 * cancel all async-ops of a file-descriptor.
1086 */
1088 native_handle_type fd) {
1089 bool need_notify{false};
1090 {
1091 // check all async-ops
1092 std::lock_guard<std::mutex> lk(mtx_);
1093
1094 while (auto op = active_ops_.extract_first(fd)) {
1095 op->cancel();
1096
1097 cancelled_ops_.push_back(std::move(op));
1098
1099 need_notify = true;
1100 }
1101 }
1102
1103 // wakeup the loop to deliver the cancelled fds
1104 if (true || need_notify) {
1105 io_service_->remove_fd(fd);
1106
1108 }
1109
1110 return {};
1111}
1112
1113template <class Clock, class Duration>
1115 std::unique_lock<std::mutex> &lk,
1116 const std::chrono::time_point<Clock, Duration> &abs_time) {
1117 using namespace std::chrono_literals;
1118
1119 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1120 auto rel_time_ms =
1121 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1122
1123 if (rel_time_ms < 0ms) {
1124 // expired already.
1125 rel_time_ms = 0ms;
1126 } else if (rel_time_ms < rel_time) {
1127 // std::chrono::ceil()
1128 rel_time_ms += 1ms;
1129 }
1130
1131 return do_one(lk, rel_time_ms);
1132}
1133
1135 if (impl::Callstack<io_context>::contains(this) == nullptr) {
1136 io_service_->notify();
1137 }
1138}
1139
1140// precond: lk MUST be locked
1142 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1144
1145 timer_queue_base *timer_q{nullptr};
1146
1147 monitor mon(*this);
1148
1149 if (!has_outstanding_work()) {
1150 wake_one_runner_(lk);
1151 return 0;
1152 }
1153
1154 while (true) {
1155 // 1. deferred work.
1156 // 2. timer
1157 // 3. triggered events.
1158
1159 // timer (2nd round)
1160 if (timer_q) {
1161 if (timer_q->run_one()) {
1162 wake_one_runner_(lk);
1163 return 1;
1164 } else {
1165 timer_q = nullptr;
1166 }
1167 }
1168
1169 // deferred work
1170 if (deferred_work_.run_one()) {
1171 wake_one_runner_(lk);
1172 return 1;
1173 }
1174
1175 // timer
1176 std::chrono::milliseconds min_duration{0};
1177 {
1178 std::lock_guard<std::mutex> lk(mtx_);
1179 // check the smallest timestamp of all timer-queues
1180 for (auto q : timer_queues_) {
1181 const auto duration = q->next();
1182
1183 if (duration == duration.zero()) {
1184 timer_q = q;
1185 min_duration = duration;
1186 break;
1187 } else if ((duration != duration.max()) &&
1188 (timeout != timeout.zero()) &&
1189 (duration < min_duration || timer_q == nullptr)) {
1190 timer_q = q;
1191 min_duration = duration;
1192 }
1193 }
1194 }
1195
1196 // if we have a timer that has fired or was cancelled, run it right away
1197 if (timer_q && min_duration <= min_duration.zero()) continue;
1198
1199 if (auto op = [this]() -> std::unique_ptr<async_op> {
1200 // handle all the cancelled ops without polling first
1201 std::lock_guard<std::mutex> lk(mtx_);
1202
1203 // ops have all cancelled operators at the front
1204 if (!cancelled_ops_.empty() &&
1205 cancelled_ops_.front()->is_cancelled()) {
1206 auto op = std::move(cancelled_ops_.front());
1207
1208 cancelled_ops_.pop_front();
1209
1210 return op;
1211 }
1212
1213 return {};
1214 }()) {
1215 // before we unlock the concurrent io-context-thread-lock increment the
1216 // work-count to ensure the next waiting thread exiting in case:
1217 //
1218 // - no io-events registered
1219 // - no timers registered
1221 wake_one_runner_(lk);
1222 op->run(*this);
1224
1225 return 1;
1226 }
1227
1228 if (stopped() || !io_service_open_res_) {
1229 break;
1230 }
1231
1232 // adjust min-duration according to caller's timeout
1233 //
1234 // - if there is no timer queued, use the caller's timeout
1235 // - if there is a timer queued, reduce min_duration to callers timeout if
1236 // it is lower and non-negative.
1237 //
1238 // note: negative timeout == infinite.
1239 if (timer_q == nullptr ||
1240 (timeout > timeout.zero() && timeout < min_duration)) {
1241 min_duration = timeout;
1242 }
1243
1244 auto res = io_service_->poll_one(min_duration);
1245 if (!res) {
1246 if (res.error() == std::errc::interrupted) {
1247 // poll again as it got interrupted
1248 continue;
1249 }
1250 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1251 timer_q != nullptr) {
1252 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1253 // less than the global timeout or there is no timeout.
1254 continue;
1255 }
1256
1257 wake_one_runner_(lk);
1258
1259#if 0
1260 // if the poll returns another error, it is a ok that we don't further
1261 // check it as we exit cleanly. Still it would be nice to be aware of it
1262 // in debug builds.
1263 assert(res.error() == io_service_errc::no_fds ||
1264 res.error() == std::errc::timed_out);
1265#endif
1266
1267 // either poll() timed out or there where no file-descriptors that fired
1268 return 0;
1269 }
1270
1271 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1272 // << res.value().event << std::endl;
1273
1274 if (auto op = [this](native_handle_type fd,
1275 short events) -> std::unique_ptr<async_op> {
1276 std::lock_guard<std::mutex> lk(mtx_);
1277
1278 return active_ops_.extract_first(fd, events);
1279 }(res->fd, res->event)) {
1281 wake_one_runner_(lk);
1282 op->run(*this);
1284 return 1;
1285 }
1286 // we may not find an async-op for this event if it already has been
1287 // cancelled. Loop around let the "is-cancelled" check handle it.
1288 }
1289
1290 wake_one_runner_(lk);
1291 return 0;
1292}
1293
1294} // namespace net
1295
1296#endif
Definition: io_service_base.h:87
Definition: socket.h:1412
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:471
Definition: socket.h:711
Definition: socket.h:1144
Definition: timer.h:57
Definition: executor.h:308
execution_context & context() noexcept
Definition: executor.h:314
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
void destroy() noexcept
Definition: executor.h:193
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:990
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1017
io_context * io_ctx_
Definition: io_context.h:1063
void on_work_started() const noexcept
Definition: io_context.h:1004
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1059
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1002
executor_type(io_context &ctx)
Definition: io_context.h:1061
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1038
bool running_in_this_thread() const noexcept
Definition: io_context.h:999
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1054
void on_work_finished() const noexcept
Definition: io_context.h:1005
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:785
Op op_
Definition: io_context.h:797
void run() override
Definition: io_context.h:788
void cancel()
Definition: io_context.h:765
pending_timer(const Timer &timer)
Definition: io_context.h:759
time_point expiry() const noexcept
Definition: io_context.h:772
typename Timer::time_point time_point
Definition: io_context.h:756
timer_id id() const
Definition: io_context.h:773
typename Timer::Id * timer_id
Definition: io_context.h:757
bool is_cancelled() const
Definition: io_context.h:764
time_point expiry_
Definition: io_context.h:778
timer_id id_
Definition: io_context.h:779
Definition: io_context.h:512
timer_queue_base(execution_context &ctx)
Definition: io_context.h:514
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:516
Definition: io_context.h:524
std::chrono::milliseconds next() const override
Definition: io_context.h:576
size_t cancel(const Timer &t)
Definition: io_context.h:690
io_context & context() noexcept
Definition: io_context.h:543
bool run_one() override
Definition: io_context.h:622
void shutdown() noexcept override
Definition: io_context.h:541
timer_queue(execution_context &ctx)
Definition: io_context.h:528
void push(const Timer &timer, Op &&op)
Definition: io_context.h:548
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:805
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:807
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:801
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:968
bool is_running_
Definition: io_context.h:875
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:871
std::condition_variable do_one_cond_
Definition: io_context.h:874
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:935
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:918
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:888
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:882
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1114
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:929
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1134
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:819
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:953
size_t cancel_one(const Timer &)
Definition: io_context.h:847
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:873
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1141
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:895
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:864
count_type poll_one()
Definition: io_context.h:981
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:902
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:877
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1080
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:832
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1087
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:959
void is_running(bool v)
Definition: io_context.h:894
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:57
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:944
static int count
Definition: myisam_ftdump.cc:43
static QUEUE queue
Definition: myisampack.cc:207
static bool interrupted
Definition: mysqladmin.cc:66
Definition: authentication.cc:36
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:496
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:576
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:572
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
Definition: executor.h:370
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4059