MySQL  8.0.20
Source Code Documentation
xcom_transport.h
Go to the documentation of this file.
1 /* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef XCOM_TRANSPORT_H
24 #define XCOM_TRANSPORT_H
25 
27 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #define XDR_INT_SIZE 4
33 #define MSG_HDR_SIZE (3 * XDR_INT_SIZE)
34 
35 /* Definition of message with fixed size header and variable size payload */
36 
37 /* This is version 1_0 of the header. It is OK to change the header in
38  other versions as long as the version field is the first field.
39 */
40 /* version[4] length[4] type[1] tag[2] UNUSED[1] message-body[length] */
41 /*
42  The version is used both for protocol negotiations and to discard messages
43  with wrong version.
44  The length field is the length of the message, not including the header
45  itself.
46  The type field is 0 if a normal message, otherwise it is a protocol control
47  message.
48  The tag field will be used during protocol negotiation to uniquely identify
49  the request. The reply will
50  contain the same tag.
51  The message-body contains xdr-serialized data.
52  */
53 
54 #define SERIALIZED_BUFLEN(x) ((x) + MSG_HDR_SIZE)
55 
56 #define VERS_PTR(buf) (buf)
57 #define LENGTH_PTR(buf) &((buf)[XDR_INT_SIZE])
58 #define X_TYPE (2 * XDR_INT_SIZE)
59 #define X_TAG (X_TYPE + 1)
60 #define X_TAG_PTR(buf) &((buf)[X_TAG])
61 #ifdef NOTDEF
62 #define CHECK_PTR(buf) &((buf)[3 * XDR_INT_SIZE])
63 #endif
64 #define MSG_PTR(buf) &((buf)[MSG_HDR_SIZE])
65 
66 /* Transport level message types */
67 enum x_msg_type {
68  x_normal = 0, /* Normal message */
69  x_version_req = 1, /* Negotiate protocol version */
70  x_version_reply = 2 /* Protocol version reply */
71 };
72 typedef enum x_msg_type x_msg_type;
73 
74 struct envelope {
75  char *srv;
77  pax_msg *p;
79 };
80 
81 typedef struct envelope envelope;
82 
83 int check_protoversion(xcom_proto x_proto, xcom_proto negotiated);
84 int flush_srv_buf(server *s, int64_t *ret);
85 
86 /**
87  Reads message from connection rfd with buffering reads.
88 
89  @param[in] rfd Pointer to open connection.
90  @param[in,out] buf Used for buffering reads.
91  @param[out] p Output buffer.
92  @param[out] s Pointer to server. Server timestamp updated if not 0.
93  @param[out] ret Number of bytes read, or -1 if failure.
94 
95  @retval 0 if task should terminate.
96  @retval 1 if it should continue.
97 */
98 
99 int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p,
100  server *s, int64_t *ret);
101 
102 /**
103  Reads message from connection rfd without buffering reads.
104 
105  @param[in] rfd Pointer to open connection.
106  @param[out] p Output buffer.
107  @param[in,out] s Pointer to server. Server timestamp updated if not 0.
108  @param[in,out] ret Number of bytes read, or -1 if failure.
109 
110  @retval 0 if task should terminate.
111  @retval 1 if it should continue.
112 */
113 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret);
114 
115 int send_to_acceptors(pax_msg *p, const char *dbg);
116 int send_to_all(pax_msg *p, const char *dbg);
117 int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg);
118 int send_to_others(site_def const *s, pax_msg *p, const char *dbg);
119 int send_to_someone(site_def const *s, pax_msg *p, const char *dbg);
120 int send_to_self_site(site_def const *s, pax_msg *p);
121 
122 int sender_task(task_arg arg);
123 int local_sender_task(task_arg arg);
124 int shutdown_servers();
125 int srv_ref(server *s);
126 int srv_unref(server *s);
127 int tcp_reaper_task(task_arg arg);
128 int tcp_server(task_arg arg);
129 uint32_t crc32c_hash(char *buf, char *end);
130 int apply_xdr(xcom_proto x_proto, void *buff, uint32_t bufflen,
131  xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op);
132 void init_crc32c();
133 void init_xcom_transport(xcom_port listen_port);
134 void reset_srv_buf(srv_buf *sb);
135 char *xcom_get_name(char *a);
136 xcom_port xcom_get_port(char *a);
137 int send_server_msg(site_def const *s, node_no i, pax_msg *p);
138 double server_active(site_def const *s, node_no i);
139 void update_servers(site_def *s, cargo_type operation);
141 int client_task(task_arg arg);
142 int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
143  pax_msg *p);
144 /**
145  Updates timestamp of server.
146 
147  @param[in] s Pointer to server.
148 */
149 void server_detected(server *s);
150 
151 void invalidate_servers(const site_def *old_site_def,
152  const site_def *new_site_def);
153 
157 
158 #ifndef XCOM_WITHOUT_OPENSSL
161 #endif
162 
163 char const *xcom_proto_name(xcom_proto proto_vers);
165 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
166  x_msg_type *x_type, unsigned int *tag);
167 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
168  x_msg_type x_type, unsigned int tag);
169 
171  x_msg_type x_type, unsigned int tag, int64_t *ret);
172 int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto,
173  x_msg_type *x_type, unsigned int *tag, int64_t *ret);
174 
175 void write_protoversion(unsigned char *buf, xcom_proto proto_vers);
176 xcom_proto read_protoversion(unsigned char *p);
177 
178 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf);
179 int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen);
183 
184 /**
185  * @brief Returns the version from which nodes are able to speak IPv6
186  *
187  * @return xcom_proto the version from which nodes are able to speak IPv6
188  */
190 
191 #define IP_MAX_SIZE 512
192 
193 /**
194  * @brief Get the ip and port object from a given address in the authorized
195  * input format. For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or)
196  * NAME]:PORT
197  *
198  * @param address input address to parse
199  * @param ip the resulting IP or Name
200  * @param port the resulting port
201  * @return int true (1) in case of parse error
202  */
203 int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port);
204 
205 /**
206  * @brief Checks if an incoming node is eligible to enter the group
207  *
208  * This function checks if a new node entering the group is able to be part of
209  * it.
210  * This is needed duw to downgrade procedures to server versions that do not
211  * speak IPv6. One wil check if:
212  * - Our server is being contacted by a server that has a lower version than the
213  * IPv6 baseline
214  * - Check if the current configuration is all reachable by an IPv4 node
215  *
216  * If all of the above hold true we are able to proceed and add the node. Else,
217  * we must fail.
218  *
219  * @retval 1 in case of success.
220  */
221 int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto,
222  const site_def *current_site_def);
223 
224 #define INITIAL_CONNECT_WAIT 0.1
225 #define MAX_CONNECT_WAIT 1.0
226 #define CONNECT_WAIT_INCREASE 1.1
227 
228 #ifdef __cplusplus
229 }
230 #endif
231 
232 #endif
x_msg_type
Definition: xcom_transport.h:67
Definition: site_struct.h:33
int local_sender_task(task_arg arg)
Definition: xcom_transport.c:1672
int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
Definition: xcom_transport.c:670
xcom_proto
Definition: xcom_proto_enum.h:25
int client_task(task_arg arg)
Definition: xcom_transport.c:1878
Definition: task_arg.h:45
int srv_ref(server *s)
Definition: xcom_transport.c:865
site_def * new_site_def()
Definition: site_def.c:297
Definition: xcom_transport.h:69
uint32_t crc32c_hash(char *buf, char *end)
Definition: xcom_transport.c:698
void put_header_1_0(unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
Definition: xcom_transport.c:1275
void server_detected(server *s)
Updates timestamp of server.
Definition: xcom_transport.c:969
xcom_proto negotiate_protocol(xcom_proto proto_vers)
Definition: xcom_transport.c:2026
xcom_proto set_latest_common_proto(xcom_proto x_proto)
Definition: xcom_transport.c:2018
#define IP_MAX_SIZE
Definition: xcom_transport.h:191
void reset_connection(connection_descriptor *con)
Definition: xcom_transport.c:1997
int srv_unref(server *s)
Definition: xcom_transport.c:871
void init_xcom_transport(xcom_port listen_port)
Definition: xcom_transport.c:105
int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd without buffering reads.
Definition: xcom_transport.c:1283
xcom_proto minimum_ipv6_version()
Returns the version from which nodes are able to speak IPv6.
Definition: xcom_transport.c:2164
double server_active(site_def const *s, node_no i)
Definition: xcom_transport.c:843
xcom_proto get_latest_common_proto()
Definition: xcom_transport.c:2022
int tcp_reaper_task(task_arg arg)
Definition: xcom_transport.c:1809
int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto, const site_def *current_site_def)
Checks if an incoming node is eligible to enter the group.
Definition: xcom_transport.c:200
int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port)
Get the ip and port object from a given address in the authorized input format.
Definition: xcom_transport.c:2166
void invalidate_servers(const site_def *old_site_def, const site_def *new_site_def)
Definition: xcom_transport.c:1784
void reset_srv_buf(srv_buf *sb)
Definition: xcom_transport.c:112
unsigned short xcom_port
Definition: xcom_common.h:47
pax_msg * p
Definition: xcom_transport.h:77
char * xcom_get_name(char *a)
void ssl_shutdown_con(connection_descriptor *con)
Definition: xcom_transport.c:1974
int apply_xdr(xcom_proto x_proto, void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
Definition: xcom_transport.c:364
char const * xcom_proto_name(xcom_proto proto_vers)
void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
Definition: xcom_transport.c:1268
int sender_task(task_arg arg)
Definition: xcom_transport.c:1540
int tcp_server(task_arg arg)
Definition: xcom_transport.c:884
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188
void close_connection(connection_descriptor *con)
Definition: xcom_transport.c:1982
Definition: node_connection.h:48
int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
Definition: xcom_transport.c:321
xcom_port port
Definition: xcom_transport.h:76
int crash_on_error
Definition: xcom_transport.h:78
Definition: server_struct.h:42
Definition: xcom_transport.h:68
int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag, int64_t *ret)
Definition: xcom_transport.c:1490
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:150
int send_server_msg(site_def const *s, node_no i, pax_msg *p)
Definition: xcom_transport.c:1034
int send_to_someone(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1099
int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd with buffering reads.
Definition: xcom_transport.c:1386
void shutdown_connection(connection_descriptor *con)
Definition: xcom_transport.c:1988
xcom_proto common_xcom_version(site_def const *site)
Definition: xcom_transport.c:2007
void garbage_collect_servers()
Definition: xcom_transport.c:830
char * srv
Definition: xcom_transport.h:75
Definition: server_struct.h:34
int send_to_acceptors(pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1139
xcom_proto read_protoversion(unsigned char *p)
Definition: xcom_transport.c:319
int flush_srv_buf(server *s, int64_t *ret)
Definition: xcom_transport.c:159
int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
Definition: xcom_transport.c:1006
int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1054
int send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
Definition: xcom_transport.c:332
xdr_op
Definition: xdr.h:79
void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
Definition: xcom_transport.c:315
int shutdown_servers()
int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1092
void ssl_free_con(connection_descriptor *con)
Definition: xcom_transport.c:1969
int send_to_self_site(site_def const *s, pax_msg *p)
Definition: xcom_transport.c:1061
xcom_port xcom_get_port(char *a)
Definition: xcom_transport.h:74
int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
Definition: xcom_transport.c:661
void update_servers(site_def *s, cargo_type operation)
Definition: xcom_transport.c:1722
int send_to_all(pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1066
Definition: xcom_transport.h:70
void init_crc32c()
Definition: xcom_transport.c:683