MySQL 9.1.0
Source Code Documentation
connection_base.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_CONNECTION_POOL_CONNECTION_BASE_INCLUDED
27#define ROUTER_CONNECTION_POOL_CONNECTION_BASE_INCLUDED
28
29#include <cstdint> // size_t
30#include <functional>
31#include <system_error> // error_code
32#include <vector>
33
34#ifdef _WIN32
35// include winsock2.h before openssl/ssl.h
36#include <windows.h>
37#include <winsock2.h>
38#include <ws2tcpip.h>
39#endif
40
41#include <openssl/ssl.h> // SSL_CTX
42
43#include "harness_assert.h"
48#include "mysqlrouter/channel.h"
50
51/**
52 * virtual base-class of BasicConnection.
53 */
55 public:
56 virtual ~ConnectionBase() = default;
57
59 std::vector<uint8_t, default_init_allocator<uint8_t>>;
60
61 virtual net::io_context &io_ctx() = 0;
62
63 virtual void async_recv(
65 std::function<void(std::error_code ec, size_t transferred)>) = 0;
66
67 virtual void async_send(
69 std::function<void(std::error_code ec, size_t transferred)>) = 0;
70
71 virtual void async_wait_send(std::function<void(std::error_code ec)>) = 0;
72 virtual void async_wait_recv(std::function<void(std::error_code ec)>) = 0;
73 virtual void async_wait_error(std::function<void(std::error_code ec)>) = 0;
74
75 [[nodiscard]] virtual bool is_open() const = 0;
76
78 const = 0;
79
80 [[nodiscard]] virtual stdx::expected<void, std::error_code> close() = 0;
83
84 [[nodiscard]] virtual std::string endpoint() const = 0;
85
86 [[nodiscard]] virtual stdx::expected<void, std::error_code> cancel() = 0;
87
88 [[nodiscard]] virtual bool is_secure_transport() const = 0;
89
91 net::io_context &new_ctx) = 0;
92};
93
94template <class Protocol>
95struct IsTransportSecure : std::false_type {};
96
97#ifdef NET_TS_HAS_UNIX_SOCKET
98template <>
99struct IsTransportSecure<local::stream_protocol> : std::true_type {};
100#endif
101
102/**
103 * basic connection which wraps a net-ts Protocol.
104 *
105 * knows about mysql-protocol specifics like:
106 *
107 * - session attributes
108 * - connection error-tracking.
109 *
110 * @tparam Protocol a protocol like net::ip::tcp or local::stream_protocol
111 */
112template <class Protocol>
114 public:
117 using endpoint_type = typename protocol_type::endpoint;
118
120
122 : sock_{std::move(sock)}, ep_{std::move(ep)} {}
123
124 net::io_context &io_ctx() override { return sock_.get_executor().context(); }
125
127 net::io_context &new_ctx) override {
128 // nothing to do.
129 if (sock_.get_executor() == new_ctx.get_executor()) return {};
130
131 return sock_.release().and_then(
132 [this, &new_ctx](
134 socket_type new_sock(new_ctx);
135
136 auto assign_res = new_sock.assign(ep_.protocol(), native_handle);
137 if (!assign_res) return assign_res;
138
139 std::swap(sock_, new_sock);
140
141 return {};
142 });
143 }
144
146 std::function<void(std::error_code ec, size_t transferred)>
147 completion) override {
149 std::move(completion));
150 }
151
153 std::function<void(std::error_code ec, size_t transferred)>
154 completion) override {
155 if (sock_.native_non_blocking()) {
156 // if the socket is non-blocking try to send directly as the send-buffer
157 // is usually empty
158 auto write_res = net::write(sock_, net::dynamic_buffer(buf),
160 if (write_res) {
161 net::defer(sock_.get_executor(), [completion = std::move(completion),
162 transferred = *write_res]() {
163 completion({}, transferred);
164 });
165 return;
166 }
167
168 const auto ec = write_res.error();
169
170 if (ec != make_error_condition(std::errc::operation_would_block) &&
171 ec !=
172 make_error_condition(std::errc::resource_unavailable_try_again)) {
173 net::defer(sock_.get_executor(), [completion = std::move(completion),
174 ec]() { completion(ec, 0); });
175 return;
176 }
177
178 // if it would-block, use the normal async-write.
179 }
180
182 std::move(completion));
183 }
184
186 std::function<void(std::error_code ec)> completion) override {
187 sock_.async_wait(net::socket_base::wait_write, std::move(completion));
188 }
189
191 std::function<void(std::error_code ec)> completion) override {
192 sock_.async_wait(net::socket_base::wait_read, std::move(completion));
193 }
194
196 std::function<void(std::error_code ec)> completion) override {
197 sock_.async_wait(net::socket_base::wait_error, std::move(completion));
198 }
199
200 [[nodiscard]] bool is_open() const override { return sock_.is_open(); }
201
203 const override {
204 return sock_.native_handle();
205 }
206
208 return sock_.close();
209 }
210
212 return sock_.cancel();
213 }
214
217 return sock_.shutdown(st);
218 }
219
220 [[nodiscard]] std::string endpoint() const override {
222
223 oss << ep_;
224
225 return oss.str();
226 }
227
228 template <class GettableSocketOption>
230 GettableSocketOption &opt) const {
231 return sock_.get_option(opt);
232 }
233
234 /**
235 * check if the underlying transport is secure.
236 *
237 * - unix-socket, shared-memory, ... are secure.
238 */
239 [[nodiscard]] bool is_secure_transport() const override {
241 }
242
243 protected:
246};
247
249#ifdef NET_TS_HAS_UNIX_SOCKET
250using UnixDomainConnection = BasicConnection<local::stream_protocol>;
251#endif
252
253/**
254 * a Connection that can be switched to TLS.
255 *
256 * wraps
257 *
258 * - a low-level connections (conn)
259 * - a routing connection (endpoints, destinations, ...)
260 * - a tls switchable (a SSL_CTX * wrapper)
261 * - protocol state (classic, xproto)
262 */
263template <class T>
265 public:
266 // 16kb per buffer
267 // 2 buffers per channel (send/recv)
268 // 2 channels per connection
269 // 10000 connections
270 // = 640MByte
271 static constexpr size_t kRecvBufferSize{16UL * 1024};
273
274 TlsSwitchableConnection(std::unique_ptr<ConnectionBase> conn,
276 : conn_{std::move(conn)},
278 channel_{},
279 protocol_{std::move(state)} {
281 }
282
283 TlsSwitchableConnection(std::unique_ptr<ConnectionBase> conn,
286 : conn_{std::move(conn)},
288 channel_{std::move(channel)},
289 protocol_{std::move(state)} {
291 }
292
293 /**
294 * assign a low-level connection.
295 */
296 void assign_connection(std::unique_ptr<ConnectionBase> conn) {
297 conn_ = std::move(conn);
298 }
299
301 if (auto *ssl = channel_.ssl()) {
302 SSL_set_info_callback(ssl, nullptr);
303 SSL_set_msg_callback_arg(ssl, nullptr);
304 }
305
306 // reset the recv and send buffers and the pool shouldn't care about the
307 // content of those buffers.
308 channel_.clear();
309 }
310
311 /**
312 * async receive data from connection into the channel's receive buffer.
313 *
314 * calls func when async operation is completed.
315 */
316 template <class Func>
317 void async_recv(Func &&func) {
318 harness_assert(conn_ != nullptr);
319
320 // discard everything that has been marked as 'consumed'
322
323 conn_->async_recv(channel_.recv_buffer(),
324 [this, func = std::forward<Func>(func)](
325 std::error_code ec, size_t transferred) {
326 if (ec == std::error_code()) {
327 channel_.view_sync_raw();
328 }
329
330 func(ec, transferred);
331 });
332 }
333
334 /**
335 * async send data from the channel's send buffer to the connection.
336 *
337 * calls func when async operation is completed.
338 */
339 template <class Func>
340 void async_send(Func &&func) {
341 conn_->async_send(channel_.send_buffer(), std::forward<Func>(func));
342 }
343
344 /**
345 * async wait until connection allows to send data.
346 *
347 * calls func when async operation is completed.
348 */
349 template <class Func>
350 void async_wait_send(Func &&func) {
351 conn_->async_wait_send(std::forward<Func>(func));
352 }
353
354 template <class Func>
355 void async_wait_error(Func &&func) {
356 conn_->async_wait_error(std::forward<Func>(func));
357 }
358
359 [[nodiscard]] Channel &channel() { return channel_; }
360
361 [[nodiscard]] const Channel &channel() const { return channel_; }
362
363 [[nodiscard]] SslMode ssl_mode() const { return ssl_mode_; }
364
365 [[nodiscard]] bool is_open() const { return conn_ && conn_->is_open(); }
366
368 return conn_->native_handle();
369 }
370
372 if (!conn_) {
373 return stdx::unexpected(make_error_code(std::errc::not_connected));
374 }
375 return conn_->close();
376 }
377
380 if (!conn_) {
381 return stdx::unexpected(make_error_code(std::errc::not_connected));
382 }
383 return conn_->shutdown(st);
384 }
385
386 [[nodiscard]] std::string endpoint() const {
387 if (!is_open()) return "";
388
389 return conn_->endpoint();
390 }
391
393 if (!conn_) return {};
394
395 return conn_->cancel();
396 }
397
398 [[nodiscard]] protocol_state_type &protocol() { return protocol_; }
399
400 [[nodiscard]] const protocol_state_type &protocol() const {
401 return protocol_;
402 }
403
404 std::unique_ptr<ConnectionBase> &connection() { return conn_; }
405
406 const std::unique_ptr<ConnectionBase> &connection() const { return conn_; }
407
408 /**
409 * check if the channel is secure.
410 *
411 * - if TLS is enabled, it the transport is secure
412 * - if transport is secure, the channel is secure
413 */
414 [[nodiscard]] bool is_secure_transport() const {
415 return conn_->is_secure_transport() || (channel_.ssl() != nullptr);
416 }
417
418 private:
419 // tcp/unix-socket
420 std::unique_ptr<ConnectionBase> conn_;
421
423
424 // socket buffers
426
427 // higher-level protocol
429};
430
431#endif
basic connection which wraps a net-ts Protocol.
Definition: connection_base.h:113
stdx::expected< void, std::error_code > set_io_context(net::io_context &new_ctx) override
Definition: connection_base.h:126
stdx::expected< void, std::error_code > get_option(GettableSocketOption &opt) const
Definition: connection_base.h:229
stdx::expected< void, std::error_code > close() override
Definition: connection_base.h:207
void async_wait_error(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:195
std::string endpoint() const override
Definition: connection_base.h:220
void async_wait_recv(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:190
net::impl::socket::native_handle_type native_handle() const override
Definition: connection_base.h:202
void async_recv(recv_buffer_type &buf, std::function< void(std::error_code ec, size_t transferred)> completion) override
Definition: connection_base.h:145
stdx::expected< void, std::error_code > cancel() override
Definition: connection_base.h:211
bool is_open() const override
Definition: connection_base.h:200
socket_type sock_
Definition: connection_base.h:244
typename protocol_type::socket socket_type
Definition: connection_base.h:116
BasicConnection(socket_type sock, endpoint_type ep)
Definition: connection_base.h:121
bool is_secure_transport() const override
check if the underlying transport is secure.
Definition: connection_base.h:239
net::io_context & io_ctx() override
Definition: connection_base.h:124
endpoint_type ep_
Definition: connection_base.h:245
stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st) override
Definition: connection_base.h:215
void async_send(recv_buffer_type &buf, std::function< void(std::error_code ec, size_t transferred)> completion) override
Definition: connection_base.h:152
void async_wait_send(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:185
typename protocol_type::endpoint endpoint_type
Definition: connection_base.h:117
SSL aware socket buffers.
Definition: channel.h:65
void view_discard_raw()
Definition: channel.cc:302
void clear()
clears all buffers.
Definition: channel.h:79
SSL * ssl() const
get access to the raw SSL handle.
Definition: channel.h:324
recv_buffer_type & recv_buffer()
buffer of data that was received from the socket.
Definition: channel.h:240
virtual base-class of BasicConnection.
Definition: connection_base.h:54
virtual void async_send(recv_buffer_type &, std::function< void(std::error_code ec, size_t transferred)>)=0
std::vector< uint8_t, default_init_allocator< uint8_t > > recv_buffer_type
Definition: connection_base.h:59
virtual void async_wait_recv(std::function< void(std::error_code ec)>)=0
virtual stdx::expected< void, std::error_code > close()=0
virtual void async_recv(recv_buffer_type &, std::function< void(std::error_code ec, size_t transferred)>)=0
virtual void async_wait_send(std::function< void(std::error_code ec)>)=0
virtual bool is_secure_transport() const =0
virtual net::impl::socket::native_handle_type native_handle() const =0
virtual net::io_context & io_ctx()=0
virtual std::string endpoint() const =0
virtual bool is_open() const =0
virtual stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st)=0
virtual stdx::expected< void, std::error_code > cancel()=0
virtual stdx::expected< void, std::error_code > set_io_context(net::io_context &new_ctx)=0
virtual void async_wait_error(std::function< void(std::error_code ec)>)=0
virtual ~ConnectionBase()=default
Definition: protocol.h:33
a Connection that can be switched to TLS.
Definition: connection_base.h:264
void prepare_for_pool()
Definition: connection_base.h:300
SslMode ssl_mode_
Definition: connection_base.h:422
Channel & channel()
Definition: connection_base.h:359
std::unique_ptr< ConnectionBase > conn_
Definition: connection_base.h:420
T protocol_state_type
Definition: connection_base.h:272
bool is_secure_transport() const
check if the channel is secure.
Definition: connection_base.h:414
void async_wait_send(Func &&func)
async wait until connection allows to send data.
Definition: connection_base.h:350
std::string endpoint() const
Definition: connection_base.h:386
protocol_state_type protocol_
Definition: connection_base.h:428
protocol_state_type & protocol()
Definition: connection_base.h:398
const protocol_state_type & protocol() const
Definition: connection_base.h:400
const std::unique_ptr< ConnectionBase > & connection() const
Definition: connection_base.h:406
bool is_open() const
Definition: connection_base.h:365
void async_send(Func &&func)
async send data from the channel's send buffer to the connection.
Definition: connection_base.h:340
std::unique_ptr< ConnectionBase > & connection()
Definition: connection_base.h:404
TlsSwitchableConnection(std::unique_ptr< ConnectionBase > conn, SslMode ssl_mode, protocol_state_type state)
Definition: connection_base.h:274
void async_recv(Func &&func)
async receive data from connection into the channel's receive buffer.
Definition: connection_base.h:317
net::impl::socket::native_handle_type native_handle() const
Definition: connection_base.h:367
stdx::expected< void, std::error_code > cancel()
Definition: connection_base.h:392
void assign_connection(std::unique_ptr< ConnectionBase > conn)
assign a low-level connection.
Definition: connection_base.h:296
void async_wait_error(Func &&func)
Definition: connection_base.h:355
Channel channel_
Definition: connection_base.h:425
stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st) const
Definition: connection_base.h:378
stdx::expected< void, std::error_code > close() const
Definition: connection_base.h:371
const Channel & channel() const
Definition: connection_base.h:361
static constexpr size_t kRecvBufferSize
Definition: connection_base.h:271
SslMode ssl_mode() const
Definition: connection_base.h:363
TlsSwitchableConnection(std::unique_ptr< ConnectionBase > conn, SslMode ssl_mode, Channel channel, protocol_state_type state)
Definition: connection_base.h:283
Definition: io_context.h:61
executor_type get_executor() noexcept
Definition: io_context.h:1081
static constexpr wait_type wait_read
Definition: socket.h:168
shutdown_type
Definition: socket.h:172
static constexpr wait_type wait_write
Definition: socket.h:169
static constexpr wait_type wait_error
Definition: socket.h:170
Definition: buffer.h:696
Definition: expected.h:286
#define harness_assert(COND)
Improved assert()
Definition: harness_assert.h:37
static MYSQL * sock
Definition: mysqlcheck.cc:57
Definition: buf0block_hint.cc:30
constexpr value_type ssl
Definition: classic_protocol_constants.h:49
Definition: local.h:60
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
int native_handle_type
Definition: socket_constants.h:51
void async_read(AsyncReadStream &stream, DynamicBuffer &&b, CompletionCondition completion_condition, CompletionToken &&token)
Definition: buffer.h:933
stdx::expected< size_t, std::error_code > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:977
dynamic_vector_buffer< T, Allocator > dynamic_buffer(std::vector< T, Allocator > &vec) noexcept
Definition: buffer.h:661
auto defer(CompletionToken &&token)
Definition: executor.h:779
void async_write(AsyncWriteStream &stream, const ConstBufferSequence &buffers, CompletionCondition cond, CompletionToken &&token)
Definition: buffer.h:1066
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
Definition: gcs_xcom_synode.h:64
unexpected(E) -> unexpected< E >
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2872
static void swap(String &a, String &b) noexcept
Definition: sql_string.h:663
SslMode
Definition: ssl_mode.h:29
Definition: connection_base.h:95
Definition: task.h:427