MySQL  8.0.19
Source Code Documentation
xcom_transport.h
Go to the documentation of this file.
1 /* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License, version 2.0,
5  as published by the Free Software Foundation.
6 
7  This program is also distributed with certain software (including
8  but not limited to OpenSSL) that is licensed under separate terms,
9  as designated in a particular file or component or in included license
10  documentation. The authors of MySQL hereby grant you an additional
11  permission to link the program and your derivative works with the
12  separately licensed software that they have included with MySQL.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License, version 2.0, for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22 
23 #ifndef XCOM_TRANSPORT_H
24 #define XCOM_TRANSPORT_H
25 
27 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #define XDR_INT_SIZE 4
33 #define MSG_HDR_SIZE (3 * XDR_INT_SIZE)
34 
35 /* Definition of message with fixed size header and variable size payload */
36 
37 /* This is version 1_0 of the header. It is OK to change the header in
38  other versions as long as the version field is the first field.
39 */
40 /* version[4] length[4] type[1] tag[2] UNUSED[1] message-body[length] */
41 /*
42  The version is used both for protocol negotiations and to discard messages
43  with wrong version.
44  The length field is the length of the message, not including the header
45  itself.
46  The type field is 0 if a normal message, otherwise it is a protocol control
47  message.
48  The tag field will be used during protocol negotiation to uniquely identify
49  the request. The reply will
50  contain the same tag.
51  The message-body contains xdr-serialized data.
52  */
53 
54 #define SERIALIZED_BUFLEN(x) ((x) + MSG_HDR_SIZE)
55 
56 #define VERS_PTR(buf) (buf)
57 #define LENGTH_PTR(buf) &((buf)[XDR_INT_SIZE])
58 #define X_TYPE (2 * XDR_INT_SIZE)
59 #define X_TAG (X_TYPE + 1)
60 #define X_TAG_PTR(buf) &((buf)[X_TAG])
61 #ifdef NOTDEF
62 #define CHECK_PTR(buf) &((buf)[3 * XDR_INT_SIZE])
63 #endif
64 #define MSG_PTR(buf) &((buf)[MSG_HDR_SIZE])
65 
66 /* Transport level message types */
67 enum x_msg_type {
68  x_normal = 0, /* Normal message */
69  x_version_req = 1, /* Negotiate protocol version */
70  x_version_reply = 2 /* Protocol version reply */
71 };
72 typedef enum x_msg_type x_msg_type;
73 
74 struct envelope {
75  char *srv;
77  pax_msg *p;
79 };
80 
81 typedef struct envelope envelope;
82 
83 int check_protoversion(xcom_proto x_proto, xcom_proto negotiated);
84 int flush_srv_buf(server *s, int64_t *ret);
85 
86 /**
87  Reads message from connection rfd with buffering reads.
88 
89  @param[in] rfd Pointer to open connection.
90  @param[in,out] buf Used for buffering reads.
91  @param[out] p Output buffer.
92  @param[out] s Pointer to server. Server timestamp updated if not 0.
93  @param[out] ret Number of bytes read, or -1 if failure.
94 
95  @return
96  @retval 0 if task should terminate.
97  @retval 1 if it should continue.
98 */
99 
100 int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p,
101  server *s, int64_t *ret);
102 
103 /**
104  Reads message from connection rfd without buffering reads.
105 
106  @param[in] rfd Pointer to open connection.
107  @param[out] p Output buffer.
108  @param[in,out] s Pointer to server. Server timestamp updated if not 0.
109  @param[in,out] ret Number of bytes read, or -1 if failure.
110 
111  @return
112  @retval 0 if task should terminate.
113  @retval 1 if it should continue.
114 */
115 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret);
116 
117 int send_to_acceptors(pax_msg *p, const char *dbg);
118 int send_to_all(pax_msg *p, const char *dbg);
119 int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg);
120 int send_to_others(site_def const *s, pax_msg *p, const char *dbg);
121 int send_to_someone(site_def const *s, pax_msg *p, const char *dbg);
122 int send_to_self_site(site_def const *s, pax_msg *p);
123 
124 int sender_task(task_arg arg);
125 int local_sender_task(task_arg arg);
126 int shutdown_servers();
127 int srv_ref(server *s);
128 int srv_unref(server *s);
129 int tcp_reaper_task(task_arg arg);
130 int tcp_server(task_arg arg);
131 uint32_t crc32c_hash(char *buf, char *end);
132 int apply_xdr(xcom_proto x_proto, void *buff, uint32_t bufflen,
133  xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op);
134 void init_crc32c();
135 void init_xcom_transport(xcom_port listen_port);
136 void reset_srv_buf(srv_buf *sb);
137 char *xcom_get_name(char *a);
138 xcom_port xcom_get_port(char *a);
139 int send_server_msg(site_def const *s, node_no i, pax_msg *p);
140 double server_active(site_def const *s, node_no i);
141 void update_servers(site_def *s, cargo_type operation);
143 int client_task(task_arg arg);
144 int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
145  pax_msg *p);
146 /**
147  Updates timestamp of server.
148 
149  @param[in] s Pointer to server.
150 */
151 void server_detected(server *s);
152 
153 void invalidate_servers(const site_def *old_site_def,
154  const site_def *new_site_def);
155 
159 
160 #ifdef XCOM_HAVE_OPENSSL
161 void ssl_free_con(connection_descriptor *con);
162 void ssl_shutdown_con(connection_descriptor *con);
163 #endif
164 
165 char const *xcom_proto_name(xcom_proto proto_vers);
167 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
168  x_msg_type *x_type, unsigned int *tag);
169 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
170  x_msg_type x_type, unsigned int tag);
171 
173  x_msg_type x_type, unsigned int tag, int64_t *ret);
174 int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto,
175  x_msg_type *x_type, unsigned int *tag, int64_t *ret);
176 
177 void write_protoversion(unsigned char *buf, xcom_proto proto_vers);
178 xcom_proto read_protoversion(unsigned char *p);
179 
180 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf);
181 int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen);
185 
186 /**
187  * @brief Returns the version from which nodes are able to speak IPv6
188  *
189  * @return xcom_proto the version from which nodes are able to speak IPv6
190  */
192 
193 #define IP_MAX_SIZE 512
194 
195 /**
196  * @brief Get the ip and port object from a given address in the authorized
197  * input format. For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or)
198  * NAME]:PORT
199  *
200  * @param address input address to parse
201  * @param ip the resulting IP or Name
202  * @param port the resulting port
203  * @return int true (1) in case of parse error
204  */
205 int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port);
206 
207 /**
208  * @brief Checks if an incoming node is eligible to enter the group
209  *
210  * This function checks if a new node entering the group is able to be part of
211  * it.
212  * This is needed duw to downgrade procedures to server versions that do not
213  * speak IPv6. One wil check if:
214  * - Our server is being contacted by a server that has a lower version than the
215  * IPv6 baseline
216  * - Check if the current configuration is all reachable by an IPv4 node
217  *
218  * If all of the above hold true we are able to proceed and add the node. Else,
219  * we must fail.
220  *
221  * @return 1 in case of success.
222  */
223 int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto,
224  const site_def *current_site_def);
225 
226 #define INITIAL_CONNECT_WAIT 0.1
227 #define MAX_CONNECT_WAIT 1.0
228 #define CONNECT_WAIT_INCREASE 1.1
229 
230 #ifdef __cplusplus
231 }
232 #endif
233 
234 #endif
common_xcom_version
xcom_proto common_xcom_version(site_def const *site)
Definition: xcom_transport.c:2006
shutdown_connection
void shutdown_connection(connection_descriptor *con)
Definition: xcom_transport.c:1987
envelope
Definition: xcom_transport.h:74
xcom_get_port
xcom_port xcom_get_port(char *a)
negotiate_protocol
xcom_proto negotiate_protocol(xcom_proto proto_vers)
Definition: xcom_transport.c:2025
new_site_def
site_def * new_site_def()
Definition: site_def.c:297
recv_proto
int recv_proto(connection_descriptor const *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag, int64_t *ret)
Definition: xcom_transport.c:1490
srv_unref
int srv_unref(server *s)
Definition: xcom_transport.c:871
server_detected
void server_detected(server *s)
Updates timestamp of server.
Definition: xcom_transport.c:969
serialize_msg
int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
Definition: xcom_transport.c:661
send_to_all
int send_to_all(pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1066
send_to_all_site
int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1054
IP_MAX_SIZE
#define IP_MAX_SIZE
Definition: xcom_transport.h:193
x_msg_type
x_msg_type
Definition: xcom_transport.h:67
deserialize_msg
int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
Definition: xcom_transport.c:670
invalidate_servers
void invalidate_servers(const site_def *old_site_def, const site_def *new_site_def)
Definition: xcom_transport.c:1783
close_connection
void close_connection(connection_descriptor *con)
Definition: xcom_transport.c:1981
xdrproc_t
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:150
envelope::port
xcom_port port
Definition: xcom_transport.h:76
read_protoversion
xcom_proto read_protoversion(unsigned char *p)
Definition: xcom_transport.c:319
update_servers
void update_servers(site_def *s, cargo_type operation)
Definition: xcom_transport.c:1722
xcom_port
unsigned short xcom_port
Definition: xcom_common.h:47
xcom_proto
xcom_proto
Definition: xcom_proto_enum.h:25
client_task
int client_task(task_arg arg)
Definition: xcom_transport.c:1877
srv_ref
int srv_ref(server *s)
Definition: xcom_transport.c:865
minimum_ipv6_version
xcom_proto minimum_ipv6_version()
Returns the version from which nodes are able to speak IPv6.
Definition: xcom_transport.c:2163
envelope::srv
char * srv
Definition: xcom_transport.h:75
get_ip_and_port
int get_ip_and_port(char *address, char ip[IP_MAX_SIZE], xcom_port *port)
Get the ip and port object from a given address in the authorized input format.
Definition: xcom_transport.c:2165
crc32c_hash
uint32_t crc32c_hash(char *buf, char *end)
Definition: xcom_transport.c:698
send_to_self_site
int send_to_self_site(site_def const *s, pax_msg *p)
Definition: xcom_transport.c:1061
send_to_acceptors
int send_to_acceptors(pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1139
flush_srv_buf
int flush_srv_buf(server *s, int64_t *ret)
Definition: xcom_transport.c:159
reset_srv_buf
void reset_srv_buf(srv_buf *sb)
Definition: xcom_transport.c:112
envelope::p
pax_msg * p
Definition: xcom_transport.h:77
reset_connection
void reset_connection(connection_descriptor *con)
Definition: xcom_transport.c:1996
local_sender_task
int local_sender_task(task_arg arg)
Definition: xcom_transport.c:1672
send_to_someone
int send_to_someone(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1099
send_proto
int send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
Definition: xcom_transport.c:332
x_version_reply
@ x_version_reply
Definition: xcom_transport.h:70
tcp_server
int tcp_server(task_arg arg)
Definition: xcom_transport.c:884
port
static in_port_t port
Definition: testapp.c:33
xdr_op
xdr_op
Definition: xdr.h:79
get_latest_common_proto
xcom_proto get_latest_common_proto()
Definition: xcom_transport.c:2021
tcp_reaper_task
int tcp_reaper_task(task_arg arg)
Definition: xcom_transport.c:1808
check_protoversion
int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
Definition: xcom_transport.c:321
rules_table_service::end
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188
sender_task
int sender_task(task_arg arg)
Definition: xcom_transport.c:1540
envelope::crash_on_error
int crash_on_error
Definition: xcom_transport.h:78
put_header_1_0
void put_header_1_0(unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
Definition: xcom_transport.c:1275
send_msg
int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
Definition: xcom_transport.c:1006
xcom_get_name
char * xcom_get_name(char *a)
srv_buf
Definition: server_struct.h:34
apply_xdr
int apply_xdr(xcom_proto x_proto, void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
Definition: xcom_transport.c:364
site_def
Definition: site_struct.h:33
xcom_common.h
init_xcom_transport
void init_xcom_transport(xcom_port listen_port)
Definition: xcom_transport.c:105
send_server_msg
int send_server_msg(site_def const *s, node_no i, pax_msg *p)
Definition: xcom_transport.c:1034
server_active
double server_active(site_def const *s, node_no i)
Definition: xcom_transport.c:843
task_arg
Definition: task_arg.h:45
get_header_1_0
void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
Definition: xcom_transport.c:1268
x_version_req
@ x_version_req
Definition: xcom_transport.h:69
shutdown_servers
int shutdown_servers()
garbage_collect_servers
void garbage_collect_servers()
Definition: xcom_transport.c:830
server
Definition: server_struct.h:42
is_new_node_eligible_for_ipv6
int is_new_node_eligible_for_ipv6(xcom_proto incoming_proto, const site_def *current_site_def)
Checks if an incoming node is eligible to enter the group.
Definition: xcom_transport.c:200
send_to_others
int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
Definition: xcom_transport.c:1092
set_latest_common_proto
xcom_proto set_latest_common_proto(xcom_proto x_proto)
Definition: xcom_transport.c:2017
xcom_proto_name
const char * xcom_proto_name(xcom_proto proto_vers)
init_crc32c
void init_crc32c()
Definition: xcom_transport.c:683
connection_descriptor
Definition: node_connection.h:48
p
const char * p
Definition: ctype-mb.cc:1233
x_normal
@ x_normal
Definition: xcom_transport.h:68
write_protoversion
void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
Definition: xcom_transport.c:315
buffered_read_msg
int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd with buffering reads.
Definition: xcom_transport.c:1386
read_msg
int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
Reads message from connection rfd without buffering reads.
Definition: xcom_transport.c:1283