26#ifndef MYSQL_HARNESS_NET_TS_EXECUTOR_H_
27#define MYSQL_HARNESS_NET_TS_EXECUTOR_H_
30#include <condition_variable>
40#include <unordered_map>
51template <
class CompletionToken,
class Signature>
54template <
class CompletionToken,
class Signature>
68template <
class CompletionToken,
class Signature>
69class async_completion;
71template <
class CompletionToken,
class Signature>
80 std::is_same<CompletionToken, completion_handler_type>::value,
99template <class T, class ProtoAllocator =
std::allocator<
void>>
102template <class T, class ProtoAllocator =
std::allocator<
void>>
108template <class T, class ProtoAllocator, typename =
std::void_t<>>
112 static type __get(
const T & ,
const ProtoAllocator &a)
noexcept {
117template <
class T,
class ProtoAllocator>
119 std::void_t<typename T::allocator_type>> {
120 using type =
typename T::allocator_type;
122 static type __get(
const T &t,
const ProtoAllocator & )
noexcept {
123 return t.get_allocator();
127template <
class T,
class ProtoAllocator>
129 static auto get(
const T &t,
130 const ProtoAllocator &a = ProtoAllocator()) noexcept {
132 return Impl::__get(t, a);
142template <
class T,
class ProtoAllocator>
144 const T &t,
const ProtoAllocator &a)
noexcept {
151 using std::logic_error::logic_error;
185 template <
class Service>
187 delete static_cast<Service *
>(svc);
191 template <
class Service>
207 return std::type_index(
typeid(
Key));
215 std::unordered_map<service_key_type, service *>
keys_;
217 template <
typename Service,
class... Args>
220 ServicePtr{
new Service{*
this, std::forward<Args>(args)...}});
225 template <
class Service>
228 template <
class Service>
231 template <
class Service,
class... Args>
236template <
class Service>
240 static_assert(std::is_base_of<execution_context::service, Key>::value,
241 "Key must derive from execution_context::service");
242 static_assert(std::is_base_of<Key, Service>::value,
243 "Service must derive from Key");
245 auto key = execution_context::service_key<Key>();
250 if (svc ==
nullptr) {
255 return static_cast<Key &
>(*svc);
258template <
class Service,
class... Args>
262 static_assert(std::is_base_of<execution_context::service, Key>::value,
263 "Key must derive from execution_context::service");
264 static_assert(std::is_base_of<Key, Service>::value,
265 "Service must derive from Key");
267 auto key = execution_context::service_key<Key>();
271 if (svc ==
nullptr) {
273 svc = ctx.
add_service<Service>(std::forward(args)...);
276 "can't make_service(), Service already exists");
279 return static_cast<Service &
>(*svc);
282template <
class Service>
286 std::lock_guard<std::mutex> lk(ctx.services_mtx_);
287 return ctx.keys_.count(execution_context::service_key<Key>()) > 0;
311template <
class T,
class = std::
void_t<>>
329template <
class T,
typename U = std::remove_const_t<T>>
331 void (*f)() =
nullptr,
332 const std::allocator<int> &a = {})
335 std::is_copy_constructible<T>,
337 std::is_same<
decltype(*__const_x == *__const_x),
bool>,
338 std::is_same<
decltype(*__const_x != *__const_x),
bool>,
339 std::is_void<
decltype(__x->on_work_started())>,
340 std::is_void<
decltype(__x->on_work_finished())>,
341 std::is_void<
decltype(__x->dispatch(std::move(f), a))>,
342 std::is_void<
decltype(__x->post(std::move(f), a))>,
343 std::is_void<
decltype(__x->defer(std::move(f), a))>>::value,
346 std::void_t<
decltype(__x->context()),
void()>>;
367template <
class T,
class Executor,
typename = std::
void_t<>>
370template <
class T,
class Executor>
372 : std::is_convertible<Executor, typename T::executor_type> {};
376template <
class T,
class Executor>
379template <
class T,
class Executor>
383template <
class T,
class Executor,
typename = std::
void_t<>>
387 static type __get(
const T & ,
const Executor &ex)
noexcept {
392template <
class T,
class Executor>
394 std::void_t<typename T::executor_type>> {
395 using type =
typename T::executor_type;
397 static type __get(
const T &t,
const Executor & )
noexcept {
398 return t.get_executor();
402template <
class T,
class Executor = system_executor>
403struct associated_executor;
405template <
class T,
class Executor = system_executor>
408template <
class T,
class Executor>
410 static auto get(
const T &t,
const Executor &ex = Executor()) noexcept {
412 return Impl::__get(t, ex);
420template <
class T,
class Executor>
422 const T &t,
const Executor &ex)
noexcept;
424template <
class T,
class ExecutorContext>
425associated_executor_t<T, typename ExecutorContext::executor_type>
433template <
class T,
class Executor>
435 const T &t,
const Executor &ex)
noexcept {
439template <
class T,
class ExecutorContext>
440associated_executor_t<T, typename ExecutorContext::executor_type>
451template <
class Executor>
458 ex_.on_work_started();
461 :
ex_{other.ex_},
owns_{other.owns_} {
463 ex_.on_work_started();
467 :
ex_{std::move(other.ex_)},
owns_{std::exchange(other.owns_,
false)} {}
473 ex_.on_work_finished();
483 ex_.on_work_finished();
505template <
class Executor>
511template <
class ExecutionContext>
513 std::is_convertible<ExecutionContext &, execution_context &>::value,
514 executor_work_guard<typename ExecutionContext::executor_type>>
520std::enable_if_t<!is_executor<T>::value &&
521 !std::is_convertible<T &, execution_context &>::value,
522 executor_work_guard<associated_executor_t<T>>>
527template <
class T,
class U>
530 std::forward<U>(u)))) {
546 template <
class Func,
class ProtoAllocator>
547 void dispatch(Func &&f,
const ProtoAllocator &a)
const;
548 template <
class Func,
class ProtoAllocator>
549 void post(Func &&f,
const ProtoAllocator &a)
const;
550 template <
class Func,
class ProtoAllocator>
551 void defer(Func &&f,
const ProtoAllocator &a)
const;
582 std::lock_guard<std::mutex> lk(
mtx_);
588 std::lock_guard<std::mutex> lk(
mtx_);
603 std::function<void()> f;
605 std::unique_lock<std::mutex> lk(
mtx_);
611 f = std::move(
tasks_.front());
618 void post_(std::function<
void()> f) {
619 std::lock_guard<std::mutex> lk(
mtx_);
626 tasks_.push(std::move(f));
638 std::condition_variable
cv_;
649template <
class Func,
class ProtoAllocator>
654template <
class Func,
class ProtoAllocator>
656 std::decay_t<Func>{std::forward<Func>(f)}();
658template <
class Func,
class ProtoAllocator>
660 post(std::forward<Func>(f), a);
672template <
class CompletionHandler>
692template <
class CompletionHandler>
700template <
class CompletionToken>
714template <
class Executor,
class CompletionToken>
716 is_executor<Executor>::value,
717 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
718dispatch(
const Executor &ex, CompletionToken &&token) {
731template <
class ExecutionContext,
class CompletionToken>
733 std::is_convertible<ExecutionContext &, execution_context &>::value,
734 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
737 std::forward<CompletionToken>(token));
745template <
class CompletionToken>
746auto post(CompletionToken &&token) {
759template <
class Executor,
class CompletionToken>
761 is_executor<Executor>::value,
762 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
763post(
const Executor &ex, CompletionToken &&token) {
776template <
class ExecutionContext,
class CompletionToken>
778 std::is_convertible<ExecutionContext &, execution_context &>::value,
779 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
781 return net::post(ctx.get_executor(), std::forward<CompletionToken>(token));
786template <
class CompletionToken>
787auto defer(CompletionToken &&token) {
800template <
class Executor,
class CompletionToken>
802 is_executor<Executor>::value,
803 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
804defer(
const Executor &ex, CompletionToken &&token) {
817template <
class ExecutionContext,
class CompletionToken>
819 std::is_convertible<ExecutionContext &, execution_context &>::value,
820 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
822 return net::defer(ctx.get_executor(), std::forward<CompletionToken>(token));
827template <
class Executor>
836 template <
class ProtoAllocator>
837 strand(std::allocator_arg_t,
const ProtoAllocator & , Executor ex)
843 template <
class OtherExecutor>
846 template <
class OtherExecutor>
848 :
inner_ex_{std::move(other.inner_ex_)} {}
862 template <
class OtherExecutor>
869 template <
class OtherExecutor>
891 template <
class Func,
class ProtoAllocator>
892 void dispatch(Func &&f,
const ProtoAllocator & )
const {
894 std::forward<Func>(f)();
897 template <
class Func,
class ProtoAllocator>
898 void post(Func &&f,
const ProtoAllocator &a)
const;
899 template <
class Func,
class ProtoAllocator>
900 void defer(Func &&f,
const ProtoAllocator &a)
const;
909template <
class Executor>
912template <
class Executor>
926 [e](
auto &svc) { svc.ptr_->notify_fork(e); });
929 [e](
auto &svc) { svc.ptr_->notify_fork(e); });
938 svc.ptr_->shutdown();
execution context for SQL.
Definition: sql_exec_context.h:39
The handler class is the interface for dynamically loadable storage engines.
Definition: handler.h:4573
Definition: executor.h:72
std::conditional_t< std::is_same< CompletionToken, completion_handler_type >::value, completion_handler_type &, completion_handler_type > handler_type
Definition: executor.h:81
async_completion & operator=(const async_completion &)=delete
handler_type completion_handler
Definition: executor.h:90
async_completion(CompletionToken &t)
Definition: executor.h:84
async_completion(const async_completion &)=delete
result_type result
Definition: executor.h:91
typename result_type::completion_handler_type completion_handler_type
Definition: executor.h:76
Definition: executor.h:55
void return_type
Definition: executor.h:58
CompletionToken completion_handler_type
Definition: executor.h:57
return_type get()
Definition: executor.h:64
async_result(const async_result &)=delete
async_result(completion_handler_type &)
Definition: executor.h:60
async_result & operator=(const async_result &)=delete
Definition: executor.h:291
virtual ~service()=default
virtual void shutdown() noexcept=0
service(const service &)=delete
service & operator=(const service &)=delete
service(execution_context &owner)
Definition: executor.h:293
execution_context & context_
Definition: executor.h:304
virtual void notify_fork(fork_event) noexcept
Definition: executor.h:301
execution_context & context() noexcept
Definition: executor.h:297
Definition: executor.h:154
friend bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
std::list< ServicePtr > services_
Definition: executor.h:214
execution_context(const execution_context &)=delete
void shutdown() noexcept
Definition: executor.h:934
void destroy() noexcept
Definition: executor.h:176
static void service_deleter(service *svc)
Definition: executor.h:186
friend Service & make_service(execution_context &ctx, Args &&... args)
Definition: executor.h:259
void notify_fork(fork_event e)
Definition: executor.h:922
service * add_service(Args &&... args)
Definition: executor.h:218
execution_context()=default
execution_context & operator=(const execution_context &)=delete
virtual ~execution_context()
Definition: executor.h:164
static service_key_type service_key()
maps selected type to unique identifier.
Definition: executor.h:206
std::mutex services_mtx_
Definition: executor.h:211
friend Service::key_type & use_service(execution_context &ctx)
Definition: executor.h:237
std::type_index service_key_type
Definition: executor.h:200
std::unordered_map< service_key_type, service * > keys_
Definition: executor.h:215
Definition: executor.h:452
Executor executor_type
Definition: executor.h:454
executor_work_guard(const executor_work_guard &other) noexcept
Definition: executor.h:460
bool owns_
Definition: executor.h:490
executor_type get_executor() const noexcept
Definition: executor.h:477
executor_work_guard(executor_work_guard &&other) noexcept
Definition: executor.h:466
void reset() noexcept
Definition: executor.h:481
Executor ex_
Definition: executor.h:489
executor_work_guard(const executor_type &ex) noexcept
Definition: executor.h:456
executor_work_guard & operator=(const executor_work_guard &other)=delete
~executor_work_guard()
Definition: executor.h:471
bool owns_work() const noexcept
Definition: executor.h:479
static constexpr Value * contains(const Key *k)
check if a callstack contains a pointer already.
Definition: callstack.h:151
function object for net::dispatch(), net::post(), net::defer().
Definition: executor.h:673
Dispatcher(CompletionHandler &handler)
Definition: executor.h:675
decltype(net::make_work_guard(handler_)) work_guard_
Definition: executor.h:689
CompletionHandler handler_
Definition: executor.h:688
void operator()()
Definition: executor.h:679
Definition: executor.h:149
Definition: executor.h:828
void post(Func &&f, const ProtoAllocator &a) const
strand(Executor ex)
Definition: executor.h:834
strand operator=(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:863
strand(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:847
strand(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:844
Executor inner_ex_
Definition: executor.h:903
bool running_in_this_thread() const noexcept
Definition: executor.h:882
void defer(Func &&f, const ProtoAllocator &a) const
strand operator=(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:870
strand operator=(strand &&other) noexcept
Definition: executor.h:856
void on_work_started() const noexcept
Definition: executor.h:888
void on_work_finished() const noexcept
Definition: executor.h:889
void dispatch(Func &&f, const ProtoAllocator &) const
Definition: executor.h:892
execution_context & context() const noexcept
Definition: executor.h:886
Executor inner_executor_type
Definition: executor.h:830
strand(strand &&other) noexcept
Definition: executor.h:841
strand operator=(const strand &other) noexcept
Definition: executor.h:850
std::queue< std::function< void()> > jobs_
Definition: executor.h:906
strand(std::allocator_arg_t, const ProtoAllocator &, Executor ex)
Definition: executor.h:837
strand(const strand &other) noexcept
Definition: executor.h:840
inner_executor_type get_inner_executor() const noexcept
Definition: executor.h:880
bool running_
Definition: executor.h:905
Definition: executor.h:567
static system_context & get_() noexcept
Definition: executor.h:631
bool stopped_
Definition: executor.h:640
void join()
Definition: executor.h:591
std::condition_variable cv_
Definition: executor.h:638
std::queue< std::function< void()> > tasks_
Definition: executor.h:639
void run_()
Definition: executor.h:601
std::thread thread_
Definition: executor.h:636
system_context(const system_context &)=delete
bool stopped() const noexcept
Definition: executor.h:587
~system_context() override
Definition: executor.h:574
void post_(std::function< void()> f)
Definition: executor.h:618
std::mutex mtx_
Definition: executor.h:637
friend class system_executor
Definition: executor.h:599
executor_type get_executor() noexcept
Definition: executor.h:579
system_context & operator=(const system_context &)=delete
void stop()
Definition: executor.h:581
system_context(__tag)
Definition: executor.h:597
Definition: executor.h:537
system_executor()=default
void dispatch(Func &&f, const ProtoAllocator &a) const
Definition: executor.h:655
void defer(Func &&f, const ProtoAllocator &a) const
Definition: executor.h:659
void on_work_finished() const noexcept
Definition: executor.h:544
system_context & context() const noexcept
Definition: executor.h:645
void on_work_started() const noexcept
Definition: executor.h:543
void post(Func &&f, const ProtoAllocator &a) const
Definition: executor.h:650
Header for compiler-dependent features.
#define MY_COMPILER_DIAGNOSTIC_PUSH()
save the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:285
#define MY_COMPILER_DIAGNOSTIC_POP()
restore the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:286
static QUEUE queue
Definition: myisampack.cc:210
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:323
std::string_view Key
The key type for the hash structure in HashJoinRowBuffer.
Definition: hash_join_buffer.h:102
int key_type
Definition: method.h:38
Definition: http_server_component.cc:34
auto executor_requirements(U *__x=nullptr, const U *__const_x=nullptr, void(*f)()=nullptr, const std::allocator< int > &a={}) -> std::enable_if_t< std::conjunction< std::is_copy_constructible< T >, std::is_same< decltype(*__const_x== *__const_x), bool >, std::is_same< decltype(*__const_x != *__const_x), bool >, std::is_void< decltype(__x->on_work_started())>, std::is_void< decltype(__x->on_work_finished())>, std::is_void< decltype(__x->dispatch(std::move(f), a))>, std::is_void< decltype(__x->post(std::move(f), a))>, std::is_void< decltype(__x->defer(std::move(f), a))> >::value, std::void_t< decltype(__x->context()), void()> >
Dispatcher< CompletionHandler > make_dispatcher(CompletionHandler &handler)
Definition: executor.h:693
constexpr bool uses_executor_v
Definition: executor.h:380
bool operator!=(const system_executor &, const system_executor &)
Definition: executor.h:559
bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:283
Service & make_service(execution_context &ctx, Args &&... args)
Definition: executor.h:259
auto defer(CompletionToken &&token)
Definition: executor.h:787
std::enable_if_t< is_executor< Executor >::value, executor_work_guard< Executor > > make_work_guard(const Executor &ex)
Definition: executor.h:507
typename associated_executor< T, Executor >::type associated_executor_t
Definition: executor.h:406
associated_allocator_t< T > get_associated_allocator(const T &t) noexcept
Definition: executor.h:138
auto dispatch(CompletionToken &&token)
Definition: executor.h:701
auto post(CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:746
constexpr executor_arg_t executor_arg
Definition: executor.h:361
Service::key_type & use_service(execution_context &ctx)
Definition: executor.h:237
typename associated_allocator< T, ProtoAllocator >::type associated_allocator_t
Definition: executor.h:104
constexpr bool is_executor_v
Definition: executor.h:356
associated_executor_t< T > get_associated_executor(const T &t) noexcept
Definition: executor.h:429
bool operator==(const system_executor &, const system_executor &)
Definition: executor.h:555
fork_event
Definition: executor.h:48
Definition: gcs_xcom_synode.h:64
std::conditional_t< !std::is_array< T >::value, std::unique_ptr< T, detail::Deleter< T > >, std::conditional_t< detail::is_unbounded_array_v< T >, std::unique_ptr< T, detail::Array_deleter< std::remove_extent_t< T > > >, void > > unique_ptr
The following is a common type that is returned by all the ut::make_unique (non-aligned) specializati...
Definition: ut0new.h:2438
MY_COMPILER_CLANG_DIAGNOSTIC_IGNORE("-Winconsistent-missing-destructor-override") static Scope_guard static_guard([]()
Definition: protobuf_plugin.cc:33
required string key
Definition: replication_asynchronous_connection_failover.proto:60
required string type
Definition: replication_group_member_actions.proto:34
static type __get(const T &t, const ProtoAllocator &) noexcept
Definition: executor.h:122
typename T::allocator_type type
Definition: executor.h:120
Definition: executor.h:109
static type __get(const T &, const ProtoAllocator &a) noexcept
Definition: executor.h:112
ProtoAllocator type
Definition: executor.h:110
Definition: executor.h:128
static auto get(const T &t, const ProtoAllocator &a=ProtoAllocator()) noexcept
Definition: executor.h:129
static type __get(const T &t, const Executor &) noexcept
Definition: executor.h:397
typename T::executor_type type
Definition: executor.h:395
Definition: executor.h:384
static type __get(const T &, const Executor &ex) noexcept
Definition: executor.h:387
Executor type
Definition: executor.h:385
Definition: executor.h:409
static auto get(const T &t, const Executor &ex=Executor()) noexcept
Definition: executor.h:410
Definition: executor.h:190
ServicePtr(Service *svc)
Definition: executor.h:192
std::unique_ptr< service, void(*)(service *)> ptr_
Definition: executor.h:197
bool active_
Definition: executor.h:195
Definition: executor.h:359
Definition: executor.h:312
Definition: executor.h:368
Definition: executor.h:353
Definition: executor.h:596
Definition: executor.h:377