MySQL 9.4.0
Source Code Documentation
connection_pool.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQLROUTER_CONNECTION_POOL_INCLUDED
27#define MYSQLROUTER_CONNECTION_POOL_INCLUDED
28
30
31#include <chrono>
32#include <cstdint> // uint32_t
33
34#include <algorithm> // find_if
35#include <concepts>
36#include <list>
37#include <memory>
38#include <mutex>
39#include <optional>
40
43#include "mysql/harness/tls_types.h" // Ssl
46
47// default max idle server connections set on bootstrap
48static constexpr uint32_t kDefaultMaxIdleServerConnectionsBootstrap{64};
49
50// A pooled connection:
51//
52// - is owned by the ConnectionPool's "pool" or "stash"
53// as a shared_ptr<PooledConnection>
54// - owns Connection
55// - async-waits for recv() or timeout() on the Connection's socket.
56//
57// When a connection is taken from the stash/pool,
58//
59// - the async-waits are canceled
60// - the connection is released from the PooledConnection and
61// - the PooledConnection is erased from the ConnectionPool
62
63/**
64 * pooled connection.
65 */
66template <class T>
68 public:
71
73 : conn_(std::move(conn)), idle_timer_(conn_.connection()->io_ctx()) {}
74
75 /**
76 * access to conn_.
77 *
78 * allows others to move the connection structs out.
79 */
81
82 const connection_type &connection() const { return conn_; }
83
85 (void)idle_timer_.cancel();
86 (void)conn_.cancel();
87
88 std::lock_guard lk(mtx());
89
90 pool_remover_ = nullptr;
91
92 return std::move(connection());
93 }
94
95 /**
96 * prepares for reusing the connection.
97 */
98 void reset() {
99 {
100 std::lock_guard lk(mtx_);
101 pool_remover_ = nullptr;
102 }
103
104 (void)idle_timer_.cancel();
105 (void)conn_.cancel();
106 }
107
108 friend class ConnectionPool;
109
110 /**
111 * set a remove-callback.
112 *
113 * used when the pooled connection wants to remove itself from the
114 * connection-pool.
115 */
117 std::function<void(std::shared_ptr<PooledConnection<T>>)> remover) {
118 std::lock_guard lk(mtx_);
119
120 pool_remover_ = std::move(remover);
121 }
122
123 /**
124 * calls remove-callback.
125 */
126 static void remove_from_pool(
127 std::shared_ptr<PooledConnection<T>> shared_this) {
128 decltype(pool_remover_) pool_remover;
129
130 // ensure that the remover is called at-most-once.
131 {
132 std::lock_guard lk(shared_this->mtx_);
133
134 if (shared_this->pool_remover_) {
135 pool_remover = std::exchange(shared_this->pool_remover_, nullptr);
136 }
137 }
138
139 if (pool_remover) {
140 pool_remover(shared_this);
141 }
142 }
143
144 protected:
145 std::mutex &mtx() { return mtx_; }
146
147 std::function<void(std::shared_ptr<PooledConnection<T>>)> pool_remover_;
148
149 private:
150 std::mutex mtx_;
151
152 /**
153 * wait for idle timeout.
154 */
155 static void async_idle(std::shared_ptr<PooledConnection<T>> shared_this,
157 auto &tmr = shared_this->idle_timer_;
158
159 tmr.expires_after(idle_timeout);
160
161 // if the idle_timer fires, close the connection and remove it from the
162 // pool.
163 tmr.async_wait([shared_this](std::error_code ec) {
164 if (ec) {
165 return; // cancelled ...
166 }
167
168 {
169 std::lock_guard lk(shared_this->mtx());
170
171 // timed out.
172 //
173 // cancel the async_recv() and remove the connection.
174 (void)shared_this->conn_.cancel();
175 }
176
177 remove_from_pool(shared_this);
178 });
179
180 async_recv_message(shared_this);
181 }
182
183 /**
184 * wait for server message and shutdown.
185 */
187 std::shared_ptr<PooledConnection<T>> shared_this) {
188 // for classic we may receive a ERROR for shutdown. Ignore
189 // it and close the connection. for xprotocol we may
190 // receive a NOTICE for shutdown. Ignore it and close the
191 // connection.
192
193 shared_this->conn_.async_recv(
194 [shared_this](std::error_code ec, size_t /* recved */) {
195 if (ec) {
197 {
198 // cancel the timer and let that close the connection.
199 shared_this->idle_timer_.cancel();
200
201 std::lock_guard lk(shared_this->mtx());
202
203 (void)shared_this->conn_.close();
204 }
205
206 remove_from_pool(shared_this);
207 }
208 return;
209 }
210
211 // discard what has been received.
212 shared_this->conn_.channel().recv_buffer().clear();
213
214 // wait for the next bytes or connection-close.
215 async_recv_message(shared_this);
216 });
217 }
218
220
222};
223
224/**
225 * connection pool of mysql connections.
226 *
227 * It can contain connections:
228 *
229 * - classic protocol
230 * - to any tcp endpoint.
231 *
232 * It has:
233 *
234 * - a pool, which contains server-side connections without a client-connection
235 * - a stash, which contains server-side connections with a client-connection
236 *
237 */
239 public:
242
244
245 using ConnectionIdentifier = void *;
246
248 public:
250 : conn_(std::move(conn)) {}
251
252 void async_close();
253
254 void async_send_quit();
255
256 void await_quit_response(std::error_code ec, size_t transferred);
257
259
261 std::function<void(const ConnectionPool::ServerSideConnection &)> cb) {
262 before_close_ = std::move(cb);
263 }
264
265 private:
267
268 std::function<void(const ConnectionPool::ServerSideConnection &)>
270 };
271
272 ConnectionPool(uint32_t max_pooled_connections,
273 std::chrono::milliseconds idle_timeout)
274 : max_pooled_connections_(max_pooled_connections),
275 idle_timeout_(idle_timeout) {}
276
277 // disable copy
280
281 // disable move
284
285 ~ConnectionPool() = default;
286
287 /**
288 * add a connection to the pool.
289 *
290 * if the pool is full, the connection will be close.
291 */
292 void add(ServerSideConnection conn);
293
294 /**
295 * add connection to the pool if the poll isn't full.
296 */
297 std::optional<ServerSideConnection> add_if_not_full(
299
300 /**
301 * get a connection from the pool that matches a predicate.
302 *
303 * @returns a connection if one exists.
304 */
305 std::optional<ServerSideConnection> pop_if(
306 const std::string &ep,
307 std::predicate<const ServerSideConnection &> auto pred) {
308 return pool_(
309 [this, ep, pred](auto &pool) -> std::optional<ServerSideConnection> {
310 auto key_range = pool.equal_range(ep);
311 if (key_range.first == key_range.second) return std::nullopt;
312
313 auto kv_it = std::find_if(
314 key_range.first, key_range.second,
315 [pred](const auto &kv) { return pred(kv.second->connection()); });
316 if (kv_it == key_range.second) return std::nullopt;
317
318 // found.
319
320 auto server_conn = kv_it->second->release();
321
322 pool.erase(kv_it);
323
324 ++reused_;
325
326 return server_conn;
327 });
328 }
329
330 void async_close_connection(ConnectionPool::ServerSideConnection conn);
331
332 /**
333 * number of currently pooled connections.
334 */
335 [[nodiscard]] uint32_t current_pooled_connections() const;
336
337 [[nodiscard]] uint32_t max_pooled_connections() const {
338 return max_pooled_connections_;
339 }
340
342 return idle_timeout_;
343 }
344
345 /**
346 * add a server-side connection to the stash.
347 *
348 * @param conn server-side connection to be stashed.
349 * @param from opaque connection-identifier
350 * @param delay allow sharing with other connection after 'delay'
351 * milliseconds.
352 */
353 void stash(ServerSideConnection conn, ConnectionIdentifier from,
355
356 // discard all stashed connection and move them to the pool.
357 void discard_all_stashed(ConnectionIdentifier from);
358
359 /**
360 * connection on the stash.
361 */
362 struct Stashed {
363 // constructor for the container's .emplace()
365 ConnectionIdentifier ci, std::chrono::steady_clock::time_point tp)
366 : pooled_conn(std::move(pc)), conn_id(ci), after(tp) {}
367
368 std::shared_ptr<PooledConnection<ServerSideConnection>>
369 pooled_conn; //!< pooled connection.
370 ConnectionIdentifier conn_id; //!< opaque connection identifier
371 std::chrono::steady_clock::time_point after; //!< stealable after ...
372 };
373
374 std::optional<ServerSideConnection> unstash_if(
375 const std::string &ep,
376 std::function<bool(const ServerSideConnection &)> pred,
377 bool ignore_sharing_delay = false);
378
379 std::optional<ServerSideConnection> unstash_mine(
380 const std::string &ep, ConnectionIdentifier conn_id);
381
382 /**
383 * number of server-side connections on the stash.
384 */
385 [[nodiscard]] size_t current_stashed_connections() const;
386
387 /**
388 * total number of reused connections.
389 */
390 [[nodiscard]] uint64_t reused_connections() const { return reused_; }
391
392 protected:
393 using pool_type = std::unordered_multimap<
394 std::string, std::shared_ptr<PooledConnection<ServerSideConnection>>>;
395 using stash_type = std::unordered_multimap<std::string, Stashed>;
396
397 void erase(pool_type::iterator it);
398 void remove_pooled_connection(
399 std::string ep, std::shared_ptr<PooledConnection<ServerSideConnection>>);
400
401 void remove_stashed_connection(
402 std::string ep, std::shared_ptr<PooledConnection<ServerSideConnection>>);
403
406
408
410
411 // a stash of sharable connections.
412 //
413 // they are associated to a connection.
415
416 void erase_from_stash(stash_type::iterator it);
417
418 uint64_t reused_{};
419};
420
421#endif
Definition: connection_pool.h:247
ConnectionPool::ServerSideConnection & connection()
Definition: connection_pool.h:258
ConnectionPool::ServerSideConnection conn_
Definition: connection_pool.h:266
std::function< void(const ConnectionPool::ServerSideConnection &)> before_close_
Definition: connection_pool.h:269
ConnectionCloser(ConnectionPool::ServerSideConnection conn)
Definition: connection_pool.h:249
void before_close(std::function< void(const ConnectionPool::ServerSideConnection &)> cb)
Definition: connection_pool.h:260
connection pool of mysql connections.
Definition: connection_pool.h:238
ConnectionPool & operator=(const ConnectionPool &)=delete
~ConnectionPool()=default
uint32_t max_pooled_connections() const
Definition: connection_pool.h:337
std::chrono::milliseconds idle_timeout() const
Definition: connection_pool.h:341
const uint32_t max_pooled_connections_
Definition: connection_pool.h:404
void * ConnectionIdentifier
Definition: connection_pool.h:245
ConnectionPool(uint32_t max_pooled_connections, std::chrono::milliseconds idle_timeout)
Definition: connection_pool.h:272
uint64_t reused_connections() const
total number of reused connections.
Definition: connection_pool.h:390
const std::chrono::milliseconds idle_timeout_
Definition: connection_pool.h:405
std::unordered_multimap< std::string, std::shared_ptr< PooledConnection< ServerSideConnection > > > pool_type
Definition: connection_pool.h:394
std::optional< ServerSideConnection > pop_if(const std::string &ep, std::predicate< const ServerSideConnection & > auto pred)
get a connection from the pool that matches a predicate.
Definition: connection_pool.h:305
ConnectionPool(ConnectionPool &&)=delete
ConnectionPool & operator=(ConnectionPool &&)=delete
std::unordered_multimap< std::string, Stashed > stash_type
Definition: connection_pool.h:395
ConnectionPool(const ConnectionPool &)=delete
Monitor pattern.
Definition: monitor.h:39
pooled connection.
Definition: connection_pool.h:67
connection_type & connection()
access to conn_.
Definition: connection_pool.h:80
T connection_type
Definition: connection_pool.h:70
connection_type release()
Definition: connection_pool.h:84
net::steady_timer idle_timer_
Definition: connection_pool.h:221
PooledConnection(connection_type conn)
Definition: connection_pool.h:72
std::function< void(std::shared_ptr< PooledConnection< T > >)> pool_remover_
Definition: connection_pool.h:147
static void remove_from_pool(std::shared_ptr< PooledConnection< T > > shared_this)
calls remove-callback.
Definition: connection_pool.h:126
std::mutex mtx_
Definition: connection_pool.h:150
mysql_harness::Ssl Ssl
Definition: connection_pool.h:69
void reset()
prepares for reusing the connection.
Definition: connection_pool.h:98
void pool_remover(std::function< void(std::shared_ptr< PooledConnection< T > >)> remover)
set a remove-callback.
Definition: connection_pool.h:116
std::mutex & mtx()
Definition: connection_pool.h:145
static void async_idle(std::shared_ptr< PooledConnection< T > > shared_this, std::chrono::milliseconds idle_timeout)
wait for idle timeout.
Definition: connection_pool.h:155
static void async_recv_message(std::shared_ptr< PooledConnection< T > > shared_this)
wait for server message and shutdown.
Definition: connection_pool.h:186
const connection_type & connection() const
Definition: connection_pool.h:82
connection_type conn_
Definition: connection_pool.h:219
Definition: timer.h:57
size_t cancel()
Definition: timer.h:101
static constexpr uint32_t kDefaultMaxIdleServerConnectionsBootstrap
Definition: connection_pool.h:48
#define CONNECTION_POOL_EXPORT
Definition: connection_pool_export.h:15
#define T
Definition: jit_executor_value.cc:373
if(!(yy_init))
Definition: lexyy.cc:1144
static bool cb(unsigned long long new_value)
Definition: option_usage.cc:45
Container::const_iterator find_if(const Container &c, Find_if &&find_if)
Definition: generic.h:54
std::chrono::milliseconds milliseconds
Definition: authorize_manager.cc:67
std::unique_ptr< SSL, mysql_harness::impl::Deleter_SSL > Ssl
Definition: tls_types.h:48
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
static mysql_service_status_t add(reference_caching_channel channel, const char *implementation_name) noexcept
Definition: component.cc:127
Definition: gcs_xcom_synode.h:64
connection on the stash.
Definition: connection_pool.h:362
ConnectionIdentifier conn_id
opaque connection identifier
Definition: connection_pool.h:370
std::shared_ptr< PooledConnection< ServerSideConnection > > pooled_conn
pooled connection.
Definition: connection_pool.h:369
std::chrono::steady_clock::time_point after
stealable after ...
Definition: connection_pool.h:371
Stashed(std::shared_ptr< PooledConnection< ServerSideConnection > > pc, ConnectionIdentifier ci, std::chrono::steady_clock::time_point tp)
Definition: connection_pool.h:364
Definition: my_base.h:1201