MySQL  8.0.22
Source Code Documentation
xcom_base.cc File Reference

xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision. More...

#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <poll.h>
#include "xcom/xcom_profile.h"
#include "my_compiler.h"
#include "xcom/x_platform.h"
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sockio.h>
#include "xcom/app_data.h"
#include "xcom/get_synode_app_data.h"
#include "xcom/node_no.h"
#include "xcom/server_struct.h"
#include "xcom/simset.h"
#include "xcom/site_struct.h"
#include "xcom/task.h"
#include "xcom/task_net.h"
#include "xcom/task_os.h"
#include "xcom/xcom_base.h"
#include "xcom/xcom_common.h"
#include "xcom/xcom_detector.h"
#include "xcom/xcom_transport.h"
#include "xcom/xdr_utils.h"
#include "xdr_gen/xcom_vp.h"
#include "xcom/xcom_ssl_transport.h"
#include "xcom/bitset.h"
#include "xcom/node_list.h"
#include "xcom/node_set.h"
#include "xcom/pax_msg.h"
#include "xcom/site_def.h"
#include "xcom/sock_probe.h"
#include "xcom/synode_no.h"
#include "xcom/task_debug.h"
#include "xcom/xcom_cache.h"
#include "xcom/xcom_cfg.h"
#include "xcom/xcom_interface.h"
#include "xcom/xcom_memory.h"
#include "xcom/xcom_msg_queue.h"
#include "xcom/xcom_recover.h"
#include "xcom/xcom_statistics.h"
#include "xcom/xcom_vp_str.h"
#include <openssl/ssl.h>
#include "xcom/retry.h"
#include <netinet/in.h>

Classes

struct  execute_context
 
struct  fp_name
 
struct  xcom_fsm_state
 

Macros

#define __STDC_FORMAT_MACROS
 
#define xcom_buf   void
 
#define SYS_STRERROR_SIZE   512
 
#define PROTOVERSION_WARNING_TIMEOUT   600.0 /** Every 10 minutes */
 
#define IS_CONS_ALL(p)   ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)
 
#define PROP_ITER
 
#define FNVSTART   0x811c9dc5
 
#define GOTO(x)
 
#define LOSER(x, site)   0
 
#define FIFO_SIZE   1000
 
#define NAME(f)   { f, #f }
 
#define reply_msg(m)
 
#define CREATE_REPLY(x)
 
#define SEND_REPLY
 
#define PING_GATHERING_TIME_WINDOW   5.0
 
#define PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN   3
 
#define BAL_FMT   "ballot {cnt %d node %d}"
 
#define BAL_MEM(x)   (x).cnt, (x).node
 
#define SERIALIZE_REPLY(msg)
 
#define WRITE_REPLY
 
#define X(b)   #b
 
#define X_FSM_STATE(s)   { s, #s }
 
#define SET_X_FSM_STATE(s)
 
#define CONNECT_FAIL
 

Typedefs

typedef enum allow_event_horizon_result allow_event_horizon_result
 
typedef struct execute_context execute_context
 
typedef void(* exec_fp) (execute_context *xc)
 
typedef struct xcom_fsm_state xcom_fsm_state
 
typedef int(* xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
typedef enum xcom_send_app_wait_result xcom_send_app_wait_result
 

Enumerations

enum  allow_event_horizon_result { EVENT_HORIZON_ALLOWED, EVENT_HORIZON_INVALID, EVENT_HORIZON_UNCHANGEABLE }
 Check if we can reconfigure the event horizon. More...
 
enum  { TAG_START = 313 }
 
enum  xcom_send_app_wait_result {
  SEND_REQUEST_FAILED = 0, RECEIVE_REQUEST_FAILED, REQUEST_BOTCHED, RETRIES_EXCEEDED,
  REQUEST_OK_RECEIVED, REQUEST_FAIL_RECEIVED
}
 

Functions

long xcom_unique_long (void)
 
static double wakeup_delay (double old)
 
static void note_snapshot (node_no node)
 
static int proposer_task (task_arg arg)
 
static int executor_task (task_arg arg)
 
static int sweeper_task (task_arg arg)
 
int alive_task (task_arg arg)
 
int cache_manager_task (task_arg arg)
 
int detector_task (task_arg arg)
 
static int finished (pax_machine *p)
 
static int accepted (pax_machine *p)
 
static int started (pax_machine *p)
 
static synode_no first_free_synode (synode_no msgno)
 
static void free_forced_config_site_def ()
 
static void activate_sweeper ()
 
static void force_pax_machine (pax_machine *p, int enforcer)
 
static void handle_need_snapshot (linkage *reply_queue, pax_msg *pm)
 
static void handle_skip (site_def const *site, pax_machine *p, pax_msg *m)
 
void bit_set_or (bit_set *x, bit_set const *y)
 
static void init_proposers ()
 
void initialize_lsn (uint64_t n)
 
void init_base_vars ()
 
uint32_t get_my_xcom_id ()
 
synode_no get_current_message ()
 
synode_no get_max_synode ()
 
static bool_t is_latest_config (site_def const *const config)
 
static site_def const * first_event_horizon_reconfig ()
 Get the first pending configuration that reconfigures the event horizon. More...
 
static site_def const * latest_event_horizon_reconfig ()
 Get the latest pending configuration that reconfigures the event horizon. More...
 
static synode_no add_default_event_horizon (synode_no s)
 Add the event horizon to the given base synod s. More...
 
static synode_no add_event_horizon (synode_no s)
 
void set_group (uint32_t id)
 Set node group. More...
 
static void bury_site (uint32_t id)
 
static bool_t is_dead_site (uint32_t id)
 
node_set * init_node_set (node_set *set, u_int n)
 
node_set * alloc_node_set (node_set *set, u_int n)
 
static synode_no incr_msgno (synode_no msgno)
 
synode_no incr_synode (synode_no synode)
 
synode_no decr_synode (synode_no synode)
 
static void skip_value (pax_msg *p)
 
static int ignoresig (int signum)
 
static int recently_active (pax_machine *p)
 
int pm_finished (pax_machine *p)
 
static int accepted_noop (pax_machine *p)
 
static int noop_match (pax_machine *p, pax_msg *pm)
 
void set_last_received_config (synode_no received_config_change)
 
static node_no max_check (site_def const *site)
 
static int is_forcing_node (pax_machine const *p)
 
static int majority (bit_set const *nodeset, site_def const *s, int all, int delay, int force)
 
static int prep_majority (site_def const *site, pax_machine *p)
 
static int prop_majority (site_def const *site, pax_machine *p)
 
site_def const * get_executor_site ()
 
site_defget_executor_site_rw ()
 
site_def const * get_proposer_site ()
 
synode_no get_delivered_msg ()
 
synode_no get_last_delivered_msg ()
 
void init_xcom_base ()
 
static void init_tasks ()
 
void xcom_thread_init ()
 
static void empty_prop_input_queue ()
 
void xcom_thread_deinit ()
 
static void create_proposers ()
 
static void terminate_proposers ()
 
static void set_proposer_startpoint ()
 
void set_xcom_run_cb (xcom_state_change_cb x)
 
void set_xcom_comms_cb (xcom_state_change_cb x)
 
void set_xcom_terminate_cb (xcom_state_change_cb x)
 
void set_xcom_exit_cb (xcom_state_change_cb x)
 
void set_xcom_recovery_begin_cb (xcom_recovery_cb x)
 
void set_xcom_recovery_restart_cb (xcom_recovery_cb x)
 
void set_xcom_recovery_init_cb (xcom_recovery_cb x)
 
void set_xcom_recovery_end_cb (xcom_recovery_cb x)
 
void set_xcom_expel_cb (xcom_state_change_cb x)
 
void set_xcom_input_try_pop_cb (xcom_input_try_pop_cb pop)
 
static bool_t xcom_input_signal_connection_shutdown_ssl_wait_for_peer ()
 
static bool_t xcom_input_signal_connection_shutdown_ssl ()
 
bool_t xcom_input_new_signal_connection (char const *address, xcom_port port)
 
static int64_t socket_write (connection_descriptor *wfd, void *_buf, uint32_t n)
 
bool_t xcom_input_signal ()
 
void xcom_input_free_signal_connection ()
 
static int local_server_shutdown_ssl (connection_descriptor *con, void *buf, int n, int *ret)
 
int local_server (task_arg arg)
 
static bool_t local_server_is_setup ()
 
int xcom_taskmain2 (xcom_port listen_port)
 
static void prepare (pax_msg *p, pax_op op)
 
void init_prepare_msg (pax_msg *p)
 Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static int prepare_msg (pax_msg *p)
 
pax_msg * create_noop (pax_msg *p)
 Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static pax_msg * create_read (site_def const *site, pax_msg *p)
 
static int skip_msg (pax_msg *p)
 
static void brand_app_data (pax_msg *p)
 
static synode_no my_unique_id (synode_no synode)
 
static void set_unique_id (pax_msg *msg, synode_no synode)
 
void init_propose_msg (pax_msg *p)
 Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static int send_propose_msg (pax_msg *p)
 
static int propose_msg (pax_msg *p)
 
static void set_learn_type (pax_msg *p)
 
static void init_learn_msg (pax_msg *p)
 
static int send_learn_msg (site_def const *site, pax_msg *p)
 
static pax_msg * create_tiny_learn_msg (pax_machine *pm, pax_msg *p)
 
static int send_tiny_learn_msg (site_def const *site, pax_msg *p)
 
void prepare_push_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 Initializes the message msg to go through a 3-phase, regular Paxos. More...
 
void prepare_push_2p (site_def const *site, pax_machine *p)
 Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_). More...
 
static void push_msg_2p (site_def const *site, pax_machine *p)
 
static void push_msg_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 
static void brand_client_msg (pax_msg *msg, synode_no msgno)
 
void xcom_send (app_data_ptr a, pax_msg *msg)
 
static uint32_t fnv_hash (unsigned char *buf, size_t length, uint32_t sum)
 
uint32_t new_id ()
 Create a new (hopefully unique) ID. More...
 
static synode_no getstart (app_data_ptr a)
 
synode_no get_default_start (app_data_ptr a)
 
static void dump_xcom_node_names (site_def const *site)
 
void site_install_action (site_def *site, cargo_type operation)
 
static site_defcreate_site_def_with_start (app_data_ptr a, synode_no start)
 
static site_definstall_ng_with_start (app_data_ptr a, synode_no start)
 
site_definstall_node_group (app_data_ptr a)
 
