MySQL  8.0.27
Source Code Documentation
xcom_transport.h
Go to the documentation of this file.
1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef XCOM_TRANSPORT_H
24 #define XCOM_TRANSPORT_H
25 
26 #include "xcom/server_struct.h"
27 #include "xcom/site_struct.h"
28 #include "xcom/xcom_common.h"
29 #include "xdr_gen/xcom_vp.h"
30 
31 #define XDR_INT_SIZE 4
32 #define MSG_HDR_SIZE (3 * XDR_INT_SIZE)
33 
34 /* Definition of message with fixed size header and variable size payload */
35 
36 /* This is version 1_0 of the header. It is OK to change the header in
37  other versions as long as the version field is the first field.
38 */
39 /* version[4] length[4] type[1] tag[2] UNUSED[1] message-body[length] */
40 /*
41  The version is used both for protocol negotiations and to discard messages
42  with wrong version.
43  The length field is the length of the message, not including the header
44  itself.
45  The type field is 0 if a normal message, otherwise it is a protocol control
46  message.
47  The tag field will be used during protocol negotiation to uniquely identify
48  the request. The reply will
49  contain the same tag.
50  The message-body contains xdr-serialized data.
51  */
52 
53 #define SERIALIZED_BUFLEN(x) ((x) + MSG_HDR_SIZE)
54 
55 #define VERS_PTR(buf) (buf)
56 #define LENGTH_PTR(buf) &((buf)[XDR_INT_SIZE])
57 #define X_TYPE (2 * XDR_INT_SIZE)
58 #define X_TAG (X_TYPE + 1)
59 #define X_TAG_PTR(buf) &((buf)[X_TAG])
60 #ifdef NOTDEF
61 #define CHECK_PTR(buf) &((buf)[3 * XDR_INT_SIZE])
62 #endif
63 #define MSG_PTR(buf) &((buf)[MSG_HDR_SIZE])
64 
65 extern xcom_proto const my_min_xcom_version; /* The minimum protocol version I
66  am able to understand */
67 extern xcom_proto const
68  my_xcom_version; /* The maximum protocol version I am able to understand */
69 
70 /* Transport level message types */
71 enum x_msg_type {
72  x_normal = 0, /* Normal message */
73  x_version_req = 1, /* Negotiate protocol version */
74  x_version_reply = 2 /* Protocol version reply */
75 };
76 typedef enum x_msg_type x_msg_type;
77 
78 struct envelope {
79  char *srv;
83 };
84 
85 typedef struct envelope envelope;
86 
87 int check_protoversion(xcom_proto x_proto, xcom_proto negotiated);
88 int flush_srv_buf(server *s, int64_t *ret);
89 
90 /**
91  Reads message from connection rfd with buffering reads.
92 
93  @param[in] rfd Pointer to open connection.
94  @param[in,out] buf Used for buffering reads.
95  @param[out] p Output buffer.
96  @param[out] s Pointer to server. Server timestamp updated if not 0.
97  @param[out] ret Number of bytes read, or -1 if failure.
98 
99  @retval 0 if task should terminate.
100  @retval 1 if it should continue.
101 */
102 
104  server *s, int64_t *ret);
105 
106 /**
107  Reads message from connection rfd without buffering reads.
108 
109  @param[in] rfd Pointer to open connection.
110  @param[out] p Output buffer.
111  @param[in,out] s Pointer to server. Server timestamp updated if not 0.
112  @param[in,out] ret Number of bytes read, or -1 if failure.
113 
114  @retval 0 if task should terminate.
115  @retval 1 if it should continue.
116 */
117 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret);
118 
119 int send_to_acceptors(pax_msg *p, const char *dbg);
120 int send_to_all(pax_msg *p, const char *dbg);
121 int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg);
122 int send_to_others(site_def const *s, pax_msg *p, const char *dbg);
123 int send_to_someone(site_def const *s, pax_msg *p, const char *dbg);
125 int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg);
126 
127 void wakeup_sender();
128 int sender_task(task_arg arg);
129 int local_sender_task(task_arg arg);
131 int srv_ref(server *s);
132 int srv_unref(server *s);
136 uint32_t crc32c_hash(char *buf, char *end);
137 int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata,
138  enum xdr_op op);
139 void init_crc32c();
140 void init_xcom_transport(xcom_port listen_port);
141 void reset_srv_buf(srv_buf *sb);
142 char *xcom_get_name(char *a);
144 int send_server_msg(site_def const *s, node_no i, pax_msg *p);
145 double server_active(site_def const *s, node_no i);
146 void update_servers(site_def *s, cargo_type operation);
148 int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
149  pax_msg *p);
150 /**
151  Updates timestamp of server.
152 
153  @param[in] s Pointer to server.
154 */
155 void server_detected(server *s);
156 
157 void invalidate_servers(const site_def *old_site_def,
158  const site_def *new_site_def);
159 
163 
166  const char *server, xcom_port port,
167  int connection_timeout = Network_provider::default_connection_timeout());
169  xcom_port port);
170 
171 #ifndef XCOM_WITHOUT_OPENSSL
173 #endif
174 
175 char const *xcom_proto_name(xcom_proto proto_vers);
176 xcom_proto negotiate_protocol(xcom_proto proto_vers);
177 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
178  x_msg_type *x_type, unsigned int *tag);
179 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
180  x_msg_type x_type, unsigned int tag);
181 
182 int send_proto(connection_descriptor *con, xcom_proto x_proto,
183  x_msg_type x_type, unsigned int tag, int64_t *ret);
184 int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto,
185  x_msg_type *x_type, unsigned int *tag, int64_t *ret);
186 
187 void write_protoversion(unsigned char *buf, xcom_proto proto_vers);
188 xcom_proto read_protoversion(unsigned char *p);
189 
190 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf);
191 int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen);
192 xcom_proto common_xcom_version(site_def const *site);
193 xcom_proto get_latest_common_proto();
194 xcom_proto set_latest_common_proto(xcom_proto x_proto);
195 extern linkage connect_wait;
196 extern int connect_tcp(char *server, xcom_port port, int *ret);
197 
198 /**
199  * @brief Returns the version from which nodes are able to speak IPv6
200  *
201  * @return xcom_proto the version from which nodes are able to speak IPv6
202  */
203 xcom_proto minimum_ipv6_version();
204 
205 #define IP_MAX_SIZE 512
206 
207 /**
208  * @brief Get the ip and port object from a given address in the authorized
209  * input format. For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or)
210  * NAME]:PORT
211  *
212  * @param address input address to parse
213  * @param ip the resulting IP or Name
214  * @param port the resulting port
215  * @return int true (1) in case of parse error
216  */
217 int get_ip_and_port(char const *address, char ip[IP_MAX_SIZE], xcom_port *port);
218 
219 /**
220  * @brief Checks if an incoming node is eligible to enter the group
221  *
222  * This function checks if a new node entering the group is able to be part of
223  * it.
224  * This is needed duw to downgrade procedures to server versions that do not
225  * speak IPv6. One wil check if:
226  * - Our server is being contacted by a server that has a lower version than the
227  * IPv6 baseline
228  * - Check if the current configuration is all reachable by an IPv4 node
229  *
230  * If all of the above hold true we are able to proceed and add the node. Else,
231  * we must fail.
232  *
233  * @retval 1 in case of success.
234  */
235 int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto,
236  const site_def *current_site_def);
237 
238 #define INITIAL_CONNECT_WAIT 0.1
239 #define MAX_CONNECT_WAIT 10.0
240 #define CONNECT_WAIT_INCREASE 1.0
241 
242 #endif
static constexpr int default_connection_timeout()
Definition: network_provider.h:420
const char * p
Definition: ctype-mb.cc:1236
Definition: buf0block_hint.cc:29
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:191
required uint64 port
Definition: replication_asynchronous_connection_failover.proto:32
site_def * new_site_def()
Definition: site_def.cc:295
struct pax_msg pax_msg
Definition: site_struct.h:36
Definition: node_connection.h:46
Definition: xcom_transport.h:78
int crash_on_error
Definition: xcom_transport.h:82
xcom_port port
Definition: xcom_transport.h:80
pax_msg * p
Definition: xcom_transport.h:81
char * srv
Definition: xcom_transport.h:79
Definition: simset.h:35
Definition: server_struct.h:38
Definition: site_struct.h:42
Definition: server_struct.h:30
Definition: task_arg.h:41
unsigned short xcom_port
Definition: xcom_common.h:45
int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:932
int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
Definition: xcom_transport.cc:823
int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
Definition: xcom_transport.cc:414
int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
Definition: xcom_transport.cc:536
int incoming_connection_task(task_arg arg)
int send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
Definition: xcom_transport.cc:377
int close_open_connection(connection_descriptor *conn)
Definition: xcom_transport.cc:101
int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto, const site_def *current_site_def)
Checks if an incoming node is eligible to enter the group.
Definition: xcom_transport.cc:241
xcom_proto set_latest_common_proto(xcom_proto x_proto)
Definition: xcom_transport.cc:1832
int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
Definition: xcom_transport.cc:527
int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag, int64_t *ret)
int connect_tcp(char *server, xcom_port port, int *ret)
int shutdown_servers()
int tcp_reaper_task(task_arg arg)
void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
Definition: xcom_transport.cc:1106
xcom_proto minimum_ipv6_version()
Returns the version from which nodes are able to speak IPv6.
Definition: xcom_transport.cc:1852
xcom_proto get_latest_common_proto()
Definition: xcom_transport.cc:1836
int srv_ref(server *s)
Definition: xcom_transport.cc:734
void garbage_collect_servers()
Definition: xcom_transport.cc:700
int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd without buffering reads.
Definition: xcom_transport.cc:1123
void init_crc32c()
Definition: xcom_transport.cc:553
int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:898
int flush_srv_buf(server *s, int64_t *ret)
Definition: xcom_transport.cc:200
void server_detected(server *s)
Updates timestamp of server.
Definition: xcom_transport.cc:780
int send_server_msg(site_def const *s, node_no i, pax_msg *p)
Definition: xcom_transport.cc:851
x_msg_type
Definition: xcom_transport.h:71
@ x_normal
Definition: xcom_transport.h:72
@ x_version_reply
Definition: xcom_transport.h:74
@ x_version_req
Definition: xcom_transport.h:73
char * xcom_get_name(char *a)
uint32_t crc32c_hash(char *buf, char *end)
void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
Definition: xcom_transport.cc:360
void reset_srv_buf(srv_buf *sb)
Definition: xcom_transport.cc:153
char const * xcom_proto_name(xcom_proto proto_vers)
#define IP_MAX_SIZE
Definition: xcom_transport.h:205
int tcp_reconnection_task(task_arg arg)
int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
Definition: xcom_transport.cc:366
int sender_task(task_arg arg)
Definition: xcom_transport.cc:1400
xcom_proto const my_xcom_version
Definition: xcom_transport.cc:81
void init_xcom_transport(xcom_port listen_port)
Definition: xcom_transport.cc:146
void put_header_1_0(unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
Definition: xcom_transport.cc:1114
int send_to_someone(site_def const *s, pax_msg *p, const char *dbg)
int get_ip_and_port(char const *address, char ip[IP_MAX_SIZE], xcom_port *port)
Get the ip and port object from a given address in the authorized input format.
Definition: xcom_transport.cc:1991
int srv_unref(server *s)
Definition: xcom_transport.cc:740
int send_to_all(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:906
void update_servers(site_def *s, cargo_type operation)
Definition: xcom_transport.cc:1653
int local_sender_task(task_arg arg)
Definition: xcom_transport.cc:1604
xcom_port xcom_get_port(char *a)
linkage connect_wait
Definition: xcom_transport.cc:1389
xcom_proto const my_min_xcom_version
Definition: xcom_transport.cc:79
int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd with buffering reads.
Definition: xcom_transport.cc:1235
int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:890
double server_active(site_def const *s, node_no i)
Definition: xcom_transport.cc:715
int send_to_self_site(site_def const *s, pax_msg *p)
void shutdown_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1799
xcom_proto read_protoversion(unsigned char *p)
Definition: xcom_transport.cc:364
void close_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1794
xcom_proto negotiate_protocol(xcom_proto proto_vers)
Definition: xcom_transport.cc:1840
void wakeup_sender()
Definition: xcom_transport.cc:1393
xcom_proto common_xcom_version(site_def const *site)
Definition: xcom_transport.cc:1821
void invalidate_servers(const site_def *old_site_def, const site_def *new_site_def)
Definition: xcom_transport.cc:1730
int send_to_acceptors(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:979
void reset_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1809
connection_descriptor * open_new_local_connection(const char *server, xcom_port port)
Definition: xcom_transport.cc:113
connection_descriptor * open_new_connection(const char *server, xcom_port port, int connection_timeout=Network_provider::default_connection_timeout())
Definition: xcom_transport.cc:105
void ssl_free_con(connection_descriptor *con)
Definition: xcom_transport.cc:1787
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:142
xdr_op
Definition: xdr.h:78