26#ifndef MYSQL_HARNESS_NET_TS_EXECUTOR_H_ 
   27#define MYSQL_HARNESS_NET_TS_EXECUTOR_H_ 
   30#include <condition_variable> 
   40#include <unordered_map> 
   51template <
class CompletionToken, 
class Signature>
 
   54template <
class CompletionToken, 
class Signature>
 
   68template <
class CompletionToken, 
class Signature>
 
   69class async_completion;
 
   71template <
class CompletionToken, 
class Signature>
 
   80      std::is_same<CompletionToken, completion_handler_type>::value,
 
   99template <class T, class ProtoAllocator = 
std::allocator<
void>>
 
  102template <class T, class ProtoAllocator = 
std::allocator<
void>>
 
  108template <class T, class ProtoAllocator, typename = 
std::void_t<>>
 
  112  static type __get(
const T & , 
const ProtoAllocator &a) 
noexcept {
 
  117template <
class T, 
class ProtoAllocator>
 
  119                                 std::void_t<typename T::allocator_type>> {
 
  120  using type = 
typename T::allocator_type;
 
  122  static type __get(
const T &t, 
const ProtoAllocator & ) 
noexcept {
 
  123    return t.get_allocator();
 
  127template <
class T, 
class ProtoAllocator>
 
  129  static auto get(
const T &t,
 
  130                  const ProtoAllocator &a = ProtoAllocator()) noexcept {
 
  132    return Impl::__get(t, a);
 
  142template <
class T, 
class ProtoAllocator>
 
  144    const T &t, 
const ProtoAllocator &a) 
noexcept {
 
  151  using std::logic_error::logic_error;
 
  174                    [e](
auto &svc) { svc.ptr_->notify_fork(e); });
 
  177                    [e](
auto &svc) { svc.ptr_->notify_fork(e); });
 
  187        svc.ptr_->shutdown();
 
  195    while (!services_.empty()) services_.pop_back();
 
  202  template <
class Service>
 
  204    delete static_cast<Service *
>(svc);
 
  208    template <
class Service>
 
  209    ServicePtr(Service *svc) : ptr_(svc, &service_deleter<Service>) {}
 
  224    return std::type_index(
typeid(
Key));
 
  232  std::unordered_map<service_key_type, service *> 
keys_;
 
  234  template <
typename Service, 
class... Args>
 
  237        ServicePtr{
new Service{*
this, std::forward<Args>(args)...}});
 
  239    return services_.back().ptr_.get();
 
  242  template <
class Service>
 
  245  template <
class Service>
 
  248  template <
class Service, 
class... Args>
 
  253template <
class Service>
 
  257  static_assert(std::is_base_of<execution_context::service, Key>::value,
 
  258                "Key must derive from execution_context::service");
 
  259  static_assert(std::is_base_of<Key, Service>::value,
 
  260                "Service must derive from Key");
 
  262  auto key = execution_context::service_key<Key>();
 
  267  if (svc == 
nullptr) {
 
  272  return static_cast<Key &
>(*svc);
 
  275template <
class Service, 
class... Args>
 
  279  static_assert(std::is_base_of<execution_context::service, Key>::value,
 
  280                "Key must derive from execution_context::service");
 
  281  static_assert(std::is_base_of<Key, Service>::value,
 
  282                "Service must derive from Key");
 
  284  auto key = execution_context::service_key<Key>();
 
  288  if (svc == 
nullptr) {
 
  290    svc = ctx.
add_service<Service>(std::forward(args)...);
 
  293        "can't make_service(), Service already exists");
 
  296  return static_cast<Service &
>(*svc);
 
  299template <
class Service>
 
  303  std::lock_guard<std::mutex> lk(ctx.services_mtx_);
 
  304  return ctx.keys_.count(execution_context::service_key<Key>()) > 0;
 
  328template <
class T, 
class = std::
void_t<>>
 
  346template <
class T, 
typename U = std::remove_const_t<T>>
 
  348                           void (*f)() = 
nullptr,
 
  349                           const std::allocator<int> &a = {})
 
  352            std::is_copy_constructible<T>,
 
  354            std::is_same<
decltype(*__const_x == *__const_x), 
bool>,
 
  355            std::is_same<
decltype(*__const_x != *__const_x), 
bool>,
 
  356            std::is_void<
decltype(__x->on_work_started())>,
 
  357            std::is_void<
decltype(__x->on_work_finished())>,
 
  358            std::is_void<
decltype(__x->dispatch(std::move(f), a))>,
 
  359            std::is_void<
decltype(__x->post(std::move(f), a))>,
 
  360            std::is_void<
decltype(__x->defer(std::move(f), a))>>::value,
 
  363        std::void_t<
decltype(__x->context()), 
void()>>;
 
  384template <
class T, 
class Executor, 
typename = std::
void_t<>>
 
  387template <
class T, 
class Executor>
 
  389    : std::is_convertible<Executor, typename T::executor_type> {};
 
  393template <
class T, 
class Executor>
 
  396template <
class T, 
class Executor>
 
  400template <
class T, 
class Executor, 
typename = std::
void_t<>>
 
  404  static type __get(
const T & , 
const Executor &ex) 
noexcept {
 
  409template <
class T, 
class Executor>
 
  411                                std::void_t<typename T::executor_type>> {
 
  412  using type = 
typename T::executor_type;
 
  414  static type __get(
const T &t, 
const Executor & ) 
noexcept {
 
  415    return t.get_executor();
 
  419template <
class T, 
class Executor = system_executor>
 
  420struct associated_executor;
 
  422template <
class T, 
class Executor = system_executor>
 
  425template <
class T, 
class Executor>
 
  427  static auto get(
const T &t, 
const Executor &ex = Executor()) noexcept {
 
  429    return Impl::__get(t, ex);
 
  437template <
class T, 
class Executor>
 
  439    const T &t, 
const Executor &ex) 
noexcept;
 
  441template <
class T, 
class ExecutorContext>
 
  442associated_executor_t<T, typename ExecutorContext::executor_type>
 
  450template <
class T, 
class Executor>
 
  452    const T &t, 
const Executor &ex) 
noexcept {
 
  456template <
class T, 
class ExecutorContext>
 
  457associated_executor_t<T, typename ExecutorContext::executor_type>
 
  468template <
class Executor>
 
  474      : ex_{ex}, owns_{
true} {
 
  475    ex_.on_work_started();
 
  478      : ex_{other.ex_}, owns_{other.owns_} {
 
  480      ex_.on_work_started();
 
  484      : ex_{std::move(other.ex_)}, owns_{std::exchange(other.owns_, 
false)} {}
 
  490      ex_.on_work_finished();
 
  500      ex_.on_work_finished();
 
  522template <
class Executor>
 
  528template <
class ExecutionContext>
 
  530    std::is_convertible<ExecutionContext &, execution_context &>::value,
 
  531    executor_work_guard<typename ExecutionContext::executor_type>>
 
  537std::enable_if_t<!is_executor<T>::value &&
 
  538                     !std::is_convertible<T &, execution_context &>::value,
 
  539                 executor_work_guard<associated_executor_t<T>>>
 
  544template <
class T, 
class U>
 
  547                                                        std::forward<U>(u)))) {
 
  560  void on_work_started() const noexcept {}
 
  563  template <
class Func, 
class ProtoAllocator>
 
  564  void dispatch(Func &&f, 
const ProtoAllocator &a) 
const;
 
  565  template <
class Func, 
class ProtoAllocator>
 
  566  void post(Func &&f, 
const ProtoAllocator &a) 
const;
 
  567  template <
class Func, 
class ProtoAllocator>
 
  568  void defer(Func &&f, 
const ProtoAllocator &a) 
const;
 
  599    std::lock_guard<std::mutex> lk(mtx_);
 
  605    std::lock_guard<std::mutex> lk(mtx_);
 
  609    if (thread_.joinable()) thread_.join();
 
  620      std::function<void()> f;
 
  622        std::unique_lock<std::mutex> lk(mtx_);
 
  624        cv_.wait(lk, [
this] { 
return stopped_ || !tasks_.empty(); });
 
  626        if (stopped_) 
return;
 
  628        f = std::move(tasks_.front());
 
  635  void post_(std::function<
void()> f) {
 
  636    std::lock_guard<std::mutex> lk(mtx_);
 
  637    if (stopped_) 
return;
 
  639    if (!thread_.joinable()) {
 
  640      thread_ = std::thread(&system_context::run_, 
this);
 
  643    tasks_.push(std::move(f));
 
  655  std::condition_variable 
cv_;
 
  657  bool stopped_{
false};
 
  663  return system_context::get_();
 
  666template <
class Func, 
class ProtoAllocator>
 
  668  system_context::get_().post_(std::forward<Func>(f));
 
  671template <
class Func, 
class ProtoAllocator>
 
  673  std::decay_t<Func>{std::forward<Func>(f)}();
 
  675template <
class Func, 
class ProtoAllocator>
 
  677  post(std::forward<Func>(f), a);
 
  689template <
class CompletionHandler>
 
  699    work_guard_.get_executor().dispatch(std::move(handler_), alloc);
 
  709template <
class CompletionHandler>
 
  717template <
class CompletionToken>
 
  731template <
class Executor, 
class CompletionToken>
 
  733    is_executor<Executor>::value,
 
  734    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  735dispatch(
const Executor &ex, CompletionToken &&token) {
 
  748template <
class ExecutionContext, 
class CompletionToken>
 
  750    std::is_convertible<ExecutionContext &, execution_context &>::value,
 
  751    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  754                       std::forward<CompletionToken>(token));
 
  762template <
class CompletionToken>
 
  763auto post(CompletionToken &&token) {
 
  776template <
class Executor, 
class CompletionToken>
 
  778    is_executor<Executor>::value,
 
  779    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  780post(
const Executor &ex, CompletionToken &&token) {
 
  793template <
class ExecutionContext, 
class CompletionToken>
 
  795    std::is_convertible<ExecutionContext &, execution_context &>::value,
 
  796    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  798  return net::post(ctx.get_executor(), std::forward<CompletionToken>(token));
 
  803template <
class CompletionToken>
 
  804auto defer(CompletionToken &&token) {
 
  817template <
class Executor, 
class CompletionToken>
 
  819    is_executor<Executor>::value,
 
  820    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  821defer(
const Executor &ex, CompletionToken &&token) {
 
  834template <
class ExecutionContext, 
class CompletionToken>
 
  836    std::is_convertible<ExecutionContext &, execution_context &>::value,
 
  837    typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
 
  839  return net::defer(ctx.get_executor(), std::forward<CompletionToken>(token));
 
  844template <
class Executor>
 
  851  explicit strand(Executor ex) : inner_ex_{ex} {}
 
  853  template <
class ProtoAllocator>
 
  854  strand(std::allocator_arg_t, 
const ProtoAllocator & , Executor ex)
 
  858  strand(
strand &&other) noexcept : inner_ex_{std::move(other.inner_ex_)} {}
 
  860  template <
class OtherExecutor>
 
  862      : inner_ex_{other.inner_ex_} {}
 
  863  template <
class OtherExecutor>
 
  865      : inner_ex_{std::move(other.inner_ex_)} {}
 
  874    inner_ex_ = std::move(other.inner_ex_);
 
  879  template <
class OtherExecutor>
 
  886  template <
class OtherExecutor>
 
  888    inner_ex_ = std::move(other.inner_ex_);
 
  908  template <
class Func, 
class ProtoAllocator>
 
  909  void dispatch(Func &&f, 
const ProtoAllocator & )
 const {
 
  910    if (running_in_this_thread()) {
 
  911      std::forward<Func>(f)();
 
  914  template <
class Func, 
class ProtoAllocator>
 
  915  void post(Func &&f, 
const ProtoAllocator &a) 
const;
 
  916  template <
class Func, 
class ProtoAllocator>
 
  917  void defer(Func &&f, 
const ProtoAllocator &a) 
const;
 
  922  bool running_{
false};
 
  926template <
class Executor>
 
  929template <
class Executor>
 
execution context for SQL.
Definition: sql_exec_context.h:43
The handler class is the interface for dynamically loadable storage engines.
Definition: handler.h:4412
Definition: executor.h:72
std::conditional_t< std::is_same< CompletionToken, completion_handler_type >::value, completion_handler_type &, completion_handler_type > handler_type
Definition: executor.h:81
async_completion & operator=(const async_completion &)=delete
handler_type completion_handler
Definition: executor.h:90
async_completion(CompletionToken &t)
Definition: executor.h:84
async_completion(const async_completion &)=delete
result_type result
Definition: executor.h:91
typename result_type::completion_handler_type completion_handler_type
Definition: executor.h:76
Definition: executor.h:55
void return_type
Definition: executor.h:58
CompletionToken completion_handler_type
Definition: executor.h:57
return_type get()
Definition: executor.h:64
async_result(const async_result &)=delete
async_result(completion_handler_type &)
Definition: executor.h:60
async_result & operator=(const async_result &)=delete
Definition: executor.h:308
virtual ~service()=default
virtual void shutdown() noexcept=0
service(const service &)=delete
service & operator=(const service &)=delete
service(execution_context &owner)
Definition: executor.h:310
execution_context & context_
Definition: executor.h:321
execution_context & context() noexcept
Definition: executor.h:314
Definition: executor.h:154
service * add_service(Args &&...args)
Definition: executor.h:235
std::list< ServicePtr > services_
Definition: executor.h:231
execution_context(const execution_context &)=delete
void shutdown() noexcept
Definition: executor.h:183
void destroy() noexcept
Definition: executor.h:193
static void service_deleter(service *svc)
Definition: executor.h:203
void notify_fork(fork_event e)
Definition: executor.h:170
execution_context()=default
execution_context & operator=(const execution_context &)=delete
virtual ~execution_context()
Definition: executor.h:164
static service_key_type service_key()
maps selected type to unique identifier.
Definition: executor.h:223
std::mutex services_mtx_
Definition: executor.h:228
std::type_index service_key_type
Definition: executor.h:217
std::unordered_map< service_key_type, service * > keys_
Definition: executor.h:232
Definition: executor.h:469
Executor executor_type
Definition: executor.h:471
executor_work_guard(const executor_work_guard &other) noexcept
Definition: executor.h:477
bool owns_
Definition: executor.h:507
executor_type get_executor() const noexcept
Definition: executor.h:494
executor_work_guard(executor_work_guard &&other) noexcept
Definition: executor.h:483
void reset() noexcept
Definition: executor.h:498
Executor ex_
Definition: executor.h:506
executor_work_guard(const executor_type &ex) noexcept
Definition: executor.h:473
executor_work_guard & operator=(const executor_work_guard &other)=delete
~executor_work_guard()
Definition: executor.h:488
bool owns_work() const noexcept
Definition: executor.h:496
function object for net::dispatch(), net::post(), net::defer().
Definition: executor.h:690
Dispatcher(CompletionHandler &handler)
Definition: executor.h:692
CompletionHandler handler_
Definition: executor.h:705
void operator()()
Definition: executor.h:696
Definition: executor.h:149
Definition: executor.h:845
void post(Func &&f, const ProtoAllocator &a) const
strand(Executor ex)
Definition: executor.h:851
strand operator=(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:880
strand(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:864
strand(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:861
Executor inner_ex_
Definition: executor.h:920
bool running_in_this_thread() const noexcept
Definition: executor.h:899
void defer(Func &&f, const ProtoAllocator &a) const
strand operator=(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:887
strand operator=(strand &&other) noexcept
Definition: executor.h:873
void on_work_started() const noexcept
Definition: executor.h:905
void on_work_finished() const noexcept
Definition: executor.h:906
void dispatch(Func &&f, const ProtoAllocator &) const
Definition: executor.h:909
execution_context & context() const noexcept
Definition: executor.h:903
Executor inner_executor_type
Definition: executor.h:847
strand(strand &&other) noexcept
Definition: executor.h:858
strand operator=(const strand &other) noexcept
Definition: executor.h:867
std::queue< std::function< void()> > jobs_
Definition: executor.h:923
strand(std::allocator_arg_t, const ProtoAllocator &, Executor ex)
Definition: executor.h:854
strand(const strand &other) noexcept
Definition: executor.h:857
inner_executor_type get_inner_executor() const noexcept
Definition: executor.h:897
Definition: executor.h:584
static system_context & get_() noexcept
Definition: executor.h:648
void join()
Definition: executor.h:608
std::condition_variable cv_
Definition: executor.h:655
std::queue< std::function< void()> > tasks_
Definition: executor.h:656
void run_()
Definition: executor.h:618
std::thread thread_
Definition: executor.h:653
system_context(const system_context &)=delete
bool stopped() const noexcept
Definition: executor.h:604
~system_context() override
Definition: executor.h:591
void post_(std::function< void()> f)
Definition: executor.h:635
std::mutex mtx_
Definition: executor.h:654
executor_type get_executor() noexcept
Definition: executor.h:596
system_context & operator=(const system_context &)=delete
void stop()
Definition: executor.h:598
system_context(__tag)
Definition: executor.h:614
Definition: executor.h:554
system_executor()=default
void on_work_finished() const noexcept
Definition: executor.h:561
static bool contains(const std::vector< std::string > &container, const std::string &file)
Definition: config_files.cc:41
Header for compiler-dependent features.
#define MY_COMPILER_DIAGNOSTIC_PUSH()
save the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:296
#define MY_COMPILER_DIAGNOSTIC_POP()
restore the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:297
static QUEUE queue
Definition: myisampack.cc:207
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:323
int key_type
Definition: http_request.h:51
std::string_view Key
The key type for the hash structure in HashJoinRowBuffer.
Definition: hash_join_buffer.h:102
Definition: authentication.cc:36
std::string join(Container cont, const std::string &delim)
join elements of an container into a string separated by a delimiter.
Definition: string.h:151
auto executor_requirements(U *__x=nullptr, const U *__const_x=nullptr, void(*f)()=nullptr, const std::allocator< int > &a={}) -> std::enable_if_t< std::conjunction< std::is_copy_constructible< T >, std::is_same< decltype(*__const_x== *__const_x), bool >, std::is_same< decltype(*__const_x != *__const_x), bool >, std::is_void< decltype(__x->on_work_started())>, std::is_void< decltype(__x->on_work_finished())>, std::is_void< decltype(__x->dispatch(std::move(f), a))>, std::is_void< decltype(__x->post(std::move(f), a))>, std::is_void< decltype(__x->defer(std::move(f), a))> >::value, std::void_t< decltype(__x->context()), void()> >
Dispatcher< CompletionHandler > make_dispatcher(CompletionHandler &handler)
Definition: executor.h:710
constexpr bool uses_executor_v
Definition: executor.h:397
bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:300
auto defer(CompletionToken &&token)
Definition: executor.h:804
std::enable_if_t< is_executor< Executor >::value, executor_work_guard< Executor > > make_work_guard(const Executor &ex)
Definition: executor.h:524
typename associated_executor< T, Executor >::type associated_executor_t
Definition: executor.h:423
Service & make_service(execution_context &ctx, Args &&...args)
Definition: executor.h:276
associated_allocator_t< T > get_associated_allocator(const T &t) noexcept
Definition: executor.h:138
auto dispatch(CompletionToken &&token)
Definition: executor.h:718
auto post(CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:763
constexpr executor_arg_t executor_arg
Definition: executor.h:378
Service::key_type & use_service(execution_context &ctx)
Definition: executor.h:254
typename associated_allocator< T, ProtoAllocator >::type associated_allocator_t
Definition: executor.h:104
constexpr bool is_executor_v
Definition: executor.h:373
bool operator!=(const strand< Executor > &a, const strand< Executor > &b)
Definition: executor.h:930
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > post(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:797
bool operator==(const strand< Executor > &a, const strand< Executor > &b)
fork_event
Definition: executor.h:48
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > defer(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:838
associated_executor_t< T, typename ExecutorContext::executor_type > get_associated_executor(const T &t, const ExecutorContext &ctx) noexcept
Definition: executor.h:458
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > dispatch(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:752
auto make_work_guard(const T &t, U &&u) -> decltype(make_work_guard(get_associated_executor(t, std::forward< U >(u))))
Definition: executor.h:545
associated_allocator_t< T > get_associated_allocator(const T &t, const ProtoAllocator &a) noexcept
Definition: executor.h:143
void get(PSI_field *, PSI_longlong *) noexcept
Definition: pfs_plugin_column_bigint_v1_all_empty.cc:32
Definition: gcs_xcom_synode.h:64
std::conditional_t< !std::is_array< T >::value, std::unique_ptr< T, detail::Deleter< T > >, std::conditional_t< detail::is_unbounded_array_v< T >, std::unique_ptr< T, detail::Array_deleter< std::remove_extent_t< T > > >, void > > unique_ptr
The following is a common type that is returned by all the ut::make_unique (non-aligned) specializati...
Definition: ut0new.h:2439
MY_COMPILER_CLANG_DIAGNOSTIC_IGNORE("-Winconsistent-missing-destructor-override") extern "C"
Definition: protobuf_plugin.cc:32
required string key
Definition: replication_asynchronous_connection_failover.proto:60
required string type
Definition: replication_group_member_actions.proto:34
static type __get(const T &t, const ProtoAllocator &) noexcept
Definition: executor.h:122
typename T::allocator_type type
Definition: executor.h:120
Definition: executor.h:109
static type __get(const T &, const ProtoAllocator &a) noexcept
Definition: executor.h:112
ProtoAllocator type
Definition: executor.h:110
Definition: executor.h:128
static auto get(const T &t, const ProtoAllocator &a=ProtoAllocator()) noexcept
Definition: executor.h:129
static type __get(const T &t, const Executor &) noexcept
Definition: executor.h:414
typename T::executor_type type
Definition: executor.h:412
Definition: executor.h:401
static type __get(const T &, const Executor &ex) noexcept
Definition: executor.h:404
Executor type
Definition: executor.h:402
Definition: executor.h:426
static auto get(const T &t, const Executor &ex=Executor()) noexcept
Definition: executor.h:427
Definition: executor.h:207
ServicePtr(Service *svc)
Definition: executor.h:209
Definition: executor.h:376
Definition: executor.h:329
Definition: executor.h:385
Definition: executor.h:370
Definition: executor.h:613
Definition: executor.h:394