MySQL  8.0.18
Source Code Documentation
xcom_base.c File Reference

plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision. More...

#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <poll.h>
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_profile.h"
#include "my_compiler.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/x_platform.h"
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sockio.h>
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/app_data.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/get_synode_app_data.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/node_no.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/server_struct.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/simset.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/site_struct.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/task.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/task_os.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_common.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_detector.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_transport.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xdr_utils.h"
#include "plugin/group_replication/libmysqlgcs/xdr_gen/xcom_vp.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/bitset.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/node_list.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/node_set.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/pax_msg.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/site_def.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/sock_probe.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/synode_no.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/task_debug.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/task_net.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_cache.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_cfg.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_interface.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_memory.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_msg_queue.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_recover.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_statistics.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_vp_str.h"
#include "plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/retry.h"

Macros

#define SYS_STRERROR_SIZE   512
 
#define IS_CONS_ALL(p)   ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)
 
#define PROP_ITER
 
#define FNVSTART   0x811c9dc5
 
#define GOTO(x)
 
#define LOSER(x, site)   0
 
#define DBGFIX2(x)
 
#define NEXTSTATE(x)   ep->state = (x)
 
#define FIFO_SIZE   1000
 
#define reply_msg(m)
 
#define CREATE_REPLY(x)
 
#define SEND_REPLY
 
#define BAL_FMT   "ballot {cnt %d node %d}"
 
#define BAL_MEM(x)   (x).cnt, (x).node
 
#define SERIALIZE_REPLY(msg)
 
#define WRITE_REPLY
 
#define CO_BEGIN
 
#define CO_END   }
 
#define CO_RETURN(x)
 
#define HALT(x)
 
#define X(b)   #b,
 
#define CONNECT_FAIL
 

Typedefs

typedef enum allow_event_horizon_result allow_event_horizon_result
 
typedef enum exec_state exec_state
 
typedef enum xcom_send_app_wait_result xcom_send_app_wait_result
 

Enumerations

enum  allow_event_horizon_result { EVENT_HORIZON_ALLOWED, EVENT_HORIZON_INVALID, EVENT_HORIZON_UNCHANGEABLE }
 Check if we can reconfigure the event horizon. More...
 
enum  exec_state { FETCH = 0, EXECUTE = 1 }
 
enum  { TAG_START = 313 }
 
enum  xcom_send_app_wait_result {
  SEND_REQUEST_FAILED = 0, RECEIVE_REQUEST_FAILED, REQUEST_BOTCHED, RETRIES_EXCEEDED,
  REQUEST_OK_RECEIVED, REQUEST_FAIL_RECEIVED
}
 

Functions

void handle_need_boot (server *srv, site_def const *s, node_no node)
 
long xcom_unique_long (void)
 
void get_host_name (char *a, char *name)
 
static double wakeup_delay (double old)
 
static int proposer_task (task_arg arg)
 
static int executor_task (task_arg arg)
 
static int sweeper_task (task_arg arg)
 
int alive_task (task_arg arg)
 
int cache_manager_task (task_arg arg)
 
int detector_task (task_arg arg)
 
static int finished (pax_machine *p)
 
static int accepted (pax_machine *p)
 
static int started (pax_machine *p)
 
static synode_no first_free_synode (synode_no msgno)
 
static void free_forced_config_site_def ()
 
static void force_pax_machine (pax_machine *p, int enforcer)
 
void bit_set_or (bit_set *x, bit_set const *y)
 
static void init_proposers ()
 
void initialize_lsn (uint64_t n)
 
void init_base_vars ()
 
synode_no get_current_message ()
 
channelget_prop_input_queue ()
 
synode_no get_max_synode ()
 
static bool_t is_latest_config (site_def const *const config)
 
static site_def const * first_event_horizon_reconfig ()
 Get the first pending configuration that reconfigures the event horizon. More...
 
static site_def const * latest_event_horizon_reconfig ()
 Get the latest pending configuration that reconfigures the event horizon. More...
 
static synode_no add_event_horizon (synode_no s)
 Add the event horizon to the given base synod s. More...
 
void set_max_synode_from_unified_boot (synode_no unified_boot_synode)
 
void set_group (uint32_t id)
 Set node group. More...
 
static void bury_site (uint32_t id)
 
static bool_t is_dead_site (uint32_t id)
 
node_set * init_node_set (node_set *set, u_int n)
 
node_set * alloc_node_set (node_set *set, u_int n)
 
static synode_no incr_msgno (synode_no msgno)
 
synode_no incr_synode (synode_no synode)
 
synode_no decr_synode (synode_no synode)
 
static void skip_value (pax_msg *p)
 
static void pexitall (int i)
 
static int ignoresig (int signum)
 
static int recently_active (pax_machine *p)
 
int pm_finished (pax_machine *p)
 
static int accepted_noop (pax_machine *p)
 
static int noop_match (pax_machine *p, pax_msg *pm)
 
void set_last_received_config (synode_no received_config_change)
 
static node_no max_check (site_def const *site)
 
static int is_forcing_node (pax_machine const *p)
 
static int majority (bit_set const *nodeset, site_def const *s, int all, int delay, int force)
 
static int prep_majority (site_def const *site, pax_machine *p)
 
static int prop_majority (site_def const *site, pax_machine *p)
 
void * xcom_thread_main (void *cp)
 
site_def const * get_executor_site ()
 
site_def const * get_proposer_site ()
 
void init_xcom_base ()
 
static void init_tasks ()
 
void xcom_thread_init ()
 
static void empty_prop_input_queue ()
 
void xcom_thread_deinit ()
 
bool_t must_force_recover ()
 
void set_force_recover (bool_t const x)
 
static void create_proposers ()
 
static void terminate_proposers ()
 
static void set_proposer_startpoint ()
 
static int yes (xcom_port port)
 
int taskmain (xcom_port listen_port)
 
void start_run_tasks ()
 
int xcom_taskmain (xcom_port listen_port)
 
void set_xcom_run_cb (xcom_state_change_cb x)
 
void set_xcom_comms_cb (xcom_state_change_cb x)
 
void set_xcom_terminate_cb (xcom_state_change_cb x)
 
void set_xcom_exit_cb (xcom_state_change_cb x)
 
void set_xcom_expel_cb (xcom_state_change_cb x)
 
void set_xcom_input_try_pop_cb (xcom_input_try_pop_cb pop)
 
bool xcom_input_new_signal_connection (char const *address, xcom_port port)
 
static int64_t socket_write (connection_descriptor *wfd, void *_buf, uint32_t n)
 
bool xcom_input_signal ()
 
void xcom_input_free_signal_connection ()
 
int local_server (task_arg arg)
 
static bool local_server_is_setup ()
 
int xcom_taskmain2 (xcom_port listen_port)
 
static void prepare (pax_msg *p, pax_op op)
 
void init_prepare_msg (pax_msg *p)
 Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static int prepare_msg (pax_msg *p)
 
pax_msg * create_noop (pax_msg *p)
 Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static pax_msg * create_read (site_def const *site, pax_msg *p)
 
static int skip_msg (pax_msg *p)
 
static void brand_app_data (pax_msg *p)
 
static synode_no my_unique_id (synode_no synode)
 
static void set_unique_id (pax_msg *msg, synode_no synode)
 
void init_propose_msg (pax_msg *p)
 Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static int send_propose_msg (pax_msg *p)
 
static int propose_msg (pax_msg *p)
 
static void set_learn_type (pax_msg *p)
 
static void init_learn_msg (pax_msg *p)
 
static int send_learn_msg (site_def const *site, pax_msg *p)
 
static pax_msg * create_tiny_learn_msg (pax_machine *pm, pax_msg *p)
 
static int send_tiny_learn_msg (site_def const *site, pax_msg *p)
 
void prepare_push_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 Initializes the message msg to go through a 3-phase, regular Paxos. More...
 
void prepare_push_2p (site_def const *site, pax_machine *p)
 Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_). More...
 
static void push_msg_2p (site_def const *site, pax_machine *p)
 
static void push_msg_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 
static void brand_client_msg (pax_msg *msg, synode_no msgno)
 
void xcom_send (app_data_ptr a, pax_msg *msg)
 
static uint32_t fnv_hash (unsigned char *buf, size_t length, uint32_t sum)
 
uint32_t new_id ()
 Create a new (hopefully unique) ID. More...
 
static synode_no getstart (app_data_ptr a)
 
void site_install_action (site_def *site, cargo_type operation)
 
static site_defcreate_site_def_with_start (app_data_ptr a, synode_no start)
 
static site_definstall_ng_with_start (app_data_ptr a, synode_no start)
 
site_definstall_node_group (app_data_ptr a)
 
int is_real_recover (app_data_ptr a)
 
void set_max_synode (synode_no synode)
 
static int is_busy (synode_no s)
 
bool_t match_my_msg (pax_msg *learned, pax_msg *mine)
 
static uint64_t assign_lsn ()
 Assign the next log sequence number (lsn) for a message. More...
 
static void propose_noop (synode_no find, pax_machine *p)
 
static uint64_t too_far_threshold (xcom_event_horizon active_event_horizon)
 Checks if the given synod s is outside the event horizon. More...
 
static uint64_t too_far_threshold_new_event_horizon_pending (site_def const *new_config)
 
static int too_far (synode_no s)
 
