26#ifndef MYSQLROUTER_CONNECTION_POOL_INCLUDED
27#define MYSQLROUTER_CONNECTION_POOL_INCLUDED
88 std::lock_guard lk(
mtx());
100 std::lock_guard lk(
mtx_);
105 (void)
conn_.cancel();
118 std::lock_guard lk(
mtx_);
132 std::lock_guard lk(shared_this->mtx_);
134 if (shared_this->pool_remover_) {
135 pool_remover = std::exchange(shared_this->pool_remover_,
nullptr);
157 auto &tmr = shared_this->idle_timer_;
163 tmr.async_wait([shared_this](std::error_code ec) {
169 std::lock_guard lk(shared_this->mtx());
174 (void)shared_this->conn_.cancel();
193 shared_this->conn_.async_recv(
194 [shared_this](std::error_code ec,
size_t ) {
199 shared_this->idle_timer_.cancel();
201 std::lock_guard lk(shared_this->mtx());
203 (void)shared_this->conn_.close();
212 shared_this->conn_.channel().recv_buffer().clear();
250 : conn_(
std::move(conn)) {}
254 void async_send_quit();
256 void await_quit_response(std::error_code ec,
size_t transferred);
262 before_close_ = std::move(
cb);
274 : max_pooled_connections_(max_pooled_connections),
275 idle_timeout_(idle_timeout) {}
297 std::optional<ServerSideConnection> add_if_not_full(
305 std::optional<ServerSideConnection>
pop_if(
306 const std::string &ep,
307 std::predicate<const ServerSideConnection &>
auto pred) {
309 [
this, ep, pred](
auto &pool) -> std::optional<ServerSideConnection> {
315 [pred](
const auto &kv) { return pred(kv.second->connection()); });
316 if (kv_it ==
key_range.second)
return std::nullopt;
320 auto server_conn = kv_it->second->release();
335 [[nodiscard]] uint32_t current_pooled_connections()
const;
338 return max_pooled_connections_;
342 return idle_timeout_;
353 void stash(ServerSideConnection conn, ConnectionIdentifier from,
357 void discard_all_stashed(ConnectionIdentifier from);
366 : pooled_conn(
std::move(pc)), conn_id(ci), after(tp) {}
368 std::shared_ptr<PooledConnection<ServerSideConnection>>
371 std::chrono::steady_clock::time_point
after;
374 std::optional<ServerSideConnection> unstash_if(
375 const std::string &ep,
377 bool ignore_sharing_delay =
false);
379 std::optional<ServerSideConnection> unstash_mine(
385 [[nodiscard]]
size_t current_stashed_connections()
const;
394 std::string, std::shared_ptr<PooledConnection<ServerSideConnection>>>;
395 using stash_type = std::unordered_multimap<std::string, Stashed>;
397 void erase(pool_type::iterator it);
398 void remove_pooled_connection(
401 void remove_stashed_connection(
416 void erase_from_stash(stash_type::iterator it);
Definition: connection_pool.h:247
ConnectionPool::ServerSideConnection & connection()
Definition: connection_pool.h:258
ConnectionPool::ServerSideConnection conn_
Definition: connection_pool.h:266
std::function< void(const ConnectionPool::ServerSideConnection &)> before_close_
Definition: connection_pool.h:269
ConnectionCloser(ConnectionPool::ServerSideConnection conn)
Definition: connection_pool.h:249
void before_close(std::function< void(const ConnectionPool::ServerSideConnection &)> cb)
Definition: connection_pool.h:260
connection pool of mysql connections.
Definition: connection_pool.h:238
ConnectionPool & operator=(const ConnectionPool &)=delete
~ConnectionPool()=default
uint32_t max_pooled_connections() const
Definition: connection_pool.h:337
std::chrono::milliseconds idle_timeout() const
Definition: connection_pool.h:341
const uint32_t max_pooled_connections_
Definition: connection_pool.h:404
void * ConnectionIdentifier
Definition: connection_pool.h:245
ConnectionPool(uint32_t max_pooled_connections, std::chrono::milliseconds idle_timeout)
Definition: connection_pool.h:272
uint64_t reused_connections() const
total number of reused connections.
Definition: connection_pool.h:390
const std::chrono::milliseconds idle_timeout_
Definition: connection_pool.h:405
std::unordered_multimap< std::string, std::shared_ptr< PooledConnection< ServerSideConnection > > > pool_type
Definition: connection_pool.h:394
std::optional< ServerSideConnection > pop_if(const std::string &ep, std::predicate< const ServerSideConnection & > auto pred)
get a connection from the pool that matches a predicate.
Definition: connection_pool.h:305
ConnectionPool(ConnectionPool &&)=delete
ConnectionPool & operator=(ConnectionPool &&)=delete
std::unordered_multimap< std::string, Stashed > stash_type
Definition: connection_pool.h:395
ConnectionPool(const ConnectionPool &)=delete
Monitor pattern.
Definition: monitor.h:39
pooled connection.
Definition: connection_pool.h:67
connection_type & connection()
access to conn_.
Definition: connection_pool.h:80
T connection_type
Definition: connection_pool.h:70
connection_type release()
Definition: connection_pool.h:84
net::steady_timer idle_timer_
Definition: connection_pool.h:221
PooledConnection(connection_type conn)
Definition: connection_pool.h:72
std::function< void(std::shared_ptr< PooledConnection< T > >)> pool_remover_
Definition: connection_pool.h:147
static void remove_from_pool(std::shared_ptr< PooledConnection< T > > shared_this)
calls remove-callback.
Definition: connection_pool.h:126
std::mutex mtx_
Definition: connection_pool.h:150
mysql_harness::Ssl Ssl
Definition: connection_pool.h:69
void reset()
prepares for reusing the connection.
Definition: connection_pool.h:98
void pool_remover(std::function< void(std::shared_ptr< PooledConnection< T > >)> remover)
set a remove-callback.
Definition: connection_pool.h:116
std::mutex & mtx()
Definition: connection_pool.h:145
static void async_idle(std::shared_ptr< PooledConnection< T > > shared_this, std::chrono::milliseconds idle_timeout)
wait for idle timeout.
Definition: connection_pool.h:155
static void async_recv_message(std::shared_ptr< PooledConnection< T > > shared_this)
wait for server message and shutdown.
Definition: connection_pool.h:186
const connection_type & connection() const
Definition: connection_pool.h:82
connection_type conn_
Definition: connection_pool.h:219
size_t cancel()
Definition: timer.h:101
static constexpr uint32_t kDefaultMaxIdleServerConnectionsBootstrap
Definition: connection_pool.h:48
#define CONNECTION_POOL_EXPORT
Definition: connection_pool_export.h:15
#define T
Definition: jit_executor_value.cc:373
if(!(yy_init))
Definition: lexyy.cc:1144
static bool cb(unsigned long long new_value)
Definition: option_usage.cc:45
Container::const_iterator find_if(const Container &c, Find_if &&find_if)
Definition: generic.h:54
std::chrono::milliseconds milliseconds
Definition: authorize_manager.cc:67
std::unique_ptr< SSL, mysql_harness::impl::Deleter_SSL > Ssl
Definition: tls_types.h:48
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
static mysql_service_status_t add(reference_caching_channel channel, const char *implementation_name) noexcept
Definition: component.cc:127
Definition: gcs_xcom_synode.h:64
connection on the stash.
Definition: connection_pool.h:362
ConnectionIdentifier conn_id
opaque connection identifier
Definition: connection_pool.h:370
std::shared_ptr< PooledConnection< ServerSideConnection > > pooled_conn
pooled connection.
Definition: connection_pool.h:369
std::chrono::steady_clock::time_point after
stealable after ...
Definition: connection_pool.h:371
Stashed(std::shared_ptr< PooledConnection< ServerSideConnection > > pc, ConnectionIdentifier ci, std::chrono::steady_clock::time_point tp)
Definition: connection_pool.h:364
Definition: my_base.h:1201