void set_max_synode (synode_no synode)
 
static int is_busy (synode_no s)
 
bool_t match_my_msg (pax_msg *learned, pax_msg *mine)
 
static uint64_t assign_lsn ()
 Assign the next log sequence number (lsn) for a message. More...
 
static int check_lsn (app_data_ptr a)
 
static void propose_noop (synode_no find, pax_machine *p)
 
static uint64_t too_far_threshold (xcom_event_horizon active_event_horizon)
 Checks if the given synod s is outside the event horizon. More...
 
static uint64_t too_far_threshold_new_event_horizon_pending (site_def const *new_config)
 
static int too_far (synode_no s)
 
static int is_view (cargo_type x)
 
static int is_config (cargo_type x)
 
static int wait_for_cache (pax_machine **pm, synode_no synode, double timeout)
 
static bool constexpr should_ignore_forced_config_or_view (xcom_proto protocol_version)
 
static node_no leader (site_def const *s)
 
int iamthegreatest (site_def const *s)
 
void execute_msg (site_def *site, pax_machine *pma, pax_msg *p)
 
static void read_missing_values (int n)
 
static void propose_missing_values (int n)
 
static void find_value (site_def const *site, unsigned int *wait, int n)
 
static void dump_debug_exec_state ()
 
int get_xcom_message (pax_machine **p, synode_no msgno, int n)
 
synode_no set_executed_msg (synode_no msgno)
 
synode_no set_current_message (synode_no msgno)
 
static void update_max_synode (pax_msg *p)
 
static void debug_loser (synode_no x)
 
static void send_value (site_def const *site, node_no to, synode_no synode)
 
static synode_no compute_delay (synode_no start, xcom_event_horizon event_horizon)
 Returns the message number where it is safe for nodes in previous configuration to exit. More...
 
static void inform_removed (int index, int all)
 
static bool_t backwards_compatible (xcom_event_horizon event_horizon)
 
static bool_t reconfigurable_event_horizon (xcom_proto protocol_version)
 
static bool_t add_node_unsafe_against_ipv4_old_nodes (app_data_ptr a)
 
static bool_t unsafe_against_event_horizon (node_address const *node)
 Check if a node is compatible with the group's event horizon. More...
 
static bool_t add_node_unsafe_against_event_horizon (app_data_ptr a)
 
site_defhandle_add_node (app_data_ptr a)
 Reconfigure the group membership: add new member(s). More...
 
static void log_event_horizon_reconfiguration_failure (allow_event_horizon_result error_code, xcom_event_horizon attempted_event_horizon)
 
static allow_event_horizon_result allow_event_horizon (xcom_event_horizon event_horizon)
 
static bool_t unsafe_event_horizon_reconfiguration (app_data_ptr a)
 
static bool_t are_there_dead_nodes_in_new_config (app_data_ptr a)
 
bool_t handle_event_horizon (app_data_ptr a)
 Reconfigure the event horizon. More...
 
void terminate_and_exit ()
 
static int is_empty_site (site_def const *s)
 
site_defhandle_remove_node (app_data_ptr a)
 
static void log_ignored_forced_config (app_data_ptr a, char const *const caller_name)
 
bool_t handle_config (app_data_ptr a, bool const forced)
 
static int is_member (site_def const *site)
 
static int addone (int i)
 
static int fifo_empty ()
 
static int fifo_full ()
 
static void fifo_insert (synode_no s)
 
static synode_no fifo_extract ()
 
static synode_no fifo_front ()
 
static void dump_exec_state (execute_context *xc, long dbg)
 
static int x_check_exit (execute_context *xc)
 
static int x_check_execute_inform (execute_context *xc)
 
static void x_fetch (execute_context *xc)
 
static void x_execute (execute_context *xc)
 
static void x_check_increment_fetch (execute_context *xc)
 
static void x_check_increment_execute (execute_context *xc)
 
static void x_terminate (execute_context *xc)
 
char const * get_fp_name (exec_fp fp)
 
static void setup_exit_handling (execute_context *xc, site_def *site)
 
static synode_no get_sweep_start ()
 
static void send_read (synode_no find)
 
static int ok_to_propose (pax_machine *p)
 
void request_values (synode_no find, synode_no end)
 
bool_t safe_app_data_copy (pax_msg **target, app_data_ptr source)
 Copies app data source into target and checks if the copy succeeded. More...
 
static pax_msg * create_learn_msg_for_ignorant_node (pax_machine *p, pax_msg *pm, synode_no synode)
 
