MySQL 8.4.4
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
connection.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTING_CONNECTION_INCLUDED
27#define ROUTING_CONNECTION_INCLUDED
28
29#include <chrono>
30#include <cstdint> // size_t
31#include <functional>
32#include <memory>
33#include <optional>
34
36#include "context.h"
37#include "destination.h" // RouteDestination
38#include "destination_error.h"
43
45 public:
48 std::function<void(MySQLRoutingConnectionBase *)> remove_callback)
49 : context_(context), remove_callback_(std::move(remove_callback)) {}
50
51 virtual ~MySQLRoutingConnectionBase() = default;
52
54 const MySQLRoutingContext &context() const { return context_; }
55
56 virtual std::string get_destination_id() const = 0;
57
58 virtual std::string read_only_destination_id() const {
59 return get_destination_id();
60 }
61
62 virtual std::string read_write_destination_id() const {
63 return get_destination_id();
64 }
65
66 virtual std::optional<net::ip::tcp::endpoint> destination_endpoint()
67 const = 0;
68
69 virtual std::optional<net::ip::tcp::endpoint> read_only_destination_endpoint()
70 const {
71 return destination_endpoint();
72 }
73
74 virtual std::optional<net::ip::tcp::endpoint>
76 return destination_endpoint();
77 }
78
80
81 /**
82 * @brief Returns address of server to which connection is established.
83 *
84 * @return address of server
85 */
86 std::string get_server_address() const {
87 return stats_([](const Stats &stats) { return stats.server_address; });
88 }
89
90 void server_address(const std::string &dest) {
91 return stats_([&dest](Stats &stats) { stats.server_address = dest; });
92 }
93
94 virtual void disconnect() = 0;
95
96 /**
97 * @brief Returns address of client which connected to router
98 *
99 * @return address of client
100 */
101 std::string get_client_address() const {
102 return stats_([](const Stats &stats) { return stats.client_address; });
103 }
104
105 void client_address(const std::string &dest) {
106 return stats_([&dest](Stats &stats) { stats.client_address = dest; });
107 }
108
109 std::size_t get_bytes_up() const {
110 return stats_([](const Stats &stats) { return stats.bytes_up; });
111 }
112
113 std::size_t get_bytes_down() const {
114 return stats_([](const Stats &stats) { return stats.bytes_down; });
115 }
116
117 using clock_type = std::chrono::system_clock;
118 using time_point_type = clock_type::time_point;
119
121 return stats_([](const Stats &stats) { return stats.started; });
122 }
123
125 return stats_([](const Stats &stats) { return stats.connected_to_server; });
126 }
127
129 return stats_([](const Stats &stats) { return stats.last_sent_to_server; });
130 }
131
133 return stats_(
134 [](const Stats &stats) { return stats.last_received_from_server; });
135 }
136
137 struct Stats {
138 Stats() = default;
139
140 Stats(std::string client_address, std::string server_address,
141 std::size_t bytes_up, std::size_t bytes_down, time_point_type started,
153
154 std::string client_address;
155 std::string server_address;
156
157 std::size_t bytes_up{0};
158 std::size_t bytes_down{0};
159
160 time_point_type started{clock_type::now()};
164 };
165
166 Stats get_stats() const {
167 return stats_([](const Stats &stats) { return stats; });
168 }
169
170 void transfered_to_server(size_t bytes) {
171 const auto now = clock_type::now();
172 stats_([bytes, now](Stats &stats) {
173 stats.last_sent_to_server = now;
174 stats.bytes_down += bytes;
175 });
176 }
177
178 void transfered_to_client(size_t bytes) {
179 const auto now = clock_type::now();
180 stats_([bytes, now](Stats &stats) {
181 stats.last_received_from_server = now;
182 stats.bytes_up += bytes;
183 });
184 }
185
187
188 void accepted();
189
190 virtual void connected();
191
192 template <class F>
193 auto disconnect_request(F &&f) {
194 return disconnect_(std::forward<F>(f));
195 }
196
197 bool disconnect_requested() const {
198 return disconnect_([](auto requested) { return requested; });
199 }
200
201 protected:
202 /** @brief wrapper for common data used by all routing threads */
204 /** @brief callback that is called when thread of execution completes */
206
208
210
212
213 private:
215};
216
218 public:
220
221 ConnectorBase(net::io_context &io_ctx, RouteDestination *route_destination,
222 Destinations &destinations)
223 : io_ctx_{io_ctx},
224 route_destination_{route_destination},
225 destinations_{destinations},
227
228 enum class Function {
231 };
232
235
237
239
240 bool connect_timed_out() const { return connect_timed_out_; }
241
242 void destination_id(std::string id) { destination_id_ = id; }
243 std::string destination_id() const { return destination_id_; }
244
246 std::function<void(std::string, uint16_t, std::error_code)> func) {
247 on_connect_failure_ = std::move(func);
248 }
249
250 void on_connect_success(std::function<void(std::string, uint16_t)> func) {
251 on_connect_success_ = std::move(func);
252 }
253
254 void on_is_destination_good(std::function<bool(std::string, uint16_t)> func) {
255 on_is_destination_good_ = std::move(func);
256 }
257
258 bool is_destination_good(const std::string &hostname, uint16_t port) const {
260
261 return true;
262 }
263
264 protected:
275
277
281
287
289
291
293
295 std::string destination_id_;
296
297 std::function<void(std::string, uint16_t, std::error_code)>
299 std::function<void(std::string, uint16_t)> on_connect_success_;
300 std::function<bool(std::string, uint16_t)> on_is_destination_good_;
301};
302
303template <class ConnectionType>
304class Connector : public ConnectorBase {
305 public:
307
309 switch (func_) {
311 auto init_res = init_destination();
312 if (!init_res) return stdx::unexpected(init_res.error());
313
314 } break;
316 auto connect_res = connect_finish();
317 if (!connect_res) return stdx::unexpected(connect_res.error());
318
319 } break;
320 }
321
322 if (destination_id().empty()) {
323 // stops at 'connect_init()
324 {
325 auto connect_res = try_connect();
326 if (!connect_res) return stdx::unexpected(connect_res.error());
327 }
328 }
330
331 return ret_type{std::in_place,
332 std::make_unique<TcpConnection>(std::move(socket()),
333 std::move(endpoint()))};
334 }
335};
336
337template <class ConnectionType>
339 public:
340 using pool_lookup_cb = std::function<std::optional<ConnectionType>(
342
344 Destinations &destinations, pool_lookup_cb pool_lookup)
345 : ConnectorBase{io_ctx, route_destination, destinations},
346 pool_lookup_{std::move(pool_lookup)} {}
347
349 switch (func_) {
351 auto init_res = init_destination();
352 if (!init_res) return stdx::unexpected(init_res.error());
353
354 } break;
356 auto connect_res = connect_finish();
357 if (!connect_res) return stdx::unexpected(connect_res.error());
358
359 } break;
360 }
361
362 if (destination_id().empty()) {
363 // stops at 'connect_init()
364 if (auto pool_res = probe_pool()) {
365 // return the pooled connection.
366 return std::move(pool_res.value());
367 }
368
369 {
370 auto connect_res = try_connect();
371 if (!connect_res) return stdx::unexpected(connect_res.error());
372 }
373 }
374
375 return {std::in_place, std::make_unique<TcpConnection>(
376 std::move(socket()), std::move(endpoint()))};
377 }
378
379 private:
380 std::optional<ConnectionType> probe_pool() {
382 }
383
385};
386
387#endif /* ROUTING_CONNECTION_INCLUDED */
Definition: connection.h:217
server_protocol_type::endpoint & endpoint()
Definition: connection.h:234
void connect_timed_out(bool v)
Definition: connection.h:238
std::function< void(std::string, uint16_t)> on_connect_success_
Definition: connection.h:299
std::function< void(std::string, uint16_t, std::error_code)> on_connect_failure_
Definition: connection.h:298
net::steady_timer & timer()
Definition: connection.h:236
stdx::expected< void, std::error_code > connect_init()
Definition: connection.cc:90
stdx::expected< void, std::error_code > init_destination()
Definition: connection.cc:38
Destinations & destinations_
Definition: connection.h:283
std::error_code last_ec_
Definition: connection.h:288
net::steady_timer connect_timer_
Definition: connection.h:292
stdx::expected< void, std::error_code > resolve()
Definition: connection.cc:54
server_protocol_type::endpoint server_endpoint_
Definition: connection.h:280
Function
Definition: connection.h:228
stdx::expected< void, std::error_code > init_endpoint()
Definition: connection.cc:84
std::string destination_id() const
Definition: connection.h:243
stdx::expected< void, std::error_code > connected()
Definition: connection.cc:218
stdx::expected< void, std::error_code > connect_failed(std::error_code ec)
ConnectorBase(net::io_context &io_ctx, RouteDestination *route_destination, Destinations &destinations)
Definition: connection.h:221
void destination_id(std::string id)
Definition: connection.h:242
bool connect_timed_out_
Definition: connection.h:294
void on_connect_success(std::function< void(std::string, uint16_t)> func)
Definition: connection.h:250
Destinations::iterator destinations_it_
Definition: connection.h:284
net::ip::tcp::resolver::results_type endpoints_
Definition: connection.h:285
bool connect_timed_out() const
Definition: connection.h:240
std::function< bool(std::string, uint16_t)> on_is_destination_good_
Definition: connection.h:300
Function func_
Definition: connection.h:290
server_protocol_type::socket & socket()
Definition: connection.h:233
net::ip::tcp::resolver resolver_
Definition: connection.h:278
bool is_destination_good(const std::string &hostname, uint16_t port) const
Definition: connection.h:258
void on_is_destination_good(std::function< bool(std::string, uint16_t)> func)
Definition: connection.h:254
stdx::expected< void, std::error_code > next_destination()
Definition: connection.cc:250
net::io_context & io_ctx_
Definition: connection.h:276
stdx::expected< void, std::error_code > try_connect()
Definition: connection.cc:103
std::string destination_id_
Definition: connection.h:295
net::ip::tcp::resolver::results_type::iterator endpoints_it_
Definition: connection.h:286
stdx::expected< void, std::error_code > connect_finish()
Definition: connection.cc:185
server_protocol_type::socket server_sock_
Definition: connection.h:279
stdx::expected< void, std::error_code > next_endpoint()
Definition: connection.cc:230
void on_connect_failure(std::function< void(std::string, uint16_t, std::error_code)> func)
Definition: connection.h:245
RouteDestination * route_destination_
Definition: connection.h:282
Definition: connection.h:304
stdx::expected< ConnectionType, std::error_code > connect()
Definition: connection.h:308
A forward iterable container of destinations.
Definition: destination.h:107
typename container_type::iterator iterator
Definition: destination.h:111
Monitor pattern.
Definition: monitor.h:39
Definition: connection.h:44
virtual std::string get_destination_id() const =0
void accepted()
Definition: connection.cc:277
MySQLRoutingConnectionBase(MySQLRoutingContext &context, std::function< void(MySQLRoutingConnectionBase *)> remove_callback)
Definition: connection.h:46
clock_type::time_point time_point_type
Definition: connection.h:118
virtual void connected()
Definition: connection.cc:284
const MySQLRoutingContext & context() const
Definition: connection.h:54
time_point_type get_started() const
Definition: connection.h:120
Stats get_stats() const
Definition: connection.h:166
std::size_t get_bytes_down() const
Definition: connection.h:113
std::chrono::system_clock clock_type
Definition: connection.h:117
void transfered_to_server(size_t bytes)
Definition: connection.h:170
virtual std::string read_only_destination_id() const
Definition: connection.h:58
virtual std::optional< net::ip::tcp::endpoint > read_write_destination_endpoint() const
Definition: connection.h:75
std::string get_client_address() const
Returns address of client which connected to router.
Definition: connection.h:101
std::size_t get_bytes_up() const
Definition: connection.h:109
Monitor< bool > disconnect_
Definition: connection.h:209
net::impl::socket::native_handle_type client_fd_
Definition: connection.h:214
virtual std::optional< net::ip::tcp::endpoint > read_only_destination_endpoint() const
Definition: connection.h:69
std::string get_server_address() const
Returns address of server to which connection is established.
Definition: connection.h:86
void disassociate()
Definition: connection.h:186
void log_connection_summary()
Definition: connection.cc:298
Monitor< Stats > stats_
Definition: connection.h:207
virtual void disconnect()=0
time_point_type get_connected_to_server() const
Definition: connection.h:124
std::function< void(MySQLRoutingConnectionBase *)> remove_callback_
callback that is called when thread of execution completes
Definition: connection.h:205
virtual ~MySQLRoutingConnectionBase()=default
MySQLRoutingContext & context()
Definition: connection.h:53
void client_address(const std::string &dest)
Definition: connection.h:105
void transfered_to_client(size_t bytes)
Definition: connection.h:178
time_point_type get_last_received_from_server() const
Definition: connection.h:132
virtual std::string read_write_destination_id() const
Definition: connection.h:62
time_point_type get_last_sent_to_server() const
Definition: connection.h:128
auto disconnect_request(F &&f)
Definition: connection.h:193
virtual net::impl::socket::native_handle_type get_client_fd() const =0
virtual std::optional< net::ip::tcp::endpoint > destination_endpoint() const =0
void server_address(const std::string &dest)
Definition: connection.h:90
bool disconnect_requested() const
Definition: connection.h:197
MySQLRoutingContext & context_
wrapper for common data used by all routing threads
Definition: connection.h:203
MySQLRoutingContext holds data used by MySQLRouting (1 per plugin instances) and MySQLRoutingConnecti...
Definition: context.h:59
Definition: connection.h:338
stdx::expected< ConnectionType, std::error_code > connect()
Definition: connection.h:348
pool_lookup_cb pool_lookup_
Definition: connection.h:384
std::optional< ConnectionType > probe_pool()
Definition: connection.h:380
PooledConnector(net::io_context &io_ctx, RouteDestination *route_destination, Destinations &destinations, pool_lookup_cb pool_lookup)
Definition: connection.h:343
std::function< std::optional< ConnectionType >(const server_protocol_type::endpoint &ep)> pool_lookup_cb
Definition: connection.h:341
Manage destinations for a Connection Routing.
Definition: destination.h:189
Definition: socket.h:1090
Definition: timer.h:57
Definition: io_context.h:61
Definition: internet.h:678
Definition: internet.h:542
const_iterator iterator
Definition: internet.h:550
Definition: internet.h:608
TCP protocol.
Definition: internet.h:1155
Definition: expected.h:286
struct stats stats
Definition: mysqlslap.cc:238
bool empty(const Histogram &histogram)
Return true if 'histogram' was built on an empty table.
Definition: histogram.h:693
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
const char * begin(const char *const c)
Definition: base64.h:44
int native_handle_type
Definition: socket_constants.h:51
Definition: gcs_xcom_synode.h:64
unexpected(E) -> unexpected< E >
required uint64 port
Definition: replication_asynchronous_connection_failover.proto:33
Definition: connection.h:137
time_point_type started
Definition: connection.h:160
time_point_type last_sent_to_server
Definition: connection.h:162
time_point_type connected_to_server
Definition: connection.h:161
std::size_t bytes_down
Definition: connection.h:158
Stats(std::string client_address, std::string server_address, std::size_t bytes_up, std::size_t bytes_down, time_point_type started, time_point_type connected_to_server, time_point_type last_sent_to_server, time_point_type last_received_from_server)
Definition: connection.h:140
std::string client_address
Definition: connection.h:154
std::string server_address
Definition: connection.h:155
time_point_type last_received_from_server
Definition: connection.h:163
std::size_t bytes_up
Definition: connection.h:157
Definition: mysqlslap.cc:240
unsigned long id[MAX_DEAD]
Definition: xcom_base.cc:510