static int is_view (cargo_type x)
 
static int is_config (cargo_type x)
 
static int wait_for_cache (pax_machine **pm, synode_no synode, double timeout)
 
static void terminate_and_exit ()
 
static node_no leader (site_def const *s)
 
int iamthegreatest (site_def const *s)
 
void execute_msg (site_def const *site, pax_machine *pma, pax_msg *p)
 
static void read_missing_values (int n)
 
static void propose_missing_values (int n)
 
static void find_value (site_def const *site, unsigned int *wait, int n)
 
int get_xcom_message (pax_machine **p, synode_no msgno, int n)
 
synode_no set_executed_msg (synode_no msgno)
 
synode_no set_current_message (synode_no msgno)
 
static void update_max_synode (pax_msg *p)
 
static void debug_loser (synode_no x)
 
static void send_value (site_def const *site, node_no to, synode_no synode)
 
static synode_no compute_delay (synode_no start, xcom_event_horizon event_horizon)
 Returns the message number where it is safe for nodes in previous configuration to exit. More...
 
static void inform_removed (int index, int all)
 
static bool_t backwards_compatible (xcom_event_horizon event_horizon)
 
static bool_t reconfigurable_event_horizon (xcom_proto protocol_version)
 
static bool_t add_node_unsafe_against_ipv4_old_nodes (app_data_ptr a)
 
static bool_t unsafe_against_event_horizon (node_address const *node)
 Check if a node is compatible with the group's event horizon. More...
 
static bool_t add_node_unsafe_against_event_horizon (app_data_ptr a)
 
site_defhandle_add_node (app_data_ptr a)
 Reconfigure the group membership: add new member(s). More...
 
static void log_event_horizon_reconfiguration_failure (allow_event_horizon_result error_code, xcom_event_horizon attempted_event_horizon)
 
static allow_event_horizon_result allow_event_horizon (xcom_event_horizon event_horizon)
 
static bool_t unsafe_event_horizon_reconfiguration (app_data_ptr a)
 
static bool_t are_there_dead_nodes_in_new_config (app_data_ptr a)
 
bool_t handle_event_horizon (app_data_ptr a)
 Reconfigure the event horizon. More...
 
int terminator_task (task_arg arg)
 
static void delayed_terminate_and_exit (double t)
 
static int is_empty_site (site_def const *s)
 
site_defhandle_remove_node (app_data_ptr a)
 
bool_t handle_config (app_data_ptr a)
 
synode_no get_delivered_msg ()
 
static int is_member (site_def const *site)
 
static int addone (int i)
 
static int fifo_empty ()
 
static int fifo_full ()
 
static void fifo_insert (synode_no s)
 
static synode_no fifo_extract ()
 
static synode_no fifo_front ()
 
static synode_no get_sweep_start ()
 
static void send_read (synode_no find)
 
static int ok_to_propose (pax_machine *p)
 
void request_values (synode_no find, synode_no end)
 
bool_t safe_app_data_copy (pax_msg **target, app_data_ptr source)
 Copies app data source into target and checks if the copy succeeded. More...
 
static pax_msg * create_learn_msg_for_ignorant_node (pax_machine *p, pax_msg *pm, synode_no synode)
 
