MySQL 8.4.3
Source Code Documentation
xcom_transport.cc File Reference
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <limits.h>
#include <math.h>
#include <rpc/rpc.h>
#include <stdlib.h>
#include <string.h>
#include "xcom/xcom_profile.h"
#include "my_compiler.h"
#include "xcom/node_connection.h"
#include "xcom/node_list.h"
#include "xcom/node_no.h"
#include "xcom/retry.h"
#include "xcom/server_struct.h"
#include "xcom/simset.h"
#include "xcom/site_def.h"
#include "xcom/site_struct.h"
#include "xcom/sock_probe.h"
#include "xcom/synode_no.h"
#include "xcom/task.h"
#include "xcom/task_debug.h"
#include "xcom/task_net.h"
#include "xcom/task_os.h"
#include "xcom/x_platform.h"
#include "xcom/xcom_base.h"
#include "xcom/xcom_common.h"
#include "xcom/xcom_detector.h"
#include "xcom/xcom_memory.h"
#include "xcom/xcom_msg_queue.h"
#include "xcom/xcom_scope_guard.h"
#include "xcom/xcom_statistics.h"
#include "xcom/xcom_transport.h"
#include "xcom/xcom_vp_str.h"
#include "xcom/xdr_utils.h"
#include "xdr_gen/xcom_vp.h"
#include "xcom/network/network_provider_manager.h"
#include <openssl/err.h>
#include <openssl/ssl.h>

Classes

struct  parse_buf
 

Macros

#define MY_XCOM_PROTO   x_1_9
 
#define SERVER_MAX   (2 * NSERVERS)
 
#define XDRFUNC   xdrfunc
 
#define CRC32CSTART   0xFFFFFFFF
 
#define TAG_START   313
 
#define TERMINATE_CLIENT(ep)
 
#define EMIT    if (!emit(p)) return 0
 

Typedefs

typedef int(* node_set_selector) (site_def const *s, node_no node)
 
typedef struct parse_buf parse_buf
 

Functions

static void shut_srv (server *s)
 
static int pm (xcom_port port)
 
int close_open_connection (connection_descriptor *conn)
 
connection_descriptoropen_new_connection (const char *server, xcom_port port, int connection_timeout, network_provider_dynamic_log_level log_level)
 
connection_descriptoropen_new_local_connection (const char *server, xcom_port port)
 
bool is_able_to_connect_to_node (const char *server, const xcom_port port)
 Tests connectivity to another node under the current configuration setup. More...
 
result set_nodelay (int fd)
 
void init_xcom_transport (xcom_port listen_port)
 
void reset_srv_buf (srv_buf *sb)
 
static void alive (server *s)
 
static u_int srv_buf_capacity (srv_buf *sb)
 
static u_int srv_buf_free_space (srv_buf *sb)
 
static u_int srv_buf_buffered (srv_buf *sb)
 
static char * srv_buf_extract_ptr (srv_buf *sb)
 
static char * srv_buf_insert_ptr (srv_buf *sb)
 
static void advance_extract_ptr (srv_buf *sb, u_int len)
 
static u_int get_srv_buf (srv_buf *sb, char *data, u_int len)
 
static void advance_insert_ptr (srv_buf *sb, u_int len)
 
static u_int put_srv_buf (srv_buf *sb, char *data, u_int len)
 
int flush_srv_buf (server *s, int64_t *ret)
 
int is_new_node_eligible_for_ipv6 (xcom_proto incoming_proto, const site_def *current_site_def)
 Checks if an incoming node is eligible to enter the group. More...
 
static int _send_msg (server *s, pax_msg *p, node_no to, int64_t *ret)
 
void write_protoversion (unsigned char *buf, xcom_proto proto_vers)
 
xcom_proto read_protoversion (unsigned char *p)
 
int check_protoversion (xcom_proto x_proto, xcom_proto negotiated)
 
int send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag, int64_t *ret)
 
int apply_xdr (void *buff, uint32_t bufflen, xdrproc_t xdrfunc, void *xdrdata, enum xdr_op op)
 
