MySQL 9.4.0
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
io_context.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2020, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
27#define MYSQL_HARNESS_NET_TS_IO_CONTEXT_H_
28
29#include <atomic>
30#include <chrono>
31#include <iterator>
32#include <limits> // numeric_limits
33#include <list>
34#include <map>
35#include <memory> // unique_ptr
36#include <mutex>
37#include <system_error> // error_code
38#include <unordered_map>
39#include <utility>
40#include <vector>
41
42#include "my_config.h" // HAVE_EPOLL
52
53namespace net {
54
55#if defined(HAVE_EPOLL)
57#else
59#endif
60
62 public:
63 class executor_type;
64
65 using count_type = size_t;
67
69 : io_context{std::make_unique<net::impl::socket::SocketService>(),
71
73 std::unique_ptr<net::impl::socket::SocketServiceBase> &&socket_service,
74 std::unique_ptr<IoServiceBase> &&io_service)
78
79 explicit io_context(int /* concurrency_hint */) : io_context() {}
80
83 cancelled_ops_.clear();
84 // Make sure the services are destroyed before our internal fields. The
85 // services own the timers that can indirectly call our methods when
86 // destructed. See UT NetTS_io_context.pending_timer_on_destroy for an
87 // example.
88 destroy();
89 }
90
91 io_context(const io_context &) = delete;
92 io_context &operator=(const io_context &) = delete;
93
94 executor_type get_executor() noexcept;
95
97
98 template <class Rep, class Period>
99 count_type run_for(const std::chrono::duration<Rep, Period> &rel_time);
100
101 template <class Clock, class Duration>
103 const std::chrono::time_point<Clock, Duration> &abs_time);
104
106
107 template <class Rep, class Period>
108 count_type run_one_for(const std::chrono::duration<Rep, Period> &rel_time);
109
110 template <class Clock, class Duration>
112 const std::chrono::time_point<Clock, Duration> &abs_time);
113
116 void stop() {
117 {
118 std::lock_guard<std::mutex> lk(mtx_);
119 stopped_ = true;
120 }
121
123 }
124
125 bool stopped() const noexcept {
126 std::lock_guard<std::mutex> lk(mtx_);
127 return stopped_;
128 }
129
130 void restart() {
131 std::lock_guard<std::mutex> lk(mtx_);
132 stopped_ = false;
133 }
134
136 return socket_service_.get();
137 }
138
139 IoServiceBase *io_service() const { return io_service_.get(); }
140
141 /**
142 * get the status of the implicit open() call of the io-service.
143 *
144 * the io_service_.open() may fail due to out-of-file-descriptors.
145 *
146 * run() will fail silently if the io-service failed to open.
147 *
148 * @returns std::error_code on error
149 */
152 }
153
154 private:
155 /**
156 * queued work from io_context::executor_type::dispatch()/post()/defer().
157 */
159 public:
160 // simple, generic storage of callable.
161 //
162 // std::function<void()> is similar, but doesn't work for move-only
163 // callables like lambda's that capture a move-only type
165 public:
166 virtual ~BasicCallable() = default;
167
168 virtual void invoke() = 0;
169 };
170
171 template <class Func>
172 class Callable : public BasicCallable {
173 public:
174 Callable(Func &&f) : f_{std::forward<Func>(f)} {}
175
176 void invoke() override { f_(); }
177
178 private:
179 Func f_;
180 };
181
182 using op_type = std::unique_ptr<BasicCallable>;
183
184 /**
185 * run a deferred work item.
186 *
187 * @returns number work items run.
188 * @retval 0 work queue was empty, nothing was run.
189 */
190 size_t run_one() {
191 // tmp list to hold the current operation to run.
192 //
193 // makes it simple and fast to move the head element and shorten the time
194 // the lock is held.
195 decltype(work_) tmp;
196
197 // lock is only needed as long as we modify the work-queue.
198 {
199 std::lock_guard<std::mutex> lk(work_mtx_);
200
201 if (work_.empty()) return 0;
202
203 // move the head of the work queue out and release the lock.
204 //
205 // note: std::list.splice() moves pointers.
206 tmp.splice(tmp.begin(), work_, work_.begin());
207 }
208
209 // run the deferred work.
210 tmp.front()->invoke();
211
212 // and destruct the list at the end.
213
214 return 1;
215 }
216
217 /**
218 * queue work for later execution.
219 */
220 template <class Func, class ProtoAllocator>
221 void post(Func &&f, const ProtoAllocator & /* a */) {
222 std::lock_guard<std::mutex> lk(work_mtx_);
223
224 work_.emplace_back(
225 std::make_unique<Callable<Func>>(std::forward<Func>(f)));
226 }
227
228 /**
229 * check if work is queued for later execution.
230 *
231 * @retval true if some work is queued.
232 */
233 bool has_outstanding_work() const {
234 std::lock_guard<std::mutex> lk(work_mtx_);
235 return !work_.empty();
236 }
237
238 private:
239 mutable std::mutex work_mtx_;
240 std::list<op_type> work_;
241 };
242
244
245 /**
246 * defer work for later execution.
247 */
248 template <class Func, class ProtoAllocator>
249 void defer_work(Func &&f, const ProtoAllocator &a) {
250 deferred_work_.post(std::forward<Func>(f), a);
251
252 // wakeup the possibly blocked io-thread.
254 }
255
256 template <class Clock, class Duration>
258 std::unique_lock<std::mutex> &lk,
259 const std::chrono::time_point<Clock, Duration> &abs_time);
260
261 count_type do_one(std::unique_lock<std::mutex> &lk,
263
264 template <typename _Clock, typename _WaitTraits>
266
268
269 template <class Protocol>
270 friend class basic_socket_impl;
271
272 template <class Protocol>
273 friend class basic_socket;
274
275 template <class Protocol>
277
278 template <class Protocol>
280
281 bool stopped_{false};
282 std::atomic<count_type> work_count_{};
283
284 // must be first member-var to ensure it is destroyed last
285 std::unique_ptr<impl::socket::SocketServiceBase> socket_service_;
286 std::unique_ptr<IoServiceBase> io_service_;
288
289 // has outstanding work
290 //
291 // work is outstanding when
292 //
293 // - work-count from on_work_started()/on_work_finished() is more than 0 and
294 // - any active or cancelled operations are still ongoing
295 bool has_outstanding_work() const {
296 if (!cancelled_ops_.empty()) return true;
297 if (active_ops_.has_outstanding_work()) return true;
298 if (deferred_work_.has_outstanding_work()) return true;
299
300 if (work_count_ > 0) return true;
301
302 return false;
303 }
304
305 // monitor all .run()s in the io-context needs to be stopped.
306 class monitor {
307 public:
308 monitor(io_context &ctx) : ctx_{ctx} {}
309
310 monitor(const monitor &) = delete;
311 monitor(monitor &&) = delete;
312
314 std::lock_guard<std::mutex> lk(ctx_.mtx_);
315
316 // ctx_.call_stack_.pop_back();
317
318 if (!ctx_.has_outstanding_work()) {
319 // like stop(), just that we already have the mutex
320 ctx_.stopped_ = true;
321 ctx_.io_service_->notify();
322 }
323 }
324
325 private:
327 };
328
330
331 /**
332 * base class of async operation.
333 *
334 * - file-descriptor
335 * - wait-event
336 */
337 class async_op {
338 public:
340
342
343 virtual ~async_op() = default;
344
345 virtual void run(io_context &) = 0;
346
349
351 wait_type event() const { return event_; }
352
353 private:
356 };
357
358 /**
359 * async operation with callback.
360 */
361 template <class Op>
362 class async_op_impl : public async_op {
363 public:
365 : async_op{fd, wt}, op_{std::forward<Op>(op)} {}
366
367 void run(io_context & /* io_ctx */) override {
368 if (is_cancelled()) {
369 op_(make_error_code(std::errc::operation_canceled));
370 } else {
371 op_(std::error_code{});
372 }
373 }
374
375 private:
376 Op op_;
377 };
378
379 class AsyncOps {
380 public:
381 using element_type = std::unique_ptr<async_op>;
382
383 bool has_outstanding_work() const {
384 std::lock_guard<std::mutex> lk(mtx_);
385
386 return !ops_.empty();
387 }
388
390 const auto handle = t->native_handle();
391
392 std::lock_guard<std::mutex> lk(mtx_);
393
394 auto it = ops_.find(handle);
395 if (it != ops_.end()) {
396 it->second.push_back(std::move(t));
397 } else {
398 std::vector<element_type> v;
399 v.push_back(std::move(t));
400 ops_.emplace(handle, std::move(v));
401 }
402 }
403
405 return extract_first(fd, [events](auto const &el) {
406 return static_cast<short>(el->event()) & events;
407 });
408 }
409
411 return extract_first(fd, [](auto const &) { return true; });
412 }
413
414 void release_all() {
415 // We expect that this method is called before AsyncOps destructor, to
416 // make sure that ops_ map is empty when the destructor executes. If the
417 // ops_ is not empty when destructed, the destructor of its element can
418 // trigger a method that will try to access that map (that is destructed).
419 // Example: we have an AsyncOp that captures some Socket object with a
420 // smart pointer. When the destructor of this AsyncOp is called, it can
421 // also call the destructor of that Socket, which in turn will call
422 // socket.close(), causing the Socket to unregister its operations in
423 // its respective io_context object which is us (calls extract_first()).
424 std::list<element_type> ops_to_delete;
425 {
426 std::lock_guard<std::mutex> lk(mtx_);
427 for (auto &fd_ops : ops_) {
428 for (auto &fd_op : fd_ops.second) {
429 ops_to_delete.push_back(std::move(fd_op));
430 }
431 }
432 ops_.clear();
433 // It is important that we release the mtx_ here before the
434 // ops_to_delete go out of scope and are deleted. AsyncOp destructor can
435 // indirectly call extract_first which would lead to a deadlock.
436 }
437 }
438
439 private:
440 template <class Pred>
442 std::lock_guard<std::mutex> lk(mtx_);
443
444 const auto it = ops_.find(fd);
445 if (it != ops_.end()) {
446 auto &async_ops = it->second;
447
448 const auto end = async_ops.end();
449 for (auto cur = async_ops.begin(); cur != end; ++cur) {
450 auto &el = *cur;
451
452 if (el->native_handle() == fd && pred(el)) {
453 auto op = std::move(el);
454
455 if (async_ops.size() == 1) {
456 // remove the current container and with it its only element
457 ops_.erase(it);
458 } else {
459 // remove the current entry
460 async_ops.erase(cur);
461 }
462
463 return op;
464 }
465 }
466 }
467
468 return {};
469 }
470
471 std::unordered_map<native_handle_type, std::vector<element_type>> ops_{
472 16 * 1024};
473
474 mutable std::mutex mtx_;
475 };
476
478
479 // cancelled async operators.
480 std::list<std::unique_ptr<async_op>> cancelled_ops_;
481
482 template <class Op>
484 // add the socket-wait op to the queue
486 std::make_unique<async_op_impl<Op>>(std::forward<Op>(op), fd, wt));
487
488 {
489 auto res = io_service_->add_fd_interest(fd, wt);
490 if (!res) {
491#if 0
492 // fd may be -1 or so
493 std::cerr << "!! add_fd_interest(" << fd << ", ..."
494 << ") " << res.error() << " " << res.error().message()
495 << std::endl;
496#endif
497 // adding failed. Cancel it again.
498 //
499 // code should be similar to ::cancel(fd)
500 std::lock_guard<std::mutex> lk(mtx_);
501
502 if (auto as_op =
503 active_ops_.extract_first(fd, static_cast<short>(wt))) {
504 as_op->cancel();
505 cancelled_ops_.push_back(std::move(as_op));
506 }
507 }
508 }
509
511 }
512
514 protected:
516
517 mutable std::mutex queue_mtx_;
518
519 public:
520 virtual bool run_one() = 0;
521 virtual std::chrono::milliseconds next() const = 0;
522 };
523
524 template <class Timer>
526 public:
528
530 // add timer_queue to io_context
531
532 auto &io_ctx = static_cast<io_context &>(ctx);
533
534 // @note: don't move this lock+push into the timer_queue_base constructor
535 //
536 // @see
537 // https://github.com/google/sanitizers/wiki/ThreadSanitizerPopularDataRaces#data-race-on-vptr-during-construction
538 std::lock_guard<std::mutex> lk(io_ctx.mtx_);
539 io_ctx.timer_queues_.push_back(this);
540 }
541
542 ~timer_queue() override {
543 // remove every pending-timer from the pending-timers before destroying to
544 // avoid the owned Timer to destruct the pending-timer a 2nd time.
545 //
546 // A pending_timer_.erase() does NOT work here as it first destructs the
547 // node and then updates the iterators.
548 for (auto cur = pending_timers_.begin(); cur != pending_timers_.end();
549 cur = pending_timers_.begin()) {
550 // extract and then destroy it.
551 pending_timers_.extract(cur);
552 }
553 }
554
555 void shutdown() noexcept override {}
556
557 io_context &context() noexcept {
558 return static_cast<io_context &>(service::context());
559 }
560
561 template <class Op>
562 void push(const Timer &timer, Op &&op) {
564
565 std::lock_guard<std::mutex> lk(queue_mtx_);
566
567#if 0
568 pending_timers_.insert(
569 std::upper_bound(
570 pending_timers_.begin(), pending_timers_.end(), timer.expiry(),
571 [](const auto &a, const auto &b) { return a < b->expiry(); }),
572 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op)));
573#else
574 if (timer.id() == nullptr) abort();
575
576 // add timer
577 pending_timers_.emplace(std::make_pair(
578 timer.id(),
579 std::make_unique<pending_timer_op<Op>>(timer, std::forward<Op>(op))));
580
581 if (timer.id() == nullptr) abort();
582 if (timer.expiry() == Timer::time_point::min()) abort();
583
584 // sorted timer ids by expiry
586 std::make_pair(timer.expiry(), timer.id()));
587#endif
588 }
589
591 typename Timer::time_point expiry;
592 {
593 std::lock_guard<std::mutex> lk(queue_mtx_);
594
595 // no pending timers, return the max-timeout
596 if (cancelled_timers_.empty()) {
597 if (pending_timer_expiries_.empty())
599
600#if 0
601 expiry = pending_timers_.front()->expiry();
602#else
603 expiry = pending_timer_expiries_.begin()->first;
604#endif
605 } else {
606 // cancelled timers should be executed directly
607 return std::chrono::milliseconds::min();
608 }
609
610 // the lock isn't needed anymore.
611 }
612
613 auto duration = Timer::traits_type::to_wait_duration(expiry);
614 if (duration < duration.zero()) {
615 duration = duration.zero();
616 }
617
618 // round up the next wait-duration to wait /at least/ the expected time.
619 //
620 // In case the expiry is 990us, wait 1ms
621 // If it is 0ns, leave it at 0ms;
622
623 auto duration_ms =
624 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
625
626 using namespace std::chrono_literals;
627
628 // round up to the next millisecond.
629 if ((duration - duration_ms).count() != 0) {
630 duration_ms += 1ms;
631 }
632
633 return duration_ms;
634 }
635
636 bool run_one() override {
637 std::unique_ptr<pending_timer> pt;
638
639 {
640 std::lock_guard<std::mutex> lk(queue_mtx_);
641
642 // if the pending-timers queue is empty, leave
643 // if the top is cancelled or expired, run it
644 if (cancelled_timers_.empty()) {
645 if (pending_timers_.empty()) return false;
646
647#if 0
648 // list
649 if (pending_timers_.front()->expiry() > Timer::clock_type::now()) {
650 return false;
651 }
652 pt = std::move(pending_timers_.front());
653 pending_timers_.pop_front();
654#else
655 if (pending_timers_.size() != pending_timer_expiries_.size()) abort();
656
657 auto min = Timer::time_point::min();
658 for (const auto &cur : pending_timer_expiries_) {
659 if (cur.first < min) abort();
660
661 min = cur.first;
662 }
663
664 const auto now = Timer::clock_type::now();
665
666 // multimap
667 auto pending_expiry_it = pending_timer_expiries_.begin();
668 auto timepoint = pending_expiry_it->first;
669
670 if (timepoint > now) {
671 // not expired yet. leave
672 return false;
673 }
674 typename Timer::Id *timer_id = pending_expiry_it->second;
675
676 auto pending_it = pending_timers_.find(timer_id);
677 if (pending_it == pending_timers_.end()) {
678 abort();
679 }
680 if (pending_it->second->id() != timer_id) {
681 abort();
682 }
683 if (pending_it->second->expiry() != pending_expiry_it->first) {
684 abort();
685 }
686
687 pt = std::move(pending_it->second);
688 pending_timer_expiries_.erase(pending_expiry_it);
689 pending_timers_.erase(pending_it);
690#endif
691 } else {
692 pt = std::move(cancelled_timers_.front());
693 cancelled_timers_.pop_front();
694 }
695 }
696
697 pt->run();
698
700
701 return true;
702 }
703
704 size_t cancel(const Timer &t) {
705 size_t count{};
706
707 {
708 std::lock_guard<std::mutex> lk(queue_mtx_);
709
710#if 0
711 const auto end = pending_timers_.end();
712
713 // the same timer may be pushed multiple times to the queue
714 // therefore, check all entries
715 for (auto cur = pending_timers_.begin(); cur != end;) {
716 auto &cur_timer = cur->second;
717 if (cur_timer->id() == t.id()) {
718 cur_timer->cancel();
719 ++count;
720
721 auto nxt = std::next(cur);
722 // move the timer over to the cancelled timers
724 cur);
725 cur = nxt;
726 } else {
727 ++cur;
728 }
729 }
730#else
731 auto eq_range = pending_timers_.equal_range(t.id());
732
733 for (auto cur = eq_range.first; cur != eq_range.second;) {
734 auto expiry_eq_range =
735 pending_timer_expiries_.equal_range(cur->second->expiry());
736
737 size_t erase_count{};
738
739 for (auto expiry_cur = expiry_eq_range.first;
740 expiry_cur != expiry_eq_range.second;) {
741 if (expiry_cur->first == cur->second->expiry() &&
742 expiry_cur->second == cur->second->id() && erase_count == 0) {
743 expiry_cur = pending_timer_expiries_.erase(expiry_cur);
744 ++erase_count;
745 } else {
746 ++expiry_cur;
747 }
748 }
749
750 // nothing found ... boom
751 if (erase_count == 0) abort();
752
753 cur->second->cancel();
754
755 // move timer to cancelled timers
756 cancelled_timers_.emplace_back(std::move(cur->second));
757
758 ++count;
759
760 cur = pending_timers_.erase(cur);
761 }
762#endif
763 }
764
765 return count;
766 }
767
769 public:
770 using time_point = typename Timer::time_point;
771 using timer_id = typename Timer::Id *;
772
773 pending_timer(const Timer &timer)
774 : expiry_{timer.expiry()}, id_{timer.id()} {}
775
776 virtual ~pending_timer() = default;
777
778 bool is_cancelled() const { return id_ == nullptr; }
779 void cancel() {
780 id_ = nullptr;
781
782 // ensure that it bubbles up to the top
783 expiry_ = expiry_.min();
784 }
785
786 time_point expiry() const noexcept { return expiry_; }
787 timer_id id() const { return id_; }
788
789 virtual void run() = 0;
790
791 private:
794 };
795
796 template <class Op>
798 public:
799 pending_timer_op(const Timer &timer, Op &&op)
800 : pending_timer(timer), op_{std::move(op)} {}
801
802 void run() override {
803 if (this->is_cancelled()) {
804 op_(make_error_code(std::errc::operation_canceled));
805 } else {
806 op_(std::error_code{});
807 }
808 }
809
810 private:
811 Op op_;
812 };
813
814 // cancelled timers, earliest cancelled timer first
815 std::list<std::unique_ptr<pending_timer>> cancelled_timers_;
816
817 // active timers, smallest time-point first
818 std::multimap<typename Timer::time_point, typename Timer::Id *>
820 std::multimap<typename Timer::Id *, std::unique_ptr<pending_timer>>
822 };
823
824 /**
825 * async wait for a timer expire.
826 *
827 * adds the op and timer to the timer_queue
828 *
829 * @param timer timer
830 * @param op completion handler to call when timer is triggered
831 */
832 template <class Timer, class Op>
833 void async_wait(const Timer &timer, Op &&op) {
834 auto &queue = use_service<timer_queue<Timer>>(*this);
835
836 queue.push(timer, std::forward<Op>(op));
837
838 // wakeup the blocked poll_one() to handle possible timer events.
840 }
841
842 /**
843 * cancel all async-ops of a timer.
844 */
845 template <class Timer>
846 size_t cancel(const Timer &timer) {
847 if (!has_service<timer_queue<Timer>>(*this)) {
848 return 0;
849 }
850
851 const auto count = use_service<timer_queue<Timer>>(*this).cancel(timer);
852 if (count) {
853 // if a timer was canceled, interrupt the io-service
855 }
856 return count;
857 }
858
859 // cancel oldest
860 template <class Timer>
861 size_t cancel_one(const Timer & /* timer */) {
862 // TODO: implement if async_wait is implemented
863 return 0;
864 }
865
866 /** pointers to the timer-queues of this io-contexts.
867 *
868 * timer-queues are one per timer-type (std::chrono::steady_clock,
869 * std::chrono::system_clock, ...)
870 *
871 * timer_queue_base is the base class of the timer-queues
872 *
873 * the timer-queue's themselves are ownered by the io_context's executor via
874 * execution_context::add_service()
875 *
876 * protected via 'mtx_'
877 */
878 std::vector<timer_queue_base *> timer_queues_;
879
880 /**
881 * mutex that protects the core parts of the io-context.
882 *
883 * - timer_queues_
884 */
885 mutable std::mutex mtx_{};
886
887 mutable std::mutex do_one_mtx_{};
888 mutable std::condition_variable do_one_cond_{};
889 bool is_running_{false};
890
891 void wait_no_runner_(std::unique_lock<std::mutex> &lk) {
892 lk.lock();
894 }
895
896 void wait_no_runner_unlocked_(std::unique_lock<std::mutex> &lk) {
897 do_one_cond_.wait(lk, [this]() { return is_running_ == false; });
898
899 is_running(true);
900 }
901
902 void wake_one_runner_(std::unique_lock<std::mutex> &lk) {
903 is_running(false);
904 lk.unlock();
905 do_one_cond_.notify_one();
906 }
907
908 void is_running(bool v) { is_running_ = v; }
909 bool is_running() const { return is_running_; }
910
912};
913} // namespace net
914
915namespace net {
917 count_type n = 0;
918
919 std::unique_lock<std::mutex> lk(do_one_mtx_);
920
921 using namespace std::chrono_literals;
922
923 // in the first round, we already have the lock, the all other rounds we
924 // need to take the lock first
925 for (wait_no_runner_unlocked_(lk); do_one(lk, -1ms) != 0;
926 wait_no_runner_(lk)) {
928 }
929 return n;
930}
931
933 using namespace std::chrono_literals;
934
935 std::unique_lock<std::mutex> lk(do_one_mtx_);
936
938
939 return do_one(lk, -1ms);
940}
941
942template <class Rep, class Period>
944 const std::chrono::duration<Rep, Period> &rel_time) {
945 return run_until(std::chrono::steady_clock::now() + rel_time);
946}
947
948template <class Clock, class Duration>
950 const std::chrono::time_point<Clock, Duration> &abs_time) {
951 count_type n = 0;
952
953 std::unique_lock<std::mutex> lk(do_one_mtx_);
954
955 using namespace std::chrono_literals;
956
957 // in the first round, we already have the lock, the all other rounds we
958 // need to take the lock first
959 for (wait_no_runner_unlocked_(lk); do_one_until(lk, abs_time) != 0;
960 wait_no_runner_(lk)) {
962 }
963 return n;
964}
965
966template <class Rep, class Period>
968 const std::chrono::duration<Rep, Period> &rel_time) {
969 return run_one_until(std::chrono::steady_clock::now() + rel_time);
970}
971
972template <class Clock, class Duration>
974 const std::chrono::time_point<Clock, Duration> &abs_time) {
975 std::unique_lock<std::mutex> lk(do_one_mtx_);
976
978
979 return do_one_until(lk, abs_time);
980}
981
983 count_type n = 0;
984 std::unique_lock<std::mutex> lk(do_one_mtx_);
985
986 using namespace std::chrono_literals;
987
988 for (wait_no_runner_unlocked_(lk); do_one(lk, 0ms) != 0;
989 wait_no_runner_(lk)) {
991 }
992 return n;
993}
994
996 std::unique_lock<std::mutex> lk(do_one_mtx_);
997
998 using namespace std::chrono_literals;
999
1001 return do_one(lk, 0ms);
1002}
1003
1005 public:
1006 executor_type(const executor_type &rhs) noexcept = default;
1007 executor_type(executor_type &&rhs) noexcept = default;
1008 executor_type &operator=(const executor_type &rhs) noexcept = default;
1009 executor_type &operator=(executor_type &&rhs) noexcept = default;
1010
1011 ~executor_type() = default;
1012
1013 bool running_in_this_thread() const noexcept {
1015 }
1016 io_context &context() const noexcept { return *io_ctx_; }
1017
1018 void on_work_started() const noexcept { ++io_ctx_->work_count_; }
1019 void on_work_finished() const noexcept { --io_ctx_->work_count_; }
1020
1021 /**
1022 * execute function.
1023 *
1024 * Effect:
1025 *
1026 * The executor
1027 *
1028 * - MAY block forward progress of the caller until f() finishes.
1029 */
1030 template <class Func, class ProtoAllocator>
1031 void dispatch(Func &&f, const ProtoAllocator &a) const {
1032 if (running_in_this_thread()) {
1033 // run it in this thread.
1034 std::decay_t<Func>(std::forward<Func>(f))();
1035 } else {
1036 // queue function call for later execution.
1037 post(std::forward<Func>(f), a);
1038 }
1039 }
1040
1041 /**
1042 * queue function for execution.
1043 *
1044 * Effects:
1045 *
1046 * The executor
1047 *
1048 * - SHALL NOT block forward progress of the caller pending completion of f().
1049 * - MAY begin f() progress before the call to post completes.
1050 */
1051 template <class Func, class ProtoAllocator>
1052 void post(Func &&f, const ProtoAllocator &a) const {
1053 io_ctx_->defer_work(std::forward<Func>(f), a);
1054 }
1055
1056 /**
1057 * defer function call for later execution.
1058 *
1059 * Effect:
1060 *
1061 * The executor:
1062 *
1063 * - SHALL NOT block forward progress of the caller pending completion of f().
1064 * - SHOULD NOT begin f()'s progress before the call to defer()
1065 * completes.
1066 */
1067 template <class Func, class ProtoAllocator>
1068 void defer(Func &&f, const ProtoAllocator &a) const {
1069 post(std::forward<Func>(f), a);
1070 }
1071
1072 private:
1074
1075 explicit executor_type(io_context &ctx) : io_ctx_{std::addressof(ctx)} {}
1076
1078};
1079
1081 const io_context::executor_type &b) noexcept {
1082 return std::addressof(a.context()) == std::addressof(b.context());
1083}
1085 const io_context::executor_type &b) noexcept {
1086 return !(a == b);
1087}
1088
1089// io_context::executor_type is an executor even though it doesn't have an
1090// default constructor
1091template <>
1092struct is_executor<io_context::executor_type> : std::true_type {};
1093
1095 return executor_type(*this);
1096}
1097
1098/**
1099 * cancel all async-ops of a file-descriptor.
1100 */
1102 native_handle_type fd) {
1103 bool need_notify{false};
1104 {
1105 // check all async-ops
1106 std::lock_guard<std::mutex> lk(mtx_);
1107
1108 while (auto op = active_ops_.extract_first(fd)) {
1109 op->cancel();
1110
1111 cancelled_ops_.push_back(std::move(op));
1112
1113 need_notify = true;
1114 }
1115 }
1116
1117 // wakeup the loop to deliver the cancelled fds
1118 if (true || need_notify) {
1119 io_service_->remove_fd(fd);
1120
1122 }
1123
1124 return {};
1125}
1126
1127template <class Clock, class Duration>
1129 std::unique_lock<std::mutex> &lk,
1130 const std::chrono::time_point<Clock, Duration> &abs_time) {
1131 using namespace std::chrono_literals;
1132
1133 const auto rel_time = abs_time - std::chrono::steady_clock::now();
1134 auto rel_time_ms =
1135 std::chrono::duration_cast<std::chrono::milliseconds>(rel_time);
1136
1137 if (rel_time_ms < 0ms) {
1138 // expired already.
1139 rel_time_ms = 0ms;
1140 } else if (rel_time_ms < rel_time) {
1141 // std::chrono::ceil()
1142 rel_time_ms += 1ms;
1143 }
1144
1145 return do_one(lk, rel_time_ms);
1146}
1147
1149 if (impl::Callstack<io_context>::contains(this) == nullptr) {
1150 io_service_->notify();
1151 }
1152}
1153
1154// precond: lk MUST be locked
1156 std::unique_lock<std::mutex> &lk, std::chrono::milliseconds timeout) {
1158
1159 timer_queue_base *timer_q{nullptr};
1160
1161 monitor mon(*this);
1162
1163 if (!has_outstanding_work()) {
1164 wake_one_runner_(lk);
1165 return 0;
1166 }
1167
1168 while (true) {
1169 // 1. deferred work.
1170 // 2. timer
1171 // 3. triggered events.
1172
1173 // timer (2nd round)
1174 if (timer_q) {
1175 if (timer_q->run_one()) {
1176 wake_one_runner_(lk);
1177 return 1;
1178 } else {
1179 timer_q = nullptr;
1180 }
1181 }
1182
1183 // deferred work
1184 if (deferred_work_.run_one()) {
1185 wake_one_runner_(lk);
1186 return 1;
1187 }
1188
1189 // timer
1190 std::chrono::milliseconds min_duration{0};
1191 {
1192 std::lock_guard<std::mutex> lock(mtx_);
1193 // check the smallest timestamp of all timer-queues
1194 for (auto q : timer_queues_) {
1195 const auto duration = q->next();
1196
1197 if (duration == duration.zero()) {
1198 timer_q = q;
1199 min_duration = duration;
1200 break;
1201 } else if ((duration != duration.max()) &&
1202 (timeout != timeout.zero()) &&
1203 (duration < min_duration || timer_q == nullptr)) {
1204 timer_q = q;
1205 min_duration = duration;
1206 }
1207 }
1208 }
1209
1210 // if we have a timer that has fired or was cancelled, run it right away
1211 if (timer_q && min_duration <= min_duration.zero()) continue;
1212
1213 if (auto op = [this]() -> std::unique_ptr<async_op> {
1214 // handle all the cancelled ops without polling first
1215 std::lock_guard<std::mutex> lock(mtx_);
1216
1217 // ops have all cancelled operators at the front
1218 if (!cancelled_ops_.empty() &&
1219 cancelled_ops_.front()->is_cancelled()) {
1220 auto cancelled_op = std::move(cancelled_ops_.front());
1221
1222 cancelled_ops_.pop_front();
1223
1224 return cancelled_op;
1225 }
1226
1227 return {};
1228 }()) {
1229 // before we unlock the concurrent io-context-thread-lock increment the
1230 // work-count to ensure the next waiting thread exiting in case:
1231 //
1232 // - no io-events registered
1233 // - no timers registered
1235 wake_one_runner_(lk);
1236 op->run(*this);
1238
1239 return 1;
1240 }
1241
1242 if (stopped() || !io_service_open_res_) {
1243 break;
1244 }
1245
1246 // adjust min-duration according to caller's timeout
1247 //
1248 // - if there is no timer queued, use the caller's timeout
1249 // - if there is a timer queued, reduce min_duration to callers timeout if
1250 // it is lower and non-negative.
1251 //
1252 // note: negative timeout == infinite.
1253 if (timer_q == nullptr ||
1254 (timeout > timeout.zero() && timeout < min_duration)) {
1255 min_duration = timeout;
1256 }
1257
1258 auto res = io_service_->poll_one(min_duration);
1259 if (!res) {
1260 if (res.error() == std::errc::interrupted) {
1261 // poll again as it got interrupted
1262 continue;
1263 }
1264 if (res.error() == std::errc::timed_out && min_duration != timeout &&
1265 timer_q != nullptr) {
1266 // poll_one() timed out, we have a timer-queue and the timer's expiry is
1267 // less than the global timeout or there is no timeout.
1268 continue;
1269 }
1270
1271 wake_one_runner_(lk);
1272
1273#if 0
1274 // if the poll returns another error, it is a ok that we don't further
1275 // check it as we exit cleanly. Still it would be nice to be aware of it
1276 // in debug builds.
1277 assert(res.error() == io_service_errc::no_fds ||
1278 res.error() == std::errc::timed_out);
1279#endif
1280
1281 // either poll() timed out or there where no file-descriptors that fired
1282 return 0;
1283 }
1284
1285 // std::cerr << __LINE__ << ": " << res.value().fd << " - "
1286 // << res.value().event << std::endl;
1287
1288 if (auto op = [this](native_handle_type fd,
1289 short events) -> std::unique_ptr<async_op> {
1290 std::lock_guard<std::mutex> lock(mtx_);
1291
1292 return active_ops_.extract_first(fd, events);
1293 }(res->fd, res->event)) {
1295 wake_one_runner_(lk);
1296 op->run(*this);
1298 return 1;
1299 }
1300 // we may not find an async-op for this event if it already has been
1301 // cancelled. Loop around let the "is-cancelled" check handle it.
1302 }
1303
1304 wake_one_runner_(lk);
1305 return 0;
1306}
1307
1308} // namespace net
1309
1310#endif
Definition: io_service_base.h:87
Definition: socket.h:1292
template-less base-class of basic_socket_impl.
Definition: socket.h:335
Definition: socket.h:470
Definition: socket.h:711
Definition: socket.h:1090
Definition: timer.h:57
Definition: executor.h:291
execution_context & context() noexcept
Definition: executor.h:297
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
void destroy() noexcept
Definition: executor.h:176
Definition: callstack.h:80
callstack of a thread.
Definition: callstack.h:71
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
Definition: socket_service_base.h:48
Definition: io_context.h:379
std::unordered_map< native_handle_type, std::vector< element_type > > ops_
Definition: io_context.h:471
std::mutex mtx_
Definition: io_context.h:474
element_type extract_first(native_handle_type fd, short events)
Definition: io_context.h:404
element_type extract_first(native_handle_type fd, Pred &&pred)
Definition: io_context.h:441
void release_all()
Definition: io_context.h:414
element_type extract_first(native_handle_type fd)
Definition: io_context.h:410
bool has_outstanding_work() const
Definition: io_context.h:383
void push_back(element_type &&t)
Definition: io_context.h:389
std::unique_ptr< async_op > element_type
Definition: io_context.h:381
Definition: io_context.h:172
Callable(Func &&f)
Definition: io_context.h:174
void invoke() override
Definition: io_context.h:176
Func f_
Definition: io_context.h:179
queued work from io_context::executor_type::dispatch()/post()/defer().
Definition: io_context.h:158
std::list< op_type > work_
Definition: io_context.h:240
std::unique_ptr< BasicCallable > op_type
Definition: io_context.h:182
std::mutex work_mtx_
Definition: io_context.h:239
size_t run_one()
run a deferred work item.
Definition: io_context.h:190
bool has_outstanding_work() const
check if work is queued for later execution.
Definition: io_context.h:233
void post(Func &&f, const ProtoAllocator &)
queue work for later execution.
Definition: io_context.h:221
async operation with callback.
Definition: io_context.h:362
async_op_impl(Op &&op, native_handle_type fd, impl::socket::wait_type wt)
Definition: io_context.h:364
Op op_
Definition: io_context.h:376
void run(io_context &) override
Definition: io_context.h:367
base class of async operation.
Definition: io_context.h:337
native_handle_type fd_
Definition: io_context.h:354
wait_type event() const
Definition: io_context.h:351
virtual ~async_op()=default
void cancel()
Definition: io_context.h:347
virtual void run(io_context &)=0
async_op(native_handle_type fd, wait_type ev)
Definition: io_context.h:341
bool is_cancelled() const
Definition: io_context.h:348
native_handle_type native_handle() const
Definition: io_context.h:350
wait_type event_
Definition: io_context.h:355
Definition: io_context.h:1004
void dispatch(Func &&f, const ProtoAllocator &a) const
execute function.
Definition: io_context.h:1031
io_context * io_ctx_
Definition: io_context.h:1077
void on_work_started() const noexcept
Definition: io_context.h:1018
executor_type(executor_type &&rhs) noexcept=default
executor_type(const executor_type &rhs) noexcept=default
friend io_context
Definition: io_context.h:1073
executor_type & operator=(const executor_type &rhs) noexcept=default
executor_type & operator=(executor_type &&rhs) noexcept=default
io_context & context() const noexcept
Definition: io_context.h:1016
executor_type(io_context &ctx)
Definition: io_context.h:1075
void post(Func &&f, const ProtoAllocator &a) const
queue function for execution.
Definition: io_context.h:1052
bool running_in_this_thread() const noexcept
Definition: io_context.h:1013
void defer(Func &&f, const ProtoAllocator &a) const
defer function call for later execution.
Definition: io_context.h:1068
void on_work_finished() const noexcept
Definition: io_context.h:1019
Definition: io_context.h:306
io_context & ctx_
Definition: io_context.h:326
monitor(const monitor &)=delete
monitor(io_context &ctx)
Definition: io_context.h:308
~monitor()
Definition: io_context.h:313
monitor(monitor &&)=delete
pending_timer_op(const Timer &timer, Op &&op)
Definition: io_context.h:799
Op op_
Definition: io_context.h:811
void run() override
Definition: io_context.h:802
void cancel()
Definition: io_context.h:779
pending_timer(const Timer &timer)
Definition: io_context.h:773
time_point expiry() const noexcept
Definition: io_context.h:786
typename Timer::time_point time_point
Definition: io_context.h:770
timer_id id() const
Definition: io_context.h:787
typename Timer::Id * timer_id
Definition: io_context.h:771
bool is_cancelled() const
Definition: io_context.h:778
time_point expiry_
Definition: io_context.h:792
timer_id id_
Definition: io_context.h:793
Definition: io_context.h:513
timer_queue_base(execution_context &ctx)
Definition: io_context.h:515
virtual std::chrono::milliseconds next() const =0
std::mutex queue_mtx_
Definition: io_context.h:517
Definition: io_context.h:525
std::chrono::milliseconds next() const override
Definition: io_context.h:590
size_t cancel(const Timer &t)
Definition: io_context.h:704
io_context & context() noexcept
Definition: io_context.h:557
bool run_one() override
Definition: io_context.h:636
void shutdown() noexcept override
Definition: io_context.h:555
timer_queue(execution_context &ctx)
Definition: io_context.h:529
~timer_queue() override
Definition: io_context.h:542
void push(const Timer &timer, Op &&op)
Definition: io_context.h:562
std::multimap< typename Timer::time_point, typename Timer::Id * > pending_timer_expiries_
Definition: io_context.h:819
std::multimap< typename Timer::Id *, std::unique_ptr< pending_timer > > pending_timers_
Definition: io_context.h:821
std::list< std::unique_ptr< pending_timer > > cancelled_timers_
Definition: io_context.h:815
Definition: io_context.h:61
count_type poll()
Definition: io_context.h:982
bool is_running_
Definition: io_context.h:889
std::mutex mtx_
mutex that protects the core parts of the io-context.
Definition: io_context.h:885
std::condition_variable do_one_cond_
Definition: io_context.h:888
count_type run_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:949
~io_context()
Definition: io_context.h:81
std::atomic< count_type > work_count_
Definition: io_context.h:282
count_type run_one()
Definition: io_context.h:932
io_context()
Definition: io_context.h:68
void wake_one_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:902
void wait_no_runner_unlocked_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:896
std::unique_ptr< impl::socket::SocketServiceBase > socket_service_
Definition: io_context.h:285
std::unique_ptr< IoServiceBase > io_service_
Definition: io_context.h:286
DeferredWork deferred_work_
Definition: io_context.h:243
void defer_work(Func &&f, const ProtoAllocator &a)
defer work for later execution.
Definition: io_context.h:249
count_type do_one_until(std::unique_lock< std::mutex > &lk, const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:1128
count_type run_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:943
void notify_io_service_if_not_running_in_this_thread()
Definition: io_context.h:1148
void async_wait(const Timer &timer, Op &&op)
async wait for a timer expire.
Definition: io_context.h:833
stdx::expected< void, std::error_code > open_res() const noexcept
get the status of the implicit open() call of the io-service.
Definition: io_context.h:150
count_type run_one_for(const std::chrono::duration< Rep, Period > &rel_time)
Definition: io_context.h:967
size_t cancel_one(const Timer &)
Definition: io_context.h:861
impl::socket::native_handle_type native_handle_type
Definition: io_context.h:66
std::mutex do_one_mtx_
Definition: io_context.h:887
io_context(int)
Definition: io_context.h:79
IoServiceBase * io_service() const
Definition: io_context.h:139
stdx::expected< void, std::error_code > io_service_open_res_
Definition: io_context.h:287
count_type do_one(std::unique_lock< std::mutex > &lk, std::chrono::milliseconds timeout)
Definition: io_context.h:1155
std::list< std::unique_ptr< async_op > > cancelled_ops_
Definition: io_context.h:480
io_context(const io_context &)=delete
io_context(std::unique_ptr< net::impl::socket::SocketServiceBase > &&socket_service, std::unique_ptr< IoServiceBase > &&io_service)
Definition: io_context.h:72
impl::socket::SocketServiceBase * socket_service() const
Definition: io_context.h:135
bool is_running() const
Definition: io_context.h:909
AsyncOps active_ops_
Definition: io_context.h:477
bool stopped_
Definition: io_context.h:281
std::vector< timer_queue_base * > timer_queues_
pointers to the timer-queues of this io-contexts.
Definition: io_context.h:878
count_type poll_one()
Definition: io_context.h:995
void stop()
Definition: io_context.h:116
count_type run()
Definition: io_context.h:916
void wait_no_runner_(std::unique_lock< std::mutex > &lk)
Definition: io_context.h:891
void async_wait(native_handle_type fd, impl::socket::wait_type wt, Op &&op)
Definition: io_context.h:483
executor_type get_executor() noexcept
Definition: io_context.h:1094
void restart()
Definition: io_context.h:130
size_t cancel(const Timer &timer)
cancel all async-ops of a timer.
Definition: io_context.h:846
stdx::expected< void, std::error_code > cancel(native_handle_type fd)
cancel all async-ops of a file-descriptor.
Definition: io_context.h:1101
count_type run_one_until(const std::chrono::time_point< Clock, Duration > &abs_time)
Definition: io_context.h:973
void is_running(bool v)
Definition: io_context.h:908
size_t count_type
Definition: io_context.h:65
bool has_outstanding_work() const
Definition: io_context.h:295
bool stopped() const noexcept
Definition: io_context.h:125
io_context & operator=(const io_context &)=delete
Definition: linux_epoll_io_service.h:58
io_service based on the poll() system-call.
Definition: poll_io_service.h:52
Definition: expected.h:286
static int count
Definition: myisam_ftdump.cc:45
static QUEUE queue
Definition: myisampack.cc:210
static bool interrupted
Definition: mysqladmin.cc:73
Definition: http_server_component.cc:34
static bool timeout(bool(*wait_condition)())
Timeout function.
Definition: log0meb.cc:498
Unique_ptr< T, std::nullptr_t > make_unique(size_t size)
In-place constructs a new unique pointer with no specific allocator and with array type T.
std::chrono::milliseconds milliseconds
Definition: authorize_manager.cc:67
ValueType max(X &&first)
Definition: gtid.h:103
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
wait_type
Definition: socket_constants.h:86
int native_handle_type
Definition: socket_constants.h:51
constexpr const native_handle_type kInvalidSocket
Definition: socket_constants.h:52
Definition: buffer.h:45
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:551
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:547
std::error_code make_error_code(net::stream_errc e) noexcept
Definition: buffer.h:103
static int handle(int sql_errno, const char *sqlstate, const char *message, void *state)
Bridge function between the C++ API offered by this module and the C API of the parser service.
Definition: services.cc:64
Definition: gcs_xcom_synode.h:64
stdx::expected< int, std::error_code > open(const char *fname, int flags, mode_t mode) noexcept
Definition: file_handle.cc:79
static std::mutex lock
Definition: net_ns.cc:56
Definition: executor.h:353
int n
Definition: xcom_base.cc:509
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4112