MySQL  8.0.22
Source Code Documentation
xcom_transport.h
Go to the documentation of this file.
1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef XCOM_TRANSPORT_H
24 #define XCOM_TRANSPORT_H
25 
26 #include "xcom/xcom_common.h"
27 
28 #define XDR_INT_SIZE 4
29 #define MSG_HDR_SIZE (3 * XDR_INT_SIZE)
30 
31 /* Definition of message with fixed size header and variable size payload */
32 
33 /* This is version 1_0 of the header. It is OK to change the header in
34  other versions as long as the version field is the first field.
35 */
36 /* version[4] length[4] type[1] tag[2] UNUSED[1] message-body[length] */
37 /*
38  The version is used both for protocol negotiations and to discard messages
39  with wrong version.
40  The length field is the length of the message, not including the header
41  itself.
42  The type field is 0 if a normal message, otherwise it is a protocol control
43  message.
44  The tag field will be used during protocol negotiation to uniquely identify
45  the request. The reply will
46  contain the same tag.
47  The message-body contains xdr-serialized data.
48  */
49 
50 #define SERIALIZED_BUFLEN(x) ((x) + MSG_HDR_SIZE)
51 
52 #define VERS_PTR(buf) (buf)
53 #define LENGTH_PTR(buf) &((buf)[XDR_INT_SIZE])
54 #define X_TYPE (2 * XDR_INT_SIZE)
55 #define X_TAG (X_TYPE + 1)
56 #define X_TAG_PTR(buf) &((buf)[X_TAG])
57 #ifdef NOTDEF
58 #define CHECK_PTR(buf) &((buf)[3 * XDR_INT_SIZE])
59 #endif
60 #define MSG_PTR(buf) &((buf)[MSG_HDR_SIZE])
61 
62 extern xcom_proto const my_min_xcom_version; /* The minimum protocol version I
63  am able to understand */
64 extern xcom_proto const
65  my_xcom_version; /* The maximum protocol version I am able to understand */
66 
67 /* Transport level message types */
68 enum x_msg_type {
69  x_normal = 0, /* Normal message */
70  x_version_req = 1, /* Negotiate protocol version */
71  x_version_reply = 2 /* Protocol version reply */
72 };
73 typedef enum x_msg_type x_msg_type;
74 
75 struct envelope {
76  char *srv;
78  pax_msg *p;
80 };
81 
82 typedef struct envelope envelope;
83 
84 int check_protoversion(xcom_proto x_proto, xcom_proto negotiated);
85 int flush_srv_buf(server *s, int64_t *ret);
86 
87 /**
88  Reads message from connection rfd with buffering reads.
89 
90  @param[in] rfd Pointer to open connection.
91  @param[in,out] buf Used for buffering reads.
92  @param[out] p Output buffer.
93  @param[out] s Pointer to server. Server timestamp updated if not 0.
94  @param[out] ret Number of bytes read, or -1 if failure.
95 
96  @retval 0 if task should terminate.
97  @retval 1 if it should continue.
98 */
99 
101  server *s, int64_t *ret);
102 
103 /**
104  Reads message from connection rfd without buffering reads.
105 
106  @param[in] rfd Pointer to open connection.
107  @param[out] p Output buffer.
108  @param[in,out] s Pointer to server. Server timestamp updated if not 0.
109  @param[in,out] ret Number of bytes read, or -1 if failure.
110 
111  @retval 0 if task should terminate.
112  @retval 1 if it should continue.
113 */
114 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret);
115 
116 int send_to_acceptors(pax_msg *p, const char *dbg);
117 int send_to_all(pax_msg *p, const char *dbg);
118 int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg);
119 int send_to_others(site_def const *s, pax_msg *p, const char *dbg);
120 int send_to_someone(site_def const *s, pax_msg *p, const char *dbg);
121 int send_to_self_site(site_def const *s, pax_msg *p);
122 int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg);
123 
124 void wakeup_sender();
125 int sender_task(task_arg arg);
126 int local_sender_task(task_arg arg);
127 int shutdown_servers();
128 int srv_ref(server *s);
129 int srv_unref(server *s);
130 int tcp_reaper_task(task_arg arg);
131 int tcp_server(task_arg arg);
132 uint32_t crc32c_hash(char *buf, char *end);
133 int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata,
134  enum xdr_op op);
135 void init_crc32c();
136 void init_xcom_transport(xcom_port listen_port);
137 void reset_srv_buf(srv_buf *sb);
138 char *xcom_get_name(char *a);
139 xcom_port xcom_get_port(char *a);
140 int send_server_msg(site_def const *s, node_no i, pax_msg *p);
141 double server_active(site_def const *s, node_no i);
142 void update_servers(site_def *s, cargo_type operation);
144 int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
145  pax_msg *p);
146 /**
147  Updates timestamp of server.
148 
149  @param[in] s Pointer to server.
150 */
151 void server_detected(server *s);
152 
153 void invalidate_servers(const site_def *old_site_def,
154  const site_def *new_site_def);
155 
159 
160 #ifndef XCOM_WITHOUT_OPENSSL
163 #endif
164 
165 char const *xcom_proto_name(xcom_proto proto_vers);
166 xcom_proto negotiate_protocol(xcom_proto proto_vers);
167 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
168  x_msg_type *x_type, unsigned int *tag);
169 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
170  x_msg_type x_type, unsigned int tag);
171 
172 int send_proto(connection_descriptor *con, xcom_proto x_proto,
173  x_msg_type x_type, unsigned int tag, int64_t *ret);
174 int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto,
175  x_msg_type *x_type, unsigned int *tag, int64_t *ret);
176 
177 void write_protoversion(unsigned char *buf, xcom_proto proto_vers);
178 xcom_proto read_protoversion(unsigned char *p);
179 
180 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf);
181 int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen);
182 xcom_proto common_xcom_version(site_def const *site);
183 xcom_proto get_latest_common_proto();
184 xcom_proto set_latest_common_proto(xcom_proto x_proto);
185 extern linkage connect_wait;
186 
187 /**
188  * @brief Returns the version from which nodes are able to speak IPv6
189  *
190  * @return xcom_proto the version from which nodes are able to speak IPv6
191  */
192 xcom_proto minimum_ipv6_version();
193 
194 #define IP_MAX_SIZE 512
195 
196 /**
197  * @brief Get the ip and port object from a given address in the authorized
198  * input format. For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or)
199  * NAME]:PORT
200  *
201  * @param address input address to parse
202  * @param ip the resulting IP or Name
203  * @param port the resulting port
204  * @return int true (1) in case of parse error
205  */
206 int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port);
207 
208 /**
209  * @brief Checks if an incoming node is eligible to enter the group
210  *
211  * This function checks if a new node entering the group is able to be part of
212  * it.
213  * This is needed duw to downgrade procedures to server versions that do not
214  * speak IPv6. One wil check if:
215  * - Our server is being contacted by a server that has a lower version than the
216  * IPv6 baseline
217  * - Check if the current configuration is all reachable by an IPv4 node
218  *
219  * If all of the above hold true we are able to proceed and add the node. Else,
220  * we must fail.
221  *
222  * @retval 1 in case of success.
223  */
224 int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto,
225  const site_def *current_site_def);
226 
227 #define INITIAL_CONNECT_WAIT 0.1
228 #define MAX_CONNECT_WAIT 1.0
229 #define CONNECT_WAIT_INCREASE 1.1
230 
231 #endif
x_msg_type
Definition: xcom_transport.h:68
Definition: site_struct.h:29
int local_sender_task(task_arg arg)
Definition: xcom_transport.cc:1572
void wakeup_sender()
Definition: xcom_transport.cc:1419
int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
Definition: xcom_transport.cc:511
Definition: task_arg.h:41
int srv_ref(server *s)
Definition: xcom_transport.cc:704
Definition: xcom_transport.h:70
uint32_t crc32c_hash(char *buf, char *end)
void put_header_1_0(unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
Definition: xcom_transport.cc:1143
void server_detected(server *s)
Updates timestamp of server.
Definition: xcom_transport.cc:808
xcom_proto negotiate_protocol(xcom_proto proto_vers)
Definition: xcom_transport.cc:1811
xcom_proto set_latest_common_proto(xcom_proto x_proto)
Definition: xcom_transport.cc:1803
#define IP_MAX_SIZE
Definition: xcom_transport.h:194
void reset_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1782
int srv_unref(server *s)
Definition: xcom_transport.cc:710
int send_to_all_except_self(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:919
void init_xcom_transport(xcom_port listen_port)
Definition: xcom_transport.cc:107
int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd without buffering reads.
Definition: xcom_transport.cc:1152
xcom_proto minimum_ipv6_version()
Returns the version from which nodes are able to speak IPv6.
Definition: xcom_transport.cc:1823
double server_active(site_def const *s, node_no i)
Definition: xcom_transport.cc:685
xcom_proto get_latest_common_proto()
Definition: xcom_transport.cc:1807
int tcp_reaper_task(task_arg arg)
Definition: xcom_transport.cc:1724
Definition: buf0block_hint.cc:29
int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto, const site_def *current_site_def)
Checks if an incoming node is eligible to enter the group.
Definition: xcom_transport.cc:202
int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port)
Get the ip and port object from a given address in the authorized input format.
Definition: xcom_transport.cc:1950
void invalidate_servers(const site_def *old_site_def, const site_def *new_site_def)
Definition: xcom_transport.cc:1697
void reset_srv_buf(srv_buf *sb)
Definition: xcom_transport.cc:114
unsigned short xcom_port
Definition: xcom_common.h:45
pax_msg * p
Definition: xcom_transport.h:78
char * xcom_get_name(char *a)
void ssl_shutdown_con(connection_descriptor *con)
Definition: xcom_transport.cc:1759
char const * xcom_proto_name(xcom_proto proto_vers)
linkage connect_wait
Definition: xcom_transport.cc:1415
xcom_proto const my_min_xcom_version
Definition: xcom_transport.cc:78
void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
Definition: xcom_transport.cc:1135
Definition: simset.h:35
int sender_task(task_arg arg)
Definition: xcom_transport.cc:1426
int tcp_server(task_arg arg)
Definition: xcom_transport.cc:721
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188
void close_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1767
Definition: node_connection.h:44
int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
Definition: xcom_transport.cc:327
xcom_port port
Definition: xcom_transport.h:77
int crash_on_error
Definition: xcom_transport.h:79
Definition: server_struct.h:38
Definition: xcom_transport.h:69
int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag, int64_t *ret)
Definition: xcom_transport.cc:1375
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:142
int send_server_msg(site_def const *s, node_no i, pax_msg *p)
Definition: xcom_transport.cc:872
site_def * new_site_def()
Definition: site_def.cc:280
int send_to_someone(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:965
int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd with buffering reads.
Definition: xcom_transport.cc:1263
void shutdown_connection(connection_descriptor *con)
Definition: xcom_transport.cc:1772
xcom_proto common_xcom_version(site_def const *site)
Definition: xcom_transport.cc:1792
void garbage_collect_servers()
Definition: xcom_transport.cc:671
char * srv
Definition: xcom_transport.h:76
Definition: server_struct.h:30
xcom_proto const my_xcom_version
Definition: xcom_transport.cc:80
int send_to_acceptors(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:1005
xcom_proto read_protoversion(unsigned char *p)
Definition: xcom_transport.cc:325
int flush_srv_buf(server *s, int64_t *ret)
Definition: xcom_transport.cc:161
int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
Definition: xcom_transport.cc:844
int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:911
int send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
Definition: xcom_transport.cc:338
xdr_op
Definition: xdr.h:78
void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
Definition: xcom_transport.cc:321
int apply_xdr(void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
Definition: xcom_transport.cc:379
int shutdown_servers()
int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:958
void ssl_free_con(connection_descriptor *con)
Definition: xcom_transport.cc:1754
int send_to_self_site(site_def const *s, pax_msg *p)
Definition: xcom_transport.cc:927
xcom_port xcom_get_port(char *a)
Definition: xcom_transport.h:75
int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
Definition: xcom_transport.cc:502
void update_servers(site_def *s, cargo_type operation)
Definition: xcom_transport.cc:1621
int send_to_all(pax_msg *p, const char *dbg)
Definition: xcom_transport.cc:932
Definition: xcom_transport.h:71
void init_crc32c()
Definition: xcom_transport.cc:528