25#ifndef MYSQL_HARNESS_NET_TS_EXECUTOR_H_
26#define MYSQL_HARNESS_NET_TS_EXECUTOR_H_
29#include <condition_variable>
39#include <unordered_map>
50template <
class CompletionToken,
class Signature>
53template <
class CompletionToken,
class Signature>
67template <
class CompletionToken,
class Signature>
68class async_completion;
70template <
class CompletionToken,
class Signature>
79 std::is_same<CompletionToken, completion_handler_type>::value,
98template <class T, class ProtoAllocator =
std::allocator<
void>>
101template <class T, class ProtoAllocator =
std::allocator<
void>>
107template <class T, class ProtoAllocator, typename =
std::void_t<>>
111 static type __get(
const T & ,
const ProtoAllocator &a)
noexcept {
116template <
class T,
class ProtoAllocator>
118 std::void_t<typename T::allocator_type>> {
119 using type =
typename T::allocator_type;
121 static type __get(
const T &t,
const ProtoAllocator & )
noexcept {
122 return t.get_allocator();
126template <
class T,
class ProtoAllocator>
128 static auto get(
const T &t,
129 const ProtoAllocator &a = ProtoAllocator()) noexcept {
131 return Impl::__get(t, a);
141template <
class T,
class ProtoAllocator>
143 const T &t,
const ProtoAllocator &a)
noexcept {
150 using std::logic_error::logic_error;
173 [e](
auto &svc) { svc.ptr_->notify_fork(e); });
176 [e](
auto &svc) { svc.ptr_->notify_fork(e); });
186 svc.ptr_->shutdown();
194 while (!services_.empty()) services_.pop_back();
201 template <
class Service>
203 delete static_cast<Service *
>(svc);
207 template <
class Service>
208 ServicePtr(Service *svc) : ptr_(svc, &service_deleter<Service>) {}
223 return std::type_index(
typeid(
Key));
231 std::unordered_map<service_key_type, service *>
keys_;
233 template <
typename Service,
class... Args>
236 ServicePtr{
new Service{*
this, std::forward<Args>(args)...}});
238 return services_.back().ptr_.get();
241 template <
class Service>
244 template <
class Service>
247 template <
class Service,
class... Args>
252template <
class Service>
256 static_assert(std::is_base_of<execution_context::service, Key>::value,
257 "Key must derive from execution_context::service");
258 static_assert(std::is_base_of<Key, Service>::value,
259 "Service must derive from Key");
261 auto key = execution_context::service_key<Key>();
266 if (svc ==
nullptr) {
271 return static_cast<Key &
>(*svc);
274template <
class Service,
class... Args>
278 static_assert(std::is_base_of<execution_context::service, Key>::value,
279 "Key must derive from execution_context::service");
280 static_assert(std::is_base_of<Key, Service>::value,
281 "Service must derive from Key");
283 auto key = execution_context::service_key<Key>();
287 if (svc ==
nullptr) {
289 svc = ctx.
add_service<Service>(std::forward(args)...);
292 "can't make_service(), Service already exists");
295 return static_cast<Service &
>(*svc);
298template <
class Service>
302 std::lock_guard<std::mutex> lk(ctx.services_mtx_);
303 return ctx.keys_.count(execution_context::service_key<Key>()) > 0;
327template <
class T,
class = std::
void_t<>>
345template <
class T,
typename U = std::remove_const_t<T>>
347 void (*f)() =
nullptr,
348 const std::allocator<int> &a = {})
351 std::is_copy_constructible<T>,
353 std::is_same<
decltype(*__const_x == *__const_x),
bool>,
354 std::is_same<
decltype(*__const_x != *__const_x),
bool>,
355 std::is_void<
decltype(__x->on_work_started())>,
356 std::is_void<
decltype(__x->on_work_finished())>,
357 std::is_void<
decltype(__x->dispatch(std::move(f), a))>,
358 std::is_void<
decltype(__x->post(std::move(f), a))>,
359 std::is_void<
decltype(__x->defer(std::move(f), a))>>::value,
362 std::void_t<
decltype(__x->context()),
void()>>;
383template <
class T,
class Executor,
typename = std::
void_t<>>
386template <
class T,
class Executor>
388 : std::is_convertible<Executor, typename T::executor_type> {};
392template <
class T,
class Executor>
395template <
class T,
class Executor>
399template <
class T,
class Executor,
typename = std::
void_t<>>
403 static type __get(
const T & ,
const Executor &ex)
noexcept {
408template <
class T,
class Executor>
410 std::void_t<typename T::executor_type>> {
411 using type =
typename T::executor_type;
413 static type __get(
const T &t,
const Executor & )
noexcept {
414 return t.get_executor();
418template <
class T,
class Executor = system_executor>
419struct associated_executor;
421template <
class T,
class Executor = system_executor>
424template <
class T,
class Executor>
426 static auto get(
const T &t,
const Executor &ex = Executor()) noexcept {
428 return Impl::__get(t, ex);
436template <
class T,
class Executor>
438 const T &t,
const Executor &ex)
noexcept;
440template <
class T,
class ExecutorContext>
441associated_executor_t<T, typename ExecutorContext::executor_type>
449template <
class T,
class Executor>
451 const T &t,
const Executor &ex)
noexcept {
455template <
class T,
class ExecutorContext>
456associated_executor_t<T, typename ExecutorContext::executor_type>
467template <
class Executor>
473 : ex_{ex}, owns_{
true} {
474 ex_.on_work_started();
477 : ex_{other.ex_}, owns_{other.owns_} {
479 ex_.on_work_started();
483 : ex_{std::move(other.ex_)}, owns_{std::exchange(other.owns_,
false)} {}
489 ex_.on_work_finished();
499 ex_.on_work_finished();
521template <
class Executor>
527template <
class ExecutionContext>
529 std::is_convertible<ExecutionContext &, execution_context &>::value,
530 executor_work_guard<typename ExecutionContext::executor_type>>
536std::enable_if_t<!is_executor<T>::value &&
537 !std::is_convertible<T &, execution_context &>::value,
538 executor_work_guard<associated_executor_t<T>>>
543template <
class T,
class U>
546 std::forward<U>(u)))) {
559 void on_work_started() const noexcept {}
562 template <
class Func,
class ProtoAllocator>
563 void dispatch(Func &&f,
const ProtoAllocator &a)
const;
564 template <
class Func,
class ProtoAllocator>
565 void post(Func &&f,
const ProtoAllocator &a)
const;
566 template <
class Func,
class ProtoAllocator>
567 void defer(Func &&f,
const ProtoAllocator &a)
const;
598 std::lock_guard<std::mutex> lk(mtx_);
604 std::lock_guard<std::mutex> lk(mtx_);
608 if (thread_.joinable()) thread_.join();
619 std::function<void()> f;
621 std::unique_lock<std::mutex> lk(mtx_);
623 cv_.wait(lk, [
this] {
return stopped_ || !tasks_.empty(); });
625 if (stopped_)
return;
627 f = std::move(tasks_.front());
634 void post_(std::function<
void()> f) {
635 std::lock_guard<std::mutex> lk(mtx_);
636 if (stopped_)
return;
638 if (!thread_.joinable()) {
639 thread_ = std::thread(&system_context::run_,
this);
642 tasks_.push(std::move(f));
654 std::condition_variable
cv_;
656 bool stopped_{
false};
662 return system_context::get_();
665template <
class Func,
class ProtoAllocator>
667 system_context::get_().post_(std::forward<Func>(f));
670template <
class Func,
class ProtoAllocator>
672 std::decay_t<Func>{std::forward<Func>(f)}();
674template <
class Func,
class ProtoAllocator>
676 post(std::forward<Func>(f), a);
688template <
class CompletionHandler>
698 work_guard_.get_executor().dispatch(std::move(handler_), alloc);
708template <
class CompletionHandler>
716template <
class CompletionToken>
730template <
class Executor,
class CompletionToken>
732 is_executor<Executor>::value,
733 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
734dispatch(
const Executor &ex, CompletionToken &&token) {
747template <
class ExecutionContext,
class CompletionToken>
749 std::is_convertible<ExecutionContext &, execution_context &>::value,
750 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
753 std::forward<CompletionToken>(token));
761template <
class CompletionToken>
762auto post(CompletionToken &&token) {
775template <
class Executor,
class CompletionToken>
777 is_executor<Executor>::value,
778 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
779post(
const Executor &ex, CompletionToken &&token) {
792template <
class ExecutionContext,
class CompletionToken>
794 std::is_convertible<ExecutionContext &, execution_context &>::value,
795 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
797 return net::post(ctx.get_executor(), std::forward<CompletionToken>(token));
802template <
class CompletionToken>
803auto defer(CompletionToken &&token) {
816template <
class Executor,
class CompletionToken>
818 is_executor<Executor>::value,
819 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
820defer(
const Executor &ex, CompletionToken &&token) {
833template <
class ExecutionContext,
class CompletionToken>
835 std::is_convertible<ExecutionContext &, execution_context &>::value,
836 typename async_result<std::decay_t<CompletionToken>, void()>::return_type>
838 return net::defer(ctx.get_executor(), std::forward<CompletionToken>(token));
843template <
class Executor>
850 explicit strand(Executor ex) : inner_ex_{ex} {}
852 template <
class ProtoAllocator>
853 strand(std::allocator_arg_t,
const ProtoAllocator & , Executor ex)
857 strand(
strand &&other) noexcept : inner_ex_{std::move(other.inner_ex_)} {}
859 template <
class OtherExecutor>
861 : inner_ex_{other.inner_ex_} {}
862 template <
class OtherExecutor>
864 : inner_ex_{std::move(other.inner_ex_)} {}
873 inner_ex_ = std::move(other.inner_ex_);
878 template <
class OtherExecutor>
885 template <
class OtherExecutor>
887 inner_ex_ = std::move(other.inner_ex_);
907 template <
class Func,
class ProtoAllocator>
908 void dispatch(Func &&f,
const ProtoAllocator & )
const {
909 if (running_in_this_thread()) {
910 std::forward<Func>(f)();
913 template <
class Func,
class ProtoAllocator>
914 void post(Func &&f,
const ProtoAllocator &a)
const;
915 template <
class Func,
class ProtoAllocator>
916 void defer(Func &&f,
const ProtoAllocator &a)
const;
921 bool running_{
false};
925template <
class Executor>
928template <
class Executor>
execution context for SQL.
Definition: sql_exec_context.h:42
The handler class is the interface for dynamically loadable storage engines.
Definition: handler.h:4365
Definition: executor.h:71
std::conditional_t< std::is_same< CompletionToken, completion_handler_type >::value, completion_handler_type &, completion_handler_type > handler_type
Definition: executor.h:80
async_completion & operator=(const async_completion &)=delete
handler_type completion_handler
Definition: executor.h:89
async_completion(CompletionToken &t)
Definition: executor.h:83
async_completion(const async_completion &)=delete
result_type result
Definition: executor.h:90
typename result_type::completion_handler_type completion_handler_type
Definition: executor.h:75
Definition: executor.h:54
void return_type
Definition: executor.h:57
CompletionToken completion_handler_type
Definition: executor.h:56
return_type get()
Definition: executor.h:63
async_result(const async_result &)=delete
async_result(completion_handler_type &)
Definition: executor.h:59
async_result & operator=(const async_result &)=delete
Definition: executor.h:307
virtual ~service()=default
virtual void shutdown() noexcept=0
service(const service &)=delete
service & operator=(const service &)=delete
service(execution_context &owner)
Definition: executor.h:309
execution_context & context_
Definition: executor.h:320
execution_context & context() noexcept
Definition: executor.h:313
Definition: executor.h:153
std::list< ServicePtr > services_
Definition: executor.h:230
execution_context(const execution_context &)=delete
void shutdown() noexcept
Definition: executor.h:182
void destroy() noexcept
Definition: executor.h:192
static void service_deleter(service *svc)
Definition: executor.h:202
void notify_fork(fork_event e)
Definition: executor.h:169
service * add_service(Args &&... args)
Definition: executor.h:234
execution_context()=default
execution_context & operator=(const execution_context &)=delete
virtual ~execution_context()
Definition: executor.h:163
static service_key_type service_key()
maps selected type to unique identifier.
Definition: executor.h:222
std::mutex services_mtx_
Definition: executor.h:227
std::type_index service_key_type
Definition: executor.h:216
std::unordered_map< service_key_type, service * > keys_
Definition: executor.h:231
Definition: executor.h:468
Executor executor_type
Definition: executor.h:470
executor_work_guard(const executor_work_guard &other) noexcept
Definition: executor.h:476
bool owns_
Definition: executor.h:506
executor_type get_executor() const noexcept
Definition: executor.h:493
executor_work_guard(executor_work_guard &&other) noexcept
Definition: executor.h:482
void reset() noexcept
Definition: executor.h:497
Executor ex_
Definition: executor.h:505
executor_work_guard(const executor_type &ex) noexcept
Definition: executor.h:472
executor_work_guard & operator=(const executor_work_guard &other)=delete
~executor_work_guard()
Definition: executor.h:487
bool owns_work() const noexcept
Definition: executor.h:495
function object for net::dispatch(), net::post(), net::defer().
Definition: executor.h:689
Dispatcher(CompletionHandler &handler)
Definition: executor.h:691
CompletionHandler handler_
Definition: executor.h:704
void operator()()
Definition: executor.h:695
Definition: executor.h:148
Definition: executor.h:844
void post(Func &&f, const ProtoAllocator &a) const
strand(Executor ex)
Definition: executor.h:850
strand operator=(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:879
strand(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:863
strand(const strand< OtherExecutor > &other) noexcept
Definition: executor.h:860
Executor inner_ex_
Definition: executor.h:919
bool running_in_this_thread() const noexcept
Definition: executor.h:898
void defer(Func &&f, const ProtoAllocator &a) const
strand operator=(strand< OtherExecutor > &&other) noexcept
Definition: executor.h:886
strand operator=(strand &&other) noexcept
Definition: executor.h:872
void on_work_started() const noexcept
Definition: executor.h:904
void on_work_finished() const noexcept
Definition: executor.h:905
void dispatch(Func &&f, const ProtoAllocator &) const
Definition: executor.h:908
execution_context & context() const noexcept
Definition: executor.h:902
Executor inner_executor_type
Definition: executor.h:846
strand(strand &&other) noexcept
Definition: executor.h:857
strand operator=(const strand &other) noexcept
Definition: executor.h:866
std::queue< std::function< void()> > jobs_
Definition: executor.h:922
strand(std::allocator_arg_t, const ProtoAllocator &, Executor ex)
Definition: executor.h:853
strand(const strand &other) noexcept
Definition: executor.h:856
inner_executor_type get_inner_executor() const noexcept
Definition: executor.h:896
Definition: executor.h:583
static system_context & get_() noexcept
Definition: executor.h:647
void join()
Definition: executor.h:607
std::condition_variable cv_
Definition: executor.h:654
std::queue< std::function< void()> > tasks_
Definition: executor.h:655
void run_()
Definition: executor.h:617
std::thread thread_
Definition: executor.h:652
system_context(const system_context &)=delete
bool stopped() const noexcept
Definition: executor.h:603
~system_context() override
Definition: executor.h:590
void post_(std::function< void()> f)
Definition: executor.h:634
std::mutex mtx_
Definition: executor.h:653
executor_type get_executor() noexcept
Definition: executor.h:595
system_context & operator=(const system_context &)=delete
void stop()
Definition: executor.h:597
system_context(__tag)
Definition: executor.h:613
Definition: executor.h:553
system_executor()=default
void on_work_finished() const noexcept
Definition: executor.h:560
static bool contains(const std::vector< std::string > &container, const std::string &file)
Definition: config_files.cc:40
Header for compiler-dependent features.
#define MY_COMPILER_DIAGNOSTIC_PUSH()
save the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:295
#define MY_COMPILER_DIAGNOSTIC_POP()
restore the compiler's diagnostic (enabled warnings, errors, ...) state
Definition: my_compiler.h:296
static QUEUE queue
Definition: myisampack.cc:206
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:322
int key_type
Definition: http_request.h:49
std::string_view Key
The key type for the hash structure in HashJoinRowBuffer.
Definition: hash_join_buffer.h:107
Definition: authentication.cc:35
std::string join(Container cont, const std::string &delim)
join elements of an container into a string separated by a delimiter.
Definition: string.h:150
auto executor_requirements(U *__x=nullptr, const U *__const_x=nullptr, void(*f)()=nullptr, const std::allocator< int > &a={}) -> std::enable_if_t< std::conjunction< std::is_copy_constructible< T >, std::is_same< decltype(*__const_x== *__const_x), bool >, std::is_same< decltype(*__const_x != *__const_x), bool >, std::is_void< decltype(__x->on_work_started())>, std::is_void< decltype(__x->on_work_finished())>, std::is_void< decltype(__x->dispatch(std::move(f), a))>, std::is_void< decltype(__x->post(std::move(f), a))>, std::is_void< decltype(__x->defer(std::move(f), a))> >::value, std::void_t< decltype(__x->context()), void()> >
Dispatcher< CompletionHandler > make_dispatcher(CompletionHandler &handler)
Definition: executor.h:709
constexpr bool uses_executor_v
Definition: executor.h:396
bool has_service(const execution_context &ctx) noexcept
Definition: executor.h:299
Service & make_service(execution_context &ctx, Args &&... args)
Definition: executor.h:275
auto defer(CompletionToken &&token)
Definition: executor.h:803
std::enable_if_t< is_executor< Executor >::value, executor_work_guard< Executor > > make_work_guard(const Executor &ex)
Definition: executor.h:523
typename associated_executor< T, Executor >::type associated_executor_t
Definition: executor.h:422
associated_allocator_t< T > get_associated_allocator(const T &t) noexcept
Definition: executor.h:137
auto dispatch(CompletionToken &&token)
Definition: executor.h:717
auto post(CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:762
constexpr executor_arg_t executor_arg
Definition: executor.h:377
Service::key_type & use_service(execution_context &ctx)
Definition: executor.h:253
typename associated_allocator< T, ProtoAllocator >::type associated_allocator_t
Definition: executor.h:103
constexpr bool is_executor_v
Definition: executor.h:372
bool operator!=(const strand< Executor > &a, const strand< Executor > &b)
Definition: executor.h:929
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > post(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:796
bool operator==(const strand< Executor > &a, const strand< Executor > &b)
fork_event
Definition: executor.h:47
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > defer(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:837
associated_executor_t< T, typename ExecutorContext::executor_type > get_associated_executor(const T &t, const ExecutorContext &ctx) noexcept
Definition: executor.h:457
std::enable_if_t< std::is_convertible< ExecutionContext &, execution_context & >::value, typename async_result< std::decay_t< CompletionToken >, void()>::return_type > dispatch(ExecutionContext &ctx, CompletionToken &&token)
queue a function call for later execution.
Definition: executor.h:751
auto make_work_guard(const T &t, U &&u) -> decltype(make_work_guard(get_associated_executor(t, std::forward< U >(u))))
Definition: executor.h:544
associated_allocator_t< T > get_associated_allocator(const T &t, const ProtoAllocator &a) noexcept
Definition: executor.h:142
static mysql_service_status_t get(reference_caching_cache cache, unsigned service_name_index, const my_h_service **refs) noexcept
Definition: component.cc:113
Definition: varlen_sort.h:183
std::conditional_t< !std::is_array< T >::value, std::unique_ptr< T, detail::Deleter< T > >, std::conditional_t< detail::is_unbounded_array_v< T >, std::unique_ptr< T, detail::Array_deleter< std::remove_extent_t< T > > >, void > > unique_ptr
The following is a common type that is returned by all the ut::make_unique (non-aligned) specializati...
Definition: ut0new.h:2436
MY_COMPILER_CLANG_DIAGNOSTIC_IGNORE("-Winconsistent-missing-destructor-override") extern "C"
Definition: protobuf_plugin.cc:31
required string key
Definition: replication_asynchronous_connection_failover.proto:59
required string type
Definition: replication_group_member_actions.proto:33
static type __get(const T &t, const ProtoAllocator &) noexcept
Definition: executor.h:121
typename T::allocator_type type
Definition: executor.h:119
Definition: executor.h:108
static type __get(const T &, const ProtoAllocator &a) noexcept
Definition: executor.h:111
ProtoAllocator type
Definition: executor.h:109
Definition: executor.h:127
static auto get(const T &t, const ProtoAllocator &a=ProtoAllocator()) noexcept
Definition: executor.h:128
static type __get(const T &t, const Executor &) noexcept
Definition: executor.h:413
typename T::executor_type type
Definition: executor.h:411
Definition: executor.h:400
static type __get(const T &, const Executor &ex) noexcept
Definition: executor.h:403
Executor type
Definition: executor.h:401
Definition: executor.h:425
static auto get(const T &t, const Executor &ex=Executor()) noexcept
Definition: executor.h:426
Definition: executor.h:206
ServicePtr(Service *svc)
Definition: executor.h:208
Definition: executor.h:375
Definition: executor.h:328
Definition: executor.h:384
Definition: executor.h:369
Definition: executor.h:612
Definition: executor.h:393