MySQL 9.4.0
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
connection_base.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2021, 2025, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTER_CONNECTION_POOL_CONNECTION_BASE_INCLUDED
27#define ROUTER_CONNECTION_POOL_CONNECTION_BASE_INCLUDED
28
29#include <cstdint> // size_t
30#include <functional>
31#include <sstream>
32#include <system_error> // error_code
33#include <vector>
34
35#ifdef _WIN32
36// include winsock2.h before openssl/ssl.h
37#include <windows.h>
38#include <winsock2.h>
39#include <ws2tcpip.h>
40#endif
41
42#include <openssl/ssl.h> // SSL_CTX
43
44#include "harness_assert.h"
49#include "mysqlrouter/channel.h"
51
52/**
53 * virtual base-class of BasicConnection.
54 */
56 public:
57 virtual ~ConnectionBase() = default;
58
60 std::vector<uint8_t, default_init_allocator<uint8_t>>;
61
62 virtual net::io_context &io_ctx() = 0;
63
64 virtual void async_recv(
66 std::function<void(std::error_code ec, size_t transferred)>) = 0;
67
68 virtual void async_send(
70 std::function<void(std::error_code ec, size_t transferred)>) = 0;
71
72 virtual void async_wait_send(std::function<void(std::error_code ec)>) = 0;
73 virtual void async_wait_recv(std::function<void(std::error_code ec)>) = 0;
74 virtual void async_wait_error(std::function<void(std::error_code ec)>) = 0;
75
76 [[nodiscard]] virtual bool is_open() const = 0;
77
79 const = 0;
80
81 [[nodiscard]] virtual stdx::expected<void, std::error_code> close() = 0;
84
85 [[nodiscard]] virtual std::string endpoint() const = 0;
86
87 [[nodiscard]] virtual stdx::expected<void, std::error_code> cancel() = 0;
88
89 [[nodiscard]] virtual bool is_secure_transport() const = 0;
90
92 net::io_context &new_ctx) = 0;
93};
94
95template <class Protocol>
96struct IsTransportSecure : std::false_type {};
97
98#ifdef NET_TS_HAS_UNIX_SOCKET
99template <>
100struct IsTransportSecure<local::stream_protocol> : std::true_type {};
101#endif
102
103/**
104 * basic connection which wraps a net-ts Protocol.
105 *
106 * knows about mysql-protocol specifics like:
107 *
108 * - session attributes
109 * - connection error-tracking.
110 *
111 * @tparam Protocol a protocol like net::ip::tcp or local::stream_protocol
112 */
113template <class Protocol>
115 public:
118 using endpoint_type = typename protocol_type::endpoint;
119
121
123 : sock_{std::move(sock)}, ep_{std::move(ep)} {}
124
125 net::io_context &io_ctx() override { return sock_.get_executor().context(); }
126
128 net::io_context &new_ctx) override {
129 // nothing to do.
130 if (sock_.get_executor() == new_ctx.get_executor()) return {};
131
132 return sock_.release().and_then(
133 [this, &new_ctx](
135 socket_type new_sock(new_ctx);
136
137 auto assign_res = new_sock.assign(ep_.protocol(), native_handle);
138 if (!assign_res) return assign_res;
139
140 std::swap(sock_, new_sock);
141
142 return {};
143 });
144 }
145
147 std::function<void(std::error_code ec, size_t transferred)>
148 completion) override {
150 std::move(completion));
151 }
152
154 std::function<void(std::error_code ec, size_t transferred)>
155 completion) override {
156 if (sock_.native_non_blocking()) {
157 // if the socket is non-blocking try to send directly as the send-buffer
158 // is usually empty
159 auto write_res = net::write(sock_, net::dynamic_buffer(buf),
161 if (write_res) {
162 net::defer(sock_.get_executor(), [completion = std::move(completion),
163 transferred = *write_res]() {
164 completion({}, transferred);
165 });
166 return;
167 }
168
169 const auto ec = write_res.error();
170
171 if (ec != make_error_condition(std::errc::operation_would_block) &&
172 ec !=
173 make_error_condition(std::errc::resource_unavailable_try_again)) {
174 net::defer(sock_.get_executor(), [completion = std::move(completion),
175 ec]() { completion(ec, 0); });
176 return;
177 }
178
179 // if it would-block, use the normal async-write.
180 }
181
183 std::move(completion));
184 }
185
187 std::function<void(std::error_code ec)> completion) override {
188 sock_.async_wait(net::socket_base::wait_write, std::move(completion));
189 }
190
192 std::function<void(std::error_code ec)> completion) override {
193 sock_.async_wait(net::socket_base::wait_read, std::move(completion));
194 }
195
197 std::function<void(std::error_code ec)> completion) override {
198 sock_.async_wait(net::socket_base::wait_error, std::move(completion));
199 }
200
201 [[nodiscard]] bool is_open() const override { return sock_.is_open(); }
202
204 const override {
205 return sock_.native_handle();
206 }
207
209 return sock_.close();
210 }
211
213 return sock_.cancel();
214 }
215
218 return sock_.shutdown(st);
219 }
220
221 [[nodiscard]] std::string endpoint() const override {
223
224 oss << ep_;
225
226 return oss.str();
227 }
228
229 template <class GettableSocketOption>
231 GettableSocketOption &opt) const {
232 return sock_.get_option(opt);
233 }
234
235 /**
236 * check if the underlying transport is secure.
237 *
238 * - unix-socket, shared-memory, ... are secure.
239 */
240 [[nodiscard]] bool is_secure_transport() const override {
242 }
243
244 protected:
247};
248
250#ifdef NET_TS_HAS_UNIX_SOCKET
251using UnixDomainConnection = BasicConnection<local::stream_protocol>;
252#endif
253
254/**
255 * a Connection that can be switched to TLS.
256 *
257 * wraps
258 *
259 * - a low-level connections (conn)
260 * - a routing connection (endpoints, destinations, ...)
261 * - a tls switchable (a SSL_CTX * wrapper)
262 * - protocol state (classic, xproto)
263 */
264template <class T>
266 public:
267 // 16kb per buffer
268 // 2 buffers per channel (send/recv)
269 // 2 channels per connection
270 // 10000 connections
271 // = 640MByte
272 static constexpr size_t kRecvBufferSize{16UL * 1024};
274
275 TlsSwitchableConnection(std::unique_ptr<ConnectionBase> conn,
277 : conn_{std::move(conn)},
279 channel_{},
280 protocol_{std::move(state)} {
282 }
283
284 TlsSwitchableConnection(std::unique_ptr<ConnectionBase> conn,
287 : conn_{std::move(conn)},
289 channel_{std::move(channel)},
290 protocol_{std::move(state)} {
292 }
293
294 /**
295 * assign a low-level connection.
296 */
297 void assign_connection(std::unique_ptr<ConnectionBase> conn) {
298 conn_ = std::move(conn);
299 }
300
302 if (auto *ssl = channel_.ssl()) {
303 SSL_set_info_callback(ssl, nullptr);
304 SSL_set_msg_callback_arg(ssl, nullptr);
305 }
306
307 // reset the recv and send buffers and the pool shouldn't care about the
308 // content of those buffers.
309 channel_.clear();
310 }
311
312 /**
313 * async receive data from connection into the channel's receive buffer.
314 *
315 * calls func when async operation is completed.
316 */
317 template <class Func>
318 void async_recv(Func &&func) {
319 harness_assert(conn_ != nullptr);
320
321 // discard everything that has been marked as 'consumed'
323
324 conn_->async_recv(channel_.recv_buffer(),
325 [this, func = std::forward<Func>(func)](
326 std::error_code ec, size_t transferred) {
327 if (ec == std::error_code()) {
328 channel_.view_sync_raw();
329 }
330
331 func(ec, transferred);
332 });
333 }
334
335 /**
336 * async send data from the channel's send buffer to the connection.
337 *
338 * calls func when async operation is completed.
339 */
340 template <class Func>
341 void async_send(Func &&func) {
342 conn_->async_send(channel_.send_buffer(), std::forward<Func>(func));
343 }
344
345 /**
346 * async wait until connection allows to send data.
347 *
348 * calls func when async operation is completed.
349 */
350 template <class Func>
351 void async_wait_send(Func &&func) {
352 conn_->async_wait_send(std::forward<Func>(func));
353 }
354
355 template <class Func>
356 void async_wait_error(Func &&func) {
357 conn_->async_wait_error(std::forward<Func>(func));
358 }
359
360 [[nodiscard]] Channel &channel() { return channel_; }
361
362 [[nodiscard]] const Channel &channel() const { return channel_; }
363
364 [[nodiscard]] SslMode ssl_mode() const { return ssl_mode_; }
365
366 [[nodiscard]] bool is_open() const { return conn_ && conn_->is_open(); }
367
369 return conn_->native_handle();
370 }
371
373 if (!conn_) {
374 return stdx::unexpected(make_error_code(std::errc::not_connected));
375 }
376 return conn_->close();
377 }
378
381 if (!conn_) {
382 return stdx::unexpected(make_error_code(std::errc::not_connected));
383 }
384 return conn_->shutdown(st);
385 }
386
387 [[nodiscard]] std::string endpoint() const {
388 if (!is_open()) return "";
389
390 return conn_->endpoint();
391 }
392
394 if (!conn_) return {};
395
396 return conn_->cancel();
397 }
398
399 [[nodiscard]] protocol_state_type &protocol() { return protocol_; }
400
401 [[nodiscard]] const protocol_state_type &protocol() const {
402 return protocol_;
403 }
404
405 std::unique_ptr<ConnectionBase> &connection() { return conn_; }
406
407 const std::unique_ptr<ConnectionBase> &connection() const { return conn_; }
408
409 /**
410 * check if the channel is secure.
411 *
412 * - if TLS is enabled, it the transport is secure
413 * - if transport is secure, the channel is secure
414 */
415 [[nodiscard]] bool is_secure_transport() const {
416 return conn_->is_secure_transport() || (channel_.ssl() != nullptr);
417 }
418
419 private:
420 // tcp/unix-socket
421 std::unique_ptr<ConnectionBase> conn_;
422
424
425 // socket buffers
427
428 // higher-level protocol
430};
431
432#endif
basic connection which wraps a net-ts Protocol.
Definition: connection_base.h:114
stdx::expected< void, std::error_code > set_io_context(net::io_context &new_ctx) override
Definition: connection_base.h:127
stdx::expected< void, std::error_code > get_option(GettableSocketOption &opt) const
Definition: connection_base.h:230
stdx::expected< void, std::error_code > close() override
Definition: connection_base.h:208
void async_wait_error(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:196
std::string endpoint() const override
Definition: connection_base.h:221
void async_wait_recv(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:191
net::impl::socket::native_handle_type native_handle() const override
Definition: connection_base.h:203
void async_recv(recv_buffer_type &buf, std::function< void(std::error_code ec, size_t transferred)> completion) override
Definition: connection_base.h:146
stdx::expected< void, std::error_code > cancel() override
Definition: connection_base.h:212
bool is_open() const override
Definition: connection_base.h:201
socket_type sock_
Definition: connection_base.h:245
typename protocol_type::socket socket_type
Definition: connection_base.h:117
BasicConnection(socket_type sock, endpoint_type ep)
Definition: connection_base.h:122
bool is_secure_transport() const override
check if the underlying transport is secure.
Definition: connection_base.h:240
net::io_context & io_ctx() override
Definition: connection_base.h:125
endpoint_type ep_
Definition: connection_base.h:246
stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st) override
Definition: connection_base.h:216
void async_send(recv_buffer_type &buf, std::function< void(std::error_code ec, size_t transferred)> completion) override
Definition: connection_base.h:153
void async_wait_send(std::function< void(std::error_code ec)> completion) override
Definition: connection_base.h:186
typename protocol_type::endpoint endpoint_type
Definition: connection_base.h:118
SSL aware socket buffers.
Definition: channel.h:65
void view_discard_raw()
Definition: channel.cc:313
void clear()
clears all buffers.
Definition: channel.h:79
SSL * ssl() const
get access to the raw SSL handle.
Definition: channel.h:324
recv_buffer_type & recv_buffer()
buffer of data that was received from the socket.
Definition: channel.h:240
virtual base-class of BasicConnection.
Definition: connection_base.h:55
virtual void async_send(recv_buffer_type &, std::function< void(std::error_code ec, size_t transferred)>)=0
std::vector< uint8_t, default_init_allocator< uint8_t > > recv_buffer_type
Definition: connection_base.h:60
virtual void async_wait_recv(std::function< void(std::error_code ec)>)=0
virtual stdx::expected< void, std::error_code > close()=0
virtual void async_recv(recv_buffer_type &, std::function< void(std::error_code ec, size_t transferred)>)=0
virtual void async_wait_send(std::function< void(std::error_code ec)>)=0
virtual bool is_secure_transport() const =0
virtual net::impl::socket::native_handle_type native_handle() const =0
virtual net::io_context & io_ctx()=0
virtual std::string endpoint() const =0
virtual bool is_open() const =0
virtual stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st)=0
virtual stdx::expected< void, std::error_code > cancel()=0
virtual stdx::expected< void, std::error_code > set_io_context(net::io_context &new_ctx)=0
virtual void async_wait_error(std::function< void(std::error_code ec)>)=0
virtual ~ConnectionBase()=default
Definition: protocol.h:33
a Connection that can be switched to TLS.
Definition: connection_base.h:265
void prepare_for_pool()
Definition: connection_base.h:301
SslMode ssl_mode_
Definition: connection_base.h:423
Channel & channel()
Definition: connection_base.h:360
std::unique_ptr< ConnectionBase > conn_
Definition: connection_base.h:421
T protocol_state_type
Definition: connection_base.h:273
bool is_secure_transport() const
check if the channel is secure.
Definition: connection_base.h:415
void async_wait_send(Func &&func)
async wait until connection allows to send data.
Definition: connection_base.h:351
std::string endpoint() const
Definition: connection_base.h:387
protocol_state_type protocol_
Definition: connection_base.h:429
protocol_state_type & protocol()
Definition: connection_base.h:399
const protocol_state_type & protocol() const
Definition: connection_base.h:401
const std::unique_ptr< ConnectionBase > & connection() const
Definition: connection_base.h:407
bool is_open() const
Definition: connection_base.h:366
void async_send(Func &&func)
async send data from the channel's send buffer to the connection.
Definition: connection_base.h:341
std::unique_ptr< ConnectionBase > & connection()
Definition: connection_base.h:405
TlsSwitchableConnection(std::unique_ptr< ConnectionBase > conn, SslMode ssl_mode, protocol_state_type state)
Definition: connection_base.h:275
void async_recv(Func &&func)
async receive data from connection into the channel's receive buffer.
Definition: connection_base.h:318
net::impl::socket::native_handle_type native_handle() const
Definition: connection_base.h:368
stdx::expected< void, std::error_code > cancel()
Definition: connection_base.h:393
void assign_connection(std::unique_ptr< ConnectionBase > conn)
assign a low-level connection.
Definition: connection_base.h:297
void async_wait_error(Func &&func)
Definition: connection_base.h:356
Channel channel_
Definition: connection_base.h:426
stdx::expected< void, std::error_code > shutdown(net::socket_base::shutdown_type st) const
Definition: connection_base.h:379
stdx::expected< void, std::error_code > close() const
Definition: connection_base.h:372
const Channel & channel() const
Definition: connection_base.h:362
static constexpr size_t kRecvBufferSize
Definition: connection_base.h:272
SslMode ssl_mode() const
Definition: connection_base.h:364
TlsSwitchableConnection(std::unique_ptr< ConnectionBase > conn, SslMode ssl_mode, Channel channel, protocol_state_type state)
Definition: connection_base.h:284
Definition: io_context.h:61
executor_type get_executor() noexcept
Definition: io_context.h:1094
static constexpr wait_type wait_read
Definition: socket.h:168
shutdown_type
Definition: socket.h:172
static constexpr wait_type wait_write
Definition: socket.h:169
static constexpr wait_type wait_error
Definition: socket.h:170
Definition: buffer.h:696
Definition: expected.h:286
#define harness_assert(COND)
Improved assert()
Definition: harness_assert.h:37
#define T
Definition: jit_executor_value.cc:373
static MYSQL * sock
Definition: mysqlcheck.cc:57
Definition: buf0block_hint.cc:30
constexpr value_type ssl
Definition: classic_protocol_constants.h:49
Definition: local.h:60
Protocol
Definition: protocol.h:35
std::error_code make_error_code(DynamicLoaderErrc ec)
make error_code from a DynamicLoaderErrc.
Definition: dynamic_loader.cc:79
stdx::expected< native_handle_type, error_type > socket(int family, int sock_type, int protocol)
Definition: socket.h:63
int native_handle_type
Definition: socket_constants.h:51
void async_read(AsyncReadStream &stream, DynamicBuffer &&b, CompletionCondition completion_condition, CompletionToken &&token)
Definition: buffer.h:933
stdx::expected< size_t, std::error_code > write(SyncWriteStream &stream, const ConstBufferSequence &buffers)
Definition: buffer.h:977
dynamic_vector_buffer< T, Allocator > dynamic_buffer(std::vector< T, Allocator > &vec) noexcept
Definition: buffer.h:661
auto defer(CompletionToken &&token)
Definition: executor.h:779
void async_write(AsyncWriteStream &stream, const ConstBufferSequence &buffers, CompletionCondition cond, CompletionToken &&token)
Definition: buffer.h:1066
std::error_condition make_error_condition(net::stream_errc e) noexcept
Definition: buffer.h:107
Definition: gcs_xcom_synode.h:64
unexpected(E) -> unexpected< E >
std::basic_ostringstream< char, std::char_traits< char >, ut::allocator< char > > ostringstream
Specialization of basic_ostringstream which uses ut::allocator.
Definition: ut0new.h:2872
static void swap(String &a, String &b) noexcept
Definition: sql_string.h:650
SslMode
Definition: ssl_mode.h:29
Definition: connection_base.h:96
Definition: task.h:427