static void teach_ignorant_node (site_def const *site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
 
static void handle_read (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
static pax_msg * create_ack_prepare_msg (pax_machine *p, pax_msg *pm, synode_no synode)
 
pax_msg * handle_simple_prepare (pax_machine *p, pax_msg *pm, synode_no synode)
 Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static void handle_prepare (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
bool_t check_propose (site_def const *site, pax_machine *p)
 
static pax_msg * check_learn (site_def const *site, pax_machine *p)
 
static void do_learn (site_def const *site, pax_machine *p, pax_msg *m)
 
bool_t handle_simple_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol. More...
 
static void handle_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 
static pax_msg * create_ack_accept_msg (pax_msg *m, synode_no synode)
 
pax_msg * handle_simple_accept (pax_machine *p, pax_msg *m, synode_no synode)
 Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static void handle_accept (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *m)
 
pax_msg * handle_simple_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol. More...
 
static void handle_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 
void handle_tiny_learn (site_def const *site, pax_machine *pm, pax_msg *p)
 Process the incoming tiny, i.e. More...
 
static void force_interval (synode_no start, synode_no end, int enforcer)
 
static void start_force_config (site_def *s, int enforcer)
 
void handle_learn (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming Learn message. More...
 
static void handle_client_msg (pax_msg *p)
 
static void handle_boot (site_def const *site, linkage *reply_queue, pax_msg *p)
 
bool_t should_handle_need_boot (site_def const *site, pax_msg *p)
 
void init_need_boot_op (pax_msg *p, node_address *identity)
 Initializes the message p as a need_boot_op message. More...
 
int pre_process_incoming_ping (site_def const *site, pax_msg const *pm, int has_client_already_booted, double current_time)
 Process incoming are_you_alive (i.e. More...
 
static void handle_alive (site_def const *site, linkage *reply_queue, pax_msg *pm)
 
xcom_event_horizon xcom_get_minimum_event_horizon ()
 
xcom_event_horizon xcom_get_maximum_event_horizon ()
 
static client_reply_code xcom_get_event_horizon (xcom_event_horizon *event_horizon)
 Retrieves the latest event horizon. More...
 
static u_int allow_add_node (app_data_ptr a)
 
static u_int allow_remove_node (app_data_ptr a)
 
static void log_cfgchange_wrong_group (app_data_ptr a, const char *const message_fmt)
 Logs the fact that an add/remove node request is aimed at another group. More...
 
static client_reply_code can_execute_cfgchange (pax_msg *p)
 Validates if a configuration command can be executed. More...
 
void dispatch_get_event_horizon (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void log_get_synode_app_data_failure (xcom_get_synode_app_data_result error_code)
 
void dispatch_get_synode_app_data (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static int can_send_snapshot ()
 
pax_msg * dispatch_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void update_srv (server **target, server *srv)
 
static int harmless (pax_msg const *p)
 
static bool_t should_poll_cache (pax_op op)
 
int acceptor_learner_task (task_arg arg)
 
static void server_handle_need_snapshot (server *srv, site_def const *s, node_no node)
 
int reply_handler_task (task_arg arg)
 
static void xcom_sleep (unsigned int seconds)
 
app_data_ptr init_config_with_group (app_data *a, node_list *nl, cargo_type type, uint32_t group_id)
 
app_data_ptr init_set_event_horizon_msg (app_data *a, uint32_t group_id, xcom_event_horizon event_horizon)
 
app_data_ptr init_get_event_horizon_msg (app_data *a, uint32_t group_id)
 
app_data_ptr init_app_msg (app_data *a, char *payload, u_int payload_size)
 
app_data_ptr init_terminate_command (app_data *a)
 
static app_data_ptr init_get_synode_app_data_msg (app_data *a, uint32_t group_id, synode_no_array *const synodes)
 
app_data_ptr init_set_cache_size_msg (app_data *a, uint64_t cache_limit)
 
app_data_ptr init_convert_into_local_server_msg (app_data *a)
 
static void server_send_snapshot (server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
 
static void server_push_log (server *srv, synode_no push, node_no node)
 
static void reply_push_log (synode_no push, linkage *reply_queue)
 
static gcs_snapshot * create_snapshot ()
 
static int xcom_timer (task_arg arg)
 
static void stop_x_timer ()
 
static void start_x_timer (double t)
 
static int x_fsm_completion_task (task_arg arg)
 
void send_x_fsm_complete ()
 
static void reset_snapshot_mask ()
 
static int got_all_snapshots ()
 
static int better_snapshot (gcs_snapshot *gcs)
 
static void handle_x_snapshot (gcs_snapshot *gcs)
 
static void update_best_snapshot (gcs_snapshot *gcs)
 
static void send_need_boot ()
 
void set_log_end (gcs_snapshot *gcs)
 
static int xcom_fsm_init (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_start_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_start (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_snapshot_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_snapshot_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_recover_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_recover_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_run_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_run (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_net_boot (task_arg fsmargs, xcom_fsm_state *ctxt, int cont)
 
static int handle_fsm_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_snapshot_wait (xcom_fsm_state *ctxt)
 
static void handle_fsm_exit ()
 
static int handle_local_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_terminate (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static void handle_fsm_force_config (task_arg fsmargs)
 
xcom_fsm_statexcom_fsm_impl (xcom_actions action, task_arg fsmargs)
 
char const * xcom_fsm (xcom_actions action, task_arg fsmargs)
 
void set_app_snap_handler (app_snap_handler x)
 
void set_app_snap_getter (app_snap_getter x)
 
static result checked_create_socket (int domain, int type, int protocol)
 
static result socket_read (connection_descriptor *rfd, void *buf, int n)
 
static int64_t socket_read_bytes (connection_descriptor *rfd, char *p, uint32_t n)
 
static result xcom_close_socket (int *sock)
 
static result xcom_shut_close_socket (int *sock)
 
struct addrinfo * does_node_have_v4_address (struct addrinfo *retrieved)
 Retreives a node IPv4 address, if it exists. More...
 
static int timed_connect_msec (int fd, struct sockaddr *sock_addr, socklen_t sock_size, int timeout)
 
static int timed_connect (int fd, struct sockaddr *sock_addr, socklen_t sock_size)
 
int timed_connect_sec (int fd, struct sockaddr *sock_addr, socklen_t sock_size, int timeout)
 
static connection_descriptorconnect_xcom (char const *server, xcom_port port, int use_ssl)
 
connection_descriptorxcom_open_client_connection (char const *server, xcom_port port)
 
static int xcom_send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
 
static int xcom_recv_proto (connection_descriptor *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
 
static int is_cargo_type (app_data_ptr a, cargo_type t)
 Checks if a given app_data is from a given cargo_type. More...
 
static char * get_add_node_address (app_data_ptr a, unsigned int *member)
 Retrieves the address that was used in the add_node request. More...
 
int is_node_v4_reachable_with_info (struct addrinfo *retrieved_addr_info)
 
int is_node_v4_reachable (char *node_address)
 
int are_we_allowed_to_upgrade_to_v6 (app_data_ptr a)
 
int64_t xcom_send_client_app_data (connection_descriptor *fd, app_data_ptr a, int force)
 
int64_t xcom_client_send_die (connection_descriptor *fd)
 
int64_t xcom_client_send_data (uint32_t size, char *data, connection_descriptor *fd)
 
void warn_protoversion_mismatch (connection_descriptor *rfd)
 
static pax_msg * socket_read_msg (connection_descriptor *rfd, pax_msg *p)
 
int xcom_close_client_connection (connection_descriptor *connection)
 
int xcom_client_boot (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
static xcom_send_app_wait_result xcom_send_app_wait_and_get (connection_descriptor *fd, app_data *a, int force, pax_msg *p)
 Send a message and wait for response. More...
 
int xcom_send_app_wait (connection_descriptor *fd, app_data *a, int force)
 
int xcom_send_cfg_wait (connection_descriptor *fd, node_list *nl, uint32_t group_id, cargo_type ct, int force)
 
int xcom_client_add_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_remove_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_get_event_horizon (connection_descriptor *fd, uint32_t group_id, xcom_event_horizon *event_horizon)
 
int xcom_client_set_event_horizon (connection_descriptor *fd, uint32_t group_id, xcom_event_horizon event_horizon)
 
int xcom_client_get_synode_app_data (connection_descriptor *const fd, uint32_t group_id, synode_no_array *const synodes, synode_app_data_array *const reply)
 
int xcom_client_force_config (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_enable_arbitrator (connection_descriptor *fd)
 
int xcom_client_disable_arbitrator (connection_descriptor *fd)
 
int xcom_client_terminate_and_exit (connection_descriptor *fd)
 
int xcom_client_set_cache_limit (connection_descriptor *fd, uint64_t cache_limit)
 
int xcom_client_convert_into_local_server (connection_descriptor *const fd)
 

Variables

static double protoversion_warning_time
 
int const threephase = 0
 Timestamp of previous protoversion warning. More...
 
int ARBITRATOR_HACK = 0
 
static int const no_duplicate_payload = 1
 
static int use_buffered_read = 1
 
static unsigned short oom_abort = 0
 
int xcom_shutdown = 0
 
synode_no executed_msg
 
synode_no max_synode
 
task_envboot = NULL
 
task_envdetector = NULL
 
task_envkiller = NULL
 
task_envnet_boot = NULL
 
task_envnet_recover = NULL
 
void * xcom_thread_input = 0
 
long xcom_debug_mask
 
long xcom_dbg_stack [DBG_STACK_SIZE]
 
int xcom_dbg_stack_top = 0
 
static task_envexecutor = NULL
 
static task_envsweeper = NULL
 
static task_envretry = NULL
 
static task_envproposer [PROPOSERS]
 
static task_envalive_t = NULL
 
static task_envcache_task = NULL
 
static uint32_t my_id = 0
 
static synode_no current_message
 
static synode_no last_config_modification_id
 
static uint64_t lsn = 0
 
static channel prop_input_queue
 
int client_boot_done
 
int netboot_ok
 
static linkage exec_wait
 
linkage detector_wait
 
struct {
   int   n
 
   unsigned long   id [MAX_DEAD]
 
dead_sites
 
static site_defforced_config = 0
 
static int wait_forced_config = 0
 
static site_defexecutor_site = 0
 
static site_defproposer_site = 0
 
static synode_no delivered_msg = NULL_SYNODE
 
static synode_no last_delivered_msg = NULL_SYNODE
 
static xcom_state_change_cb xcom_run_cb = 0
 
static xcom_state_change_cb xcom_terminate_cb = 0
 
static xcom_state_change_cb xcom_comms_cb = 0
 
static xcom_state_change_cb xcom_exit_cb = 0
 
static xcom_state_change_cb xcom_expel_cb = 0
 
static xcom_input_try_pop_cb xcom_try_pop_from_input_cb = NULL
 
static xcom_recovery_cb recovery_begin_cb = NULL
 
static xcom_recovery_cb recovery_restart_cb = NULL
 
static xcom_recovery_cb recovery_init_cb = NULL
 
static xcom_recovery_cb recovery_end_cb = NULL
 
static connection_descriptorinput_signal_connection = NULL
 
static int prop_started = 0
 
static int prop_finished = 0
 
static xcom_proto constexpr first_protocol_that_ignores_intermediate_forced_configs_or_views
 
static xcom_proto const first_event_horizon_aware_protocol = x_1_4
 
struct {
   int   n
 
   int   front
 
   int   rear
 
   synode_no   q [FIFO_SIZE]
 
delay_fifo
 
static struct fp_name oblist []
 
static execute_contextdebug_xc
 
static double sent_alive = 0.0
 
static int clicnt = 0
 
static synode_no start_config = NULL_SYNODE
 
static app_snap_getter get_app_snap_cb
 
static app_snap_handler handle_app_snap_cb
 
static task_envx_timer = NULL
 
const char * xcom_actions_name [] = {x_actions}
 
static int snapshots [NSERVERS]
 
static synode_no log_start_max
 
static synode_no log_end_max
 

Detailed Description

xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.

The interface to xcom is largely intact, one notable change is that xcom will consider the message delivered as soon as it has got a majority. Consequently, the VP set will not necessarily show all nodes which will actually receive the message.

OHKFIX Add wait for complete last known node set to mimic the old semantics.

IMPORTANT: What xcom does and what it does not do:

xcom messages are received in the same order on all nodes.

xcom guarantees that if a message is delivered to one node, it will eventually be seen on all other nodes as well.

xcom messages are available to a crashed node when it comes up again if at least one node which knows the value of the message has not crashed. The size of the message cache is configurable.

OHKFIX Add logging to disk to make messages durable across system crash and to increase the number of messages which may be cached.

There is no guarantee whatsoever about the order of messages from different nodes, not even the order of multiple messages from the same node. It is up to the client to impose such an order by waiting on a message before it sends the next.

xcom can notify the client that a message has timed out, and in that case will try to cancel the message, but it cannot guarantee that a message which has timed out will not be delivered.

xcom attaches a node set to each message as it is delivered to the client. This node set reflects the current node set that xcom believes is active, it does not mean that the message has been delivered yet to all nodes in the set. Neither does it mean that the message has not been delivered to the nodes not in the set.

A cache of Paxos state machines is central to the new design. The purpose of the cache is both to store a window of messages, and to decouple the different parts of xcom, like message proposal, message delivery and execution, and recovery. The old cache was limited to caching messages, and a single state machine ran the combined VP and Paxos algorithm. This constrained xcom to deliver only a single message at a time.

Each instance of the Paxos state machine implements the basic Paxos protocol. Unlike the cache in the old system, it is not cleared when a site is deleted. This removes some problems related to message delivery during site deletion. The cache is a classic fixed size LRU with a hash index.

Some extensions to the basic Paxos algorithm has been implemented:

A node has ownership to all synodes with its own node number. Only a node with node number N can propose a value for synode {X N}, where X is the sequence number, and N is the node number. Other nodes can only propose the special value no_op for synode {X N}. The reason for this is to retain the leaderless Paxos algorithm, but to avoid collisions between nodes which are competing for the same synode number. With this scheme, each node has its own unique number series during normal operation. The scheme has the following implications:

  1. If a node N has not already proposed a value for the synode {X N}, it may at any time send a LEARN message to the other nodes with the reserved value no_op, without going through phase 1 and 2 of Paxos. This is because the other nodes are constrained to propose no_op for this synode, so the final outcome will always be no_op. To avoid unnecessary message transmission, a node will try to broadcast the no_op LEARN messages by piggybacking the information on the messages of the basic Paxos protocol.
  2. Other nodes which want to find the value of synode {X N} may do so by trying to get the value no_op accepted by following the basic Paxos algorithm. The result will be the actual value proposed by node N if it has done so, otherwise no_op. This will typically only be necessary when a node is down, and the other nodes need to find the values from the missing node in order to be able to continue execution.

Messages are delivered in order to the client, and the order is determined by the sequence number and the node number, with the sequence number as the most significant part.

The xcom network interface has been redesigned and is now implemented directly on top of TCP, and has so far been completely trouble free. We use poll() or select() to implement non-blocking send and receive, but libev could equally well have been used.

Multicast is implemented on top of unicast as before, but the implementation is prepared to use real multicast with relatively minor changes.

The roles of proposer, acceptor/learner, and executor are now directly mapped to unique task types which interact with the Paxos state machines, whereas the previous implementation folded all the roles into a single event driven state machine.

The following terminology will be used:

A node is an instance of the xcom thread. There is only one instance of the xcom thread in the agent. A client is the application which is using xcom to send messages. A thread is a real OS thread. A task is a logical process. It is implemented by coroutines and an explicit stack.

The implementation of tasks and non-blocking socket operations is isolated in task.h and task.c.

A node will open a tcp connection to each of the other nodes. This connection is used for all communication initiated by the node, and replies to messages will arrive on the connection on which it was sent.

static int tcp_server(task_arg);

The tcp_server listens on the xcom port and starts an acceptor_learner_task whenever a new connection is detected.

static int tcp_reaper_task(task_arg);

Closes tcp connection which have been unused for too long.

static int sender_task(task_arg);

The sender_task waits for tcp messages on its input queue and sends it on the tcp socket. If the socket is closed for any reason, the sender_task will reconnect the socket. There is one sender_task for each socket. The sender task exists mainly to simplify the logic in the other tasks, but it could have been replaced with a coroutine which handles the connection logic after having reserved the socket for its client task.

static int generator_task(task_arg);

The generator_task reads messages from the client queue and moves them into the input queue of the proposer_task.

OHKFIX Use a tcp socket instead of the client queue. We can then remove the generator_task and let the acceptor_learner_task do the dispatching.

static int proposer_task(task_arg);

Assign a message number to an outgoing message and try to get it accepted. There may be several proposer tasks on each node working in parallel. If there are multiple proposer tasks, xcom can not guarantee that the messages will be sent in the same order as received from the client.

static int acceptor_learner_task(task_arg);

This is the server part of the xcom thread. There is one acceptor_learner_task for each node in the system. The acceptor learner_task reads messages from the socket, finds the correct Paxos state machine, and dispatches to the correct message handler with the state machine and message as arguments.

static int reply_handler_task(task_arg);

The reply_handler_task does the same job as the acceptor_learner_task, but listens on the socket which the node uses to send messages, so it will handle only replies on that socket.

static int executor_task(task_arg);

The ececutor_task waits for a Paxos message to be accpeted. When the message is accepted, it is delivered to the client, unless it is a no-op. In either case, the executor_task steps to the next message and repeats the wait. If it times out waiting for a message, it will try to get a no-op accepted.

static int alive_task(task_arg);

Sends i-am-alive to other nodes if there has been no normal traffic for a while. It also pings nodes which seem to be inactive.

static int detector_task(task_arg);

The detector_task periodically scans the set of connections from other nodes and sees if there has been any activity. If there has been no activity for some time, it will assume that the node is dead, and send a view message to the client.

Reconfiguration:

The xcom reconfiguration process is essentially the one described in "Reconfiguring a State Machine" by Lamport et al. as the R-alpha algorithm. We execute the reconfiguration command immediately, but the config is only valid after a delay of alpha messages. The parameter alpha is the same as EVENT_HORIZON in this implementation. :/static.*too_far All tcp messages from beyond the event horizon will be ignored.

Macro Definition Documentation

◆ __STDC_FORMAT_MACROS

#define __STDC_FORMAT_MACROS

◆ BAL_FMT

#define BAL_FMT   "ballot {cnt %d node %d}"

◆ BAL_MEM

#define BAL_MEM (   x)    (x).cnt, (x).node

◆ CONNECT_FAIL

#define CONNECT_FAIL
Value:
ret_fd = -1; \
goto end
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:188

◆ CREATE_REPLY

#define CREATE_REPLY (   x)
Value:
pax_msg *reply = NULL; \
CLONE_PAX_MSG(reply, x)
#define NULL
Definition: types.h:54

◆ FIFO_SIZE

#define FIFO_SIZE   1000

◆ FNVSTART

#define FNVSTART   0x811c9dc5

◆ GOTO

#define GOTO (   x)
Value:
{ \
IFDBG(D_NONE, STRLIT("goto "); STRLIT(#x)); \
goto x; \
}
#define STRLIT(x)
Definition: gcs_debug.h:315
Definition: gcs_debug.h:173

◆ IS_CONS_ALL

#define IS_CONS_ALL (   p)    ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)

◆ LOSER

#define LOSER (   x,
  site 
)    0

◆ NAME

#define NAME (   f)    { f, #f }

◆ PING_GATHERING_TIME_WINDOW

#define PING_GATHERING_TIME_WINDOW   5.0

◆ PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN

#define PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN   3

◆ PROP_ITER

#define PROP_ITER
Value:
int i; \
for (i = 0; i < PROPOSERS; i++)
Definition: xcom_profile.h:66

◆ PROTOVERSION_WARNING_TIMEOUT

#define PROTOVERSION_WARNING_TIMEOUT   600.0 /** Every 10 minutes */

◆ reply_msg

#define reply_msg (   m)
Value:
{ \
if (is_local_node((m)->from, site)) { \
dispatch_op(site, m, NULL); \
} else { \
link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
} \
}
msg_link * msg_link_new(pax_msg *p, node_no to)
Definition: xcom_msg_queue.cc:69
#define NULL
Definition: types.h:54
static bool_t is_local_node(node_no n, site_def const *site)
Definition: site_def.h:68

◆ SEND_REPLY

#define SEND_REPLY
Value:
reply_msg(reply); \
replace_pax_msg(&reply, NULL)
#define NULL
Definition: types.h:54
#define reply_msg(m)
Definition: xcom_base.cc:3668

◆ SERIALIZE_REPLY

#define SERIALIZE_REPLY (   msg)
Value:
msg->to = ep->p->from; \
msg->from = ep->p->to; \
msg->delivered_msg = get_delivered_msg(); \
msg->max_synode = get_max_synode(); \
serialize_msg(msg, ep->rfd.x_proto, &ep->buflen, &ep->buf);
synode_no get_delivered_msg()
Definition: xcom_base.cc:814
char msg[1024]
Definition: test_sql_9_sessions.cc:281
synode_no get_max_synode()
Definition: xcom_base.cc:446

◆ SET_X_FSM_STATE

#define SET_X_FSM_STATE (   s)
Value:
do { \
ctxt->state_fp = s; \
ctxt->state_name = #s; \
} while (0)

◆ SYS_STRERROR_SIZE

#define SYS_STRERROR_SIZE   512

◆ WRITE_REPLY

#define WRITE_REPLY
Value:
if (ep->buflen) { \
int64_t sent; \
IFDBG(D_TRANSPORT, FN; STRLIT("task_write "); NDBG(ep->rfd.fd, d); \
NDBG(ep->buflen, u)); \
TASK_CALL(task_write(&ep->rfd, ep->buf, ep->buflen, &sent)); \
send_count[ep->p->op]++; \
send_bytes[ep->p->op] += ep->buflen; \
X_FREE(ep->buf); \
} \
ep->buf = NULL;
int task_write(connection_descriptor const *con, void *_buf, uint32_t n, int64_t *ret)
Definition: task.cc:998
#define FN
Definition: gcs_debug.h:307
Definition: gcs_debug.h:177
#define NULL
Definition: types.h:54
#define STRLIT(x)
Definition: gcs_debug.h:315
#define NDBG(x, f)
Definition: gcs_debug.h:317

◆ X

#define X (   b)    #b

◆ X_FSM_STATE

#define X_FSM_STATE (   s)    { s, #s }

◆ xcom_buf

#define xcom_buf   void

Typedef Documentation

◆ allow_event_horizon_result

◆ exec_fp

typedef void(* exec_fp) (execute_context *xc)

◆ execute_context

◆ xcom_fsm_fp

typedef int(* xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)

◆ xcom_fsm_state

◆ xcom_send_app_wait_result

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
TAG_START 

◆ allow_event_horizon_result

Check if we can reconfigure the event horizon.

We can reconfigure the event horizon if all group members support reconfiguring the event horizon, and the new event horizon in the domain [EVENT_HORIZON_MIN, EVENT_HORIZON_MAX].

We use the group's latest common XCom protocol as a proxy to decide if all members support reconfiguring the event horizon.

If the common protocol is at least version 5 (x_1_4) then all members run compatible server instances.

Otherwise there are older instances, and it follows that the event horizon must be the default and cannot be reconfigured.

Enumerator
EVENT_HORIZON_ALLOWED 
EVENT_HORIZON_INVALID 
EVENT_HORIZON_UNCHANGEABLE 

◆ xcom_send_app_wait_result

Enumerator
SEND_REQUEST_FAILED 
RECEIVE_REQUEST_FAILED 
REQUEST_BOTCHED 
RETRIES_EXCEEDED 
REQUEST_OK_RECEIVED 
REQUEST_FAIL_RECEIVED 

Function Documentation

◆ accepted()

static int accepted ( pax_machine p)
inlinestatic

◆ accepted_noop()

static int accepted_noop ( pax_machine p)
inlinestatic

◆ acceptor_learner_task()

int acceptor_learner_task ( task_arg  arg)

◆ activate_sweeper()

static void activate_sweeper ( )
static

◆ add_default_event_horizon()

static synode_no add_default_event_horizon ( synode_no  s)
static

Add the event horizon to the given base synod s.

We are assuming right now that this function is used solely in the context of "we have received a reconfiguration command at synod s, when should it be scheduled to take effect?" The result of this function is when it should take effect.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case result is:

s + event_horizon(active_config) + 1

If an event horizon reconfiguration R is pending, it means that the command C proposed for synod s is concurrent with R, i.e., s falls in the interval ]proposed(R), start(R)[.

In this situation we apply the command C proposed for synod s after taking into account R's event horizon.

This means that the result is:

start(R) + event_horizon(R) + 1

◆ add_event_horizon()

static synode_no add_event_horizon ( synode_no  s)
static

◆ add_node_unsafe_against_event_horizon()

static bool_t add_node_unsafe_against_event_horizon ( app_data_ptr  a)
static

◆ add_node_unsafe_against_ipv4_old_nodes()

static bool_t add_node_unsafe_against_ipv4_old_nodes ( app_data_ptr  a)
static

◆ addone()

static int addone ( int  i)
inlinestatic

◆ alive_task()

int alive_task ( task_arg  arg)

◆ alloc_node_set()

node_set* alloc_node_set ( node_set *  set,
u_int  n 
)

◆ allow_add_node()

static u_int allow_add_node ( app_data_ptr  a)
static

◆ allow_event_horizon()

static allow_event_horizon_result allow_event_horizon ( xcom_event_horizon  event_horizon)
static

◆ allow_remove_node()

static u_int allow_remove_node ( app_data_ptr  a)
static

◆ are_there_dead_nodes_in_new_config()

static bool_t are_there_dead_nodes_in_new_config ( app_data_ptr  a)
static

◆ are_we_allowed_to_upgrade_to_v6()

int are_we_allowed_to_upgrade_to_v6 ( app_data_ptr  a)

◆ assign_lsn()

static uint64_t assign_lsn ( )
static

Assign the next log sequence number (lsn) for a message.

Initial propose sets lsn to msgno of the max message number as safe starting point, otherwise lsn shall be ever increasing. lsn ensures sender order known on receiver side, as messages may arrive "out of order" due to retransmission. We use max_synode instead of current_message to avoid any conflict with lsn allocated by a previous instance of the node.

◆ backwards_compatible()

static bool_t backwards_compatible ( xcom_event_horizon  event_horizon)
static

◆ better_snapshot()

static int better_snapshot ( gcs_snapshot *  gcs)
static

◆ bit_set_or()

void bit_set_or ( bit_set *  x,
bit_set const *  y 
)

◆ brand_app_data()

static void brand_app_data ( pax_msg *  p)
static

◆ brand_client_msg()

static void brand_client_msg ( pax_msg *  msg,
synode_no  msgno 
)
static

◆ bury_site()

static void bury_site ( uint32_t  id)
static

◆ cache_manager_task()

int cache_manager_task ( task_arg  arg)

◆ can_execute_cfgchange()

static client_reply_code can_execute_cfgchange ( pax_msg *  p)
static

Validates if a configuration command can be executed.

Checks whether the configuration command is aimed at the correct group. Checks whether the configuration command pertains to a node reincarnation.

Parameters
pa pointer to the pax_msg of the configuration command
Return values
REQUEST_OKif the reconfiguration command can be executed
REQUEST_RETRYif XCom is still booting
REQUEST_FAILif the configuration command cannot be executed

◆ can_send_snapshot()

static int can_send_snapshot ( )
static

◆ check_learn()

static pax_msg* check_learn ( site_def const *  site,
pax_machine p 
)
static

◆ check_lsn()

static int check_lsn ( app_data_ptr  a)
static

◆ check_propose()

bool_t check_propose ( site_def const *  site,
pax_machine p 
)

◆ checked_create_socket()

static result checked_create_socket ( int  domain,
int  type,
int  protocol 
)
static

◆ compute_delay()

static synode_no compute_delay ( synode_no  start,
xcom_event_horizon  event_horizon 
)
static

Returns the message number where it is safe for nodes in previous configuration to exit.

Parameters
startstart synod of the next configuration
event_horizonevent horizon of the next configuration

◆ connect_xcom()

static connection_descriptor* connect_xcom ( char const *  server,
xcom_port  port,
int  use_ssl 
)
static

◆ create_ack_accept_msg()

static pax_msg* create_ack_accept_msg ( pax_msg *  m,
synode_no  synode 
)
static

◆ create_ack_prepare_msg()

static pax_msg* create_ack_prepare_msg ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)
static

◆ create_learn_msg_for_ignorant_node()

static pax_msg* create_learn_msg_for_ignorant_node ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)
static

◆ create_noop()

pax_msg* create_noop ( pax_msg *  p)

Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe no-op message to send
Return values
createdpaxos message of type no_op

◆ create_proposers()

static void create_proposers ( )
static

◆ create_read()

static pax_msg* create_read ( site_def const *  site,
pax_msg *  p 
)
static

◆ create_site_def_with_start()

static site_def* create_site_def_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ create_snapshot()

static gcs_snapshot* create_snapshot ( )
static

◆ create_tiny_learn_msg()

static pax_msg* create_tiny_learn_msg ( pax_machine pm,
pax_msg *  p 
)
static

◆ debug_loser()

static void debug_loser ( synode_no  x)
static

◆ decr_synode()

synode_no decr_synode ( synode_no  synode)

◆ detector_task()

int detector_task ( task_arg  arg)

◆ dispatch_get_event_horizon()

void dispatch_get_event_horizon ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ dispatch_get_synode_app_data()

void dispatch_get_synode_app_data ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ dispatch_op()

pax_msg* dispatch_op ( site_def const *  site,
pax_msg *  p,
linkage reply_queue 
)

◆ do_learn()

static void do_learn ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ does_node_have_v4_address()

struct addrinfo* does_node_have_v4_address ( struct addrinfo *  retrieved)

Retreives a node IPv4 address, if it exists.

If a node is v4 reachable, means one of two:

  • The raw address is V4
  • a name was resolved to a V4/V6 address

If the later is the case, we are going to prefer the first v4 address in the list, since it is the common language between old and new version. If you want exclusive V6, please configure your DNS server to serve V6 names

Parameters
retrieveda previously retrieved struct addrinfo
Returns
struct addrinfo* An addrinfo of the first IPv4 address. Else it will return the entry parameter.

◆ dump_debug_exec_state()

static void dump_debug_exec_state ( )
static

◆ dump_exec_state()

static void dump_exec_state ( execute_context xc,
long  dbg 
)
static

◆ dump_xcom_node_names()

static void dump_xcom_node_names ( site_def const *  site)
static

◆ empty_prop_input_queue()

static void empty_prop_input_queue ( )
static

◆ execute_msg()

void execute_msg ( site_def site,
pax_machine pma,
pax_msg *  p 
)

◆ executor_task()

static int executor_task ( task_arg  arg)
static

◆ fifo_empty()

static int fifo_empty ( )
inlinestatic

◆ fifo_extract()

static synode_no fifo_extract ( )
inlinestatic

◆ fifo_front()

static synode_no fifo_front ( )
inlinestatic

◆ fifo_full()

static int fifo_full ( )
inlinestatic

◆ fifo_insert()

static void fifo_insert ( synode_no  s)
inlinestatic

◆ find_value()

static void find_value ( site_def const *  site,
unsigned int *  wait,
int  n 
)
static

◆ finished()

static int finished ( pax_machine p)
inlinestatic

◆ first_event_horizon_reconfig()

static site_def const* first_event_horizon_reconfig ( )
static

Get the first pending configuration that reconfigures the event horizon.

Retrieve the first pending site_def, i.e. with the smallest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ first_free_synode()

static synode_no first_free_synode ( synode_no  msgno)
static

◆ fnv_hash()

static uint32_t fnv_hash ( unsigned char *  buf,
size_t  length,
uint32_t  sum 
)
static

◆ force_interval()

static void force_interval ( synode_no  start,
synode_no  end,
int  enforcer 
)
static

◆ force_pax_machine()

static void force_pax_machine ( pax_machine p,
int  enforcer 
)
static

◆ free_forced_config_site_def()

static void free_forced_config_site_def ( )
static

◆ get_add_node_address()

static char* get_add_node_address ( app_data_ptr  a,
unsigned int *  member 
)
static

Retrieves the address that was used in the add_node request.

Parameters
aapp data containing the node to add
memberaddress we used to present ourselves to other nodes
Returns
char* a pointer to the address being added.

◆ get_current_message()

synode_no get_current_message ( )

◆ get_default_start()

synode_no get_default_start ( app_data_ptr  a)

◆ get_delivered_msg()

synode_no get_delivered_msg ( )

◆ get_executor_site()

site_def const* get_executor_site ( )

◆ get_executor_site_rw()

site_def* get_executor_site_rw ( )

◆ get_fp_name()

char const* get_fp_name ( exec_fp  fp)

◆ get_last_delivered_msg()

synode_no get_last_delivered_msg ( )

◆ get_max_synode()

synode_no get_max_synode ( )

◆ get_my_xcom_id()

uint32_t get_my_xcom_id ( )

◆ get_proposer_site()

site_def const* get_proposer_site ( )

◆ get_sweep_start()

static synode_no get_sweep_start ( )
static

◆ get_xcom_message()

int get_xcom_message ( pax_machine **  p,
synode_no  msgno,
int  n 
)

◆ getstart()

static synode_no getstart ( app_data_ptr  a)
static

◆ got_all_snapshots()

static int got_all_snapshots ( )
static

◆ handle_accept()

static void handle_accept ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  m 
)
static

◆ handle_ack_accept()

static void handle_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_ack_prepare()

static void handle_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_add_node()

site_def* handle_add_node ( app_data_ptr  a)

Reconfigure the group membership: add new member(s).

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations modified the event horizon and that the new member(s) do not support event horizon reconfigurations.

We account for these situations by validating if adding the new members is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_alive()

static void handle_alive ( site_def const *  site,
linkage reply_queue,
pax_msg *  pm 
)
inlinestatic

◆ handle_boot()

static void handle_boot ( site_def const *  site,
linkage reply_queue,
pax_msg *  p 
)
inlinestatic

◆ handle_client_msg()

static void handle_client_msg ( pax_msg *  p)
static

◆ handle_config()

bool_t handle_config ( app_data_ptr  a,
bool const  forced 
)

◆ handle_event_horizon()

bool_t handle_event_horizon ( app_data_ptr  a)

Reconfigure the event horizon.

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations added a new member which does not support reconfiguring the event horizon.

We account for these situations by validating if the event horizon reconfiguration is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_fsm_exit()

static void handle_fsm_exit ( )
static

◆ handle_fsm_force_config()

static void handle_fsm_force_config ( task_arg  fsmargs)
static

◆ handle_fsm_net_boot()

static int handle_fsm_net_boot ( task_arg  fsmargs,
xcom_fsm_state ctxt,
int  cont 
)
static

◆ handle_fsm_snapshot()

static int handle_fsm_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_fsm_snapshot_wait()

static int handle_fsm_snapshot_wait ( xcom_fsm_state ctxt)
static

◆ handle_fsm_terminate()

static int handle_fsm_terminate ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_learn()

void handle_learn ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming Learn message.

Executed by Learners.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message

◆ handle_local_snapshot()

static int handle_local_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_need_snapshot()

static void handle_need_snapshot ( linkage reply_queue,
pax_msg *  pm 
)
static

◆ handle_prepare()

static void handle_prepare ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  pm 
)
static

◆ handle_read()

static void handle_read ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg *  pm 
)
static

◆ handle_remove_node()

site_def* handle_remove_node ( app_data_ptr  a)

◆ handle_simple_accept()

pax_msg* handle_simple_accept ( pax_machine p,
pax_msg *  m,
synode_no  synode 
)

Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
mIncoming Accept message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 2 (b) message of the Paxos protocol) if the Acceptor accepts the Accept
NULLotherwise

◆ handle_simple_ack_accept()

pax_msg* handle_simple_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
pax_msg*the Learn message to send to Leaners if a majority of Acceptors replied to the Proposer's Accept
NULLotherwise

◆ handle_simple_ack_prepare()

bool_t handle_simple_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)

Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
TRUEif a majority of Acceptors replied to the Proposer's Prepare
FALSEotherwise

◆ handle_simple_prepare()

pax_msg* handle_simple_prepare ( pax_machine p,
pax_msg *  pm,
synode_no  synode 
)

Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
pmIncoming Prepare message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 1 (b) message of the Paxos protocol) if the Acceptor accepts the Prepare
NULLotherwise

◆ handle_skip()

static void handle_skip ( site_def const *  site,
pax_machine p,
pax_msg *  m 
)
static

◆ handle_snapshot()

static int handle_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_tiny_learn()

void handle_tiny_learn ( site_def const *  site,
pax_machine pm,
pax_msg *  p 
)

Process the incoming tiny, i.e.

without the learned value, Learn message. Executed by Learners.

Parameters
siteXCom configuration
pmPaxos instance
pIncoming message

◆ handle_x_snapshot()

static void handle_x_snapshot ( gcs_snapshot *  gcs)
static

◆ harmless()

static int harmless ( pax_msg const *  p)
static

◆ iamthegreatest()

int iamthegreatest ( site_def const *  s)

◆ ignoresig()

static int ignoresig ( int  signum)
static

◆ incr_msgno()

static synode_no incr_msgno ( synode_no  msgno)
static

◆ incr_synode()

synode_no incr_synode ( synode_no  synode)

◆ inform_removed()

static void inform_removed ( int  index,
int  all 
)
static

◆ init_app_msg()

app_data_ptr init_app_msg ( app_data *  a,
char *  payload,
u_int  payload_size 
)

◆ init_base_vars()

void init_base_vars ( )

◆ init_config_with_group()

app_data_ptr init_config_with_group ( app_data *  a,
node_list *  nl,
cargo_type  type,
uint32_t  group_id 
)

◆ init_convert_into_local_server_msg()

app_data_ptr init_convert_into_local_server_msg ( app_data *  a)

◆ init_get_event_horizon_msg()

app_data_ptr init_get_event_horizon_msg ( app_data *  a,
uint32_t  group_id 
)

◆ init_get_synode_app_data_msg()

static app_data_ptr init_get_synode_app_data_msg ( app_data *  a,
uint32_t  group_id,
synode_no_array *const  synodes 
)
static

◆ init_learn_msg()

static void init_learn_msg ( pax_msg *  p)
static

◆ init_need_boot_op()

void init_need_boot_op ( pax_msg *  p,
node_address *  identity 
)

Initializes the message p as a need_boot_op message.

Parameters
pThe message to send
identityThe unique incarnation identifier of this XCom instance

◆ init_node_set()

node_set* init_node_set ( node_set *  set,
u_int  n 
)

◆ init_prepare_msg()

void init_prepare_msg ( pax_msg *  p)

Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_propose_msg()

void init_propose_msg ( pax_msg *  p)

Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_proposers()

static void init_proposers ( )
static

◆ init_set_cache_size_msg()

app_data_ptr init_set_cache_size_msg ( app_data *  a,
uint64_t  cache_limit 
)

◆ init_set_event_horizon_msg()

app_data_ptr init_set_event_horizon_msg ( app_data *  a,
uint32_t  group_id,
xcom_event_horizon  event_horizon 
)

◆ init_tasks()

static void init_tasks ( )
static

◆ init_terminate_command()

app_data_ptr init_terminate_command ( app_data *  a)

◆ init_xcom_base()

void init_xcom_base ( )

Reset lsn

◆ initialize_lsn()

void initialize_lsn ( uint64_t  n)

◆ install_ng_with_start()

static site_def* install_ng_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ install_node_group()

site_def* install_node_group ( app_data_ptr  a)

◆ is_busy()

static int is_busy ( synode_no  s)
static

◆ is_cargo_type()

static int is_cargo_type ( app_data_ptr  a,
cargo_type  t 
)
inlinestatic

Checks if a given app_data is from a given cargo_type.

Parameters
athe app_data
tthe cargo type
Returns
int TRUE (1) if app_data a is from cargo_type t

◆ is_config()

static int is_config ( cargo_type  x)
inlinestatic

◆ is_dead_site()

static bool_t is_dead_site ( uint32_t  id)
static

◆ is_empty_site()

static int is_empty_site ( site_def const *  s)
inlinestatic

◆ is_forcing_node()

static int is_forcing_node ( pax_machine const *  p)
static

◆ is_latest_config()

static bool_t is_latest_config ( site_def const *const  config)
static

◆ is_member()

static int is_member ( site_def const *  site)
inlinestatic

◆ is_node_v4_reachable()

int is_node_v4_reachable ( char *  node_address)

◆ is_node_v4_reachable_with_info()

int is_node_v4_reachable_with_info ( struct addrinfo *  retrieved_addr_info)

◆ is_view()

static int is_view ( cargo_type  x)
inlinestatic

◆ latest_event_horizon_reconfig()

static site_def const* latest_event_horizon_reconfig ( )
static

Get the latest pending configuration that reconfigures the event horizon.

Retrieve the last pending site_def, i.e. with the greatest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ leader()

static node_no leader ( site_def const *  s)
static

◆ local_server()

int local_server ( task_arg  arg)

◆ local_server_is_setup()

static bool_t local_server_is_setup ( )
static

◆ local_server_shutdown_ssl()

static int local_server_shutdown_ssl ( connection_descriptor con,
void *  buf,
int  n,
int *  ret 
)
static

◆ log_cfgchange_wrong_group()

static void log_cfgchange_wrong_group ( app_data_ptr  a,
const char *const  message_fmt 
)
static

Logs the fact that an add/remove node request is aimed at another group.

Parameters
aa pointer to the app_data of the configuration command
message_fmta formatted message to log, containing a single s that will be replaced by the node's address

◆ log_event_horizon_reconfiguration_failure()

static void log_event_horizon_reconfiguration_failure ( allow_event_horizon_result  error_code,
xcom_event_horizon  attempted_event_horizon 
)
static

◆ log_get_synode_app_data_failure()

static void log_get_synode_app_data_failure ( xcom_get_synode_app_data_result  error_code)
static

◆ log_ignored_forced_config()

static void log_ignored_forced_config ( app_data_ptr  a,
char const *const  caller_name 
)
static

◆ majority()

static int majority ( bit_set const *  nodeset,
site_def const *  s,
int  all,
int  delay,
int  force 
)
inlinestatic

◆ match_my_msg()

bool_t match_my_msg ( pax_msg *  learned,
pax_msg *  mine 
)

◆ max_check()

static node_no max_check ( site_def const *  site)
inlinestatic

◆ my_unique_id()

static synode_no my_unique_id ( synode_no  synode)
static

◆ new_id()

uint32_t new_id ( )

Create a new (hopefully unique) ID.

The basic idea is to create a hash from the host ID and a timestamp.

◆ noop_match()

static int noop_match ( pax_machine p,
pax_msg *  pm 
)
inlinestatic

◆ note_snapshot()

static void note_snapshot ( node_no  node)
static

◆ ok_to_propose()

static int ok_to_propose ( pax_machine p)
static

◆ pm_finished()

int pm_finished ( pax_machine p)
Return values
1if the value for the Paxos instance *p has been learned
0otherwise

◆ pre_process_incoming_ping()

int pre_process_incoming_ping ( site_def const *  site,
pax_msg const *  pm,
int  has_client_already_booted,
double  current_time 
)

Process incoming are_you_alive (i.e.

: ping) messages and act accordingly

GCS/XCom has a full mesh of connections between all nodes. A connects to B and B connects back to A.

If we cut out B with, for instance, a firewall, we have the A->B connection silently dead, but we have the B->A connection alive. Since we only do monitoring on one half of the connection (the incoming messages), we will consider that B is alive, although we can't contact it. In the same way, B will consider that A is dead, since it does not receive any message from it.

We must be able to break the outgoing connection if we detect that something is wrong, in order to make the bi-directional connection state consistent and report that node as unreachable. That can be done if we start receiving pings from a node that we consider that it is alive. After some pings, we just kill the outgoing connection, thus creating a consistent state.

Breaking this connection should only occur if the node has already booted, meaning that the whole joining process is complete and the node is up and running. This is due to the fact that we receive pings as part of the process of joining a group.

Parameters
sitecurrent site definitions
pma possible ping message:
has_client_already_bootedcheck if this node has already booted
current_timecurrent XCom time
Returns
int 1 if the node connection is closed. 0, otherwise.

◆ prep_majority()

static int prep_majority ( site_def const *  site,
pax_machine p 
)
static

◆ prepare()

static void prepare ( pax_msg *  p,
pax_op  op 
)
static

◆ prepare_msg()

static int prepare_msg ( pax_msg *  p)
static

◆ prepare_push_2p()

void prepare_push_2p ( site_def const *  site,
pax_machine p 
)

Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_).

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance

◆ prepare_push_3p()

void prepare_push_3p ( site_def const *  site,
pax_machine p,
pax_msg *  msg,
synode_no  msgno,
pax_msg_type  msg_type 
)

Initializes the message msg to go through a 3-phase, regular Paxos.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
msgMessage to send
msgnoSynode where msg will be proposed
msg_typeThe type of the message, e.g. normal or no_op

◆ prop_majority()

static int prop_majority ( site_def const *  site,
pax_machine p 
)
static

◆ propose_missing_values()

static void propose_missing_values ( int  n)
static

◆ propose_msg()

static int propose_msg ( pax_msg *  p)
static

◆ propose_noop()

static void propose_noop ( synode_no  find,
pax_machine p 
)
static

◆ proposer_task()

static int proposer_task ( task_arg  arg)
static

◆ push_msg_2p()

static void push_msg_2p ( site_def const *  site,
pax_machine p 
)
static

◆ push_msg_3p()

static void push_msg_3p ( site_def const *  site,
pax_machine p,
pax_msg *  msg,
synode_no  msgno,
pax_msg_type  msg_type 
)
static

◆ read_missing_values()

static void read_missing_values ( int  n)
static

◆ recently_active()

static int recently_active ( pax_machine p)
static

◆ reconfigurable_event_horizon()

static bool_t reconfigurable_event_horizon ( xcom_proto  protocol_version)
static

◆ reply_handler_task()

int reply_handler_task ( task_arg  arg)

◆ reply_push_log()

static void reply_push_log ( synode_no  push,
linkage reply_queue 
)
static

◆ request_values()

void request_values ( synode_no  find,
synode_no  end 
)

◆ reset_snapshot_mask()

static void reset_snapshot_mask ( )
static

◆ safe_app_data_copy()

bool_t safe_app_data_copy ( pax_msg **  target,
app_data_ptr  source 
)

Copies app data source into target and checks if the copy succeeded.

Sets *target to NULL if the copy fails.

Parameters
[in,out]targetThe pax_msg to which the app_data will be copied.
sourceThe app data that will be copied.
Return values
TRUEif the copy was successful.
FALSEif the copy failed, in which case *target is set to NULL; a failed copy means that there was an error allocating memory for the copy.

◆ send_learn_msg()

static int send_learn_msg ( site_def const *  site,
pax_msg *  p 
)
static

◆ send_need_boot()

static void send_need_boot ( )
static

◆ send_propose_msg()

static int send_propose_msg ( pax_msg *  p)
static

◆ send_read()

static void send_read ( synode_no  find)
static

◆ send_tiny_learn_msg()

static int send_tiny_learn_msg ( site_def const *  site,
pax_msg *  p 
)
static

◆ send_value()

static void send_value ( site_def const *  site,
node_no  to,
synode_no  synode 
)
static

◆ send_x_fsm_complete()

void send_x_fsm_complete ( )

◆ server_handle_need_snapshot()

static void server_handle_need_snapshot ( server srv,
site_def const *  s,
node_no  node 
)
static

◆ server_push_log()

static void server_push_log ( server srv,
synode_no  push,
node_no  node 
)
static

◆ server_send_snapshot()

static void server_send_snapshot ( server srv,
site_def const *  s,
gcs_snapshot *  gcs_snap,
node_no  node 
)
static

◆ set_app_snap_getter()

void set_app_snap_getter ( app_snap_getter  x)

◆ set_app_snap_handler()

void set_app_snap_handler ( app_snap_handler  x)

◆ set_current_message()

synode_no set_current_message ( synode_no  msgno)

◆ set_executed_msg()

synode_no set_executed_msg ( synode_no  msgno)

◆ set_group()

void set_group ( uint32_t  id)

Set node group.

◆ set_last_received_config()

void set_last_received_config ( synode_no  received_config_change)

◆ set_learn_type()

static void set_learn_type ( pax_msg *  p)
static

◆ set_log_end()

void set_log_end ( gcs_snapshot *  gcs)

◆ set_max_synode()

void set_max_synode ( synode_no  synode)

◆ set_proposer_startpoint()

static void set_proposer_startpoint ( )
static

◆ set_unique_id()

static void set_unique_id ( pax_msg *  msg,
synode_no  synode 
)
static

◆ set_xcom_comms_cb()

void set_xcom_comms_cb ( xcom_state_change_cb  x)

◆ set_xcom_exit_cb()

void set_xcom_exit_cb ( xcom_state_change_cb  x)

◆ set_xcom_expel_cb()

void set_xcom_expel_cb ( xcom_state_change_cb  x)

◆ set_xcom_input_try_pop_cb()

void set_xcom_input_try_pop_cb ( xcom_input_try_pop_cb  pop)

◆ set_xcom_recovery_begin_cb()

void set_xcom_recovery_begin_cb ( xcom_recovery_cb  x)

◆ set_xcom_recovery_end_cb()

void set_xcom_recovery_end_cb ( xcom_recovery_cb  x)

◆ set_xcom_recovery_init_cb()

void set_xcom_recovery_init_cb ( xcom_recovery_cb  x)

◆ set_xcom_recovery_restart_cb()

void set_xcom_recovery_restart_cb ( xcom_recovery_cb  x)

◆ set_xcom_run_cb()

void set_xcom_run_cb ( xcom_state_change_cb  x)

◆ set_xcom_terminate_cb()

void set_xcom_terminate_cb ( xcom_state_change_cb  x)

◆ setup_exit_handling()

static void setup_exit_handling ( execute_context xc,
site_def site 
)
static

◆ should_handle_need_boot()

bool_t should_handle_need_boot ( site_def const *  site,
pax_msg *  p 
)
Returns
true if we should process the incoming need_boot_op message passed in parameter p.

◆ should_ignore_forced_config_or_view()

static bool constexpr should_ignore_forced_config_or_view ( xcom_proto  protocol_version)
static

◆ should_poll_cache()

static bool_t should_poll_cache ( pax_op  op)
static

◆ site_install_action()

void site_install_action ( site_def site,
cargo_type  operation 
)

◆ skip_msg()

static int skip_msg ( pax_msg *  p)
static

◆ skip_value()

static void skip_value ( pax_msg *  p)
static

◆ socket_read()

static result socket_read ( connection_descriptor rfd,
void *  buf,
int  n 
)
static

◆ socket_read_bytes()

static int64_t socket_read_bytes ( connection_descriptor rfd,
char *  p,
uint32_t  n 
)
static

◆ socket_read_msg()

static pax_msg* socket_read_msg ( connection_descriptor rfd,
pax_msg *  p 
)
static

◆ socket_write()

static int64_t socket_write ( connection_descriptor wfd,
void *  _buf,
uint32_t  n 
)
static

◆ start_force_config()

static void start_force_config ( site_def s,
int  enforcer 
)
static

◆ start_x_timer()

static void start_x_timer ( double  t)
static

◆ started()

static int started ( pax_machine p)
inlinestatic

◆ stop_x_timer()

static void stop_x_timer ( )
static

◆ sweeper_task()

static int sweeper_task ( task_arg  arg)
static

◆ teach_ignorant_node()

static void teach_ignorant_node ( site_def const *  site,
pax_machine p,
pax_msg *  pm,
synode_no  synode,
linkage reply_queue 
)
static

◆ terminate_and_exit()

void terminate_and_exit ( )

◆ terminate_proposers()

static void terminate_proposers ( )
static

◆ timed_connect()

static int timed_connect ( int  fd,
struct sockaddr sock_addr,
socklen_t  sock_size 
)
static

◆ timed_connect_msec()

static int timed_connect_msec ( int  fd,
struct sockaddr sock_addr,
socklen_t  sock_size,
int  timeout 
)
static

◆ timed_connect_sec()

int timed_connect_sec ( int  fd,
struct sockaddr sock_addr,
socklen_t  sock_size,
int  timeout 
)

◆ too_far()

static int too_far ( synode_no  s)
inlinestatic

◆ too_far_threshold()

static uint64_t too_far_threshold ( xcom_event_horizon  active_event_horizon)
static

Checks if the given synod s is outside the event horizon.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case threshold is:

last_executed_synod + event_horizon(active_config)

If an event horizon reconfiguration R is pending, it is possible that it reduces the event horizon. In that case, it is possible that the threshold above falls outside the new event horizon.

For example, consider last_executed_synod = 42 and event_horizon(active_config) = 10. At this point this member participates in synods up to 52. Now consider an event horizon reconfiguration that takes effect at synod 45, which modifies the event horizon to 2. This means that when last_executed_synod = 45, event_horizon(active_config) = 2. At this point this member should only be able to participate in synods up to 47. The member may have previously started processing messages directed to synods between 47 and 52, but will now ignore messages directed to those same synods.

We do not want to start processing messages that will eventually fall out of the event horizon. More importantly, the threshold above may not be safe due to the exit logic of executor_task.

When a node removes itself from the group on configuration C starting at synod start(C), the exit logic relies on knowing when a majority has executed synod start(C) - 1, i.e. the last message of the last configuration to contain the leaving node.

With a constant event horizon, we know that when synod start(C) + event_horizon is learnt, it is because a majority already executed or is ready to execute (and thus learned) synod start(C). This implies that a majority already executed start(C) - 1.

With a dynamic event horizon, we cannot be sure that when synod start(C) + event_horizon(C) is learnt, a majority already executed or is ready to execute synod start(C). This is because it is possible for a new, smaller, event horizon to take effect between start(C) and start(C) + event_horizon(C). If that happens, the threshold above allows nodes to participate in synods which are possibly beyond start(C) + event_horizon(C), which can lead to the value of synod start(C) + event_horizon(C) being learnt without a majority already having executed or being ready to execute synod start(C).

In order to maintain the assumption made by the executor_task's exit logic, when an event horizon reconfiguration R is pending we set the threshold to the minimum between:

last_executed_synod + event_horizon(active_config)

and:

start(R) - 1 + event_horizon(R)

◆ too_far_threshold_new_event_horizon_pending()

static uint64_t too_far_threshold_new_event_horizon_pending ( site_def const *  new_config)
static

◆ unsafe_against_event_horizon()

static bool_t unsafe_against_event_horizon ( node_address const *  node)
static

Check if a node is compatible with the group's event horizon.

A node is compatible with the group's configuration if:

a) The node supports event horizon reconfigurations, or b) The group's event horizon is, or is scheduled to be, the default event horizon.

◆ unsafe_event_horizon_reconfiguration()

static bool_t unsafe_event_horizon_reconfiguration ( app_data_ptr  a)
static

◆ update_best_snapshot()

static void update_best_snapshot ( gcs_snapshot *  gcs)
static

◆ update_max_synode()

static void update_max_synode ( pax_msg *  p)
static

◆ update_srv()

static void update_srv ( server **  target,
server srv 
)
inlinestatic

◆ wait_for_cache()

static int wait_for_cache ( pax_machine **  pm,
synode_no  synode,
double  timeout 
)
static

◆ wakeup_delay()

static double wakeup_delay ( double  old)
static

◆ warn_protoversion_mismatch()

void warn_protoversion_mismatch ( connection_descriptor rfd)

◆ x_check_execute_inform()

static int x_check_execute_inform ( execute_context xc)
static

◆ x_check_exit()

static int x_check_exit ( execute_context xc)
static

◆ x_check_increment_execute()

static void x_check_increment_execute ( execute_context xc)
static

◆ x_check_increment_fetch()

static void x_check_increment_fetch ( execute_context xc)
static

◆ x_execute()

static void x_execute ( execute_context xc)
static

◆ x_fetch()

static void x_fetch ( execute_context xc)
static

◆ x_fsm_completion_task()

static int x_fsm_completion_task ( task_arg  arg)
static

◆ x_terminate()

static void x_terminate ( execute_context xc)
static

◆ xcom_client_add_node()

int xcom_client_add_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_boot()

int xcom_client_boot ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_convert_into_local_server()

int xcom_client_convert_into_local_server ( connection_descriptor *const  fd)

◆ xcom_client_disable_arbitrator()

int xcom_client_disable_arbitrator ( connection_descriptor fd)

◆ xcom_client_enable_arbitrator()

int xcom_client_enable_arbitrator ( connection_descriptor fd)

◆ xcom_client_force_config()

int xcom_client_force_config ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_get_event_horizon()

int xcom_client_get_event_horizon ( connection_descriptor fd,
uint32_t  group_id,
xcom_event_horizon *  event_horizon 
)

◆ xcom_client_get_synode_app_data()

int xcom_client_get_synode_app_data ( connection_descriptor *const  fd,
uint32_t  group_id,
synode_no_array *const  synodes,
synode_app_data_array *const  reply 
)

◆ xcom_client_remove_node()

int xcom_client_remove_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_send_data()

int64_t xcom_client_send_data ( uint32_t  size,
char *  data,
connection_descriptor fd 
)

◆ xcom_client_send_die()

int64_t xcom_client_send_die ( connection_descriptor fd)

◆ xcom_client_set_cache_limit()

int xcom_client_set_cache_limit ( connection_descriptor fd,
uint64_t  cache_limit 
)

◆ xcom_client_set_event_horizon()

int xcom_client_set_event_horizon ( connection_descriptor fd,
uint32_t  group_id,
xcom_event_horizon  event_horizon 
)

◆ xcom_client_terminate_and_exit()

int xcom_client_terminate_and_exit ( connection_descriptor fd)

◆ xcom_close_client_connection()

int xcom_close_client_connection ( connection_descriptor connection)

◆ xcom_close_socket()

static result xcom_close_socket ( int *  sock)
inlinestatic

◆ xcom_fsm()

char const* xcom_fsm ( xcom_actions  action,
task_arg  fsmargs 
)

◆ xcom_fsm_impl()

xcom_fsm_state* xcom_fsm_impl ( xcom_actions  action,
task_arg  fsmargs 
)

◆ xcom_fsm_init()

static int xcom_fsm_init ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_recover_wait()

static int xcom_fsm_recover_wait ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_recover_wait_enter()

static int xcom_fsm_recover_wait_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_run()

static int xcom_fsm_run ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_run_enter()

static int xcom_fsm_run_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_snapshot_wait()

static int xcom_fsm_snapshot_wait ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_snapshot_wait_enter()

static int xcom_fsm_snapshot_wait_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_start()

static int xcom_fsm_start ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_start_enter()

static int xcom_fsm_start_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_get_event_horizon()

static client_reply_code xcom_get_event_horizon ( xcom_event_horizon *  event_horizon)
static

Retrieves the latest event horizon.

There is no specific reason for this method to return the latest event horizon instead of the current one. Both would be acceptable results of this function, but we had to make a decision of one over the other.

Parameters
[out]event_horizonthe latest event horizon
Return values
REQUEST_FAILXCom is not initialized yet
REQUEST_OKfunction was successful and event_horizon contains the latest event horizon

◆ xcom_get_maximum_event_horizon()

xcom_event_horizon xcom_get_maximum_event_horizon ( )

◆ xcom_get_minimum_event_horizon()

xcom_event_horizon xcom_get_minimum_event_horizon ( )

◆ xcom_input_free_signal_connection()

void xcom_input_free_signal_connection ( void  )

◆ xcom_input_new_signal_connection()

bool_t xcom_input_new_signal_connection ( char const *  address,
xcom_port  port 
)

◆ xcom_input_signal()

bool_t xcom_input_signal ( void  )

◆ xcom_input_signal_connection_shutdown_ssl()

static bool_t xcom_input_signal_connection_shutdown_ssl ( )
static

◆ xcom_input_signal_connection_shutdown_ssl_wait_for_peer()

static bool_t xcom_input_signal_connection_shutdown_ssl_wait_for_peer ( )
static

◆ xcom_open_client_connection()

connection_descriptor* xcom_open_client_connection ( char const *  server,
xcom_port  port 
)

◆ xcom_recv_proto()

static int xcom_recv_proto ( connection_descriptor rfd,
xcom_proto *  x_proto,
x_msg_type x_type,
unsigned int *  tag 
)
static

◆ xcom_send()

void xcom_send ( app_data_ptr  a,
pax_msg *  msg 
)

◆ xcom_send_app_wait()

int xcom_send_app_wait ( connection_descriptor fd,
app_data *  a,
int  force 
)

◆ xcom_send_app_wait_and_get()

static xcom_send_app_wait_result xcom_send_app_wait_and_get ( connection_descriptor fd,
app_data *  a,
int  force,
pax_msg *  p 
)
static

Send a message and wait for response.

The caller is reponsible for freeing p after calling this function, i.e. xdr_free((xdrproc_t)xdr_pax_msg, (char *)p)

◆ xcom_send_cfg_wait()

int xcom_send_cfg_wait ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id,
cargo_type  ct,
int  force 
)

◆ xcom_send_client_app_data()

int64_t xcom_send_client_app_data ( connection_descriptor fd,
app_data_ptr  a,
int  force 
)

◆ xcom_send_proto()

static int xcom_send_proto ( connection_descriptor con,
xcom_proto  x_proto,
x_msg_type  x_type,
unsigned int  tag 
)
static

◆ xcom_shut_close_socket()

static result xcom_shut_close_socket ( int *  sock)
inlinestatic

◆ xcom_sleep()

static void xcom_sleep ( unsigned int  seconds)
inlinestatic

◆ xcom_taskmain2()

int xcom_taskmain2 ( xcom_port  listen_port)

◆ xcom_thread_deinit()

void xcom_thread_deinit ( )

◆ xcom_thread_init()

void xcom_thread_init ( )

◆ xcom_timer()

static int xcom_timer ( task_arg  arg)
static

◆ xcom_unique_long()

long xcom_unique_long ( void  )

Variable Documentation

◆ alive_t

task_env* alive_t = NULL
static

◆ ARBITRATOR_HACK

int ARBITRATOR_HACK = 0

◆ boot

task_env* boot = NULL

◆ cache_task

task_env* cache_task = NULL
static

◆ clicnt

int clicnt = 0
static

◆ client_boot_done

int client_boot_done

◆ current_message

synode_no current_message
static

◆ dead_sites

struct { ... } dead_sites

◆ debug_xc

execute_context* debug_xc
static

◆ delay_fifo

struct { ... } delay_fifo

◆ delivered_msg

synode_no delivered_msg = NULL_SYNODE
static

◆ detector

task_env* detector = NULL

◆ detector_wait

linkage detector_wait
Initial value:
linkage detector_wait
Definition: xcom_base.cc:438

◆ exec_wait

linkage exec_wait
static
Initial value:
= {
static linkage exec_wait
Definition: xcom_base.cc:435

◆ executed_msg

synode_no executed_msg

◆ executor

task_env* executor = NULL
static

◆ executor_site

site_def* executor_site = 0
static

◆ first_event_horizon_aware_protocol

xcom_proto const first_event_horizon_aware_protocol = x_1_4
static

◆ first_protocol_that_ignores_intermediate_forced_configs_or_views

xcom_proto constexpr first_protocol_that_ignores_intermediate_forced_configs_or_views
static
Initial value:
=
x_1_8

◆ forced_config

site_def* forced_config = 0
static

◆ front

int front

◆ get_app_snap_cb

app_snap_getter get_app_snap_cb
static

◆ handle_app_snap_cb

app_snap_handler handle_app_snap_cb
static

◆ id

unsigned long id[MAX_DEAD]

◆ input_signal_connection

connection_descriptor* input_signal_connection = NULL
static

◆ killer

task_env* killer = NULL

◆ last_config_modification_id

synode_no last_config_modification_id
static

◆ last_delivered_msg

synode_no last_delivered_msg = NULL_SYNODE
static

◆ log_end_max

synode_no log_end_max
static

◆ log_start_max

synode_no log_start_max
static

◆ lsn

uint64_t lsn = 0
static

◆ max_synode

synode_no max_synode

◆ my_id

uint32_t my_id = 0
static

◆ n

int n

◆ net_boot

task_env* net_boot = NULL

◆ net_recover

task_env* net_recover = NULL

◆ netboot_ok

int netboot_ok

◆ no_duplicate_payload

int const no_duplicate_payload = 1
static

◆ oblist

struct fp_name oblist[]
static
Initial value:
= {
static void x_execute(execute_context *xc)
Definition: xcom_base.cc:3262
static void x_terminate(execute_context *xc)
Definition: xcom_base.cc:3320
#define NAME(f)
Definition: xcom_base.cc:3087
static void x_fetch(execute_context *xc)
Definition: xcom_base.cc:3165

◆ oom_abort

unsigned short oom_abort = 0
static

◆ prop_finished

int prop_finished = 0
static

◆ prop_input_queue

channel prop_input_queue
static

◆ prop_started

int prop_started = 0
static

◆ proposer

task_env* proposer[PROPOSERS]
static

◆ proposer_site

site_def* proposer_site = 0
static

◆ protoversion_warning_time

double protoversion_warning_time
static
Initial value:
=
0.0

◆ q

synode_no q[FIFO_SIZE]

◆ rear

int rear

◆ recovery_begin_cb

xcom_recovery_cb recovery_begin_cb = NULL
static

◆ recovery_end_cb

xcom_recovery_cb recovery_end_cb = NULL
static

◆ recovery_init_cb

xcom_recovery_cb recovery_init_cb = NULL
static

◆ recovery_restart_cb

xcom_recovery_cb recovery_restart_cb = NULL
static

◆ retry

task_env* retry = NULL
static

◆ sent_alive

double sent_alive = 0.0
static

◆ snapshots

int snapshots[NSERVERS]
static

◆ start_config

synode_no start_config = NULL_SYNODE
static

◆ sweeper

task_env* sweeper = NULL
static

◆ threephase

int const threephase = 0

Timestamp of previous protoversion warning.

◆ use_buffered_read

int use_buffered_read = 1
static

◆ wait_forced_config

int wait_forced_config = 0
static

◆ x_timer

task_env* x_timer = NULL
static

◆ xcom_actions_name

const char* xcom_actions_name[] = {x_actions}

◆ xcom_comms_cb

xcom_state_change_cb xcom_comms_cb = 0
static

◆ xcom_dbg_stack

long xcom_dbg_stack[DBG_STACK_SIZE]

◆ xcom_dbg_stack_top

int xcom_dbg_stack_top = 0

◆ xcom_debug_mask

long xcom_debug_mask
Initial value:
=
Definition: gcs_debug.h:176
Definition: gcs_debug.h:177

◆ xcom_exit_cb

xcom_state_change_cb xcom_exit_cb = 0
static

◆ xcom_expel_cb

xcom_state_change_cb xcom_expel_cb = 0
static

◆ xcom_run_cb

xcom_state_change_cb xcom_run_cb = 0
static

◆ xcom_shutdown

int xcom_shutdown = 0

◆ xcom_terminate_cb

xcom_state_change_cb xcom_terminate_cb = 0
static

◆ xcom_thread_input

void* xcom_thread_input = 0

◆ xcom_try_pop_from_input_cb

xcom_input_try_pop_cb xcom_try_pop_from_input_cb = NULL
static