MySQL 9.1.0
Source Code Documentation
connection_pool.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef MYSQLROUTER_CONNECTION_POOL_INCLUDED
27#define MYSQLROUTER_CONNECTION_POOL_INCLUDED
28
30
31#include <chrono>
32#include <cstdint> // uint32_t
33
34#include <algorithm> // find_if
35#include <concepts>
36#include <list>
37#include <optional>
38
42#include "mysql/harness/tls_types.h" // Ssl
47
48// default max idle server connections set on bootstrap
49static constexpr uint32_t kDefaultMaxIdleServerConnectionsBootstrap{64};
50
51/**
52 * pooled connection.
53 */
55 public:
56 /**
57 * set a remove-callback.
58 *
59 * used when the pooled connection wants to remove itself from the
60 * connection-pool.
61 */
62 void remover(std::function<void()> remover) { remover_ = std::move(remover); }
63
64 /**
65 * calls remove-callback.
66 */
67 void remove_me();
68
69 void reset();
70
71 private:
72 std::function<void()> remover_;
73};
74
75/**
76 * pooled connection.
77 */
78template <class T>
80 public:
82 using connection_type = T;
83
85 : conn_{std::move(conn)}, idle_timer_(conn_.connection()->io_ctx()) {}
86
87 /**
88 * access to conn_.
89 *
90 * allows others to move the connection structs out.
91 */
93
94 const connection_type &connection() const { return conn_; }
95
96 /**
97 * prepares for reusing the connection.
98 */
99 void reset() {
101
102 (void)idle_timer_.cancel();
103 (void)conn_.cancel();
104 }
105
106 friend class ConnectionPool;
107
108 private:
109 /**
110 * wait for idle timeout.
111 */
112 void async_idle(std::chrono::milliseconds idle_timeout) {
113 auto &tmr = idle_timer_;
114
116
117 // if the idle_timer fires, close the connection and remove it from the
118 // pool.
119 tmr.async_wait([this](std::error_code ec) {
120 if (ec) return; // cancelled ...
121
122 // timed out.
123 //
124 // cancel the async_recv() and remove the connection.
125 (void)conn_.cancel();
126
127 this->remove_me();
128 });
129
131 }
132
133 /**
134 * wait for server message and shutdown.
135 */
137 // for classic we may receive a ERROR for shutdown. Ignore
138 // it and close the connection. for xprotocol we may
139 // receive a NOTICE for shutdown. Ignore it and close the
140 // connection.
141
142 conn_.async_recv([this](std::error_code ec, size_t /* recved */) {
143 if (ec) {
145 // cancel the timer and let that close the connection.
147
148 (void)conn_.close();
149
150 this->remove_me();
151 }
152 return;
153 }
154
155 // discard what has been received.
156 conn_.channel().recv_buffer().clear();
157
158 // wait for the next bytes or connection-close.
160 });
161 }
162
164
166};
167
168/**
169 * connection pool of mysql connections.
170 *
171 * It can contain connections:
172 *
173 * - classic protocol
174 * - to any tcp endpoint.
175 *
176 * It has:
177 *
178 * - a pool, which contains server-side connections without a client-connection
179 * - a stash, which contains server-side connections with a client-connection
180 *
181 */
183 public:
186
188
189 using ConnectionIdentifier = void *;
190
192 public:
194 : conn_(std::move(conn)) {}
195
196 void async_close();
197
198 void async_send_quit();
199
200 void await_quit_response(std::error_code ec, size_t transferred);
201
203
205 std::function<void(const ConnectionPool::ServerSideConnection &)> cb) {
206 before_close_ = std::move(cb);
207 }
208
209 private:
211
212 std::function<void(const ConnectionPool::ServerSideConnection &)>
214 };
215
216 ConnectionPool(uint32_t max_pooled_connections,
217 std::chrono::milliseconds idle_timeout)
218 : max_pooled_connections_(max_pooled_connections),
219 idle_timeout_(idle_timeout) {}
220
221 // disable copy
224
225 // disable move
228
229 ~ConnectionPool() = default;
230
231 /**
232 * add a connection to the pool.
233 *
234 * if the pool is full, the connection will be close.
235 */
236 void add(ServerSideConnection conn);
237
238 /**
239 * add connection to the pool if the poll isn't full.
240 */
241 std::optional<ServerSideConnection> add_if_not_full(
243
244 /**
245 * get a connection from the pool that matches a predicate.
246 *
247 * @returns a connection if one exists.
248 */
249 std::optional<ServerSideConnection> pop_if(
250 const std::string &ep,
251 std::predicate<const ServerSideConnection &> auto pred) {
252 return pool_(
253 [this, ep, pred](auto &pool) -> std::optional<ServerSideConnection> {
254 auto key_range = pool.equal_range(ep);
255 if (key_range.first == key_range.second) return std::nullopt;
256
257 auto kv_it = std::find_if(
258 key_range.first, key_range.second,
259 [pred](const auto &kv) { return pred(kv.second.connection()); });
260 if (kv_it == key_range.second) return std::nullopt;
261
262 // found.
263
264 auto pooled_conn = std::move(kv_it->second);
265
266 pool.erase(kv_it);
267
268 pooled_conn.reset();
269
270 ++reused_;
271
272 return std::move(pooled_conn.connection());
273 });
274 }
275
276 void async_close_connection(ConnectionPool::ServerSideConnection conn);
277
278 /**
279 * number of currently pooled connections.
280 */
281 [[nodiscard]] uint32_t current_pooled_connections() const;
282
283 [[nodiscard]] uint32_t max_pooled_connections() const {
284 return max_pooled_connections_;
285 }
286
287 [[nodiscard]] std::chrono::milliseconds idle_timeout() const {
288 return idle_timeout_;
289 }
290
291 /**
292 * add a server-side connection to the stash.
293 *
294 * @param conn server-side connection to be stashed.
295 * @param from opaque connection-identifier
296 * @param delay allow sharing with other connection after 'delay'
297 * milliseconds.
298 */
299 void stash(ServerSideConnection conn, ConnectionIdentifier from,
300 std::chrono::milliseconds delay);
301
302 // discard all stashed connection and move them to the pool.
303 void discard_all_stashed(ConnectionIdentifier from);
304
305 /**
306 * connection on the stash.
307 */
308 struct Stashed {
309 // constructor for the container's .emplace()
311 std::chrono::steady_clock::time_point tp)
312 : pooled_conn(std::move(pc)), conn_id(ci), after(tp) {}
313
315 ConnectionIdentifier conn_id; //!< opaque connection identifier
316 std::chrono::steady_clock::time_point after; //!< stealable after ...
317 };
318
319 std::optional<ServerSideConnection> unstash_if(
320 const std::string &ep,
321 std::function<bool(const ServerSideConnection &)> pred,
322 bool ignore_sharing_delay = false);
323
324 std::optional<ServerSideConnection> unstash_mine(
325 const std::string &ep, ConnectionIdentifier conn_id);
326
327 /**
328 * number of server-side connections on the stash.
329 */
330 [[nodiscard]] size_t current_stashed_connections() const;
331
332 /**
333 * total number of reused connections.
334 */
335 [[nodiscard]] uint64_t reused_connections() const { return reused_; }
336
337 protected:
338 using pool_type =
339 std::unordered_multimap<std::string,
341 using stash_type = std::unordered_multimap<std::string, Stashed>;
342
343 void erase(pool_type::iterator it);
344
346 const std::chrono::milliseconds idle_timeout_;
347
349
351
352 // a stash of sharable connections.
353 //
354 // they are associated to a connection.
356
357 void erase_from_stash(stash_type::iterator it);
358
359 uint64_t reused_{};
360};
361
362#endif
Definition: connection_pool.h:191
ConnectionPool::ServerSideConnection & connection()
Definition: connection_pool.h:202
ConnectionPool::ServerSideConnection conn_
Definition: connection_pool.h:210
std::function< void(const ConnectionPool::ServerSideConnection &)> before_close_
Definition: connection_pool.h:213
ConnectionCloser(ConnectionPool::ServerSideConnection conn)
Definition: connection_pool.h:193
void before_close(std::function< void(const ConnectionPool::ServerSideConnection &)> cb)
Definition: connection_pool.h:204
connection pool of mysql connections.
Definition: connection_pool.h:182
ConnectionPool & operator=(const ConnectionPool &)=delete
~ConnectionPool()=default
uint32_t max_pooled_connections() const
Definition: connection_pool.h:283
std::chrono::milliseconds idle_timeout() const
Definition: connection_pool.h:287
const uint32_t max_pooled_connections_
Definition: connection_pool.h:345
void * ConnectionIdentifier
Definition: connection_pool.h:189
std::unordered_multimap< std::string, PooledConnection< ServerSideConnection > > pool_type
Definition: connection_pool.h:340
ConnectionPool(uint32_t max_pooled_connections, std::chrono::milliseconds idle_timeout)
Definition: connection_pool.h:216
uint64_t reused_connections() const
total number of reused connections.
Definition: connection_pool.h:335
const std::chrono::milliseconds idle_timeout_
Definition: connection_pool.h:346
std::optional< ServerSideConnection > pop_if(const std::string &ep, std::predicate< const ServerSideConnection & > auto pred)
get a connection from the pool that matches a predicate.
Definition: connection_pool.h:249
ConnectionPool(ConnectionPool &&)=delete
ConnectionPool & operator=(ConnectionPool &&)=delete
std::unordered_multimap< std::string, Stashed > stash_type
Definition: connection_pool.h:341
ConnectionPool(const ConnectionPool &)=delete
Monitor pattern.
Definition: monitor.h:39
pooled connection.
Definition: connection_pool.h:54
void remover(std::function< void()> remover)
set a remove-callback.
Definition: connection_pool.h:62
void remove_me()
calls remove-callback.
Definition: connection_pool.cc:115
std::function< void()> remover_
Definition: connection_pool.h:72
void reset()
Definition: connection_pool.cc:120
pooled connection.
Definition: connection_pool.h:79
connection_type & connection()
access to conn_.
Definition: connection_pool.h:92
T connection_type
Definition: connection_pool.h:82
net::steady_timer idle_timer_
Definition: connection_pool.h:165
PooledConnection(connection_type conn)
Definition: connection_pool.h:84
void async_recv_message()
wait for server message and shutdown.
Definition: connection_pool.h:136
mysql_harness::Ssl Ssl
Definition: connection_pool.h:81
void reset()
prepares for reusing the connection.
Definition: connection_pool.h:99
void async_idle(std::chrono::milliseconds idle_timeout)
wait for idle timeout.
Definition: connection_pool.h:112
const connection_type & connection() const
Definition: connection_pool.h:94
connection_type conn_
Definition: connection_pool.h:163
Definition: timer.h:57
size_t cancel()
Definition: timer.h:101
size_t expires_after(const duration &d)
Definition: timer.h:114
static constexpr uint32_t kDefaultMaxIdleServerConnectionsBootstrap
Definition: connection_pool.h:49
#define CONNECTION_POOL_EXPORT
Definition: connection_pool_export.h:15
if(!(yy_init))
Definition: lexyy.cc:1144
std::string HARNESS_EXPORT reset()
get 'reset attributes' ESC sequence.
Definition: vt100.cc:37
Container::const_iterator find_if(const Container &c, Find_if &&find_if)
Definition: generic.h:54
std::unique_ptr< SSL, mysql_harness::impl::Deleter_SSL > Ssl
Definition: tls_types.h:48
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
static mysql_service_status_t add(reference_caching_channel channel, const char *implementation_name) noexcept
Definition: component.cc:127
Definition: gcs_xcom_synode.h:64
connection on the stash.
Definition: connection_pool.h:308
ConnectionIdentifier conn_id
opaque connection identifier
Definition: connection_pool.h:315
Stashed(PooledConnection< ServerSideConnection > pc, ConnectionIdentifier ci, std::chrono::steady_clock::time_point tp)
Definition: connection_pool.h:310
std::chrono::steady_clock::time_point after
stealable after ...
Definition: connection_pool.h:316
PooledConnection< ServerSideConnection > pooled_conn
pooled connection.
Definition: connection_pool.h:314
Definition: my_base.h:1125