void dbg_app_data (app_data_ptr a)
 
static int serialize (void *p, xcom_proto x_proto, uint32_t *out_len, xdrproc_t xdrfunc, char **out_buf)
 
static int old_proto_knows (xcom_proto x_proto, pax_op op)
 
int serialize_msg (pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
 
int deserialize_msg (pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
 
void init_crc32c ()
 
static servermksrv (char *srv, xcom_port port)
 
static serveraddsrv (char *srv, xcom_port port)
 
static void rmsrv (int i)
 
static void init_collect ()
 
void get_all_site_defs (site_def ***s, uint32_t *n)
 
static void mark_site_servers (site_def *site)
 
static void mark ()
 
static void sweep ()
 
void garbage_collect_servers ()
 
static void freesrv (server *s)
 
double server_active (site_def const *s, node_no i)
 
int srv_ref (server *s)
 
int srv_unref (server *s)
 
int incoming_connection_task (task_arg arg)
 
void server_detected (server *s)
 Updates timestamp of server. More...
 
static int dial (server *s, network_provider_dynamic_log_level dial_call_log_level)
 
int send_msg (server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
 
static int _send_server_msg (site_def const *s, node_no to, pax_msg *p)
 
int send_server_msg (site_def const *s, node_no to, pax_msg *p)
 
static int all (site_def const *s, node_no node)
 
static int not_self (site_def const *s, node_no node)
 
static int send_to_node_set (site_def const *s, node_no max, pax_msg *p, node_set_selector test_func, const char *dbg)
 
int send_to_all_site (site_def const *s, pax_msg *p, const char *dbg)
 
int send_to_all_except_self (site_def const *s, pax_msg *p, const char *dbg)
 
int send_to_all (pax_msg *p, const char *dbg)
 
static int send_other_loop (site_def const *s, pax_msg *p, const char *dbg)
 
int send_to_others (site_def const *s, pax_msg *p, const char *dbg)
 
int send_to_someone (site_def const *s, pax_msg *p, const char *dbg)
 
int send_to_acceptors (pax_msg *p, const char *dbg)
 
static int read_bytes (connection_descriptor const *rfd, char *p, uint32_t n, server *s, int64_t *ret)
 Reads n bytes from connection rfd without buffering reads. More...
 
static int buffered_read_bytes (connection_descriptor const *rfd, srv_buf *buf, char *p, uint32_t n, server *s, int64_t *ret)
 Reads n bytes from connection rfd with buffering reads. More...
 
void get_header_1_0 (unsigned char header_buf[], uint32_t *msgsize, x_msg_type *x_type, unsigned int *tag)
 
void put_header_1_0 (unsigned char header_buf[], uint32_t msgsize, x_msg_type x_type, unsigned int tag)
 
int read_msg (connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
 Reads message from connection rfd without buffering reads. More...
 
int buffered_read_msg (connection_descriptor *rfd, srv_buf *buf, pax_msg *p, server *s, int64_t *ret)
 Reads message from connection rfd with buffering reads. More...
 
static unsigned int incr_tag (unsigned int tag)
 
static void start_protocol_negotiation (channel *outgoing)
 
void wakeup_sender ()
 
static serverfind_server (server *table[], int n, char *name, xcom_port port)
 
bool is_server_in_current_view (server *sp)
 Checks if a given server is currently in use in the current view. More...
 
int sender_task (task_arg arg)
 
int local_sender_task (task_arg arg)
 
void update_servers (site_def *s, cargo_type operation)
 
void invalidate_servers (const site_def *old_site_def, const site_def *new_site_def)
 
int tcp_reaper_task (task_arg arg)
 
void ssl_free_con (connection_descriptor *con)
 
void close_connection (connection_descriptor *con)
 
void shutdown_connection (connection_descriptor *con)
 
void reset_connection (connection_descriptor *con)
 
xcom_proto common_xcom_version (site_def const *site)
 
xcom_proto set_latest_common_proto (xcom_proto x_proto)
 
xcom_proto get_latest_common_proto ()
 
xcom_proto negotiate_protocol (xcom_proto proto_vers)
 
xcom_proto minimum_ipv6_version ()
 Returns the version from which nodes are able to speak IPv6. More...
 
static int emit (parse_buf *p)
 
static int match_port (parse_buf *p, xcom_port *port)
 
static int match_ipv6 (parse_buf *p)
 
static int match_ipv4_or_name (parse_buf *p)
 
static int match_address (parse_buf *p)
 
static int match_ip_and_port (char const *address, char ip[IP_MAX_SIZE], xcom_port *port)
 
int get_ip_and_port (char const *address, char ip[IP_MAX_SIZE], xcom_port *port)
 Get the ip and port object from a given address in the authorized input format. More...
 

Variables

xcom_proto const my_min_xcom_version
 
xcom_proto const my_xcom_version
 
static int const NAGLE = 0
 
int xcom_shutdown
 
static xcom_port xcom_listen_port = 0
 
static xdrproc_t pax_msg_func []
 
static uint32_t crc_table [256]
 
static serverall_servers [SERVER_MAX]
 
static int maxservers = 0
 
linkage connect_wait
 
static xcom_proto latest_common_proto = MY_XCOM_PROTO
 

Macro Definition Documentation

◆ CRC32CSTART

#define CRC32CSTART   0xFFFFFFFF

◆ EMIT

#define EMIT    if (!emit(p)) return 0

◆ MY_XCOM_PROTO

#define MY_XCOM_PROTO   x_1_9

◆ SERVER_MAX

#define SERVER_MAX   (2 * NSERVERS)

◆ TAG_START

#define TAG_START   313

◆ TERMINATE_CLIENT

#define TERMINATE_CLIENT (   ep)
Value:
{ \
if (ep->s->crash_on_error) abort(); \
}
#define TERMINATE
Definition: task.h:303

◆ XDRFUNC

#define XDRFUNC   xdrfunc

Typedef Documentation

◆ node_set_selector

typedef int(* node_set_selector) (site_def const *s, node_no node)

◆ parse_buf

typedef struct parse_buf parse_buf

Function Documentation

◆ _send_msg()

static int _send_msg ( server s,
pax_msg p,
node_no  to,
int64_t *  ret 
)
static

◆ _send_server_msg()

static int _send_server_msg ( site_def const *  s,
node_no  to,
pax_msg p 
)
inlinestatic

◆ addsrv()

static server * addsrv ( char *  srv,
xcom_port  port 
)
static

◆ advance_extract_ptr()

static void advance_extract_ptr ( srv_buf sb,
u_int  len 
)
inlinestatic

◆ advance_insert_ptr()

static void advance_insert_ptr ( srv_buf sb,
u_int  len 
)
inlinestatic

◆ alive()

static void alive ( server s)
static

◆ all()

static int all ( site_def const *  s,
node_no  node 
)
static

◆ apply_xdr()

int apply_xdr ( void *  buff,
uint32_t  bufflen,
xdrproc_t  xdrfunc,
void *  xdrdata,
enum xdr_op  op 
)

◆ buffered_read_bytes()

static int buffered_read_bytes ( connection_descriptor const *  rfd,
srv_buf buf,
char *  p,
uint32_t  n,
server s,
int64_t *  ret 
)
static

Reads n bytes from connection rfd with buffering reads.

Parameters
[in]rfdPointer to open connection.
[in,out]bufUsed for buffering reads. Originally initialized by caller, maintained by buffered_read_bytes.
[out]pOutput buffer.
[in]nNumber of bytes to read
[out]sPointer to server.
[out]retNumber of bytes read, or -1 if failure.
Return values
0if task should terminate.
1if it should continue.

◆ buffered_read_msg()

int buffered_read_msg ( connection_descriptor rfd,
srv_buf buf,
pax_msg p,
server s,
int64_t *  ret 
)

Reads message from connection rfd with buffering reads.

Parameters
[in]rfdPointer to open connection.
[in,out]bufUsed for buffering reads.
[out]pOutput buffer.
[out]sPointer to server. Server timestamp updated if not 0.
[out]retNumber of bytes read, or -1 if failure.
Return values
0if task should terminate.
1if it should continue.

◆ check_protoversion()

int check_protoversion ( xcom_proto  x_proto,
xcom_proto  negotiated 
)

◆ close_connection()

void close_connection ( connection_descriptor con)

◆ close_open_connection()

int close_open_connection ( connection_descriptor conn)

◆ common_xcom_version()

xcom_proto common_xcom_version ( site_def const *  site)

◆ dbg_app_data()

void dbg_app_data ( app_data_ptr  a)

◆ deserialize_msg()

int deserialize_msg ( pax_msg p,
xcom_proto  x_proto,
char *  buf,
uint32_t  buflen 
)

◆ dial()

static int dial ( server s,
network_provider_dynamic_log_level  dial_call_log_level 
)
static

◆ emit()

static int emit ( parse_buf p)
static

◆ find_server()

static server * find_server ( server table[],
int  n,
char *  name,
xcom_port  port 
)
static

◆ flush_srv_buf()

int flush_srv_buf ( server s,
int64_t *  ret 
)

◆ freesrv()

static void freesrv ( server s)
static

◆ garbage_collect_servers()

void garbage_collect_servers ( )

◆ get_all_site_defs()

void get_all_site_defs ( site_def ***  s,
uint32_t *  n 
)

◆ get_header_1_0()

void get_header_1_0 ( unsigned char  header_buf[],
uint32_t *  msgsize,
x_msg_type x_type,
unsigned int *  tag 
)

◆ get_ip_and_port()

int get_ip_and_port ( char const *  address,
char  ip[IP_MAX_SIZE],
xcom_port port 
)

Get the ip and port object from a given address in the authorized input format.

For IP v4 is IP (or) NAME:PORT and for IPv6 is [IP (or) NAME]:PORT

Parameters
addressinput address to parse
ipthe resulting IP or Name
portthe resulting port
Returns
int true (1) in case of parse error

◆ get_latest_common_proto()

xcom_proto get_latest_common_proto ( )

◆ get_srv_buf()

static u_int get_srv_buf ( srv_buf sb,
char *  data,
u_int  len 
)
static

◆ incoming_connection_task()

int incoming_connection_task ( task_arg  arg)

◆ incr_tag()

static unsigned int incr_tag ( unsigned int  tag)
inlinestatic

◆ init_collect()

static void init_collect ( )
static

◆ init_crc32c()

void init_crc32c ( )

◆ init_xcom_transport()

void init_xcom_transport ( xcom_port  listen_port)

◆ invalidate_servers()

void invalidate_servers ( const site_def old_site_def,
const site_def new_site_def 
)

◆ is_able_to_connect_to_node()

bool is_able_to_connect_to_node ( const char *  server,
const xcom_port  port 
)

Tests connectivity to another node under the current configuration setup.

This function verifies that this node is able to successfully reach to the other node. It is used to do a sanity check at when a new member is about to be added to the list of participants and to check local node connectivity, in order to validate configurations.

Returns
false In case of error.
true If it succeeds to connect to a node.

◆ is_new_node_eligible_for_ipv6()

int is_new_node_eligible_for_ipv6 ( xcom_proto  incoming_proto,
const site_def current_site_def 
)

Checks if an incoming node is eligible to enter the group.

This function checks if a new node entering the group is able to be part of it. This is needed duw to downgrade procedures to server versions that do not speak IPv6. One will check if:

  • Our server is being contacted by a server that has a lower version than the IPv6 baseline
  • Check if the current configuration is all reachable by an IPv4 node

If all of the above hold true we are able to proceed and add the node. Else, we must fail.

Return values
1in case of success.

◆ is_server_in_current_view()

bool is_server_in_current_view ( server sp)

Checks if a given server is currently in use in the current view.

Parameters
spthe server to check
Returns
true if it is in the current view, false otherwise.

◆ local_sender_task()

int local_sender_task ( task_arg  arg)

◆ mark()

static void mark ( )
static

◆ mark_site_servers()

static void mark_site_servers ( site_def site)
static

◆ match_address()

static int match_address ( parse_buf p)
static

◆ match_ip_and_port()

static int match_ip_and_port ( char const *  address,
char  ip[IP_MAX_SIZE],
xcom_port port 
)
static

◆ match_ipv4_or_name()

static int match_ipv4_or_name ( parse_buf p)
static

◆ match_ipv6()

static int match_ipv6 ( parse_buf p)
static

◆ match_port()

static int match_port ( parse_buf p,
xcom_port port 
)
static

◆ minimum_ipv6_version()

xcom_proto minimum_ipv6_version ( )

Returns the version from which nodes are able to speak IPv6.

Returns
xcom_proto the version from which nodes are able to speak IPv6

◆ mksrv()

static server * mksrv ( char *  srv,
xcom_port  port 
)
static

◆ negotiate_protocol()

xcom_proto negotiate_protocol ( xcom_proto  proto_vers)

◆ not_self()

static int not_self ( site_def const *  s,
node_no  node 
)
static

◆ old_proto_knows()

static int old_proto_knows ( xcom_proto  x_proto,
pax_op  op 
)
inlinestatic

◆ open_new_connection()

connection_descriptor * open_new_connection ( const char *  server,
xcom_port  port,
int  connection_timeout,
network_provider_dynamic_log_level  log_level 
)

◆ open_new_local_connection()

connection_descriptor * open_new_local_connection ( const char *  server,
xcom_port  port 
)

◆ pm()

static int pm ( xcom_port  port)
static

◆ put_header_1_0()

void put_header_1_0 ( unsigned char  header_buf[],
uint32_t  msgsize,
x_msg_type  x_type,
unsigned int  tag 
)

◆ put_srv_buf()

static u_int put_srv_buf ( srv_buf sb,
char *  data,
u_int  len 
)
static

◆ read_bytes()

static int read_bytes ( connection_descriptor const *  rfd,
char *  p,
uint32_t  n,
server s,
int64_t *  ret 
)
static

Reads n bytes from connection rfd without buffering reads.

Parameters
[in]rfdPointer to open connection.
[out]pOutput buffer.
[in]nNumber of bytes to read.
[out]sPointer to server.
[out]retNumber of bytes read, or -1 if failure.
Return values
0if task should terminate.
1if it should continue.

◆ read_msg()

int read_msg ( connection_descriptor rfd,
pax_msg p,
server s,
int64_t *  ret 
)

Reads message from connection rfd without buffering reads.

Parameters
[in]rfdPointer to open connection.
[out]pOutput buffer.
[in,out]sPointer to server. Server timestamp updated if not 0.
[in,out]retNumber of bytes read, or -1 if failure.
Return values
0if task should terminate.
1if it should continue.

◆ read_protoversion()

xcom_proto read_protoversion ( unsigned char *  p)

◆ reset_connection()

void reset_connection ( connection_descriptor con)

◆ reset_srv_buf()

void reset_srv_buf ( srv_buf sb)

◆ rmsrv()

static void rmsrv ( int  i)
static

◆ send_msg()

int send_msg ( server s,
node_no  from,
node_no  to,
uint32_t  group_id,
pax_msg p 
)

◆ send_other_loop()

static int send_other_loop ( site_def const *  s,
pax_msg p,
const char *  dbg 
)
inlinestatic

◆ send_proto()

int send_proto ( connection_descriptor con,
xcom_proto  x_proto,
x_msg_type  x_type,
unsigned int  tag,
int64_t *  ret 
)

◆ send_server_msg()

int send_server_msg ( site_def const *  s,
node_no  to,
pax_msg p 
)

◆ send_to_acceptors()

int send_to_acceptors ( pax_msg p,
const char *  dbg 
)

◆ send_to_all()

int send_to_all ( pax_msg p,
const char *  dbg 
)

◆ send_to_all_except_self()

int send_to_all_except_self ( site_def const *  s,
pax_msg p,
const char *  dbg 
)

◆ send_to_all_site()

int send_to_all_site ( site_def const *  s,
pax_msg p,
const char *  dbg 
)

◆ send_to_node_set()

static int send_to_node_set ( site_def const *  s,
node_no  max,
pax_msg p,
node_set_selector  test_func,
const char *  dbg 
)
inlinestatic

◆ send_to_others()

int send_to_others ( site_def const *  s,
pax_msg p,
const char *  dbg 
)

◆ send_to_someone()

int send_to_someone ( site_def const *  s,
pax_msg p,
const char *  dbg 
)

◆ sender_task()

int sender_task ( task_arg  arg)

◆ serialize()

static int serialize ( void *  p,
xcom_proto  x_proto,
uint32_t *  out_len,
xdrproc_t  xdrfunc,
char **  out_buf 
)
static

◆ serialize_msg()

int serialize_msg ( pax_msg p,
xcom_proto  x_proto,
uint32_t *  buflen,
char **  buf 
)

◆ server_active()

double server_active ( site_def const *  s,
node_no  i 
)

◆ server_detected()

void server_detected ( server s)

Updates timestamp of server.

Parameters
[in]sPointer to server.

◆ set_latest_common_proto()

xcom_proto set_latest_common_proto ( xcom_proto  x_proto)

◆ set_nodelay()

result set_nodelay ( int  fd)

◆ shut_srv()

static void shut_srv ( server s)
static

◆ shutdown_connection()

void shutdown_connection ( connection_descriptor con)

◆ srv_buf_buffered()

static u_int srv_buf_buffered ( srv_buf sb)
static

◆ srv_buf_capacity()

static u_int srv_buf_capacity ( srv_buf sb)
static

◆ srv_buf_extract_ptr()

static char * srv_buf_extract_ptr ( srv_buf sb)
static

◆ srv_buf_free_space()

static u_int srv_buf_free_space ( srv_buf sb)
static

◆ srv_buf_insert_ptr()

static char * srv_buf_insert_ptr ( srv_buf sb)
static

◆ srv_ref()

int srv_ref ( server s)

◆ srv_unref()

int srv_unref ( server s)

◆ ssl_free_con()

void ssl_free_con ( connection_descriptor con)

◆ start_protocol_negotiation()

static void start_protocol_negotiation ( channel outgoing)
static

◆ sweep()

static void sweep ( )
static

◆ tcp_reaper_task()

int tcp_reaper_task ( task_arg  arg)

◆ update_servers()

void update_servers ( site_def s,
cargo_type  operation 
)

◆ wakeup_sender()

void wakeup_sender ( )

◆ write_protoversion()

void write_protoversion ( unsigned char *  buf,
xcom_proto  proto_vers 
)

Variable Documentation

◆ all_servers

server* all_servers[SERVER_MAX]
static

◆ connect_wait

linkage connect_wait
Initial value:
= {
linkage connect_wait
Definition: xcom_transport.cc:1424

◆ crc_table

uint32_t crc_table[256]
static

◆ latest_common_proto

xcom_proto latest_common_proto = MY_XCOM_PROTO
static

◆ maxservers

int maxservers = 0
static

◆ my_min_xcom_version

xcom_proto const my_min_xcom_version
Initial value:
=
x_1_0

◆ my_xcom_version

xcom_proto const my_xcom_version
Initial value:
=
#define MY_XCOM_PROTO
Definition: xcom_transport.cc:79

◆ NAGLE

int const NAGLE = 0
static

◆ pax_msg_func

xdrproc_t pax_msg_func[]
static
Initial value:
= {
nullptr,
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_0),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_1),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_2),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_3),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_4),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_5),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_6),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_7),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_8),
reinterpret_cast<xdrproc_t>(xdr_pax_msg_1_9)}
bool_t(* xdrproc_t)(XDR *, void *,...)
Definition: xdr.h:143

◆ xcom_listen_port

xcom_port xcom_listen_port = 0
static

◆ xcom_shutdown

int xcom_shutdown
extern