static void teach_ignorant_node (site_def const *site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
 
static void handle_read (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
static pax_msg * create_ack_prepare_msg (pax_machine *p, pax_msg *pm, synode_no synode)
 
pax_msg * handle_simple_prepare (pax_machine *p, pax_msg *pm, synode_no synode)
 Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static void handle_prepare (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
bool_t check_propose (site_def const *site, pax_machine *p)
 
static pax_msg * check_learn (site_def const *site, pax_machine *p)
 
static void do_learn (site_def const *site, pax_machine *p, pax_msg *m)
 
bool_t handle_simple_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol. More...
 
static void handle_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 
static pax_msg * create_ack_accept_msg (pax_msg *m, synode_no synode)
 
pax_msg * handle_simple_accept (pax_machine *p, pax_msg *m, synode_no synode)
 Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static void handle_accept (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *m)
 
pax_msg * handle_simple_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol. More...
 
static void handle_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 
static void activate_sweeper ()
 
void handle_tiny_learn (site_def const *site, pax_machine *pm, pax_msg *p)
 Process the incoming tiny, i.e. More...
 
static void force_interval (synode_no start, synode_no end, int enforcer)
 
static void start_force_config (site_def *s, int enforcer)
 
void handle_learn (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming Learn message. More...
 
static void handle_skip (site_def const *site, pax_machine *p, pax_msg *m)
 
static void handle_client_msg (pax_msg *p)
 
static void handle_boot (site_def const *site, pax_msg *p)
 
bool should_handle_boot (site_def const *site, pax_msg *p)
 
void init_need_boot_op (pax_msg *p, node_address *identity)
 Initializes the message p as a need_boot_op message. More...
 
static void handle_alive (site_def const *site, linkage *reply_queue, pax_msg *pm)
 
void add_to_cache (app_data_ptr a, synode_no synode)
 
xcom_event_horizon xcom_get_minimum_event_horizon ()
 
xcom_event_horizon xcom_get_maximum_event_horizon ()
 
static client_reply_code xcom_get_event_horizon (xcom_event_horizon *event_horizon)
 Retrieves the latest event horizon. More...
 
static u_int allow_add_node (app_data_ptr a)
 
static u_int allow_remove_node (app_data_ptr a)
 
static void log_cfgchange_wrong_group (app_data_ptr a, const char *const message_fmt)
 Logs the fact that an add/remove node request is aimed at another group. More...
 
static client_reply_code can_execute_cfgchange (pax_msg *p)
 Validates if a configuration command can be executed. More...
 
void dispatch_get_event_horizon (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void log_get_synode_app_data_failure (xcom_get_synode_app_data_result error_code)
 
void dispatch_get_synode_app_data (site_def const *site, pax_msg *p, linkage *reply_queue)
 
pax_msg * dispatch_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static bool_t should_poll_cache (pax_op op)
 
int acceptor_learner_task (task_arg arg)
 
static void server_handle_need_snapshot (server *srv, site_def const *s, node_no node)
 
int reply_handler_task (task_arg arg)
 
static void xcom_sleep (unsigned int seconds)
 
void send_app_data (app_data_ptr a)
 
void xcom_send_data (uint32_t size, char *data)
 
app_data_ptr create_config (node_list *nl, cargo_type type)
 
app_data_ptr init_config_with_group (app_data *a, node_list *nl, cargo_type type, uint32_t group_id)
 
app_data_ptr init_set_event_horizon_msg (app_data *a, uint32_t group_id, xcom_event_horizon event_horizon)
 
app_data_ptr init_get_event_horizon_msg (app_data *a, uint32_t group_id)
 
app_data_ptr init_app_msg (app_data *a, char *payload, u_int payload_size)
 
app_data_ptr init_terminate_command (app_data *a)
 
static app_data_ptr init_get_synode_app_data_msg (app_data *a, uint32_t group_id, synode_no_array *const synodes)
 
app_data_ptr init_set_cache_size_msg (app_data *a, uint64_t cache_limit)
 
app_data_ptr init_convert_into_local_server_msg (app_data *a)
 
app_data_ptr create_config_with_group (node_list *nl, cargo_type type, uint32_t group_id)
 
void send_boot (node_list *nl)
 
void send_add_node (node_list *nl)
 
void send_remove_node (node_list *nl)
 
void send_config (node_list *nl)
 
void send_client_app_data (char *srv, xcom_port port, app_data_ptr a)
 
void send_client_boot (char *srv, xcom_port port, node_list *nl)
 
void send_client_add_node (char *srv, xcom_port port, node_list *nl)
 
void send_client_remove_node (char *srv, xcom_port port, node_list *nl)
 
void send_client_config (char *srv, xcom_port port, node_list *nl)
 
static void server_send_snapshot (server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
 
static void send_snapshot (site_def const *s, gcs_snapshot *gcs_snap, node_no node)
 
static void server_push_log (server *srv, synode_no push, node_no node)
 
static void push_log (synode_no push, node_no node)
 
static void handle_need_snapshot (site_def const *s, node_no node)
 
static int xcom_timer (task_arg arg)
 
static void stop_x_timer ()
 
static void start_x_timer (double t)
 
static void note_snapshot (task_arg fsmargs)
 
static void reset_snapshot_mask ()
 
static int got_all_snapshots ()
 
static int better_snapshot (task_arg fsmargs)
 
static void handle_x_snapshot (task_arg fsmargs)
 
static void update_best_snapshot (task_arg fsmargs)
 
static void send_need_boot ()
 
xcom_state xcom_fsm (xcom_actions action, task_arg fsmargs)
 
void xcom_add_node (char *addr, xcom_port port, node_list *nl)
 
void xcom_fsm_add_node (char *addr, node_list *nl)
 
void set_app_snap_handler (app_snap_handler x)
 
void set_app_snap_getter (app_snap_getter x)
 
static result checked_create_socket (int domain, int type, int protocol)
 
static result socket_read (connection_descriptor *rfd, void *buf, int n)
 
static int64_t socket_read_bytes (connection_descriptor *rfd, char *p, uint32_t n)
 
static result xcom_close_socket (int *sock)
 
static result xcom_shut_close_socket (int *sock)
 
struct addrinfo * does_node_have_v4_address (struct addrinfo *retrieved)
 Retreives a node IPv4 address, if it exists. More...
 
static int timed_connect (int fd, struct sockaddr *sock_addr, socklen_t sock_size)
 
static connection_descriptorconnect_xcom (const char *server, xcom_port port)
 
connection_descriptorxcom_open_client_connection (const char *server, xcom_port port)
 
static int xcom_send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
 
static int xcom_recv_proto (connection_descriptor *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
 
static int is_cargo_type (app_data_ptr a, cargo_type t)
 Checks if a given app_data is from a given cargo_type. More...
 
static char * get_add_node_address (app_data_ptr a, unsigned int *member)
 Retrieves the address that was used in the add_node request. More...
 
int is_node_v4_reachable_with_info (struct addrinfo *retrieved_addr_info)
 
int is_node_v4_reachable (char *node_address)
 
int are_we_allowed_to_upgrade_to_v6 (app_data_ptr a)
 
int64_t xcom_send_client_app_data (connection_descriptor *fd, app_data_ptr a, int force)
 
int64_t xcom_client_send_die (connection_descriptor *fd)
 
int64_t xcom_client_send_data (uint32_t size, char *data, connection_descriptor *fd)
 
static pax_msg * socket_read_msg (connection_descriptor *rfd, pax_msg *p)
 
int xcom_close_client_connection (connection_descriptor *connection)
 
int xcom_client_boot (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
xcom_send_app_wait_result xcom_send_app_wait_and_get (connection_descriptor *fd, app_data *a, int force, pax_msg *p)
 Send a message and wait for response. More...
 
int xcom_send_app_wait (connection_descriptor *fd, app_data *a, int force)
 
int xcom_send_cfg_wait (connection_descriptor *fd, node_list *nl, uint32_t group_id, cargo_type ct, int force)
 
int xcom_client_add_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_remove_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_get_event_horizon (connection_descriptor *fd, uint32_t group_id, xcom_event_horizon *event_horizon)
 
int xcom_client_set_event_horizon (connection_descriptor *fd, uint32_t group_id, xcom_event_horizon event_horizon)
 
int xcom_client_get_synode_app_data (connection_descriptor *const fd, uint32_t group_id, synode_no_array *const synodes, synode_app_data_array *const reply)
 
int xcom_client_force_config (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_enable_arbitrator (connection_descriptor *fd)
 
int xcom_client_disable_arbitrator (connection_descriptor *fd)
 
int xcom_client_terminate_and_exit (connection_descriptor *fd)
 
int xcom_client_set_cache_limit (connection_descriptor *fd, uint64_t cache_limit)
 
int xcom_client_convert_into_local_server (connection_descriptor *fd)
 

Variables

int const threephase = 0
 
int ARBITRATOR_HACK = 0
 
static int const no_duplicate_payload = 1
 
static int use_buffered_read = 1
 
static unsigned short oom_abort = 0
 
int xcom_shutdown = 0
 
synode_no executed_msg
 
synode_no max_synode
 
task_envboot = NULL
 
task_envdetector = NULL
 
task_envkiller = NULL
 
task_envnet_boot = NULL
 
task_envnet_recover = NULL
 
void * xcom_thread_input = 0
 
static task_envexecutor = NULL
 
static task_envsweeper = NULL
 
static task_envretry = NULL
 
static task_envproposer [PROPOSERS]
 
static task_envalive_t = NULL
 
static task_envcache_task = NULL
 
static uint32_t my_id = 0
 
static synode_no current_message
 
static synode_no last_config_modification_id
 
static uint64_t lsn = 0
 
static channel prop_input_queue
 
int client_boot_done
 
int netboot_ok
 
int booting
 
start_t start_type
 
static linkage exec_wait
 
struct {
   int   n
 
   unsigned long   id [MAX_DEAD]
 
dead_sites
 
static site_defforced_config = 0
 
static int wait_forced_config = 0
 
static site_def const * executor_site = 0
 
static site_defproposer_site = 0
 
static bool_t force_recover = FALSE
 
static xcom_state_change_cb xcom_run_cb = 0
 
static xcom_state_change_cb xcom_terminate_cb = 0
 
static xcom_state_change_cb xcom_comms_cb = 0
 
static xcom_state_change_cb xcom_exit_cb = 0
 
static xcom_state_change_cb xcom_expel_cb = 0
 
static xcom_input_try_pop_cb xcom_try_pop_from_input_cb = NULL
 
static connection_descriptorinput_signal_connection = NULL
 
static xcom_proto const first_event_horizon_aware_protocol = x_1_4
 
static synode_no delivered_msg
 
struct {
   int   n
 
   int   front
 
   int   rear
 
   synode_no   q [FIFO_SIZE]
 
delay_fifo
 
static double sent_alive = 0.0
 
static int clicnt = 0
 
static app_snap_getter get_app_snap
 
static app_snap_handler handle_app_snap
 
static task_envx_timer = NULL
 
const char * xcom_state_name [] = {x_state_list}
 
const char * xcom_actions_name [] = {x_actions}
 
static int snapshots [NSERVERS]
 

Detailed Description

plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.

The interface to xcom is largely intact, one notable change is that xcom will consider the message delivered as soon as it has got a majority. Consequently, the VP set will not necessarily show all nodes which will actually receive the message.

OHKFIX Add wait for complete last known node set to mimic the old semantics.

IMPORTANT: What xcom does and what it does not do:

xcom messages are received in the same order on all nodes.

xcom guarantees that if a message is delivered to one node, it will eventually be seen on all other nodes as well.

xcom messages are available to a crashed node when it comes up again if at least one node which knows the value of the message has not crashed. The size of the message cache is configurable.

OHKFIX Add logging to disk to make messages durable across system crash and to increase the number of messages which may be cached.

There is no guarantee whatsoever about the order of messages from different nodes, not even the order of multiple messages from the same node. It is up to the client to impose such an order by waiting on a message before it sends the next.

xcom can notify the client that a message has timed out, and in that case will try to cancel the message, but it cannot guarantee that a message which has timed out will not be delivered.

xcom attaches a node set to each message as it is delivered to the client. This node set reflects the current node set that xcom believes is active, it does not mean that the message has been delivered yet to all nodes in the set. Neither does it mean that the message has not been delivered to the nodes not in the set.

A cache of Paxos state machines is central to the new design. The purpose of the cache is both to store a window of messages, and to decouple the different parts of xcom, like message proposal, message delivery and execution, and recovery. The old cache was limited to caching messages, and a single state machine ran the combined VP and Paxos algorithm. This constrained xcom to deliver only a single message at a time.

Each instance of the Paxos state machine implements the basic Paxos protocol. Unlike the cache in the old system, it is not cleared when a site is deleted. This removes some problems related to message delivery during site deletion. The cache is a classic fixed size LRU with a hash index.

Some extensions to the basic Paxos algorithm has been implemented:

A node has ownership to all synodes with its own node number. Only a node with node number N can propose a value for synode {X N}, where X is the sequence number, and N is the node number. Other nodes can only propose the special value no_op for synode {X N}. The reason for this is to retain the leaderless Paxos algorithm, but to avoid collisions between nodes which are competing for the same synode number. With this scheme, each node has its own unique number series during normal operation. The scheme has the following implications:

  1. If a node N has not already proposed a value for the synode {X N}, it may at any time send a LEARN message to the other nodes with the reserved value no_op, without going through phase 1 and 2 of Paxos. This is because the other nodes are constrained to propose no_op for this synode, so the final outcome will always be no_op. To avoid unnecessary message transmission, a node will try to broadcast the no_op LEARN messages by piggybacking the information on the messages of the basic Paxos protocol.
  2. Other nodes which want to find the value of synode {X N} may do so by trying to get the value no_op accepted by following the basic Paxos algorithm. The result will be the actual value proposed by node N if it has done so, otherwise no_op. This will typically only be necessary when a node is down, and the other nodes need to find the values from the missing node in order to be able to continue execution.

Messages are delivered in order to the client, and the order is determined by the sequence number and the node number, with the sequence number as the most significant part.

The xcom network interface has been redesigned and is now implemented directly on top of TCP, and has so far been completely trouble free. We use poll() or select() to implement non-blocking send and receive, but libev could equally well have been used.

Multicast is implemented on top of unicast as before, but the implementation is prepared to use real multicast with relatively minor changes.

The roles of proposer, acceptor/learner, and executor are now directly mapped to unique task types which interact with the Paxos state machines, whereas the previous implementation folded all the roles into a single event driven state machine.

The following terminology will be used:

A node is an instance of the xcom thread. There is only one instance of the xcom thread in the agent. A client is the application which is using xcom to send messages. A thread is a real OS thread. A task is a logical process. It is implemented by coroutines and an explicit stack.

The implementation of tasks and non-blocking socket operations is isolated in task.h and task.c.

A node will open a tcp connection to each of the other nodes. This connection is used for all communication initiated by the node, and replies to messages will arrive on the connection on which it was sent.

static int tcp_server(task_arg);

The tcp_server listens on the xcom port and starts an acceptor_learner_task whenever a new connection is detected.

static int tcp_reaper_task(task_arg);

Closes tcp connection which have been unused for too long.

static int sender_task(task_arg);

The sender_task waits for tcp messages on its input queue and sends it on the tcp socket. If the socket is closed for any reason, the sender_task will reconnect the socket. There is one sender_task for each socket. The sender task exists mainly to simplify the logic in the other tasks, but it could have been replaced with a coroutine which handles the connection logic after having reserved the socket for its client task.

static int generator_task(task_arg);

The generator_task reads messages from the client queue and moves them into the input queue of the proposer_task.

OHKFIX Use a tcp socket instead of the client queue. We can then remove the generator_task and let the acceptor_learner_task do the dispatching.

static int proposer_task(task_arg);

Assign a message number to an outgoing message and try to get it accepted. There may be several proposer tasks on each node working in parallel. If there are multiple proposer tasks, xcom can not guarantee that the messages will be sent in the same order as received from the client.

static int acceptor_learner_task(task_arg);

This is the server part of the xcom thread. There is one acceptor_learner_task for each node in the system. The acceptor learner_task reads messages from the socket, finds the correct Paxos state machine, and dispatches to the correct message handler with the state machine and message as arguments.

static int reply_handler_task(task_arg);

The reply_handler_task does the same job as the acceptor_learner_task, but listens on the socket which the node uses to send messages, so it will handle only replies on that socket.

static int executor_task(task_arg);

The ececutor_task waits for a Paxos message to be accpeted. When the message is accepted, it is delivered to the client, unless it is a no-op. In either case, the executor_task steps to the next message and repeats the wait. If it times out waiting for a message, it will try to get a no-op accepted.

static int alive_task(task_arg);

Sends i-am-alive to other nodes if there has been no normal traffic for a while. It also pings nodes which seem to be inactive.

static int detector_task(task_arg);

The detector_task periodically scans the set of connections from other nodes and sees if there has been any activity. If there has been no activity for some time, it will assume that the node is dead, and send a view message to the client.

Reconfiguration:

The xcom reconfiguration process is essentially the one described in "Reconfiguring a State Machine" by Lamport et al. as the R-alpha algorithm. We execute the reconfiguration command immediately, but the config is only valid after a delay of alpha messages. The parameter alpha is the same as EVENT_HORIZON in this implementation. :/static.*too_far All tcp messages from beyond the event horizon will be ignored.

Macro Definition Documentation

◆ BAL_FMT

#define BAL_FMT   "ballot {cnt %d node %d}"

◆ BAL_MEM

#define BAL_MEM (   x)    (x).cnt, (x).node

◆ CO_BEGIN

#define CO_BEGIN
Value:
switch (state) { \
default: \
assert(state == 0); \
case 0:

◆ CO_END

#define CO_END   }

◆ CO_RETURN

#define CO_RETURN (   x)
Value:
{ \
state = __LINE__; \
return x; \
case __LINE__:; \
}

◆ CONNECT_FAIL

#define CONNECT_FAIL
Value:
ret_fd = -1; \
goto end
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188

◆ CREATE_REPLY

#define CREATE_REPLY (   x)
Value:
pax_msg *reply = NULL; \
CLONE_PAX_MSG(reply, x)
#define NULL
Definition: types.h:55

◆ DBGFIX2

#define DBGFIX2 (   x)

◆ FIFO_SIZE

#define FIFO_SIZE   1000

◆ FNVSTART

#define FNVSTART   0x811c9dc5

◆ GOTO

#define GOTO (   x)
Value:
{ \
DBGOUT(STRLIT("goto "); STRLIT(#x)); \
goto x; \
}
#define STRLIT(x)
Definition: gcs_debug.h:223

◆ HALT

#define HALT (   x)
Value:
while (1) { \
CO_RETURN(x); \
}

◆ IS_CONS_ALL

#define IS_CONS_ALL (   p)    ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)

◆ LOSER

#define LOSER (   x,
  site 
)    0

◆ NEXTSTATE

#define NEXTSTATE (   x)    ep->state = (x)

◆ PROP_ITER

#define PROP_ITER
Value:
int i; \
for (i = 0; i < PROPOSERS; i++)
Definition: xcom_profile.h:84

◆ reply_msg

#define reply_msg (   m)
Value:
{ \
if (is_local_node((m)->from, site)) { \
dispatch_op(site, m, NULL); \
} else { \
if (node_no_exists((m)->from, site) && \
(m)->group_id == get_group_id(site) && \
get_server(site, (m)->from)) { \
send_server_msg(site, (m)->from, m); \
} else { \
link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
} \
} \
}
static server * get_server(site_def const *s, node_no i)
Finds pointer to server given site and node number.
Definition: site_def.h:88
static bool_t node_no_exists(node_no n, site_def const *site)
Definition: site_def.h:71
uint32_t get_group_id(site_def const *site)
Definition: site_def.c:393
msg_link * msg_link_new(pax_msg *p, node_no to)
Definition: xcom_msg_queue.c:69
#define NULL
Definition: types.h:55
static bool_t is_local_node(node_no n, site_def const *site)
Definition: site_def.h:75

◆ SEND_REPLY

#define SEND_REPLY
Value:
reply_msg(reply); \
replace_pax_msg(&reply, NULL)
#define reply_msg(m)
Definition: xcom_base.c:3449
#define NULL
Definition: types.h:55

◆ SERIALIZE_REPLY

#define SERIALIZE_REPLY (   msg)
Value:
msg->to = ep->p->from; \
msg->from = ep->p->to; \
msg->delivered_msg = get_delivered_msg(); \
msg->max_synode = get_max_synode(); \
serialize_msg(msg, ep->rfd.x_proto, &ep->buflen, &ep->buf);
synode_no get_delivered_msg()
Definition: xcom_base.c:2850
synode_no get_max_synode()
Definition: xcom_base.c:429
char msg[1024]
Definition: test_sql_9_sessions.cc:281

◆ SYS_STRERROR_SIZE

#define SYS_STRERROR_SIZE   512

◆ WRITE_REPLY

#define WRITE_REPLY
Value:
if (ep->buflen) { \
int64_t sent; \
TASK_CALL(task_write(&ep->rfd, ep->buf, ep->buflen, &sent)); \
send_count[ep->p->op]++; \
send_bytes[ep->p->op] += ep->buflen; \
X_FREE(ep->buf); \
} \
ep->buf = NULL;
int task_write(connection_descriptor const *con, void *_buf, uint32_t n, int64_t *ret)
Definition: task.c:1006
#define NULL
Definition: types.h:55

◆ X

#define X (   b)    #b,

Typedef Documentation

◆ allow_event_horizon_result

◆ exec_state

typedef enum exec_state exec_state

◆ xcom_send_app_wait_result

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
TAG_START 

◆ allow_event_horizon_result

Check if we can reconfigure the event horizon.

We can reconfigure the event horizon if all group members support reconfiguring the event horizon, and the new event horizon in the domain [EVENT_HORIZON_MIN, EVENT_HORIZON_MAX].

We use the group's latest common XCom protocol as a proxy to decide if all members support reconfiguring the event horizon.

If the common protocol is at least version 5 (x_1_4) then all members run compatible server instances.

Otherwise there are older instances, and it follows that the event horizon must be the default and cannot be reconfigured.

Enumerator
EVENT_HORIZON_ALLOWED 
EVENT_HORIZON_INVALID 
EVENT_HORIZON_UNCHANGEABLE 

◆ exec_state

enum exec_state
Enumerator
FETCH 
EXECUTE 

◆ xcom_send_app_wait_result

Enumerator
SEND_REQUEST_FAILED 
RECEIVE_REQUEST_FAILED 
REQUEST_BOTCHED 
RETRIES_EXCEEDED 
REQUEST_OK_RECEIVED 
REQUEST_FAIL_RECEIVED 

Function Documentation

◆ accepted()

static int accepted ( pax_machine p)
inlinestatic

◆ accepted_noop()

static int accepted_noop ( pax_machine p)
inlinestatic

◆ acceptor_learner_task()

int acceptor_learner_task ( task_arg  arg)

◆ activate_sweeper()

static void activate_sweeper ( )
static

◆ add_event_horizon()

static synode_no add_event_horizon ( synode_no  s)
static

Add the event horizon to the given base synod s.

We are assuming right now that this function is used solely in the context of "we have received a reconfiguration command at synod s, when should it be scheduled to take effect?" The result of this function is when it should take effect.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case result is:

s + event_horizon(active_config) + 1

If an event horizon reconfiguration R is pending, it means that the command C proposed for synod s is concurrent with R, i.e., s falls in the interval ]proposed(R), start(R)[.

In this situation we apply the command C proposed for synod s after taking into account R's event horizon.

This means that the result is:

start(R) + event_horizon(R) + 1

◆ add_node_unsafe_against_event_horizon()

static bool_t add_node_unsafe_against_event_horizon ( app_data_ptr  a)
static

◆ add_node_unsafe_against_ipv4_old_nodes()

static bool_t add_node_unsafe_against_ipv4_old_nodes ( app_data_ptr  a)
static

◆ add_to_cache()

void add_to_cache ( app_data_ptr  a,
synode_no  synode 
)

◆ addone()

static int addone ( int  i)
inlinestatic

◆ alive_task()

int alive_task ( task_arg  arg)

◆ alloc_node_set()

node_set* alloc_node_set ( node_set *  set,
u_int  n 
)

◆ allow_add_node()

static u_int allow_add_node ( app_data_ptr  a)
static

◆ allow_event_horizon()

static allow_event_horizon_result allow_event_horizon ( xcom_event_horizon  event_horizon)
static

◆ allow_remove_node()

static u_int allow_remove_node ( app_data_ptr  a)
static

◆ are_there_dead_nodes_in_new_config()

static bool_t are_there_dead_nodes_in_new_config ( app_data_ptr  a)
static

◆ are_we_allowed_to_upgrade_to_v6()

int are_we_allowed_to_upgrade_to_v6 ( app_data_ptr  a)

◆ assign_lsn()

static uint64_t assign_lsn ( )
static

Assign the next log sequence number (lsn) for a message.

Initial propose sets lsn to msgno of current_message as safe starting point, otherwise lsn shall be ever increasing. lsn ensures sender order known on receiver side, as messages may arrive "out of order" due to retransmission.

◆ backwards_compatible()

static bool_t backwards_compatible ( xcom_event_horizon  event_horizon)
static

◆ better_snapshot()

static int better_snapshot ( task_arg  fsmargs)
static

◆ bit_set_or()

void bit_set_or ( bit_set *  x,
bit_set const *  y 
)

◆ brand_app_data()

static void brand_app_data ( pax_msg *  p)
static

◆ brand_client_msg()

static void brand_client_msg ( pax_msg *  msg,
synode_no  msgno 
)
static

◆ bury_site()

static void bury_site ( uint32_t  id)
static

◆ cache_manager_task()

int cache_manager_task ( task_arg  arg)

◆ can_execute_cfgchange()

static client_reply_code can_execute_cfgchange ( pax_msg *  p)
static

Validates if a configuration command can be executed.

Checks whether the configuration command is aimed at the correct group. Checks whether the configuration command pertains to a node reincarnation.

Parameters
pa pointer to the pax_msg of the configuration command
Return values
REQUEST_OKif the reconfiguration command can be executed
REQUEST_RETRYif XCom is still booting
REQUEST_FAILif the configuration command cannot be executed

◆ check_learn()

static pax_msg* check_learn ( site_def const *  site,
pax_machine p 
)
static

◆ check_propose()

bool_t check_propose ( site_def const *  site,
pax_machine p 
)

◆ checked_create_socket()

static result checked_create_socket ( int  domain,
int  type,
int  protocol 
)
static

◆ compute_delay()

static synode_no compute_delay ( synode_no  start,
xcom_event_horizon  event_horizon 
)
static

Returns the message number where it is safe for nodes in previous configuration to exit.

Parameters
startstart synod of the next configuration
event_horizonevent horizon of the next configuration

◆ connect_xcom()

static connection_descriptor* connect_xcom ( const char *  server,
xcom_port  port 
)
static

◆ create_ack_accept_msg()

static pax_msg* create_ack_accept_msg ( pax_msg *  m,
synode_no  synode 
)
static

◆ create_ack_prepare_msg()

static pax_msg* create_ack_prepare_msg ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)
static

◆ create_config()

app_data_ptr create_config ( node_list *  nl,
cargo_type  type 
)

◆ create_config_with_group()

app_data_ptr create_config_with_group ( node_list *  nl,
cargo_type  type,
uint32_t  group_id 
)

◆ create_learn_msg_for_ignorant_node()

static pax_msg* create_learn_msg_for_ignorant_node ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)
static

◆ create_noop()

pax_msg* create_noop ( pax_msg *  p)

Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe no-op message to send
Return values
createdpaxos message of type no_op

◆ create_proposers()

static void create_proposers ( )
static

◆ create_read()

static pax_msg* create_read ( site_def const *  site,
pax_msg *  p 
)
static

◆ create_site_def_with_start()

static site_def* create_site_def_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ create_tiny_learn_msg()

static pax_msg* create_tiny_learn_msg ( pax_machine pm,
pax_msg *  p 
)
static

◆ debug_loser()

static void debug_loser ( synode_no  x)
static

◆ decr_synode()

synode_no decr_synode ( synode_no  synode)

◆ delayed_terminate_and_exit()

static void delayed_terminate_and_exit ( double  t)
static

◆ detector_task()

int detector_task ( task_arg  arg)

◆ dispatch_get_event_horizon()

void dispatch_get_event_horizon ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ dispatch_get_synode_app_data()

void dispatch_get_synode_app_data ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ dispatch_op()

pax_msg* dispatch_op ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ do_learn()

static void do_learn ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ does_node_have_v4_address()

struct addrinfo* does_node_have_v4_address ( struct addrinfo *  retrieved)

Retreives a node IPv4 address, if it exists.

If a node is v4 reachable, means one of two:

  • The raw address is V4
  • a name was resolved to a V4/V6 address

If the later is the case, we are going to prefer the first v4 address in the list, since it is the common language between old and new version. If you want exclusive V6, please configure your DNS server to serve V6 names

Parameters
retrieveda previously retrieved struct addrinfo
Returns
struct addrinfo* An addrinfo of the first IPv4 address. Else it will return the entry parameter.

◆ empty_prop_input_queue()

static void empty_prop_input_queue ( )
static

◆ execute_msg()

void execute_msg ( site_def const *  site,
pax_machine pma,
pax_msg *  p 
)

◆ executor_task()

static int executor_task ( task_arg  arg)
static

◆ fifo_empty()

static int fifo_empty ( )
inlinestatic

◆ fifo_extract()

static synode_no fifo_extract ( )
inlinestatic

◆ fifo_front()

static synode_no fifo_front ( )
inlinestatic

◆ fifo_full()

static int fifo_full ( )
inlinestatic

◆ fifo_insert()

static void fifo_insert ( synode_no  s)
inlinestatic

◆ find_value()

static void find_value ( site_def const *  site,
unsigned int *  wait,
int  n 
)
static

◆ finished()

static int finished ( pax_machine p)
inlinestatic

◆ first_event_horizon_reconfig()

static site_def const* first_event_horizon_reconfig ( )
static

Get the first pending configuration that reconfigures the event horizon.

Retrieve the first pending site_def, i.e. with the smallest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ first_free_synode()

static synode_no first_free_synode ( synode_no  msgno)
static

◆ fnv_hash()

static uint32_t fnv_hash ( unsigned char *  buf,
size_t  length,
uint32_t  sum 
)
static

◆ force_interval()

static void force_interval ( synode_no  start,
synode_no  end,
int  enforcer 
)
static

◆ force_pax_machine()

static void force_pax_machine ( pax_machine p,
int  enforcer 
)
static

◆ free_forced_config_site_def()

static void free_forced_config_site_def ( )
static

◆ get_add_node_address()

static char* get_add_node_address ( app_data_ptr  a,
unsigned int *  member 
)
static

Retrieves the address that was used in the add_node request.

Parameters
aapp data containing the node to add
memberaddress we used to present ourselves to other nodes
Returns
char* a pointer to the address being added.

◆ get_current_message()

synode_no get_current_message ( )

◆ get_delivered_msg()

synode_no get_delivered_msg ( )

◆ get_executor_site()

site_def const* get_executor_site ( )

◆ get_host_name()

void get_host_name ( char *  a,
char *  name 
)

◆ get_max_synode()

synode_no get_max_synode ( )

◆ get_prop_input_queue()

channel* get_prop_input_queue ( )

◆ get_proposer_site()

site_def const* get_proposer_site ( )

◆ get_sweep_start()

static synode_no get_sweep_start ( )
static

◆ get_xcom_message()

int get_xcom_message ( pax_machine **  p,
synode_no  msgno,
int  n 
)

◆ getstart()

static synode_no getstart ( app_data_ptr  a)
static

◆ got_all_snapshots()

static int got_all_snapshots ( )
static

◆ handle_accept()

static void handle_accept ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  m 
)
static

◆ handle_ack_accept()

static void handle_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_ack_prepare()

static void handle_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_add_node()

site_def* handle_add_node ( app_data_ptr  a)

Reconfigure the group membership: add new member(s).

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations modified the event horizon and that the new member(s) do not support event horizon reconfigurations.

We account for these situations by validating if adding the new members is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_alive()

static void handle_alive ( site_def const *  site,
linkage reply_queue,
pax_msg *  pm 
)
inlinestatic

◆ handle_boot()

static void handle_boot ( site_def const *  site,
pax_msg *  p 
)
inlinestatic

◆ handle_client_msg()

static void handle_client_msg ( pax_msg *  p)
static

◆ handle_config()

bool_t handle_config ( app_data_ptr  a)

◆ handle_event_horizon()

bool_t handle_event_horizon ( app_data_ptr  a)

Reconfigure the event horizon.

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations added a new member which does not support reconfiguring the event horizon.

We account for these situations by validating if the event horizon reconfiguration is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_learn()

void handle_learn ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming Learn message.

Executed by Learners.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message

◆ handle_need_boot()

void handle_need_boot ( server srv,
site_def const *  s,
node_no  node 
)

◆ handle_need_snapshot()

static void handle_need_snapshot ( site_def const *  s,
node_no  node 
)
static

◆ handle_prepare()

static void handle_prepare ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  pm 
)
static

◆ handle_read()

static void handle_read ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  pm 
)
static

◆ handle_remove_node()

site_def* handle_remove_node ( app_data_ptr  a)

◆ handle_simple_accept()

pax_msg* handle_simple_accept ( pax_machine p,
pax_msg *  m,
synode_no  synode 
)

Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
mIncoming Accept message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 2 (b) message of the Paxos protocol) if the Acceptor accepts the Accept
NULLotherwise

◆ handle_simple_ack_accept()

pax_msg* handle_simple_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
pax_msg*the Learn message to send to Leaners if a majority of Acceptors replied to the Proposer's Accept
NULLotherwise

◆ handle_simple_ack_prepare()

bool_t handle_simple_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
TRUEif a majority of Acceptors replied to the Proposer's Prepare
FALSEotherwise

◆ handle_simple_prepare()

pax_msg* handle_simple_prepare ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)

Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
pmIncoming Prepare message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 1 (b) message of the Paxos protocol) if the Acceptor accepts the Prepare
NULLotherwise

◆ handle_skip()

static void handle_skip ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_tiny_learn()

void handle_tiny_learn ( site_def const *  site,
pax_machine pm,
pax_msg *  p 
)

Process the incoming tiny, i.e.

without the learned value, Learn message. Executed by Learners.

Parameters
siteXCom configuration
pmPaxos instance
pIncoming message

◆ handle_x_snapshot()

static void handle_x_snapshot ( task_arg  fsmargs)
static

◆ iamthegreatest()

int iamthegreatest ( site_def const *  s)

◆ ignoresig()

static int ignoresig ( int  signum)
static

◆ incr_msgno()

static synode_no incr_msgno ( synode_no  msgno)
static

◆ incr_synode()

synode_no incr_synode ( synode_no  synode)

◆ inform_removed()

static void inform_removed ( int  index,
int  all 
)
static

◆ init_app_msg()

app_data_ptr init_app_msg ( app_data *  a,
char *  payload,
u_int  payload_size 
)

◆ init_base_vars()

void init_base_vars ( )

◆ init_config_with_group()

app_data_ptr init_config_with_group ( app_data *  a,
node_list *  nl,
cargo_type  type,
uint32_t  group_id 
)

◆ init_convert_into_local_server_msg()

app_data_ptr init_convert_into_local_server_msg ( app_data *  a)

◆ init_get_event_horizon_msg()

app_data_ptr init_get_event_horizon_msg ( app_data *  a,
uint32_t  group_id 
)

◆ init_get_synode_app_data_msg()

static app_data_ptr init_get_synode_app_data_msg ( app_data *  a,
uint32_t  group_id,
synode_no_array *const  synodes 
)
static

◆ init_learn_msg()

static void init_learn_msg ( pax_msg *  p)
static

◆ init_need_boot_op()

void init_need_boot_op ( pax_msg *  p,
node_address *  identity 
)

Initializes the message p as a need_boot_op message.

Parameters
pThe message to send
identityThe unique incarnation identifier of this XCom instance

◆ init_node_set()

node_set* init_node_set ( node_set *  set,
u_int  n 
)

◆ init_prepare_msg()

void init_prepare_msg ( pax_msg *  p)

Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_propose_msg()

void init_propose_msg ( pax_msg *  p)

Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_proposers()

static void init_proposers ( )
static

◆ init_set_cache_size_msg()

app_data_ptr init_set_cache_size_msg ( app_data *  a,
uint64_t  cache_limit 
)

◆ init_set_event_horizon_msg()

app_data_ptr init_set_event_horizon_msg ( app_data *  a,
uint32_t  group_id,
xcom_event_horizon  event_horizon 
)

◆ init_tasks()

static void init_tasks ( )
static

◆ init_terminate_command()

app_data_ptr init_terminate_command ( app_data *  a)

◆ init_xcom_base()

void init_xcom_base ( )

Reset lsn

◆ initialize_lsn()

void initialize_lsn ( uint64_t  n)

◆ install_ng_with_start()

static site_def* install_ng_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ install_node_group()

site_def* install_node_group ( app_data_ptr  a)

◆ is_busy()

static int is_busy ( synode_no  s)
static

◆ is_cargo_type()

static int is_cargo_type ( app_data_ptr  a,
cargo_type  t 
)
inlinestatic

Checks if a given app_data is from a given cargo_type.

Parameters
athe app_data
tthe cargo type
Returns
int true (1) if app_data a is from cargo_type t

◆ is_config()

static int is_config ( cargo_type  x)
inlinestatic

◆ is_dead_site()

static bool_t is_dead_site ( uint32_t  id)
static

◆ is_empty_site()

static int is_empty_site ( site_def const *  s)
inlinestatic

◆ is_forcing_node()

static int is_forcing_node ( pax_machine const *  p)
static

◆ is_latest_config()

static bool_t is_latest_config ( site_def const *const  config)
static

◆ is_member()

static int is_member ( site_def const *  site)
inlinestatic

◆ is_node_v4_reachable()

int is_node_v4_reachable ( char *  node_address)

◆ is_node_v4_reachable_with_info()

int is_node_v4_reachable_with_info ( struct addrinfo *  retrieved_addr_info)

◆ is_real_recover()

int is_real_recover ( app_data_ptr  a)

◆ is_view()

static int is_view ( cargo_type  x)
inlinestatic

◆ latest_event_horizon_reconfig()

static site_def const* latest_event_horizon_reconfig ( )
static

Get the latest pending configuration that reconfigures the event horizon.

Retrieve the last pending site_def, i.e. with the greatest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ leader()

static node_no leader ( site_def const *  s)
static

◆ local_server()

int local_server ( task_arg  arg)

◆ local_server_is_setup()

static bool local_server_is_setup ( )
static

◆ log_cfgchange_wrong_group()

static void log_cfgchange_wrong_group ( app_data_ptr  a,
const char *const  message_fmt 
)
static

Logs the fact that an add/remove node request is aimed at another group.

Parameters
aa pointer to the app_data of the configuration command
message_fmta formatted message to log, containing a single s that will be replaced by the node's address

◆ log_event_horizon_reconfiguration_failure()

static void log_event_horizon_reconfiguration_failure ( allow_event_horizon_result  error_code,
xcom_event_horizon  attempted_event_horizon 
)
static

◆ log_get_synode_app_data_failure()

static void log_get_synode_app_data_failure ( xcom_get_synode_app_data_result  error_code)
static

◆ majority()

static int majority ( bit_set const *  nodeset,
site_def const *  s,
int  all,
int  delay,
int  force 
)
inlinestatic

◆ match_my_msg()

bool_t match_my_msg ( pax_msg *  learned,
pax_msg *  mine 
)

◆ max_check()

static node_no max_check ( site_def const *  site)
inlinestatic

◆ must_force_recover()

bool_t must_force_recover ( )

◆ my_unique_id()

static synode_no my_unique_id ( synode_no  synode)
static

◆ new_id()

uint32_t new_id ( )

Create a new (hopefully unique) ID.

The basic idea is to create a hash from the host ID and a timestamp.

◆ noop_match()

static int noop_match ( pax_machine p,
pax_msg *  pm 
)
inlinestatic

◆ note_snapshot()

static void note_snapshot ( task_arg  fsmargs)
static

◆ ok_to_propose()

static int ok_to_propose ( pax_machine p)
static

◆ pexitall()

static void pexitall ( int  i)
static

◆ pm_finished()

int pm_finished ( pax_machine p)
Return values
1if the value for the Paxos instance *p has been learned
0otherwise

◆ prep_majority()

static int prep_majority ( site_def const *  site,
pax_machine p 
)
static

◆ prepare()

static void prepare ( pax_msg *  p,
pax_op  op 
)
static

◆ prepare_msg()

static int prepare_msg ( pax_msg *  p)
static

◆ prepare_push_2p()

void prepare_push_2p ( site_def const *  site,
pax_machine p 
)

Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_).

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance

◆ prepare_push_3p()

void prepare_push_3p ( site_def const *  site,
pax_machine p,
pax_msg *  msg,
synode_no  msgno,
pax_msg_type  msg_type 
)

Initializes the message msg to go through a 3-phase, regular Paxos.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
msgMessage to send
msgnoSynode where msg will be proposed
msg_typeThe type of the message, e.g. normal or no_op

◆ prop_majority()

static int prop_majority ( site_def const *  site,
pax_machine p 
)
static

◆ propose_missing_values()

static void propose_missing_values ( int  n)
static

◆ propose_msg()

static int propose_msg ( pax_msg *  p)
static

◆ propose_noop()

static void propose_noop ( synode_no  find,
pax_machine p 
)
static

◆ proposer_task()

static int proposer_task ( task_arg  arg)
static

◆ push_log()

static void push_log ( synode_no  push,
node_no  node 
)
static

◆ push_msg_2p()

static void push_msg_2p ( site_def const *  site,
pax_machine p 
)
static

◆ push_msg_3p()

static void push_msg_3p ( site_def const *  site,
pax_machine p,
pax_msg *  msg,
synode_no  msgno,
pax_msg_type  msg_type 
)
static

◆ read_missing_values()

static void read_missing_values ( int  n)
static

◆ recently_active()

static int recently_active ( pax_machine p)
static

◆ reconfigurable_event_horizon()

static bool_t reconfigurable_event_horizon ( xcom_proto  protocol_version)
static

◆ reply_handler_task()

int reply_handler_task ( task_arg  arg)

◆ request_values()

void request_values ( synode_no  find,
synode_no  end 
)

◆ reset_snapshot_mask()

static void reset_snapshot_mask ( )
static

◆ safe_app_data_copy()

bool_t safe_app_data_copy ( pax_msg **  target,
app_data_ptr  source 
)

Copies app data source into target and checks if the copy succeeded.

Sets *target to NULL if the copy fails.

Parameters
[in,out]targetThe pax_msg to which the app_data will be copied.
sourceThe app data that will be copied.
Return values
TRUEif the copy was successful.
FALSEif the copy failed, in which case *target is set to NULL; a failed copy means that there was an error allocating memory for the copy.

◆ send_add_node()

void send_add_node ( node_list *  nl)

◆ send_app_data()

void send_app_data ( app_data_ptr  a)

◆ send_boot()

void send_boot ( node_list *  nl)

◆ send_client_add_node()

void send_client_add_node ( char *  srv,
xcom_port  port,
node_list *  nl 
)

◆ send_client_app_data()

void send_client_app_data ( char *  srv,
xcom_port  port,
app_data_ptr  a 
)

◆ send_client_boot()

void send_client_boot ( char *  srv,
xcom_port  port,
node_list *  nl 
)

◆ send_client_config()

void send_client_config ( char *  srv,
xcom_port  port,
node_list *  nl 
)

◆ send_client_remove_node()

void send_client_remove_node ( char *  srv,
xcom_port  port,
node_list *  nl 
)

◆ send_config()

void send_config ( node_list *  nl)

◆ send_learn_msg()

static int send_learn_msg ( site_def const *  site,
pax_msg *  p 
)
static

◆ send_need_boot()

static void send_need_boot ( )
static

◆ send_propose_msg()

static int send_propose_msg ( pax_msg *  p)
static

◆ send_read()

static void send_read ( synode_no  find)
static

◆ send_remove_node()

void send_remove_node ( node_list *  nl)

◆ send_snapshot()

static void send_snapshot ( site_def const *  s,
gcs_snapshot *  gcs_snap,
node_no  node 
)
static

◆ send_tiny_learn_msg()

static int send_tiny_learn_msg ( site_def const *  site,
pax_msg *  p 
)
static

◆ send_value()

static void send_value ( site_def const *  site,
node_no  to,
synode_no  synode 
)
static

◆ server_handle_need_snapshot()

static void server_handle_need_snapshot ( server srv,
site_def const *  s,
node_no  node 
)
static

◆ server_push_log()

static void server_push_log ( server srv,
synode_no  push,
node_no  node 
)
static

◆ server_send_snapshot()

static void server_send_snapshot ( server srv,
site_def const *  s,
gcs_snapshot *  gcs_snap,
node_no  node 
)
static

◆ set_app_snap_getter()

void set_app_snap_getter ( app_snap_getter  x)

◆ set_app_snap_handler()

void set_app_snap_handler ( app_snap_handler  x)

◆ set_current_message()

synode_no set_current_message ( synode_no  msgno)

◆ set_executed_msg()

synode_no set_executed_msg ( synode_no  msgno)

◆ set_force_recover()

void set_force_recover ( bool_t const  x)

◆ set_group()

void set_group ( uint32_t  id)

Set node group.

◆ set_last_received_config()

void set_last_received_config ( synode_no  received_config_change)

◆ set_learn_type()

static void set_learn_type ( pax_msg *  p)
static

◆ set_max_synode()

void set_max_synode ( synode_no  synode)

◆ set_max_synode_from_unified_boot()

void set_max_synode_from_unified_boot ( synode_no  unified_boot_synode)

◆ set_proposer_startpoint()

static void set_proposer_startpoint ( )
static

◆ set_unique_id()

static void set_unique_id ( pax_msg *  msg,
synode_no  synode 
)
static

◆ set_xcom_comms_cb()

void set_xcom_comms_cb ( xcom_state_change_cb  x)

◆ set_xcom_exit_cb()

void set_xcom_exit_cb ( xcom_state_change_cb  x)

◆ set_xcom_expel_cb()

void set_xcom_expel_cb ( xcom_state_change_cb  x)

◆ set_xcom_input_try_pop_cb()

void set_xcom_input_try_pop_cb ( xcom_input_try_pop_cb  pop)

◆ set_xcom_run_cb()

void set_xcom_run_cb ( xcom_state_change_cb  x)

◆ set_xcom_terminate_cb()

void set_xcom_terminate_cb ( xcom_state_change_cb  x)

◆ should_handle_boot()

bool should_handle_boot ( site_def const *  site,
pax_msg *  p 
)
Returns
true if we should process the incoming need_boot_op message passed in parameter p.

◆ should_poll_cache()

static bool_t should_poll_cache ( pax_op  op)
static

◆ site_install_action()

void site_install_action ( site_def site,
cargo_type  operation 
)

◆ skip_msg()

static int skip_msg ( pax_msg *  p)
static

◆ skip_value()

static void skip_value ( pax_msg *  p)
static

◆ socket_read()

static result socket_read ( connection_descriptor rfd,
void *  buf,
int  n 
)
static

◆ socket_read_bytes()

static int64_t socket_read_bytes ( connection_descriptor rfd,
char *  p,
uint32_t  n 
)
static

◆ socket_read_msg()

static pax_msg* socket_read_msg ( connection_descriptor rfd,
pax_msg *  p 
)
static

◆ socket_write()

static int64_t socket_write ( connection_descriptor wfd,
void *  _buf,
uint32_t  n 
)
static

◆ start_force_config()

static void start_force_config ( site_def s,
int  enforcer 
)
static

◆ start_run_tasks()

void start_run_tasks ( )

◆ start_x_timer()

static void start_x_timer ( double  t)
static

◆ started()

static int started ( pax_machine p)
inlinestatic

◆ stop_x_timer()

static void stop_x_timer ( )
static

◆ sweeper_task()

static int sweeper_task ( task_arg  arg)
static

◆ taskmain()

int taskmain ( xcom_port  listen_port)

◆ teach_ignorant_node()

static void teach_ignorant_node ( site_def const *  site,
pax_machine p,
pax_msg *  pm,
synode_no  synode,
linkage reply_queue 
)
static

◆ terminate_and_exit()

static void terminate_and_exit ( )
static

◆ terminate_proposers()

static void terminate_proposers ( )
static

◆ terminator_task()

int terminator_task ( task_arg  arg)

◆ timed_connect()

static int timed_connect ( int  fd,
struct sockaddr sock_addr,
socklen_t  sock_size 
)
static

◆ too_far()

static int too_far ( synode_no  s)
inlinestatic

◆ too_far_threshold()

static uint64_t too_far_threshold ( xcom_event_horizon  active_event_horizon)
static

Checks if the given synod s is outside the event horizon.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case threshold is:

last_executed_synod + event_horizon(active_config)

If an event horizon reconfiguration R is pending, it is possible that it reduces the event horizon. In that case, it is possible that the threshold above falls outside the new event horizon.

For example, consider last_executed_synod = 42 and event_horizon(active_config) = 10. At this point this member participates in synods up to 52. Now consider an event horizon reconfiguration that takes effect at synod 45, which modifies the event horizon to 2. This means that when last_executed_synod = 45, event_horizon(active_config) = 2. At this point this member should only be able to participate in synods up to 47. The member may have previously started processing messages directed to synods between 47 and 52, but will now ignore messages directed to those same synods.

We do not want to start processing messages that will eventually fall out of the event horizon. More importantly, the threshold above may not be safe due to the exit logic of executor_task.

When a node removes itself from the group on configuration C starting at synod start(C), the exit logic relies on knowing when a majority has executed synod start(C) - 1, i.e. the last message of the last configuration to contain the leaving node.

With a constant event horizon, we know that when synod start(C) + event_horizon is learnt, it is because a majority already executed or is ready to execute (and thus learned) synod start(C). This implies that a majority already executed start(C) - 1.

With a dynamic event horizon, we cannot be sure that when synod start(C) + event_horizon(C) is learnt, a majority already executed or is ready to execute synod start(C). This is because it is possible for a new, smaller, event horizon to take effect between start(C) and start(C) + event_horizon(C). If that happens, the threshold above allows nodes to participate in synods which are possibly beyond start(C) + event_horizon(C), which can lead to the value of synod start(C) + event_horizon(C) being learnt without a majority already having executed or being ready to execute synod start(C).

In order to maintain the assumption made by the executor_task's exit logic, when an event horizon reconfiguration R is pending we set the threshold to the minimum between:

last_executed_synod + event_horizon(active_config)

and:

start(R) - 1 + event_horizon(R)

◆ too_far_threshold_new_event_horizon_pending()

static uint64_t too_far_threshold_new_event_horizon_pending ( site_def const *  new_config)
static

◆ unsafe_against_event_horizon()

static bool_t unsafe_against_event_horizon ( node_address const *  node)
static

Check if a node is compatible with the group's event horizon.

A node is compatible with the group's configuration if:

a) The node supports event horizon reconfigurations, or b) The group's event horizon is, or is scheduled to be, the default event horizon.

◆ unsafe_event_horizon_reconfiguration()

static bool_t unsafe_event_horizon_reconfiguration ( app_data_ptr  a)
static

◆ update_best_snapshot()

static void update_best_snapshot ( task_arg  fsmargs)
static

◆ update_max_synode()

static void update_max_synode ( pax_msg *  p)
static

◆ wait_for_cache()

static int wait_for_cache ( pax_machine **  pm,
synode_no  synode,
double  timeout 
)
static

◆ wakeup_delay()

static double wakeup_delay ( double  old)
static

◆ xcom_add_node()

void xcom_add_node ( char *  addr,
xcom_port  port,
node_list *  nl 
)

◆ xcom_client_add_node()

int xcom_client_add_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_boot()

int xcom_client_boot ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_convert_into_local_server()

int xcom_client_convert_into_local_server ( connection_descriptor fd)

◆ xcom_client_disable_arbitrator()

int xcom_client_disable_arbitrator ( connection_descriptor fd)

◆ xcom_client_enable_arbitrator()

int xcom_client_enable_arbitrator ( connection_descriptor fd)

◆ xcom_client_force_config()

int xcom_client_force_config ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_get_event_horizon()

int xcom_client_get_event_horizon ( connection_descriptor fd,
uint32_t  group_id,
xcom_event_horizon *  event_horizon 
)

◆ xcom_client_get_synode_app_data()

int xcom_client_get_synode_app_data ( connection_descriptor *const  fd,
uint32_t  group_id,
synode_no_array *const  synodes,
synode_app_data_array *const  reply 
)

◆ xcom_client_remove_node()

int xcom_client_remove_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_send_data()

int64_t xcom_client_send_data ( uint32_t  size,
char *  data,
connection_descriptor fd 
)

◆ xcom_client_send_die()

int64_t xcom_client_send_die ( connection_descriptor fd)

◆ xcom_client_set_cache_limit()

int xcom_client_set_cache_limit ( connection_descriptor fd,
uint64_t  cache_limit 
)

◆ xcom_client_set_event_horizon()

int xcom_client_set_event_horizon ( connection_descriptor fd,
uint32_t  group_id,
xcom_event_horizon  event_horizon 
)

◆ xcom_client_terminate_and_exit()

int xcom_client_terminate_and_exit ( connection_descriptor fd)

◆ xcom_close_client_connection()

int xcom_close_client_connection ( connection_descriptor connection)

◆ xcom_close_socket()

static result xcom_close_socket ( int *  sock)
inlinestatic

◆ xcom_fsm()

xcom_state xcom_fsm ( xcom_actions  action,
task_arg  fsmargs 
)

◆ xcom_fsm_add_node()

void xcom_fsm_add_node ( char *  addr,
node_list *  nl 
)

◆ xcom_get_event_horizon()

static client_reply_code xcom_get_event_horizon ( xcom_event_horizon *  event_horizon)
static

Retrieves the latest event horizon.

There is no specific reason for this method to return the latest event horizon instead of the current one. Both would be acceptable results of this function, but we had to make a decision of one over the other.

Parameters
[out]event_horizonthe latest event horizon
Return values
REQUEST_FAILXCom is not initialized yet
REQUEST_OKfunction was successful and event_horizon contains the latest event horizon

◆ xcom_get_maximum_event_horizon()

xcom_event_horizon xcom_get_maximum_event_horizon ( )

◆ xcom_get_minimum_event_horizon()

xcom_event_horizon xcom_get_minimum_event_horizon ( )

◆ xcom_input_free_signal_connection()

void xcom_input_free_signal_connection ( void  )

◆ xcom_input_new_signal_connection()

bool xcom_input_new_signal_connection ( char const *  address,
xcom_port  port 
)

◆ xcom_input_signal()

bool xcom_input_signal ( void  )

◆ xcom_open_client_connection()

connection_descriptor* xcom_open_client_connection ( const char *  server,
xcom_port  port 
)

◆ xcom_recv_proto()

static int xcom_recv_proto ( connection_descriptor rfd,
xcom_proto x_proto,
x_msg_type x_type,
unsigned int *  tag 
)
static

◆ xcom_send()

void xcom_send ( app_data_ptr  a,
pax_msg *  msg 
)

◆ xcom_send_app_wait()

int xcom_send_app_wait ( connection_descriptor fd,
app_data *  a,
int  force 
)

◆ xcom_send_app_wait_and_get()

xcom_send_app_wait_result xcom_send_app_wait_and_get ( connection_descriptor fd,
app_data *  a,
int  force,
pax_msg *  p 
)

Send a message and wait for response.

The caller is reponsible for freeing p after calling this function, i.e. my_xdr_free((xdrproc_t)xdr_pax_msg, (char *)p)

◆ xcom_send_cfg_wait()

int xcom_send_cfg_wait ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id,
cargo_type  ct,
int  force 
)

◆ xcom_send_client_app_data()

int64_t xcom_send_client_app_data ( connection_descriptor fd,
app_data_ptr  a,
int  force 
)

◆ xcom_send_data()

void xcom_send_data ( uint32_t  size,
char *  data 
)

◆ xcom_send_proto()

static int xcom_send_proto ( connection_descriptor con,
xcom_proto  x_proto,
x_msg_type  x_type,
unsigned int  tag 
)
static

◆ xcom_shut_close_socket()

static result xcom_shut_close_socket ( int *  sock)
inlinestatic

◆ xcom_sleep()

static void xcom_sleep ( unsigned int  seconds)
inlinestatic

◆ xcom_taskmain()

int xcom_taskmain ( xcom_port  listen_port)

◆ xcom_taskmain2()

int xcom_taskmain2 ( xcom_port  listen_port)

◆ xcom_thread_deinit()

void xcom_thread_deinit ( )

◆ xcom_thread_init()

void xcom_thread_init ( )

◆ xcom_thread_main()

void* xcom_thread_main ( void *  cp)

◆ xcom_timer()

static int xcom_timer ( task_arg  arg)
static

◆ xcom_unique_long()

long xcom_unique_long ( void  )

◆ yes()

static int yes ( xcom_port  port)
static

Variable Documentation

◆ alive_t

task_env* alive_t = NULL
static

◆ ARBITRATOR_HACK

int ARBITRATOR_HACK = 0

◆ boot

task_env* boot = NULL

◆ booting

int booting

◆ cache_task

task_env* cache_task = NULL
static

◆ clicnt

int clicnt = 0
static

◆ client_boot_done

int client_boot_done

◆ current_message

synode_no current_message
static

◆ dead_sites

struct { ... } dead_sites

◆ delay_fifo

struct { ... } delay_fifo

◆ delivered_msg

synode_no delivered_msg
static

◆ detector

task_env* detector = NULL

◆ exec_wait

linkage exec_wait
static
Initial value:
= {
static linkage exec_wait
Definition: xcom_base.c:421

◆ executed_msg

synode_no executed_msg

◆ executor

task_env* executor = NULL
static

◆ executor_site

site_def const* executor_site = 0
static

◆ first_event_horizon_aware_protocol

xcom_proto const first_event_horizon_aware_protocol = x_1_4
static

◆ force_recover

bool_t force_recover = FALSE
static

◆ forced_config

site_def* forced_config = 0
static

◆ front

int front

◆ get_app_snap

app_snap_getter get_app_snap
static

◆ handle_app_snap

app_snap_handler handle_app_snap
static

◆ id

unsigned long id[MAX_DEAD]

◆ input_signal_connection

connection_descriptor* input_signal_connection = NULL
static

◆ killer

task_env* killer = NULL

◆ last_config_modification_id

synode_no last_config_modification_id
static

◆ lsn

uint64_t lsn = 0
static

◆ max_synode

synode_no max_synode

◆ my_id

uint32_t my_id = 0
static

◆ n

int n

◆ net_boot

task_env* net_boot = NULL

◆ net_recover

task_env* net_recover = NULL

◆ netboot_ok

int netboot_ok

◆ no_duplicate_payload

int const no_duplicate_payload = 1
static

◆ oom_abort

unsigned short oom_abort = 0
static

◆ prop_input_queue

channel prop_input_queue
static

◆ proposer

task_env* proposer[PROPOSERS]
static

◆ proposer_site

site_def* proposer_site = 0
static

◆ q

synode_no q[FIFO_SIZE]

◆ rear

int rear

◆ retry

task_env* retry = NULL
static

◆ sent_alive

double sent_alive = 0.0
static

◆ snapshots

int snapshots[NSERVERS]
static

◆ start_type

start_t start_type

◆ sweeper

task_env* sweeper = NULL
static

◆ threephase

int const threephase = 0

◆ use_buffered_read

int use_buffered_read = 1
static

◆ wait_forced_config

int wait_forced_config = 0
static

◆ x_timer

task_env* x_timer = NULL
static

◆ xcom_actions_name

const char* xcom_actions_name[] = {x_actions}

◆ xcom_comms_cb

xcom_state_change_cb xcom_comms_cb = 0
static

◆ xcom_exit_cb

xcom_state_change_cb xcom_exit_cb = 0
static

◆ xcom_expel_cb

xcom_state_change_cb xcom_expel_cb = 0
static

◆ xcom_run_cb

xcom_state_change_cb xcom_run_cb = 0
static

◆ xcom_shutdown

int xcom_shutdown = 0

◆ xcom_state_name

const char* xcom_state_name[] = {x_state_list}

◆ xcom_terminate_cb

xcom_state_change_cb xcom_terminate_cb = 0
static

◆ xcom_thread_input

void* xcom_thread_input = 0

◆ xcom_try_pop_from_input_cb

xcom_input_try_pop_cb xcom_try_pop_from_input_cb = NULL
static