MySQL 8.4.2
Source Code Documentation
xcom_transport.h
Go to the documentation of this file.
1/* Copyright (c) 2015, 2024, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is designed to work with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have either included with
13 the program or referenced in the documentation.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24#ifndef XCOM_TRANSPORT_H
25#define XCOM_TRANSPORT_H
26
27#include "xcom/server_struct.h"
28#include "xcom/site_struct.h"
29#include "xcom/xcom_common.h"
30#include "xdr_gen/xcom_vp.h"
31
32#define XDR_INT_SIZE 4
33#define MSG_HDR_SIZE (3 * XDR_INT_SIZE)
34
35/* Definition of message with fixed size header and variable size payload */
36
37/* This is version 1_0 of the header. It is OK to change the header in
38 other versions as long as the version field is the first field.
39*/
40/* version[4] length[4] type[1] tag[2] UNUSED[1] message-body[length] */
41/*
42 The version is used both for protocol negotiations and to discard messages
43 with wrong version.
44 The length field is the length of the message, not including the header
45 itself.
46 The type field is 0 if a normal message, otherwise it is a protocol control
47 message.
48 The tag field will be used during protocol negotiation to uniquely identify
49 the request. The reply will
50 contain the same tag.
51 The message-body contains xdr-serialized data.
52 */
53
54#define SERIALIZED_BUFLEN(x) ((x) + MSG_HDR_SIZE)
55
56#define VERS_PTR(buf) (buf)
57#define LENGTH_PTR(buf) &((buf)[XDR_INT_SIZE])
58#define X_TYPE (2 * XDR_INT_SIZE)
59#define X_TAG (X_TYPE + 1)
60#define X_TAG_PTR(buf) &((buf)[X_TAG])
61#ifdef NOTDEF
62#define CHECK_PTR(buf) &((buf)[3 * XDR_INT_SIZE])
63#endif
64#define MSG_PTR(buf) &((buf)[MSG_HDR_SIZE])
65
66extern xcom_proto const my_min_xcom_version; /* The minimum protocol version I
67 am able to understand */
68extern xcom_proto const
69 my_xcom_version; /* The maximum protocol version I am able to understand */
70
71/* Transport level message types */
73 x_normal = 0, /* Normal message */
74 x_version_req = 1, /* Negotiate protocol version */
75 x_version_reply = 2 /* Protocol version reply */
76};
78
79struct envelope {
80 char *srv;
84};
85
86typedef struct envelope envelope;
87
88int check_protoversion(xcom_proto x_proto, xcom_proto negotiated);
89int flush_srv_buf(server *s, int64_t *ret);
90
91/**
92 Reads message from connection rfd with buffering reads.
93
94 @param[in] rfd Pointer to open connection.
95 @param[in,out] buf Used for buffering reads.
96 @param[out] p Output buffer.
97 @param[out] s Pointer to server. Server timestamp updated if not 0.
98 @param[out] ret Number of bytes read, or -1 if failure.
99
100 @retval 0 if task should terminate.
101 @retval 1 if it should continue.
102*/
103
105 server *s, int64_t *ret);
106
107/**
108 Reads message from connection rfd without buffering reads.
109
110 @param[in] rfd Pointer to open connection.
111 @param[out] p Output buffer.
112 @param[in,out] s Pointer to server. Server timestamp updated if not 0.
113 @param[in,out] ret Number of bytes read, or -1 if failure.
114
115 @retval 0 if task should terminate.
116 @retval 1 if it should continue.
117*/
118int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret);
119
120int send_to_acceptors(pax_msg *p, const char *dbg);
121int send_to_all(pax_msg *p, const char *dbg);
122int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg);
123int send_to_others(site_def const *s, pax_msg *p, const char *dbg);
124int send_to_someone(site_def const *s, pax_msg *p, const char *dbg);
126int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg);
127
128void wakeup_sender();
129int sender_task(task_arg arg);
132int srv_ref(server *s);
133int srv_unref(server *s);
137uint32_t crc32c_hash(char *buf, char *end);
138int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata,
139 enum xdr_op op);
140void init_crc32c();
141void init_xcom_transport(xcom_port listen_port);
142void reset_srv_buf(srv_buf *sb);
143char *xcom_get_name(char *a);
145int send_server_msg(site_def const *s, node_no i, pax_msg *p);
146double server_active(site_def const *s, node_no i);
147void update_servers(site_def *s, cargo_type operation);
149int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
150 pax_msg *p);
151/**
152 Updates timestamp of server.
153
154 @param[in] s Pointer to server.
155*/
156void server_detected(server *s);
157
158void invalidate_servers(const site_def *old_site_def,
159 const site_def *new_site_def);
160
164
167 const char *server, xcom_port port,
168 int connection_timeout = Network_provider::default_connection_timeout(),
173
174/**
175 * @brief Tests connectivity to another node under the current configuration
176 * setup.
177 *
178 * This function verifies that this node is able to successfully reach to the
179 * other node. It is used to do a sanity check at when a new member is about
180 * to be added to the list of participants and to check local node
181 * connectivity, in order to validate configurations.
182 *
183 * @return false In case of error.
184 * @return true If it succeeds to connect to a node.
185 */
186bool is_able_to_connect_to_node(const char *server, const xcom_port port);
187
188#ifndef XCOM_WITHOUT_OPENSSL
190#endif
191
192char const *xcom_proto_name(xcom_proto proto_vers);
193xcom_proto negotiate_protocol(xcom_proto proto_vers);
194void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
195 x_msg_type *x_type, unsigned int *tag);
196void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
197 x_msg_type x_type, unsigned int tag);
198
199int send_proto(connection_descriptor *con, xcom_proto x_proto,
200 x_msg_type x_type, unsigned int tag, int64_t *ret);
201int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto,
202 x_msg_type *x_type, unsigned int *tag, int64_t *ret);
203
204void write_protoversion(unsigned char *buf, xcom_proto proto_vers);
205xcom_proto read_protoversion(unsigned char *p);
206
207int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf);
208int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen);
209xcom_proto common_xcom_version(site_def const *site);
210xcom_proto get_latest_common_proto();
211xcom_proto set_latest_common_proto(xcom_proto x_proto);
212extern linkage connect_wait;
213extern int connect_tcp(char *server, xcom_port port, int *ret);
214
215/**
216 * @brief Returns the version from which nodes are able to speak IPv6
217 *
218 * @return xcom_proto the version from which nodes are able to speak IPv6
219 */
220xcom_proto minimum_ipv6_version();
221
222#define IP_MAX_SIZE 512
223
224/**
225 * @brief Get the ip and port object from a given address in the authorized
226 * input format. For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or)
227 * NAME]:PORT
228 *
229 * @param address input address to parse
230 * @param ip the resulting IP or Name
231 * @param port the resulting port
232 * @return int true (1) in case of parse error
233 */
234int get_ip_and_port(char const *address, char ip[IP_MAX_SIZE], xcom_port *port);
235
236/**
237 * @brief Checks if an incoming node is eligible to enter the group
238 *
239 * This function checks if a new node entering the group is able to be part of
240 * it.
241 * This is needed duw to downgrade procedures to server versions that do not
242 * speak IPv6. One will check if:
243 * - Our server is being contacted by a server that has a lower version than the
244 * IPv6 baseline
245 * - Check if the current configuration is all reachable by an IPv4 node
246 *
247 * If all of the above hold true we are able to proceed and add the node. Else,
248 * we must fail.
249 *
250 * @retval 1 in case of success.
251 */
252int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto,
253 const site_def *current_site_def);
254
255#define INITIAL_CONNECT_WAIT 0.1
256#define MAX_CONNECT_WAIT 10.0
257#define CONNECT_WAIT_INCREASE 1.0
258
259#endif
static constexpr int default_connection_timeout()
Definition: network_provider.h:457
const char * p
Definition: ctype-mb.cc:1235
Definition: buf0block_hint.cc:30
static loglevel log_level(const Sql_condition *condition)
Definition: histogram.cc:1643
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192
network_provider_dynamic_log_level
Dynamic log level enum values.
Definition: network_provider.h:103
required uint64 port
Definition: replication_asynchronous_connection_failover.proto:33
site_def * new_site_def()
Definition: site_def.cc:296
struct pax_msg pax_msg
Definition: site_struct.h:37
Definition: node_connection.h:47
Definition: xcom_transport.h:79
int crash_on_error
Definition: xcom_transport.h:83
xcom_port port
Definition: xcom_transport.h:81
pax_msg * p
Definition: xcom_transport.h:82
char * srv
Definition: xcom_transport.h:80
Definition: simset.h:36
Definition: server_struct.h:39
Definition: site_struct.h:43
Definition: server_struct.h:31
Definition: task_arg.h:42
unsigned short xcom_port
Definition: xcom_common.h:46
int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:958
int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
Definition: xcom_transport.cc:849
int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
Definition: xcom_transport.cc:435
int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
Definition: xcom_transport.cc:557
int incoming_connection_task(task_arg arg)
Definition: xcom_transport.cc:772
int send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
Definition: xcom_transport.cc:396
int close_open_connection(connection_descriptor *conn)
Definition: xcom_transport.cc:103
int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto, const site_def *current_site_def)
Checks if an incoming node is eligible to enter the group.
Definition: xcom_transport.cc:258
xcom_proto set_latest_common_proto(xcom_proto x_proto)
Definition: xcom_transport.cc:1915
int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
Definition: xcom_transport.cc:548
char const * xcom_proto_name(xcom_proto proto_vers)
int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag, int64_t *ret)
int connect_tcp(char *server, xcom_port port, int *ret)
int shutdown_servers()
int tcp_reaper_task(task_arg arg)
Definition: xcom_transport.cc:1838
void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
Definition: xcom_transport.cc:1136
xcom_proto minimum_ipv6_version()
Returns the version from which nodes are able to speak IPv6.
Definition: xcom_transport.cc:1935
xcom_proto get_latest_common_proto()
Definition: xcom_transport.cc:1919
int srv_ref(server *s)
Definition: xcom_transport.cc:755
void garbage_collect_servers()
Definition: xcom_transport.cc:721
int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd without buffering reads.
Definition: xcom_transport.cc:1153
void init_crc32c()
Definition: xcom_transport.cc:574
int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:924
int flush_srv_buf(server *s, int64_t *ret)
Definition: xcom_transport.cc:215
void server_detected(server *s)
Updates timestamp of server.
Definition: xcom_transport.cc:803
int send_server_msg(site_def const *s, node_no i, pax_msg *p)
Definition: xcom_transport.cc:877
x_msg_type
Definition: xcom_transport.h:72
@ x_normal
Definition: xcom_transport.h:73
@ x_version_reply
Definition: xcom_transport.h:75
@ x_version_req
Definition: xcom_transport.h:74
uint32_t crc32c_hash(char *buf, char *end)
void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
Definition: xcom_transport.cc:379
void reset_srv_buf(srv_buf *sb)
Definition: xcom_transport.cc:168
#define IP_MAX_SIZE
Definition: xcom_transport.h:222
int tcp_reconnection_task(task_arg arg)
int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
Definition: xcom_transport.cc:385
int sender_task(task_arg arg)
Definition: xcom_transport.cc:1462
xcom_proto const my_xcom_version
Definition: xcom_transport.cc:83
void init_xcom_transport(xcom_port listen_port)
Definition: xcom_transport.cc:161
void put_header_1_0(unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
Definition: xcom_transport.cc:1144
int send_to_someone(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:965
int get_ip_and_port(char const *address, char ip[IP_MAX_SIZE], xcom_port *port)
Get the ip and port object from a given address in the authorized input format.
Definition: xcom_transport.cc:2074
int srv_unref(server *s)
Definition: xcom_transport.cc:761
int send_to_all(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:932
void update_servers(site_def *s, cargo_type operation)
Definition: xcom_transport.cc:1732
int local_sender_task(task_arg arg)
Definition: xcom_transport.cc:1692
xcom_port xcom_get_port(char *a)
linkage connect_wait
Definition: xcom_transport.cc:1423
xcom_proto const my_min_xcom_version
Definition: xcom_transport.cc:81
char * xcom_get_name(char *a)
int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd with buffering reads.
Definition: xcom_transport.cc:1267
connection_descriptor * open_new_connection(const char *server, xcom_port port, int connection_timeout=Network_provider::default_connection_timeout(), network_provider_dynamic_log_level log_level=network_provider_dynamic_log_level::PROVIDED)
Definition: xcom_transport.cc:107
int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:916
double server_active(site_def const *s, node_no i)
Definition: xcom_transport.cc:736
int send_to_self_site(site_def const *s, pax_msg *p)
void shutdown_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1882
xcom_proto read_protoversion(unsigned char *p)
Definition: xcom_transport.cc:383
bool is_able_to_connect_to_node(const char *server, const xcom_port port)
Tests connectivity to another node under the current configuration setup.
Definition: xcom_transport.cc:135
void close_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1877
connection_descriptor * open_new_local_connection(const char *server, xcom_port port)
Definition: xcom_transport.cc:116
xcom_proto negotiate_protocol(xcom_proto proto_vers)
Definition: xcom_transport.cc:1923
void wakeup_sender()
Definition: xcom_transport.cc:1427
xcom_proto common_xcom_version(site_def const *site)
Definition: xcom_transport.cc:1904
void invalidate_servers(const site_def *old_site_def, const site_def *new_site_def)
Definition: xcom_transport.cc:1811
int send_to_acceptors(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:1005
void reset_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1892
void ssl_free_con(connection_descriptor *con)
Definition: xcom_transport.cc:1870
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:143
xdr_op
Definition: xdr.h:79