MySQL 8.4.0
Source Code Documentation
connection_container.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2018, 2024, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is designed to work with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have either included with
14 the program or referenced in the documentation.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24*/
25
26#ifndef ROUTING_CONNECTION_CONTAINER_INCLUDED
27#define ROUTING_CONNECTION_CONTAINER_INCLUDED
28
29#include <algorithm>
30#include <functional>
31#include <map>
32#include <memory>
33#include <mutex>
34#include <vector>
35
36#include "connection.h"
37#include "destination.h"
41#include "tcp_address.h"
42
44
45/**
46 * @brief Basic Concurrent Map
47 *
48 * The concurrent_map is a hash-map, with fixed number of buckets.
49 * The number of buckets can be specified in constructor parameter
50 * (num_buckets), by default is set to 23.
51 */
52template <typename Key, typename Value, typename Hash = std::hash<Key>>
54 public:
55 using key_type = Key;
57 using hash_type = Hash;
59
60 concurrent_map(unsigned num_buckets = kDefaultNumberOfBucket,
61 const Hash &hasher = Hash())
62 : buckets_(num_buckets), hasher_(hasher) {}
63
64 concurrent_map(const concurrent_map &other) = delete;
65 concurrent_map &operator=(const concurrent_map &other) = delete;
66
67 template <typename Predicate>
68 void for_one(const Key &key, Predicate &p) {
70 }
71
72 template <typename Predicate>
74 for (auto &each_bucket : buckets_) {
75 each_bucket.for_each(p);
76 }
77 }
78
79 void put(const Key &key, Value &&value) {
80 get_bucket(key).put(key, std::move(value));
81 }
82
83 void erase(const Key &key) { get_bucket(key).erase(key); }
84
85 std::size_t size() const {
86 std::size_t result{0};
87 for (auto &each_bucket : buckets_) {
88 result += each_bucket.size();
89 }
90 return result;
91 }
92
93 bool empty() const {
94 for (const auto &each_bucket : buckets_) {
95 if (!each_bucket.empty()) return false;
96 }
97 return true;
98 }
99
100 private:
101 static const unsigned kDefaultNumberOfBucket = 127;
102
103 class Bucket {
104 public:
105 void put(const Key &key, Value &&value) {
106 std::lock_guard<std::mutex> lock(data_mutex_);
107 data_.emplace(key, std::move(value));
108 }
109
110 void erase(const Key &key) {
111 std::lock_guard<std::mutex> lock(data_mutex_);
112 data_.erase(key);
113 }
114
115 template <typename Predicate>
116 void for_one(const Key &key, Predicate &p) {
117 std::lock_guard<std::mutex> lock(data_mutex_);
118 const ConstBucketIterator found = data_.find(key);
119 if (found != data_.end()) p(found->second);
120 }
121
122 template <typename Predicate>
124 std::lock_guard<std::mutex> lock(data_mutex_);
125 std::for_each(data_.begin(), data_.end(), p);
126 }
127
128 std::size_t size() const {
129 std::lock_guard<std::mutex> lock(data_mutex_);
130 return data_.size();
131 }
132
133 bool empty() const {
134 std::lock_guard<std::mutex> lock(data_mutex_);
135 return data_.empty();
136 }
137
138 private:
139 using BucketData = std::map<Key, Value>;
140 using BucketIterator = typename BucketData::iterator;
141 using ConstBucketIterator = typename BucketData::const_iterator;
142
144 mutable std::mutex data_mutex_;
145 };
146
147 std::vector<Bucket> buckets_;
149
151 const std::size_t bucket_index = hasher_(key) % buckets_.size();
152 return buckets_[bucket_index];
153 }
154
155 const Bucket &get_bucket(const Key &key) const {
156 const std::size_t bucket_index = hasher_(key) % buckets_.size();
157 return buckets_[bucket_index];
158 }
159};
160
161/**
162 * @brief container for connections to MySQL Server.
163 *
164 * When thread of execution for connection to MySQL Server is completed, it
165 * should call remove_connection to remove itself from connection container.
166 */
169 std::shared_ptr<MySQLRoutingConnectionBase>>
171
172 public:
174
175 std::vector<ConnData> get_all_connections_info() {
176 std::vector<ConnData> connection_datas;
177
178 auto l =
179 [&connection_datas](const decltype(connections_)::value_type &conn) {
180 const auto stats = conn.second->get_stats();
181
182 connection_datas.push_back({
183 conn.second->get_client_address(),
184 conn.second->get_server_address(),
185 stats.bytes_up,
186 stats.bytes_down,
187 stats.started,
188 stats.connected_to_server,
189 stats.last_sent_to_server,
190 stats.last_received_from_server,
191 });
192 };
193 connections_.for_each(l);
194 return connection_datas;
195 }
196 /**
197 * @brief Adds new connection to container.
198 *
199 * @param connection The connection to MySQL server
200 */
201 void add_connection(std::shared_ptr<MySQLRoutingConnectionBase> connection);
202
203 /**
204 * @brief Disconnects all connections to servers that are not allowed any
205 * longer.
206 *
207 * @param nodes Allowed servers. Connections to servers that are not in nodes
208 * are closed.
209 * @returns number of connections marked to be disconnected
210 */
211 unsigned disconnect(const AllowedNodes &nodes);
212
213 /**
214 * @brief Disconnects all connection in the ConnectionContainer.
215 */
216 void disconnect_all();
217
218 /**
219 * @brief Retrieve the connection object for the given client endpoint
220 *
221 * @param client_endpoint The endpoint string
222 * @returns the connection object, or nullptr
223 */
225 const std::string &client_endpoint);
226
227 /**
228 * @brief removes connection from container
229 *
230 * This function should be called by thread of execution when connection
231 * thread completes. Do NOT call this function before connection's thread of
232 * execution completes.
233 *
234 * @param connection The connection to remove from container
235 */
237
238 /** number of active client threads. */
239 std::condition_variable connection_removed_cond_;
241
242 /**
243 * check if container is empty.
244 *
245 * as the map is concurrent, empty() only gives a reasonable result if it is
246 * ensured no other thread is currently adding connections.
247 */
248 bool empty() const { return connections_.empty(); }
249};
250
251#endif /* ROUTING_CONNECTION_CONTAINER_INCLUDED */
container for connections to MySQL Server.
Definition: connection_container.h:167
void disconnect_all()
Disconnects all connection in the ConnectionContainer.
Definition: connection_container.cc:106
std::vector< ConnData > get_all_connections_info()
Definition: connection_container.h:175
bool empty() const
check if container is empty.
Definition: connection_container.h:248
unsigned disconnect(const AllowedNodes &nodes)
Disconnects all connections to servers that are not allowed any longer.
Definition: connection_container.cc:36
std::mutex connection_removed_cond_m_
Definition: connection_container.h:240
std::condition_variable connection_removed_cond_
number of active client threads.
Definition: connection_container.h:239
MySQLRoutingConnectionBase * get_connection(const std::string &client_endpoint)
Retrieve the connection object for the given client endpoint.
Definition: connection_container.cc:89
void add_connection(std::shared_ptr< MySQLRoutingConnectionBase > connection)
Adds new connection to container.
Definition: connection_container.cc:31
void remove_connection(MySQLRoutingConnectionBase *connection)
removes connection from container
Definition: connection_container.cc:111
concurrent_map< MySQLRoutingConnectionBase *, std::shared_ptr< MySQLRoutingConnectionBase > > connections_
Definition: connection_container.h:170
Definition: connection.h:44
a nullable SQL value.
Definition: sql_value.h:40
Definition: connection_container.h:103
std::mutex data_mutex_
Definition: connection_container.h:144
std::size_t size() const
Definition: connection_container.h:128
void for_each(Predicate &p)
Definition: connection_container.h:123
BucketData data_
Definition: connection_container.h:143
typename BucketData::iterator BucketIterator
Definition: connection_container.h:140
void put(const Key &key, Value &&value)
Definition: connection_container.h:105
std::map< Key, Value > BucketData
Definition: connection_container.h:139
typename BucketData::const_iterator ConstBucketIterator
Definition: connection_container.h:141
void for_one(const Key &key, Predicate &p)
Definition: connection_container.h:116
void erase(const Key &key)
Definition: connection_container.h:110
bool empty() const
Definition: connection_container.h:133
Basic Concurrent Map.
Definition: connection_container.h:53
concurrent_map(const concurrent_map &other)=delete
concurrent_map(unsigned num_buckets=kDefaultNumberOfBucket, const Hash &hasher=Hash())
Definition: connection_container.h:60
void erase(const Key &key)
Definition: connection_container.h:83
std::size_t size() const
Definition: connection_container.h:85
Bucket & get_bucket(const Key &key)
Definition: connection_container.h:150
static const unsigned kDefaultNumberOfBucket
Definition: connection_container.h:101
void for_each(Predicate p)
Definition: connection_container.h:73
void for_one(const Key &key, Predicate &p)
Definition: connection_container.h:68
concurrent_map & operator=(const concurrent_map &other)=delete
Hash hasher_
Definition: connection_container.h:148
void put(const Key &key, Value &&value)
Definition: connection_container.h:79
const Bucket & get_bucket(const Key &key) const
Definition: connection_container.h:155
bool empty() const
Definition: connection_container.h:93
Hash hash_type
Definition: connection_container.h:57
typename std::map< Key, Value >::value_type value_type
Definition: connection_container.h:58
std::vector< Bucket > buckets_
Definition: connection_container.h:147
Key key_type
Definition: connection_container.h:55
const char * p
Definition: ctype-mb.cc:1235
std::vector< AvailableDestination > AllowedNodes
Definition: destination_status_types.h:59
void for_each(const Shards< COUNT > &shards, Function &&f) noexcept
Iterate over the shards.
Definition: ut0counter.h:323
uint16_t value_type
Definition: vt100.h:184
std::unordered_map< Key, CHARSET_INFO * > Hash
Definition: collations_internal.cc:548
std::string_view Key
The key type for the hash structure in HashJoinRowBuffer.
Definition: hash_join_buffer.h:102
static std::mutex lock
Definition: net_ns.cc:56
struct result result
Definition: result.h:34
required string key
Definition: replication_asynchronous_connection_failover.proto:60
Definition: routing_component.h:73
A filter of some sort that is not a join condition (those are stored in JoinPredicate objects).
Definition: access_path.h:132
Definition: result.h:30
Definition: mysqlslap.cc:240