MySQL 8.4.2
Source Code Documentation
xcom_base.cc File Reference

xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision. More...

#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <poll.h>
#include "xcom/xcom_profile.h"
#include "my_compiler.h"
#include "xcom/x_platform.h"
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sockio.h>
#include <memory>
#include "xcom/app_data.h"
#include "xcom/get_synode_app_data.h"
#include "xcom/node_no.h"
#include "xcom/server_struct.h"
#include "xcom/simset.h"
#include "xcom/site_struct.h"
#include "xcom/task.h"
#include "xcom/task_net.h"
#include "xcom/task_os.h"
#include "xcom/xcom_base.h"
#include "xcom/xcom_common.h"
#include "xcom/xcom_detector.h"
#include "xcom/xcom_transport.h"
#include "xcom/xdr_utils.h"
#include "xdr_gen/xcom_vp.h"
#include "xcom/bitset.h"
#include "xcom/leader_info_data.h"
#include "xcom/node_list.h"
#include "xcom/node_set.h"
#include "xcom/pax_msg.h"
#include "xcom/site_def.h"
#include "xcom/sock_probe.h"
#include "xcom/synode_no.h"
#include "xcom/task_debug.h"
#include "xcom/xcom_cache.h"
#include "xcom/xcom_cfg.h"
#include "xcom/xcom_interface.h"
#include "xcom/xcom_memory.h"
#include "xcom/xcom_msg_queue.h"
#include "xcom/xcom_recover.h"
#include "xcom/xcom_statistics.h"
#include "xcom/xcom_vp_str.h"
#include "xcom/network/xcom_network_provider.h"
#include <openssl/ssl.h>
#include <chrono>
#include <future>
#include <queue>
#include <tuple>
#include "xcom/retry.h"
#include <sys/utsname.h>
#include <netinet/in.h>

Classes

struct  synode_pool
 
struct  execute_context
 
struct  fp_name
 
struct  xcom_fsm_state
 

Macros

#define __STDC_FORMAT_MACROS
 
#define SYS_STRERROR_SIZE   512
 
#define XCOM_SEND_APP_WAIT_TIMEOUT   20
 
#define PROTOVERSION_WARNING_TIMEOUT   600.0 /** Every 10 minutes */
 
#define IS_CONS_ALL(p)    ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)
 
#define PROP_ITER
 
#define FNVSTART   0x811c9dc5
 
#define GOTO(x)
 
#define FIFO_SIZE   1000
 
#define NAME(f)    { f, #f }
 
#define reply_msg(m)
 
#define CREATE_REPLY(x)
 
#define SEND_REPLY
 
#define PING_GATHERING_TIME_WINDOW   5.0
 
#define PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN   3
 
#define BAL_FMT   "ballot {cnt %d node %d}"
 
#define BAL_MEM(x)   (x).cnt, (x).node
 
#define SERIALIZE_REPLY(msg)
 
#define WRITE_REPLY
 
#define X(b)   #b
 
#define X_FSM_STATE(s)    { s, #s }
 
#define SET_X_FSM_STATE(s)
 
#define CONNECT_FAIL
 
#define TICK_PERIOD   0.01
 
#define X(b)   #b
 

Typedefs

typedef bool(* unsafe_node_check) (node_address const *node)
 
typedef enum allow_event_horizon_result allow_event_horizon_result
 
typedef struct execute_context execute_context
 
typedef void(* exec_fp) (execute_context *xc)
 
typedef struct xcom_fsm_state xcom_fsm_state
 
typedef int(* xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
typedef enum xcom_send_app_wait_result xcom_send_app_wait_result
 
typedef void(* paxos_state_action) (pax_machine *paxos, site_def const *site, pax_msg *mess)
 

Enumerations

enum class  synode_allocation_type { todo = 0 , local , remote , global }
 
enum class  synode_reservation_status : int { number_ok , no_nodes , delivery_timeout }
 
enum  allow_event_horizon_result { EVENT_HORIZON_ALLOWED , EVENT_HORIZON_INVALID , EVENT_HORIZON_UNCHANGEABLE }
 Check if we can reconfigure the event horizon. More...
 
enum  { TAG_START = 313 }
 
enum  xcom_send_app_wait_result {
  SEND_REQUEST_FAILED = 0 , RECEIVE_REQUEST_FAILED , REQUEST_BOTCHED , RETRIES_EXCEEDED ,
  REQUEST_OK_RECEIVED , REQUEST_FAIL_RECEIVED , REQUEST_OK_REDIRECT
}
 
enum  { paxos_timer_range = 1000 }
 

Functions

long xcom_unique_long (void)
 
static int64_t socket_write (connection_descriptor *wfd, void *_buf, uint32_t n, connnection_write_method write_function=con_write)
 
static double wakeup_delay (double old)
 
static void note_snapshot (node_no node)
 
static int proposer_task (task_arg arg)
 
static int executor_task (task_arg arg)
 
static int sweeper_task (task_arg arg)
 
int alive_task (task_arg arg)
 
int cache_manager_task (task_arg arg)
 
int detector_task (task_arg arg)
 
static int finished (pax_machine *p)
 
static int accepted (pax_machine *p)
 
static int started (pax_machine *p)
 
static synode_no first_free_synode_local (synode_no msgno)
 
static void free_forced_config_site_def ()
 
static void activate_sweeper ()
 
static void force_pax_machine (pax_machine *p, int enforcer)
 
static void propose_noop_2p (synode_no find, pax_machine *p)
 
static void handle_need_snapshot (linkage *reply_queue, pax_msg *pm)
 
static void handle_skip (site_def const *site, pax_machine *p, pax_msg *m)
 
static void paxos_fsm (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static bool is_leader (site_def *site)
 
int is_active_leader (node_no x, site_def *site)
 
static msg_handlerprimary_dispatch_table ()
 
static msg_handlersecondary_dispatch_table ()
 
static void recompute_node_sets (site_def const *old_site, site_def *new_site)
 
void recompute_timestamps (detector_state const old_timestamp, node_list const *old_nodes, detector_state new_timestamp, node_list const *new_nodes)
 
void analyze_leaders (site_def *site)
 
static int ignore_message (synode_no x, site_def *site, char const *dbg)
 
static void init_proposers ()
 
void initialize_lsn (uint64_t n)
 
void init_base_vars ()
 
uint32_t get_my_xcom_id ()
 
synode_no get_current_message ()
 
synode_no get_max_synode ()
 
static bool_t is_latest_config (site_def const *const config)
 
static site_def const * first_event_horizon_reconfig ()
 Get the first pending configuration that reconfigures the event horizon. More...
 
static site_def const * latest_event_horizon_reconfig ()
 Get the latest pending configuration that reconfigures the event horizon. More...
 
static synode_no add_event_horizon (synode_no s)
 Add the event horizon to the given base synod s. More...
 
void set_group (uint32_t id)
 Set node group. More...
 
static void bury_site (uint32_t id)
 
static bool_t is_dead_site (uint32_t id)
 
node_set * init_node_set (node_set *set, u_int n)
 
node_set * alloc_node_set (node_set *set, u_int n)
 
static synode_no incr_msgno (synode_no msgno)
 
synode_no incr_synode (synode_no synode)
 
static void skip_value (pax_msg *p)
 
static int ignoresig (int signum)
 
static int recently_active (pax_machine *p)
 
int pm_finished (pax_machine *p)
 
static int accepted_noop (pax_machine *p)
 
static int noop_match (pax_machine *p, pax_msg *pm)
 
void set_last_received_config (synode_no received_config_change)
 
static node_no max_check (site_def const *site)
 
static int is_forcing_node (pax_machine const *p)
 
static int majority (bit_set const *nodeset, site_def const *s, int all, int delay, int force)
 
static int prep_majority (site_def const *site, pax_machine const *p)
 
static int prop_majority (site_def const *site, pax_machine const *p)
 
site_def const * get_executor_site ()
 
site_defget_executor_site_rw ()
 
site_def const * get_proposer_site ()
 
synode_no get_delivered_msg ()
 
synode_no get_last_delivered_msg ()
 
void init_xcom_base ()
 
static void init_tasks ()
 
void xcom_thread_init ()
 
static void empty_prop_input_queue ()
 
static void empty_synode_number_pool ()
 
void xcom_thread_deinit ()
 
static void create_proposers ()
 
static void add_proposer_synode (int i, synode_no *syn_ptr)
 
static void remove_proposer_synode (int i)
 
static synode_no get_proposer_synode (int i)
 
static synode_no min_proposer_synode ()
 
static void terminate_proposers ()
 
static void set_proposer_startpoint ()
 
void set_xcom_run_cb (xcom_state_change_cb x)
 
void set_xcom_exit_cb (xcom_state_change_cb x)
 
void set_xcom_comms_cb (xcom_state_change_cb x)
 
void set_xcom_expel_cb (xcom_state_change_cb x)
 
void set_xcom_input_try_pop_cb (xcom_input_try_pop_cb pop)
 
static bool_t xcom_input_signal_connection_shutdown_ssl_wait_for_peer ()
 
static bool_t xcom_input_signal_connection_shutdown_ssl ()
 
bool_t xcom_input_new_signal_connection (char const *address, xcom_port port)
 
bool_t xcom_input_signal ()
 
void xcom_input_free_signal_connection ()
 
static int local_server_shutdown_ssl (connection_descriptor *con, void *buf, int n, int *ret)
 
int local_server (task_arg arg)
 
static bool_t local_server_is_setup ()
 
static void init_time_queue ()
 
static int paxos_timer_task (task_arg arg)
 
int xcom_taskmain2 (xcom_port listen_port)
 
static void prepare (pax_msg *p, pax_op op)
 
void init_prepare_msg (pax_msg *p)
 Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static int prepare_msg (pax_msg *p)
 
pax_msgcreate_noop (pax_msg *p)
 Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static pax_msgcreate_read (site_def const *site, pax_msg *p)
 
static int skip_msg (pax_msg *p)
 
static void brand_app_data (pax_msg *p)
 
static synode_no my_unique_id (synode_no synode)
 
static void set_unique_id (pax_msg *msg, synode_no synode)
 
void init_propose_msg (pax_msg *p)
 Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static int send_propose_msg (pax_msg *p)
 
static int propose_msg (pax_msg *p)
 
static void set_learn_type (pax_msg *p)
 
static void init_learn_msg (pax_msg *p)
 
static int send_learn_msg (site_def const *site, pax_msg *p)
 
static pax_msgcreate_tiny_learn_msg (pax_machine *pm, pax_msg *p)
 
static int send_tiny_learn_msg (site_def const *site, pax_msg *p)
 
void prepare_push_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 Initializes the message msg to go through a 3-phase, regular Paxos. More...
 
void prepare_push_2p (site_def const *site, pax_machine *p)
 Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_). More...
 
static void push_msg_2p (site_def const *site, pax_machine *p)
 
static void push_msg_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
 
static void brand_client_msg (pax_msg *msg, synode_no msgno)
 
void xcom_send (app_data_ptr a, pax_msg *msg)
 
static uint32_t fnv_hash (unsigned char *buf, size_t length, uint32_t sum)
 
uint32_t new_id ()
 Create a new (hopefully unique) ID. More...
 
static synode_no getstart (app_data_ptr a)
 
void site_install_action (site_def *site, cargo_type operation)
 
static void active_leaders (site_def *site, leader_array *leaders)
 
void synthesize_leaders (leader_array *leaders)
 
static bool leaders_set_by_client (site_def const *site)
 
static site_defcreate_site_def_with_start (app_data_ptr a, synode_no start)
 
static site_definstall_ng_with_start (app_data_ptr a, synode_no start)
 
site_definstall_node_group (app_data_ptr a)
 
void set_max_synode (synode_no synode)
 
static int is_busy (synode_no s)
 
bool_t match_my_msg (pax_msg *learned, pax_msg *mine)
 
static uint64_t assign_lsn ()
 Assign the next log sequence number (lsn) for a message. More...
 
static void propose_noop (synode_no find, pax_machine *p)
 
static uint64_t too_far_threshold (xcom_event_horizon active_event_horizon)
 Checks if the given synod s is outside the event horizon. More...
 
static uint64_t too_far_threshold_new_event_horizon_pending (site_def const *new_config)
 
static int too_far (synode_no s)
 
static int is_view (cargo_type x)
 
static int is_config (cargo_type x)
 
static int wait_for_cache (pax_machine **pm, synode_no synode, double timeout)
 
static synode_no local_synode_allocator (synode_no synode)
 
static synode_no global_synode_allocator (site_def *site, synode_no synode)
 
static node_no remote_synode_allocator (site_def *site, app_data const &a)
 
static int reserve_synode_number (synode_allocation_type *synode_allocation, site_def **site, synode_no *msgno, int *remote_retry, app_data *a, synode_reservation_status *ret)
 
static constexpr bool should_ignore_forced_config_or_view (xcom_proto protocol_version)
 
static node_no get_leader (site_def const *s)
 
int iamthegreatest (site_def const *s)
 
static site_defupdate_site (site_def *site, node_set const *ns, synode_no boot_key, synode_no start)
 
void execute_msg (site_def *site, pax_machine *pma, pax_msg *p)
 
static void read_missing_values (int n)
 
static void propose_missing_values (int n)
 
static void find_value (site_def const *site, unsigned int *wait, int n)
 
static void dump_debug_exec_state ()
 
int get_xcom_message (pax_machine **p, synode_no msgno, int n)
 
synode_no set_executed_msg (synode_no msgno)
 
synode_no set_current_message (synode_no msgno)
 
static void update_max_synode (pax_msg *p)
 
static int match_leader (char const *addr, leader_array const leaders)
 
static bool alive_node (site_def const *site, u_int i)
 
node_no found_active_leaders (site_def *site)
 
static void debug_loser (synode_no x)
 
static void send_value (site_def const *site, node_no to, synode_no synode)
 
static synode_no compute_delay (synode_no start, xcom_event_horizon event_horizon)
 Returns the message number where it is safe for nodes in previous configuration to exit. More...
 
static void inform_removed (int index, int all)
 
static bool_t backwards_compatible (xcom_event_horizon event_horizon)
 
static bool_t reconfigurable_event_horizon (xcom_proto protocol_version)
 
static bool_t add_node_unsafe_against_ipv4_old_nodes (app_data_ptr a)
 
static bool_t add_node_adding_own_address (app_data_ptr a)
 This will test if we are receiving a boot request that contains ourselves. More...
 
static bool unsafe_against_event_horizon (node_address const *node)
 Check if a node is compatible with the group's event horizon. More...
 
static bool check_if_add_node_is_unsafe (app_data_ptr a, unsafe_node_check unsafe)
 
static bool check_if_add_node_is_unsafe_against_event_horizon (app_data_ptr a)
 
void recompute_node_set (node_set const *old_set, node_list const *old_nodes, node_set *new_set, node_list const *new_nodes)
 
static bool incompatible_proto_and_max_leaders (xcom_proto x_proto, node_no max_leaders)
 
static bool incompatible_proto_and_leaders (xcom_proto x_proto)
 
static bool incompatible_proto_and_max_leaders (node_address const *node)
 
static bool incompatible_proto_and_leaders (node_address const *node)
 
bool unsafe_leaders (app_data *a)
 
static void set_start_and_boot (site_def *new_config, app_data_ptr a)
 
site_defhandle_add_node (app_data_ptr a)
 Reconfigure the group membership: add new member(s). More...
 
static void log_event_horizon_reconfiguration_failure (allow_event_horizon_result error_code, xcom_event_horizon attempted_event_horizon)
 
static allow_event_horizon_result allow_event_horizon (xcom_event_horizon event_horizon)
 
static bool_t is_unsafe_event_horizon_reconfiguration (app_data_ptr a)
 
static bool_t is_unsafe_max_leaders_reconfiguration (app_data_ptr a)
 
static bool_t is_unsafe_set_leaders_reconfiguration (app_data_ptr a)
 
static bool_t is_unsafe_leaders_reconfiguration (app_data_ptr a)
 
static bool_t are_there_dead_nodes_in_new_config (app_data_ptr a)
 
bool_t handle_event_horizon (app_data_ptr a)
 Reconfigure the event horizon. More...
 
static bool_t handle_max_leaders (site_def *new_config, app_data_ptr a)
 
bool_t handle_max_leaders (app_data_ptr a)
 
static void zero_leader_array (leader_array *l)
 
static void move_leader_array (leader_array *target, leader_array *source)
 
static bool_t handle_set_leaders (site_def *new_config, app_data_ptr a)
 
bool_t handle_set_leaders (app_data_ptr a)
 
bool_t handle_leaders (app_data_ptr a)
 
void terminate_and_exit ()
 
static int is_empty_site (site_def const *s)
 
site_defhandle_remove_node (app_data_ptr a)
 
static void log_ignored_forced_config (app_data_ptr a, char const *const caller_name)
 
bool_t handle_config (app_data_ptr a, bool const forced)
 
static int is_member (site_def const *site)
 
static int addone (int i)
 
static int fifo_empty ()
 
static int fifo_full ()
 
static void fifo_insert (synode_no s)
 
static synode_no fifo_extract ()
 
static synode_no fifo_front ()
 
static void dump_exec_state (execute_context *xc, long dbg)
 
static int x_check_exit (execute_context *xc)
 
static int x_check_execute_inform (execute_context *xc)
 
static void x_fetch (execute_context *xc)
 
static void x_execute (execute_context *xc)
 
static void x_check_increment_fetch (execute_context *xc)
 
static void x_check_increment_execute (execute_context *xc)
 
static void x_terminate (execute_context *xc)
 
static void setup_exit_handling (execute_context *xc, site_def *site)
 
static synode_no get_sweep_start ()
 
static bool allow_channel_takeover (site_def const *site)
 
static void broadcast_noop (synode_no find, pax_machine *p)
 
static site_def const * init_noop (synode_no find, pax_machine *p)
 
static void send_read (synode_no find)
 
static int ok_to_propose (pax_machine *p)
 
bool_t safe_app_data_copy (pax_msg **target, app_data_ptr source)
 Copies app data source into target and checks if the copy succeeded. More...
 
static pax_msgcreate_learn_msg_for_ignorant_node (pax_machine *p, pax_msg *pm, synode_no synode)
 
static void teach_ignorant_node (site_def const *site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
 
static void handle_read (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
static pax_msgcreate_ack_prepare_msg (pax_machine *p, pax_msg *pm, synode_no synode)
 
pax_msghandle_simple_prepare (pax_machine *p, pax_msg *pm, synode_no synode)
 Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol. More...
 
static void handle_prepare (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
 
bool_t check_propose (site_def const *site, pax_machine *p)
 
static bool learn_ok (site_def const *site, pax_machine const *p)
 
static pax_msgcheck_learn (site_def const *site, pax_machine *p)
 
static void do_learn (site_def const *site, pax_machine *p, pax_msg *m)
 
bool_t handle_simple_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol. More...
 
static void handle_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m)
 
static pax_msgcreate_ack_accept_msg (pax_msg *m, synode_no synode)
 
pax_msghandle_simple_accept (pax_machine *p, pax_msg *m, synode_no synode)
 Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol. More...
 
static void handle_accept (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *m)
 
pax_msghandle_simple_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol. More...
 
static void handle_ack_accept (site_def const *site, pax_machine *p, pax_msg *m)
 
void handle_tiny_learn (site_def const *site, pax_machine *pm, pax_msg *p)
 Process the incoming tiny, i.e. More...
 
static void force_interval (synode_no start, synode_no end, int enforcer)
 
static void start_force_config (site_def *s, int enforcer)
 
void handle_learn (site_def const *site, pax_machine *p, pax_msg *m)
 Process the incoming Learn message. More...
 
static void handle_client_msg (pax_msg *p)
 
static void handle_boot (site_def const *site, linkage *reply_queue, pax_msg *p)
 
bool_t should_handle_need_boot (site_def const *site, pax_msg *p)
 
void init_need_boot_op (pax_msg *p, node_address *identity)
 Initializes the message p as a need_boot_op message. More...
 
int pre_process_incoming_ping (site_def const *site, pax_msg const *pm, int has_client_already_booted, double current_time)
 Process incoming are_you_alive (i.e. More...
 
static void handle_alive (site_def const *site, linkage *reply_queue, pax_msg *pm)
 
xcom_event_horizon xcom_get_minimum_event_horizon ()
 
xcom_event_horizon xcom_get_maximum_event_horizon ()
 
static client_reply_code xcom_get_event_horizon (xcom_event_horizon *event_horizon)
 Retrieves the latest event horizon. More...
 
static bool add_node_test_connectivity_to_added_nodes (node_address *nodes_to_change, u_int number_of_nodes_to_change)
 
static u_int allow_add_node (app_data_ptr a)
 
static u_int allow_remove_node (app_data_ptr a)
 
static void log_cfgchange_wrong_group (app_data_ptr a, const char *const message_fmt)
 Logs the fact that an add/remove node request is aimed at another group. More...
 
static client_reply_code can_execute_cfgchange (pax_msg *p)
 Validates if a configuration command can be executed. More...
 
void dispatch_get_event_horizon (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static reply_data * new_leader_info (site_def *site)
 
void dispatch_get_leaders (site_def *site, pax_msg *p, linkage *reply_queue)
 
static void log_get_synode_app_data_failure (xcom_get_synode_app_data_result error_code)
 
void dispatch_get_synode_app_data (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static int can_send_snapshot ()
 
static void process_client_msg (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static int abort_processing (pax_msg *p)
 
static void process_ack_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_ack_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_recover_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_skip_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_i_am_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_are_you_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_need_boot_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_die_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_read_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_gcs_snapshot_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_tiny_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_synode_request (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void process_synode_allocated (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static msg_handlerclone_dispatch_table (msg_handler const *proto)
 
pax_msgdispatch_op (site_def const *site, pax_msg *p, linkage *reply_queue)
 
static void update_srv (server **target, server *srv)
 
static int harmless (pax_msg const *p)
 
static bool_t should_poll_cache (pax_op op)
 
int acceptor_learner_task (task_arg arg)
 
static void server_handle_need_snapshot (server *srv, site_def const *s, node_no node)
 
int reply_handler_task (task_arg arg)
 
void xcom_sleep (unsigned int seconds)
 
app_data_ptr init_config_with_group (app_data *a, node_list *nl, cargo_type type, uint32_t group_id)
 
app_data_ptr init_set_event_horizon_msg (app_data *a, uint32_t group_id, xcom_event_horizon event_horizon)
 
app_data_ptr init_get_msg (app_data *a, uint32_t group_id, cargo_type const t)
 
app_data_ptr init_get_leaders_msg (app_data *a, uint32_t group_id)
 
app_data_ptr init_get_event_horizon_msg (app_data *a, uint32_t group_id)
 
app_data_ptr init_app_msg (app_data *a, char *payload, u_int payload_size)
 
static app_data_ptr init_get_synode_app_data_msg (app_data *a, uint32_t group_id, synode_no_array *const synodes)
 
app_data_ptr init_set_cache_size_msg (app_data *a, uint64_t cache_limit)
 
app_data_ptr init_convert_into_local_server_msg (app_data *a)
 
static void server_send_snapshot (server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
 
static void server_push_log (server *srv, synode_no push, node_no node)
 
static void reply_push_log (synode_no push, linkage *reply_queue)
 
static gcs_snapshot * create_snapshot ()
 
static int xcom_timer (task_arg arg)
 
static void stop_x_timer ()
 
static void start_x_timer (double t)
 
static int x_fsm_completion_task (task_arg arg)
 
void send_x_fsm_complete ()
 
static void reset_snapshot_mask ()
 
static int got_all_snapshots ()
 
static int better_snapshot (gcs_snapshot *gcs)
 
static void handle_x_snapshot (gcs_snapshot *gcs)
 
static void update_best_snapshot (gcs_snapshot *gcs)
 
static void send_need_boot ()
 
void set_log_end (gcs_snapshot *gcs)
 
static int xcom_fsm_init (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_start_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_start (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_snapshot_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_snapshot_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_recover_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_recover_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_run_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int xcom_fsm_run (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_net_boot (task_arg fsmargs, xcom_fsm_state *ctxt, int cont)
 
static int handle_fsm_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_snapshot_wait (xcom_fsm_state *ctxt)
 
static void handle_fsm_exit ()
 
static int handle_local_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static int handle_fsm_terminate (task_arg fsmargs, xcom_fsm_state *ctxt)
 
static void handle_fsm_force_config (task_arg fsmargs)
 
xcom_fsm_statexcom_fsm_impl (xcom_actions action, task_arg fsmargs)
 
char const * xcom_fsm (xcom_actions action, task_arg fsmargs)
 
void set_app_snap_handler (app_snap_handler x)
 
void set_app_snap_getter (app_snap_getter x)
 
static result socket_read (connection_descriptor *rfd, void *buf, int n)
 
static int64_t socket_read_bytes (connection_descriptor *rfd, char *p, uint32_t n)
 
connection_descriptorxcom_open_client_connection (char const *server, xcom_port port)
 
static int xcom_send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
 
static int xcom_recv_proto (connection_descriptor *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
 
static int is_cargo_type (app_data_ptr a, cargo_type t)
 Checks if a given app_data is from a given cargo_type. More...
 
static char * get_add_node_address (app_data_ptr a, unsigned int *member)
 Retrieves the address that was used in the add_node request. More...
 
int is_node_v4_reachable_with_info (struct addrinfo *retrieved_addr_info)
 
int is_node_v4_reachable (char *node_address)
 
int are_we_allowed_to_upgrade_to_v6 (app_data_ptr a)
 
int64_t xcom_send_client_app_data (connection_descriptor *fd, app_data_ptr a, int force)
 
int64_t xcom_client_send_die (connection_descriptor *fd)
 
void warn_protoversion_mismatch (connection_descriptor *rfd)
 
static pax_msgsocket_read_msg (connection_descriptor *rfd, pax_msg *p)
 
static xcom_send_app_wait_result xcom_send_app_wait_and_get (connection_descriptor *fd, app_data *a, int force, pax_msg *p, leader_info_data *leaders)
 Send a message and wait for response. More...
 
static int xcom_send_app_wait (connection_descriptor *fd, app_data *a, int force, leader_info_data *leaders)
 
int xcom_send_cfg_wait (connection_descriptor *fd, node_list *nl, uint32_t group_id, cargo_type ct, int force)
 
int xcom_client_add_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
int xcom_client_remove_node (connection_descriptor *fd, node_list *nl, uint32_t group_id)
 
static int xcom_check_reply (int const res)
 
int xcom_client_get_synode_app_data (connection_descriptor *const fd, uint32_t group_id, synode_no_array *const synodes, synode_app_data_array *const reply)
 
int xcom_client_enable_arbitrator (connection_descriptor *fd)
 
int xcom_client_disable_arbitrator (connection_descriptor *fd)
 
int xcom_client_set_cache_limit (connection_descriptor *fd, uint64_t cache_limit)
 
int xcom_client_convert_into_local_server (connection_descriptor *const fd)
 
void init_set_max_leaders (uint32_t group_id, app_data *a, node_no max_leaders)
 
int xcom_client_set_max_leaders (connection_descriptor *fd, node_no max_leaders, uint32_t group_id)
 
leader_array new_leader_array (u_int n, char const *names[])
 
void init_set_leaders (uint32_t group_id, app_data *a, leader_array const leaders)
 
void init_set_leaders (uint32_t group_id, app_data *a, u_int n, char const *names[])
 
void init_set_leaders (uint32_t group_id, app_data *leader_app, leader_array const leaders, app_data *max_app, node_no max_leaders)
 
void init_set_leaders (uint32_t group_id, app_data *leader_app, u_int n, char const *names[], app_data *max_app, node_no max_leaders)
 
int xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], uint32_t group_id)
 
std::unique_ptr< Network_provider_management_interfaceget_network_management_interface ()
 
std::unique_ptr< Network_provider_operations_interfaceget_network_operations_interface ()
 
int xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], node_no max_leaders, uint32_t group_id)
 
int xcom_client_get_leaders (connection_descriptor *fd, uint32_t group_id, leader_info_data *leaders)
 
static void paxos_twait (pax_machine *p, unsigned int t)
 
static void paxos_twait_cancel (pax_machine *p)
 
static void paxos_wakeup (unsigned int t)
 
static void paxos_timer_advance ()
 
static int paxos_fsm_p1_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p1_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p2_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p2_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p2_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p2_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p3_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p3_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_p3_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int paxos_fsm_finished (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
static int accept_new_prepare (pax_machine *paxos, pax_msg *mess)
 
static int accept_new_accept (pax_machine *paxos, pax_msg *mess)
 
static int own_message (pax_msg *mess, site_def const *site)
 
static void action_paxos_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_paxos_accept (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_paxos_learn (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_paxos_start (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_new_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_ack_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_new_accept (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_ack_accept (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void action_ignorant (pax_machine *paxos, site_def const *site, pax_msg *mess)
 
static void dispatch_p_event (paxos_state_action *vtbl, pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 
int paxos_fsm_idle (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess)
 

Variables

static double protoversion_warning_time
 
int const threephase = 0
 Timestamp of previous protoversion warning. More...
 
int ARBITRATOR_HACK = 0
 
static int const no_duplicate_payload = 1
 
static int use_buffered_read = 1
 
int oom_abort = 0
 
int xcom_shutdown = 0
 
synode_no executed_msg
 
synode_no max_synode
 
task_envboot = nullptr
 
task_envdetector = nullptr
 
task_envkiller = nullptr
 
task_envnet_boot = nullptr
 
task_envnet_recover = nullptr
 
void * xcom_thread_input = nullptr
 
long xcom_debug_mask
 
long xcom_dbg_stack [DBG_STACK_SIZE]
 
int xcom_dbg_stack_top = 0
 
static task_envexecutor = nullptr
 
static task_envsweeper = nullptr
 
static task_envretry = nullptr
 
static task_envproposer [PROPOSERS]
 
static task_envalive_t = nullptr
 
static task_envcache_task = nullptr
 
static uint32_t my_id = 0
 
static synode_no current_message
 
static synode_no last_config_modification_id
 
static uint64_t lsn = 0
 
static channel prop_input_queue
 
synode_pool synode_number_pool
 
int client_boot_done
 
int netboot_ok
 
static linkage exec_wait
 
linkage detector_wait
 
struct {
   int   n
 
   unsigned long   id [MAX_DEAD]
 
dead_sites
 
static site_defforced_config = nullptr
 
static int wait_forced_config = 0
 
static site_defexecutor_site = nullptr
 
static site_defproposer_site = nullptr
 
static synode_no delivered_msg = NULL_SYNODE
 
static synode_no last_delivered_msg = NULL_SYNODE
 
static synode_no * proposer_synodes [PROPOSERS]
 
static xcom_state_change_cb xcom_run_cb = nullptr
 
static xcom_state_change_cb xcom_terminate_cb = nullptr
 
static xcom_state_change_cb xcom_comms_cb = nullptr
 
static xcom_state_change_cb xcom_exit_cb = nullptr
 
static xcom_state_change_cb xcom_expel_cb = nullptr
 
static xcom_input_try_pop_cb xcom_try_pop_from_input_cb = nullptr
 
static xcom_recovery_cb recovery_begin_cb = nullptr
 
static xcom_recovery_cb recovery_restart_cb = nullptr
 
static xcom_recovery_cb recovery_init_cb = nullptr
 
static xcom_recovery_cb recovery_end_cb = nullptr
 
static connection_descriptorinput_signal_connection {nullptr}
 These fields are used to signal XCom's request queue. More...
 
connection_descriptorinput_signal_connection_pipe {nullptr}
 
int pipe_signal_connections [2] = {-1, -1}
 
static constexpr xcom_proto single_writer_support = x_1_9
 
static int prop_started = 0
 
static int prop_finished = 0
 
static constexpr xcom_proto first_protocol_that_ignores_intermediate_forced_configs_or_views
 
static xcom_proto const first_event_horizon_aware_protocol = x_1_4
 
struct {
   int   n
 
   int   front
 
   int   rear
 
   synode_no   q [FIFO_SIZE]
 
delay_fifo
 
static struct fp_name oblist []
 
static execute_contextdebug_xc
 
static double sent_alive = 0.0
 
static int clicnt = 0
 
static synode_no start_config = NULL_SYNODE
 
static msg_handler dispatch_table [LAST_OP]
 
static app_snap_getter get_app_snap_cb
 
static app_snap_handler handle_app_snap_cb
 
static task_envx_timer = nullptr
 
const char * xcom_actions_name [] = {x_actions}
 
static int snapshots [NSERVERS]
 
static synode_no log_start_max
 
static synode_no log_end_max
 
static unsigned int current_tick = 0
 
static linkage time_queue [paxos_timer_range]
 
unsigned constexpr int const paxos_default_timeout = 100
 
paxos_state_action p1_idle_vtbl [last_p_event]
 
paxos_state_action p1_master_enter_vtbl [last_p_event]
 
paxos_state_action p1_master_wait_vtbl [last_p_event]
 
paxos_state_action p2_master_enter_vtbl [last_p_event]
 
paxos_state_action p2_master_wait_vtbl [last_p_event]
 
paxos_state_action p2_slave_wait_vtbl [last_p_event]
 
paxos_state_action p3_master_wait_vtbl [last_p_event]
 
paxos_state_action p3_slave_wait_vtbl [last_p_event]
 
paxos_state_action p_finished_vtbl [last_p_event]
 
const char * paxos_event_name [] = {p_events}
 

Detailed Description

xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.

The interface to xcom is largely intact, one notable change is that xcom will consider the message delivered as soon as it has got a majority. Consequently, the VP set will not necessarily show all nodes which will actually receive the message.

OHKFIX Add wait for complete last known node set to mimic the old semantics.

IMPORTANT: What xcom does and what it does not do:

xcom messages are received in the same order on all nodes.

xcom guarantees that if a message is delivered to one node, it will eventually be seen on all other nodes as well.

xcom messages are available to a crashed node when it comes up again if at least one node which knows the value of the message has not crashed. The size of the message cache is configurable.

OHKFIX Add logging to disk to make messages durable across system crash and to increase the number of messages which may be cached.

There is no guarantee whatsoever about the order of messages from different nodes, not even the order of multiple messages from the same node. It is up to the client to impose such an order by waiting on a message before it sends the next.

xcom can notify the client that a message has timed out, and in that case will try to cancel the message, but it cannot guarantee that a message which has timed out will not be delivered.

xcom attaches a node set to each message as it is delivered to the client. This node set reflects the current node set that xcom believes is active, it does not mean that the message has been delivered yet to all nodes in the set. Neither does it mean that the message has not been delivered to the nodes not in the set.

A cache of Paxos state machines is central to the new design. The purpose of the cache is both to store a window of messages, and to decouple the different parts of xcom, like message proposal, message delivery and execution, and recovery. The old cache was limited to caching messages, and a single state machine ran the combined VP and Paxos algorithm. This constrained xcom to deliver only a single message at a time.

Each instance of the Paxos state machine implements the basic Paxos protocol. Unlike the cache in the old system, it is not cleared when a site is deleted. This removes some problems related to message delivery during site deletion. The cache is a classic fixed size LRU with a hash index.

Some extensions to the basic Paxos algorithm has been implemented:

A node has ownership to all synodes with its own node number. Only a node with node number N can propose a value for synode {X N}, where X is the sequence number, and N is the node number. Other nodes can only propose the special value no_op for synode {X N}. The reason for this is to retain the leaderless Paxos algorithm, but to avoid collisions between nodes which are competing for the same synode number. With this scheme, each node has its own unique number series during normal operation. The scheme has the following implications:

  1. If a node N has not already proposed a value for the synode {X N}, it may at any time send a LEARN message to the other nodes with the reserved value no_op, without going through phase 1 and 2 of Paxos. This is because the other nodes are constrained to propose no_op for this synode, so the final outcome will always be no_op. To avoid unnecessary message transmission, a node will try to broadcast the no_op LEARN messages by piggybacking the information on the messages of the basic Paxos protocol.
  2. Other nodes which want to find the value of synode {X N} may do so by trying to get the value no_op accepted by following the basic Paxos algorithm. The result will be the actual value proposed by node N if it has done so, otherwise no_op. This will typically only be necessary when a node is down, and the other nodes need to find the values from the missing node in order to be able to continue execution.

Messages are delivered in order to the client, and the order is determined by the sequence number and the node number, with the sequence number as the most significant part.

The xcom network interface has been redesigned and is now implemented directly on top of TCP, and has so far been completely trouble free. We use poll() or select() to implement non-blocking send and receive, but libev could equally well have been used.

Multicast is implemented on top of unicast as before, but the implementation is prepared to use real multicast with relatively minor changes.

The roles of proposer, acceptor/learner, and executor are now directly mapped to unique task types which interact with the Paxos state machines, whereas the previous implementation folded all the roles into a single event driven state machine.

The following terminology will be used:

A node is an instance of the xcom thread. There is only one instance of the xcom thread in the agent. A client is the application which is using xcom to send messages. A thread is a real OS thread. A task is a logical process. It is implemented by coroutines and an explicit stack.

The implementation of tasks and non-blocking socket operations is isolated in task.h and task.c.

A node will open a tcp connection to each of the other nodes. This connection is used for all communication initiated by the node, and replies to messages will arrive on the connection on which it was sent.

static int tcp_server(task_arg);

The tcp_server listens on the xcom port and starts an acceptor_learner_task whenever a new connection is detected.

static int tcp_reaper_task(task_arg);

Closes tcp connection which have been unused for too long.

static int sender_task(task_arg);

The sender_task waits for tcp messages on its input queue and sends it on the tcp socket. If the socket is closed for any reason, the sender_task will reconnect the socket. There is one sender_task for each socket. The sender task exists mainly to simplify the logic in the other tasks, but it could have been replaced with a coroutine which handles the connection logic after having reserved the socket for its client task.

static int generator_task(task_arg);

The generator_task reads messages from the client queue and moves them into the input queue of the proposer_task.

OHKFIX Use a tcp socket instead of the client queue. We can then remove the generator_task and let the acceptor_learner_task do the dispatching.

static int proposer_task(task_arg);

Assign a message number to an outgoing message and try to get it accepted. There may be several proposer tasks on each node working in parallel. If there are multiple proposer tasks, xcom can not guarantee that the messages will be sent in the same order as received from the client.

static int acceptor_learner_task(task_arg);

This is the server part of the xcom thread. There is one acceptor_learner_task for each node in the system. The acceptor learner_task reads messages from the socket, finds the correct Paxos state machine, and dispatches to the correct message handler with the state machine and message as arguments.

static int reply_handler_task(task_arg);

The reply_handler_task does the same job as the acceptor_learner_task, but listens on the socket which the node uses to send messages, so it will handle only replies on that socket.

static int executor_task(task_arg);

The ececutor_task waits for a Paxos message to be accpeted. When the message is accepted, it is delivered to the client, unless it is a no-op. In either case, the executor_task steps to the next message and repeats the wait. If it times out waiting for a message, it will try to get a no-op accepted.

static int alive_task(task_arg);

Sends i-am-alive to other nodes if there has been no normal traffic for a while. It also pings nodes which seem to be inactive.

static int detector_task(task_arg);

The detector_task periodically scans the set of connections from other nodes and sees if there has been any activity. If there has been no activity for some time, it will assume that the node is dead, and send a view message to the client.

Reconfiguration:

The xcom reconfiguration process is essentially the one described in "Reconfiguring a State Machine" by Lamport et al. as the R-alpha algorithm. We execute the reconfiguration command immediately, but the config is only valid after a delay of alpha messages. The parameter alpha is the same as EVENT_HORIZON in this implementation. :/static.*too_far All tcp messages from beyond the event horizon will be ignored.

Macro Definition Documentation

◆ __STDC_FORMAT_MACROS

#define __STDC_FORMAT_MACROS

◆ BAL_FMT

#define BAL_FMT   "ballot {cnt %d node %d}"

◆ BAL_MEM

#define BAL_MEM (   x)    (x).cnt, (x).node

◆ CONNECT_FAIL

#define CONNECT_FAIL
Value:
ret_fd = -1; \
goto end
Cursor end()
A past-the-end Cursor.
Definition: rules_table_service.cc:192

◆ CREATE_REPLY

#define CREATE_REPLY (   x)
Value:
pax_msg *reply = NULL; \
CLONE_PAX_MSG(reply, x)
struct pax_msg pax_msg
Definition: site_struct.h:37
#define NULL
Definition: types.h:55

◆ FIFO_SIZE

#define FIFO_SIZE   1000

◆ FNVSTART

#define FNVSTART   0x811c9dc5

◆ GOTO

#define GOTO (   x)
Value:
{ \
IFDBG(D_NONE, STRLIT("goto "); STRLIT(#x)); \
goto x; \
}
@ D_NONE
Definition: gcs_debug.h:174
#define STRLIT(x)
Definition: gcs_debug.h:316

◆ IS_CONS_ALL

#define IS_CONS_ALL (   p)     ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)

◆ NAME

#define NAME (   f)     { f, #f }

◆ PING_GATHERING_TIME_WINDOW

#define PING_GATHERING_TIME_WINDOW   5.0

◆ PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN

#define PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN   3

◆ PROP_ITER

#define PROP_ITER
Value:
int i; \
for (i = 0; i < PROPOSERS; i++)
@ PROPOSERS
Definition: xcom_profile.h:74

◆ PROTOVERSION_WARNING_TIMEOUT

#define PROTOVERSION_WARNING_TIMEOUT   600.0 /** Every 10 minutes */

◆ reply_msg

#define reply_msg (   m)
Value:
{ \
if (is_local_node((m)->from, site)) { \
dispatch_op(site, m, NULL); \
} else { \
link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
} \
}
static bool_t is_local_node(node_no n, site_def const *site)
Definition: site_def.h:72
msg_link * msg_link_new(pax_msg *p, node_no to)
Definition: xcom_msg_queue.cc:74

◆ SEND_REPLY

#define SEND_REPLY
Value:
reply_msg(reply); \
replace_pax_msg(&reply, NULL)
#define reply_msg(m)
Definition: xcom_base.cc:4793

◆ SERIALIZE_REPLY

#define SERIALIZE_REPLY (   msg)
Value:
msg->to = ep->p->from; \
msg->from = ep->p->to; \
msg->delivered_msg = get_delivered_msg(); \
msg->max_synode = get_max_synode(); \
serialize_msg(msg, ep->rfd->x_proto, &ep->buflen, &ep->buf);
synode_no get_delivered_msg()
Definition: xcom_base.cc:886
synode_no get_max_synode()
Definition: xcom_base.cc:513

◆ SET_X_FSM_STATE

#define SET_X_FSM_STATE (   s)
Value:
do { \
ctxt->state_fp = s; \
ctxt->state_name = #s; \
} while (0)

◆ SYS_STRERROR_SIZE

#define SYS_STRERROR_SIZE   512

◆ TICK_PERIOD

#define TICK_PERIOD   0.01

◆ WRITE_REPLY

#define WRITE_REPLY
Value:
if (ep->buflen) { \
int64_t sent; \
IFDBG(D_TRANSPORT, FN; STRLIT("task_write "); NDBG(ep->rfd.fd, d); \
NDBG(ep->buflen, u)); \
TASK_CALL(task_write(ep->rfd, ep->buf, ep->buflen, &sent)); \
send_count[ep->p->op]++; \
send_bytes[ep->p->op] += ep->buflen; \
X_FREE(ep->buf); \
} \
ep->buf = NULL;
#define FN
Definition: gcs_debug.h:308
@ D_TRANSPORT
Definition: gcs_debug.h:178
#define NDBG(x, f)
Definition: gcs_debug.h:318
int task_write(connection_descriptor const *con, void *_buf, uint32_t n, int64_t *ret)
Definition: task.cc:1041

◆ X [1/2]

#define X (   b)    #b

◆ X [2/2]

#define X (   b)    #b

◆ X_FSM_STATE

#define X_FSM_STATE (   s)     { s, #s }

◆ XCOM_SEND_APP_WAIT_TIMEOUT

#define XCOM_SEND_APP_WAIT_TIMEOUT   20

Typedef Documentation

◆ allow_event_horizon_result

◆ exec_fp

typedef void(* exec_fp) (execute_context *xc)

◆ execute_context

◆ paxos_state_action

typedef void(* paxos_state_action) (pax_machine *paxos, site_def const *site, pax_msg *mess)

◆ unsafe_node_check

typedef bool(* unsafe_node_check) (node_address const *node)

◆ xcom_fsm_fp

typedef int(* xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt)

◆ xcom_fsm_state

◆ xcom_send_app_wait_result

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
TAG_START 

◆ anonymous enum

anonymous enum
Enumerator
paxos_timer_range 

◆ allow_event_horizon_result

Check if we can reconfigure the event horizon.

We can reconfigure the event horizon if all group members support reconfiguring the event horizon, and the new event horizon in the domain [EVENT_HORIZON_MIN, EVENT_HORIZON_MAX].

We use the group's latest common XCom protocol as a proxy to decide if all members support reconfiguring the event horizon.

If the common protocol is at least version 5 (x_1_4) then all members run compatible server instances.

Otherwise there are older instances, and it follows that the event horizon must be the default and cannot be reconfigured.

Enumerator
EVENT_HORIZON_ALLOWED 
EVENT_HORIZON_INVALID 
EVENT_HORIZON_UNCHANGEABLE 

◆ synode_allocation_type

enum class synode_allocation_type
strong
Enumerator
todo 
local 
remote 
global 

◆ synode_reservation_status

enum class synode_reservation_status : int
strong
Enumerator
number_ok 
no_nodes 
delivery_timeout 

◆ xcom_send_app_wait_result

Enumerator
SEND_REQUEST_FAILED 
RECEIVE_REQUEST_FAILED 
REQUEST_BOTCHED 
RETRIES_EXCEEDED 
REQUEST_OK_RECEIVED 
REQUEST_FAIL_RECEIVED 
REQUEST_OK_REDIRECT 

Function Documentation

◆ abort_processing()

static int abort_processing ( pax_msg p)
inlinestatic

◆ accept_new_accept()

static int accept_new_accept ( pax_machine paxos,
pax_msg mess 
)
static

◆ accept_new_prepare()

static int accept_new_prepare ( pax_machine paxos,
pax_msg mess 
)
static

◆ accepted()

static int accepted ( pax_machine p)
inlinestatic

◆ accepted_noop()

static int accepted_noop ( pax_machine p)
inlinestatic

◆ acceptor_learner_task()

int acceptor_learner_task ( task_arg  arg)

◆ action_ack_accept()

static void action_ack_accept ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_ack_prepare()

static void action_ack_prepare ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_ignorant()

static void action_ignorant ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_new_accept()

static void action_new_accept ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_new_prepare()

static void action_new_prepare ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_paxos_accept()

static void action_paxos_accept ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_paxos_learn()

static void action_paxos_learn ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_paxos_prepare()

static void action_paxos_prepare ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ action_paxos_start()

static void action_paxos_start ( pax_machine paxos,
site_def const *  site,
pax_msg mess 
)
static

◆ activate_sweeper()

static void activate_sweeper ( )
static

◆ active_leaders()

static void active_leaders ( site_def site,
leader_array *  leaders 
)
static

◆ add_event_horizon()

static synode_no add_event_horizon ( synode_no  s)
static

Add the event horizon to the given base synod s.

We are assuming right now that this function is used solely in the context of "we have received a reconfiguration command at synod s, when should it be scheduled to take effect?" The result of this function is when it should take effect.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case result is:

s + event_horizon(active_config) + 1

If an event horizon reconfiguration R is pending, it means that the command C proposed for synod s is concurrent with R, i.e., s falls in the interval ]proposed(R), start(R)[.

In this situation we apply the command C proposed for synod s after taking into account R's event horizon.

This means that the result is:

start(R) + event_horizon(R) + 1

◆ add_node_adding_own_address()

static bool_t add_node_adding_own_address ( app_data_ptr  a)
static

This will test if we are receiving a boot request that contains ourselves.

This could happed in case of a misconfiguration of a local_address, that causes an add_node request to be erroneous delivered.

Parameters
aapp_data with an add node request
Returns
bool_t TRUE in case of error, meaning that my address is in the add_node list

◆ add_node_test_connectivity_to_added_nodes()

static bool add_node_test_connectivity_to_added_nodes ( node_address *  nodes_to_change,
u_int  number_of_nodes_to_change 
)
static

◆ add_node_unsafe_against_ipv4_old_nodes()

static bool_t add_node_unsafe_against_ipv4_old_nodes ( app_data_ptr  a)
static

◆ add_proposer_synode()

static void add_proposer_synode ( int  i,
synode_no *  syn_ptr 
)
static

◆ addone()

static int addone ( int  i)
inlinestatic

◆ alive_node()

static bool alive_node ( site_def const *  site,
u_int  i 
)
inlinestatic

◆ alive_task()

int alive_task ( task_arg  arg)

◆ alloc_node_set()

node_set * alloc_node_set ( node_set *  set,
u_int  n 
)

◆ allow_add_node()

static u_int allow_add_node ( app_data_ptr  a)
static

◆ allow_channel_takeover()

static bool allow_channel_takeover ( site_def const *  site)
static

◆ allow_event_horizon()

static allow_event_horizon_result allow_event_horizon ( xcom_event_horizon  event_horizon)
static

◆ allow_remove_node()

static u_int allow_remove_node ( app_data_ptr  a)
static

◆ analyze_leaders()

void analyze_leaders ( site_def site)

◆ are_there_dead_nodes_in_new_config()

static bool_t are_there_dead_nodes_in_new_config ( app_data_ptr  a)
static

◆ are_we_allowed_to_upgrade_to_v6()

int are_we_allowed_to_upgrade_to_v6 ( app_data_ptr  a)

◆ assign_lsn()

static uint64_t assign_lsn ( )
static

Assign the next log sequence number (lsn) for a message.

Initial propose sets lsn to msgno of the max message number as safe starting point, otherwise lsn shall be ever increasing. lsn ensures sender order known on receiver side, as messages may arrive "out of order" due to retransmission. We use max_synode instead of current_message to avoid any conflict with lsn allocated by a previous instance of the node.

◆ backwards_compatible()

static bool_t backwards_compatible ( xcom_event_horizon  event_horizon)
static

◆ better_snapshot()

static int better_snapshot ( gcs_snapshot *  gcs)
static

◆ brand_app_data()

static void brand_app_data ( pax_msg p)
static

◆ brand_client_msg()

static void brand_client_msg ( pax_msg msg,
synode_no  msgno 
)
static

◆ broadcast_noop()

static void broadcast_noop ( synode_no  find,
pax_machine p 
)
static

◆ bury_site()

static void bury_site ( uint32_t  id)
static

◆ cache_manager_task()

int cache_manager_task ( task_arg  arg)

◆ can_execute_cfgchange()

static client_reply_code can_execute_cfgchange ( pax_msg p)
static

Validates if a configuration command can be executed.

Checks whether the configuration command is aimed at the correct group. Checks whether the configuration command pertains to a node reincarnation.

Parameters
pa pointer to the pax_msg of the configuration command
Return values
REQUEST_OKif the reconfiguration command can be executed
REQUEST_RETRYif XCom is still booting
REQUEST_FAILif the configuration command cannot be executed

◆ can_send_snapshot()

static int can_send_snapshot ( )
static

◆ check_if_add_node_is_unsafe()

static bool check_if_add_node_is_unsafe ( app_data_ptr  a,
unsafe_node_check  unsafe 
)
static

◆ check_if_add_node_is_unsafe_against_event_horizon()

static bool check_if_add_node_is_unsafe_against_event_horizon ( app_data_ptr  a)
static

◆ check_learn()

static pax_msg * check_learn ( site_def const *  site,
pax_machine p 
)
static

◆ check_propose()

bool_t check_propose ( site_def const *  site,
pax_machine p 
)

◆ clone_dispatch_table()

static msg_handler * clone_dispatch_table ( msg_handler const *  proto)
static

◆ compute_delay()

static synode_no compute_delay ( synode_no  start,
xcom_event_horizon  event_horizon 
)
static

Returns the message number where it is safe for nodes in previous configuration to exit.

Parameters
startstart synod of the next configuration
event_horizonevent horizon of the next configuration

◆ create_ack_accept_msg()

static pax_msg * create_ack_accept_msg ( pax_msg m,
synode_no  synode 
)
static

◆ create_ack_prepare_msg()

static pax_msg * create_ack_prepare_msg ( pax_machine p,
pax_msg pm,
synode_no  synode 
)
static

◆ create_learn_msg_for_ignorant_node()

static pax_msg * create_learn_msg_for_ignorant_node ( pax_machine p,
pax_msg pm,
synode_no  synode 
)
static

◆ create_noop()

pax_msg * create_noop ( pax_msg p)

Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe no-op message to send
Return values
createdpaxos message of type no_op

◆ create_proposers()

static void create_proposers ( )
static

◆ create_read()

static pax_msg * create_read ( site_def const *  site,
pax_msg p 
)
static

◆ create_site_def_with_start()

static site_def * create_site_def_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ create_snapshot()

static gcs_snapshot * create_snapshot ( )
static

◆ create_tiny_learn_msg()

static pax_msg * create_tiny_learn_msg ( pax_machine pm,
pax_msg p 
)
static

◆ debug_loser()

static void debug_loser ( synode_no  x)
static

◆ detector_task()

int detector_task ( task_arg  arg)

◆ dispatch_get_event_horizon()

void dispatch_get_event_horizon ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)

◆ dispatch_get_leaders()

void dispatch_get_leaders ( site_def site,
pax_msg p,
linkage reply_queue 
)

◆ dispatch_get_synode_app_data()

void dispatch_get_synode_app_data ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)

◆ dispatch_op()

pax_msg * dispatch_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)

◆ dispatch_p_event()

static void dispatch_p_event ( paxos_state_action vtbl,
pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
inlinestatic

◆ do_learn()

static void do_learn ( site_def const *  site,
pax_machine p,
pax_msg m 
)
static

◆ dump_debug_exec_state()

static void dump_debug_exec_state ( )
static

◆ dump_exec_state()

static void dump_exec_state ( execute_context xc,
long  dbg 
)
static

◆ empty_prop_input_queue()

static void empty_prop_input_queue ( )
static

◆ empty_synode_number_pool()

static void empty_synode_number_pool ( )
static

◆ execute_msg()

void execute_msg ( site_def site,
pax_machine pma,
pax_msg p 
)

◆ executor_task()

static int executor_task ( task_arg  arg)
static

◆ fifo_empty()

static int fifo_empty ( )
inlinestatic

◆ fifo_extract()

static synode_no fifo_extract ( )
inlinestatic

◆ fifo_front()

static synode_no fifo_front ( )
inlinestatic

◆ fifo_full()

static int fifo_full ( )
inlinestatic

◆ fifo_insert()

static void fifo_insert ( synode_no  s)
inlinestatic

◆ find_value()

static void find_value ( site_def const *  site,
unsigned int *  wait,
int  n 
)
static

◆ finished()

static int finished ( pax_machine p)
inlinestatic

◆ first_event_horizon_reconfig()

static site_def const * first_event_horizon_reconfig ( )
static

Get the first pending configuration that reconfigures the event horizon.

Retrieve the first pending site_def, i.e. with the smallest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ first_free_synode_local()

static synode_no first_free_synode_local ( synode_no  msgno)
static

◆ fnv_hash()

static uint32_t fnv_hash ( unsigned char *  buf,
size_t  length,
uint32_t  sum 
)
static

◆ force_interval()

static void force_interval ( synode_no  start,
synode_no  end,
int  enforcer 
)
static

◆ force_pax_machine()

static void force_pax_machine ( pax_machine p,
int  enforcer 
)
static

◆ found_active_leaders()

node_no found_active_leaders ( site_def site)

◆ free_forced_config_site_def()

static void free_forced_config_site_def ( )
static

◆ get_add_node_address()

static char * get_add_node_address ( app_data_ptr  a,
unsigned int *  member 
)
static

Retrieves the address that was used in the add_node request.

Parameters
aapp data containing the node to add
memberaddress we used to present ourselves to other nodes
Returns
char* a pointer to the address being added.

◆ get_current_message()

synode_no get_current_message ( )

◆ get_delivered_msg()

synode_no get_delivered_msg ( )

◆ get_executor_site()

site_def const * get_executor_site ( )

◆ get_executor_site_rw()

site_def * get_executor_site_rw ( )

◆ get_last_delivered_msg()

synode_no get_last_delivered_msg ( )

◆ get_leader()

static node_no get_leader ( site_def const *  s)
static

◆ get_max_synode()

synode_no get_max_synode ( )

◆ get_my_xcom_id()

uint32_t get_my_xcom_id ( )

◆ get_network_management_interface()

std::unique_ptr< Network_provider_management_interface > get_network_management_interface ( )

◆ get_network_operations_interface()

std::unique_ptr< Network_provider_operations_interface > get_network_operations_interface ( )

◆ get_proposer_site()

site_def const * get_proposer_site ( )

◆ get_proposer_synode()

static synode_no get_proposer_synode ( int  i)
static

◆ get_sweep_start()

static synode_no get_sweep_start ( )
static

◆ get_xcom_message()

int get_xcom_message ( pax_machine **  p,
synode_no  msgno,
int  n 
)

◆ getstart()

static synode_no getstart ( app_data_ptr  a)
static

◆ global_synode_allocator()

static synode_no global_synode_allocator ( site_def site,
synode_no  synode 
)
static

◆ got_all_snapshots()

static int got_all_snapshots ( )
static

◆ handle_accept()

static void handle_accept ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg m 
)
static

◆ handle_ack_accept()

static void handle_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg m 
)
static

◆ handle_ack_prepare()

static void handle_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg m 
)
static

◆ handle_add_node()

site_def * handle_add_node ( app_data_ptr  a)

Reconfigure the group membership: add new member(s).

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations modified the event horizon and that the new member(s) do not support event horizon reconfigurations.

We account for these situations by validating if adding the new members is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_alive()

static void handle_alive ( site_def const *  site,
linkage reply_queue,
pax_msg pm 
)
inlinestatic

◆ handle_boot()

static void handle_boot ( site_def const *  site,
linkage reply_queue,
pax_msg p 
)
inlinestatic

◆ handle_client_msg()

static void handle_client_msg ( pax_msg p)
static

◆ handle_config()

bool_t handle_config ( app_data_ptr  a,
bool const  forced 
)

◆ handle_event_horizon()

bool_t handle_event_horizon ( app_data_ptr  a)

Reconfigure the event horizon.

It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.

Particularly, it is possible that any of the concurrent reconfigurations added a new member which does not support reconfiguring the event horizon.

We account for these situations by validating if the event horizon reconfiguration is still possible under the current state.

If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.

◆ handle_fsm_exit()

static void handle_fsm_exit ( )
static

◆ handle_fsm_force_config()

static void handle_fsm_force_config ( task_arg  fsmargs)
static

◆ handle_fsm_net_boot()

static int handle_fsm_net_boot ( task_arg  fsmargs,
xcom_fsm_state ctxt,
int  cont 
)
static

◆ handle_fsm_snapshot()

static int handle_fsm_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_fsm_snapshot_wait()

static int handle_fsm_snapshot_wait ( xcom_fsm_state ctxt)
static

◆ handle_fsm_terminate()

static int handle_fsm_terminate ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_leaders()

bool_t handle_leaders ( app_data_ptr  a)

◆ handle_learn()

void handle_learn ( site_def const *  site,
pax_machine p,
pax_msg m 
)

Process the incoming Learn message.

Executed by Learners.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message

◆ handle_local_snapshot()

static int handle_local_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_max_leaders() [1/2]

bool_t handle_max_leaders ( app_data_ptr  a)

◆ handle_max_leaders() [2/2]

static bool_t handle_max_leaders ( site_def new_config,
app_data_ptr  a 
)
static

◆ handle_need_snapshot()

static void handle_need_snapshot ( linkage reply_queue,
pax_msg pm 
)
static

◆ handle_prepare()

static void handle_prepare ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg pm 
)
static

◆ handle_read()

static void handle_read ( site_def const *  site,
pax_machine p,
linkage reply_queue,
pax_msg pm 
)
static

◆ handle_remove_node()

site_def * handle_remove_node ( app_data_ptr  a)

◆ handle_set_leaders() [1/2]

bool_t handle_set_leaders ( app_data_ptr  a)

◆ handle_set_leaders() [2/2]

static bool_t handle_set_leaders ( site_def new_config,
app_data_ptr  a 
)
static

◆ handle_simple_accept()

pax_msg * handle_simple_accept ( pax_machine p,
pax_msg m,
synode_no  synode 
)

Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
mIncoming Accept message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 2 (b) message of the Paxos protocol) if the Acceptor accepts the Accept
NULLotherwise

◆ handle_simple_ack_accept()

pax_msg * handle_simple_ack_accept ( site_def const *  site,
pax_machine p,
pax_msg m 
)

Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
pax_msg*the Learn message to send to Leaners if a majority of Acceptors replied to the Proposer's Accept
NULLotherwise

◆ handle_simple_ack_prepare()

bool_t handle_simple_ack_prepare ( site_def const *  site,
pax_machine p,
pax_msg m 
)

Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
mIncoming message
Return values
TRUEif a majority of Acceptors replied to the Proposer's Prepare
FALSEotherwise

◆ handle_simple_prepare()

pax_msg * handle_simple_prepare ( pax_machine p,
pax_msg pm,
synode_no  synode 
)

Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Acceptors.

Parameters
pPaxos instance
pmIncoming Prepare message
synodeSynode of the Paxos instance/Accept message
Return values
pax_msg*the reply to send to the Proposer (as in the Phase 1 (b) message of the Paxos protocol) if the Acceptor accepts the Prepare
NULLotherwise

◆ handle_skip()

static void handle_skip ( site_def const *  site,
pax_machine p,
pax_msg m 
)
static

◆ handle_snapshot()

static int handle_snapshot ( task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ handle_tiny_learn()

void handle_tiny_learn ( site_def const *  site,
pax_machine pm,
pax_msg p 
)

Process the incoming tiny, i.e.

without the learned value, Learn message. Executed by Learners.

Parameters
siteXCom configuration
pmPaxos instance
pIncoming message

◆ handle_x_snapshot()

static void handle_x_snapshot ( gcs_snapshot *  gcs)
static

◆ harmless()

static int harmless ( pax_msg const *  p)
static

◆ iamthegreatest()

int iamthegreatest ( site_def const *  s)

◆ ignore_message()

static int ignore_message ( synode_no  x,
site_def site,
char const *  dbg 
)
inlinestatic

◆ ignoresig()

static int ignoresig ( int  signum)
static

◆ incompatible_proto_and_leaders() [1/2]

static bool incompatible_proto_and_leaders ( node_address const *  node)
static

◆ incompatible_proto_and_leaders() [2/2]

static bool incompatible_proto_and_leaders ( xcom_proto  x_proto)
static

◆ incompatible_proto_and_max_leaders() [1/2]

static bool incompatible_proto_and_max_leaders ( node_address const *  node)
static

◆ incompatible_proto_and_max_leaders() [2/2]

static bool incompatible_proto_and_max_leaders ( xcom_proto  x_proto,
node_no  max_leaders 
)
static

◆ incr_msgno()

static synode_no incr_msgno ( synode_no  msgno)
static

◆ incr_synode()

synode_no incr_synode ( synode_no  synode)

◆ inform_removed()

static void inform_removed ( int  index,
int  all 
)
static

◆ init_app_msg()

app_data_ptr init_app_msg ( app_data *  a,
char *  payload,
u_int  payload_size 
)

◆ init_base_vars()

void init_base_vars ( )

◆ init_config_with_group()

app_data_ptr init_config_with_group ( app_data *  a,
node_list *  nl,
cargo_type  type,
uint32_t  group_id 
)

◆ init_convert_into_local_server_msg()

app_data_ptr init_convert_into_local_server_msg ( app_data *  a)

◆ init_get_event_horizon_msg()

app_data_ptr init_get_event_horizon_msg ( app_data *  a,
uint32_t  group_id 
)

◆ init_get_leaders_msg()

app_data_ptr init_get_leaders_msg ( app_data *  a,
uint32_t  group_id 
)

◆ init_get_msg()

app_data_ptr init_get_msg ( app_data *  a,
uint32_t  group_id,
cargo_type const  t 
)

◆ init_get_synode_app_data_msg()

static app_data_ptr init_get_synode_app_data_msg ( app_data *  a,
uint32_t  group_id,
synode_no_array *const  synodes 
)
static

◆ init_learn_msg()

static void init_learn_msg ( pax_msg p)
static

◆ init_need_boot_op()

void init_need_boot_op ( pax_msg p,
node_address *  identity 
)

Initializes the message p as a need_boot_op message.

Parameters
pThe message to send
identityThe unique incarnation identifier of this XCom instance

◆ init_node_set()

node_set * init_node_set ( node_set *  set,
u_int  n 
)

◆ init_noop()

static site_def const * init_noop ( synode_no  find,
pax_machine p 
)
static

◆ init_prepare_msg()

void init_prepare_msg ( pax_msg p)

Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_propose_msg()

void init_propose_msg ( pax_msg p)

Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol.

Executed by Proposers.

Parameters
pThe message to send

◆ init_proposers()

static void init_proposers ( )
static

◆ init_set_cache_size_msg()

app_data_ptr init_set_cache_size_msg ( app_data *  a,
uint64_t  cache_limit 
)

◆ init_set_event_horizon_msg()

app_data_ptr init_set_event_horizon_msg ( app_data *  a,
uint32_t  group_id,
xcom_event_horizon  event_horizon 
)

◆ init_set_leaders() [1/4]

void init_set_leaders ( uint32_t  group_id,
app_data *  a,
leader_array const  leaders 
)

◆ init_set_leaders() [2/4]

void init_set_leaders ( uint32_t  group_id,
app_data *  a,
u_int  n,
char const *  names[] 
)

◆ init_set_leaders() [3/4]

void init_set_leaders ( uint32_t  group_id,
app_data *  leader_app,
leader_array const  leaders,
app_data *  max_app,
node_no  max_leaders 
)

◆ init_set_leaders() [4/4]

void init_set_leaders ( uint32_t  group_id,
app_data *  leader_app,
u_int  n,
char const *  names[],
app_data *  max_app,
node_no  max_leaders 
)

◆ init_set_max_leaders()

void init_set_max_leaders ( uint32_t  group_id,
app_data *  a,
node_no  max_leaders 
)

◆ init_tasks()

static void init_tasks ( )
static

◆ init_time_queue()

static void init_time_queue ( )
static

◆ init_xcom_base()

void init_xcom_base ( )

Reset lsn

◆ initialize_lsn()

void initialize_lsn ( uint64_t  n)

◆ install_ng_with_start()

static site_def * install_ng_with_start ( app_data_ptr  a,
synode_no  start 
)
static

◆ install_node_group()

site_def * install_node_group ( app_data_ptr  a)

◆ is_active_leader()

int is_active_leader ( node_no  x,
site_def site 
)

◆ is_busy()

static int is_busy ( synode_no  s)
static

◆ is_cargo_type()

static int is_cargo_type ( app_data_ptr  a,
cargo_type  t 
)
inlinestatic

Checks if a given app_data is from a given cargo_type.

Parameters
athe app_data
tthe cargo type
Returns
int TRUE (1) if app_data a is from cargo_type t

◆ is_config()

static int is_config ( cargo_type  x)
inlinestatic

◆ is_dead_site()

static bool_t is_dead_site ( uint32_t  id)
static

◆ is_empty_site()

static int is_empty_site ( site_def const *  s)
inlinestatic

◆ is_forcing_node()

static int is_forcing_node ( pax_machine const *  p)
static

◆ is_latest_config()

static bool_t is_latest_config ( site_def const *const  config)
static

◆ is_leader()

static bool is_leader ( site_def site)
inlinestatic

◆ is_member()

static int is_member ( site_def const *  site)
inlinestatic

◆ is_node_v4_reachable()

int is_node_v4_reachable ( char *  node_address)

◆ is_node_v4_reachable_with_info()

int is_node_v4_reachable_with_info ( struct addrinfo *  retrieved_addr_info)

◆ is_unsafe_event_horizon_reconfiguration()

static bool_t is_unsafe_event_horizon_reconfiguration ( app_data_ptr  a)
static

◆ is_unsafe_leaders_reconfiguration()

static bool_t is_unsafe_leaders_reconfiguration ( app_data_ptr  a)
static

◆ is_unsafe_max_leaders_reconfiguration()

static bool_t is_unsafe_max_leaders_reconfiguration ( app_data_ptr  a)
static

◆ is_unsafe_set_leaders_reconfiguration()

static bool_t is_unsafe_set_leaders_reconfiguration ( app_data_ptr  a)
static

◆ is_view()

static int is_view ( cargo_type  x)
inlinestatic

◆ latest_event_horizon_reconfig()

static site_def const * latest_event_horizon_reconfig ( )
static

Get the latest pending configuration that reconfigures the event horizon.

Retrieve the last pending site_def, i.e. with the greatest start synod that is greater than executed_msg, that reconfigures the event horizon.

◆ leaders_set_by_client()

static bool leaders_set_by_client ( site_def const *  site)
static

◆ learn_ok()

static bool learn_ok ( site_def const *  site,
pax_machine const *  p 
)
static

◆ local_server()

int local_server ( task_arg  arg)

If an error occurs or if the client connection for the local server is forcefully shutdown, we continue processing the queue until the end resorting to time-based waits.

◆ local_server_is_setup()

static bool_t local_server_is_setup ( )
static

◆ local_server_shutdown_ssl()

static int local_server_shutdown_ssl ( connection_descriptor con,
void *  buf,
int  n,
int *  ret 
)
static

◆ local_synode_allocator()

static synode_no local_synode_allocator ( synode_no  synode)
static

◆ log_cfgchange_wrong_group()

static void log_cfgchange_wrong_group ( app_data_ptr  a,
const char *const  message_fmt 
)
static

Logs the fact that an add/remove node request is aimed at another group.

Parameters
aa pointer to the app_data of the configuration command
message_fmta formatted message to log, containing a single s that will be replaced by the node's address

◆ log_event_horizon_reconfiguration_failure()

static void log_event_horizon_reconfiguration_failure ( allow_event_horizon_result  error_code,
xcom_event_horizon  attempted_event_horizon 
)
static

◆ log_get_synode_app_data_failure()

static void log_get_synode_app_data_failure ( xcom_get_synode_app_data_result  error_code)
static

◆ log_ignored_forced_config()

static void log_ignored_forced_config ( app_data_ptr  a,
char const *const  caller_name 
)
static

◆ majority()

static int majority ( bit_set const *  nodeset,
site_def const *  s,
int  all,
int  delay,
int  force 
)
inlinestatic

◆ match_leader()

static int match_leader ( char const *  addr,
leader_array const  leaders 
)
inlinestatic

◆ match_my_msg()

bool_t match_my_msg ( pax_msg learned,
pax_msg mine 
)

◆ max_check()

static node_no max_check ( site_def const *  site)
inlinestatic

◆ min_proposer_synode()

static synode_no min_proposer_synode ( )
static

◆ move_leader_array()

static void move_leader_array ( leader_array *  target,
leader_array *  source 
)
static

◆ my_unique_id()

static synode_no my_unique_id ( synode_no  synode)
static

◆ new_id()

uint32_t new_id ( )

Create a new (hopefully unique) ID.

The basic idea is to create a hash from the host ID and a timestamp.

◆ new_leader_array()

leader_array new_leader_array ( u_int  n,
char const *  names[] 
)

◆ new_leader_info()

static reply_data * new_leader_info ( site_def site)
static

◆ noop_match()

static int noop_match ( pax_machine p,
pax_msg pm 
)
inlinestatic

◆ note_snapshot()

static void note_snapshot ( node_no  node)
static

◆ ok_to_propose()

static int ok_to_propose ( pax_machine p)
static

◆ own_message()

static int own_message ( pax_msg mess,
site_def const *  site 
)
static

◆ paxos_fsm()

static void paxos_fsm ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_finished()

static int paxos_fsm_finished ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_idle()

int paxos_fsm_idle ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)

◆ paxos_fsm_p1_master_enter()

static int paxos_fsm_p1_master_enter ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p1_master_wait()

static int paxos_fsm_p1_master_wait ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p2_master_enter()

static int paxos_fsm_p2_master_enter ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p2_master_wait()

static int paxos_fsm_p2_master_wait ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p2_slave_enter()

static int paxos_fsm_p2_slave_enter ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p2_slave_wait()

static int paxos_fsm_p2_slave_wait ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p3_master_wait()

static int paxos_fsm_p3_master_wait ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p3_slave_enter()

static int paxos_fsm_p3_slave_enter ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_fsm_p3_slave_wait()

static int paxos_fsm_p3_slave_wait ( pax_machine paxos,
site_def const *  site,
paxos_event  event,
pax_msg mess 
)
static

◆ paxos_timer_advance()

static void paxos_timer_advance ( )
static

◆ paxos_timer_task()

static int paxos_timer_task ( task_arg  arg)
static

◆ paxos_twait()

static void paxos_twait ( pax_machine p,
unsigned int  t 
)
static

◆ paxos_twait_cancel()

static void paxos_twait_cancel ( pax_machine p)
static

◆ paxos_wakeup()

static void paxos_wakeup ( unsigned int  t)
static

◆ pm_finished()

int pm_finished ( pax_machine p)
Return values
1if the value for the Paxos instance *p has been learned
0otherwise

◆ pre_process_incoming_ping()

int pre_process_incoming_ping ( site_def const *  site,
pax_msg const *  pm,
int  has_client_already_booted,
double  current_time 
)

Process incoming are_you_alive (i.e.

: ping) messages and act accordingly

GCS/XCom has a full mesh of connections between all nodes. A connects to B and B connects back to A.

If we cut out B with, for instance, a firewall, we have the A->B connection silently dead, but we have the B->A connection alive. Since we only do monitoring on one half of the connection (the incoming messages), we will consider that B is alive, although we can't contact it. In the same way, B will consider that A is dead, since it does not receive any message from it.

We must be able to break the outgoing connection if we detect that something is wrong, in order to make the bi-directional connection state consistent and report that node as unreachable. That can be done if we start receiving pings from a node that we consider that it is alive. After some pings, we just kill the outgoing connection, thus creating a consistent state.

Breaking this connection should only occur if the node has already booted, meaning that the whole joining process is complete and the node is up and running. This is due to the fact that we receive pings as part of the process of joining a group.

Parameters
sitecurrent site definitions
pma possible ping message:
has_client_already_bootedcheck if this node has already booted
current_timecurrent XCom time
Returns
int 1 if the node connection is closed. 0, otherwise.

◆ prep_majority()

static int prep_majority ( site_def const *  site,
pax_machine const *  p 
)
static

◆ prepare()

static void prepare ( pax_msg p,
pax_op  op 
)
static

◆ prepare_msg()

static int prepare_msg ( pax_msg p)
static

◆ prepare_push_2p()

void prepare_push_2p ( site_def const *  site,
pax_machine p 
)

Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_).

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance

◆ prepare_push_3p()

void prepare_push_3p ( site_def const *  site,
pax_machine p,
pax_msg msg,
synode_no  msgno,
pax_msg_type  msg_type 
)

Initializes the message msg to go through a 3-phase, regular Paxos.

Executed by Proposers.

Parameters
siteXCom configuration
pPaxos instance
msgMessage to send
msgnoSynode where msg will be proposed
msg_typeThe type of the message, e.g. normal or no_op

◆ primary_dispatch_table()

static msg_handler * primary_dispatch_table ( )
static

◆ process_accept_op()

static void process_accept_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_ack_accept_op()

static void process_ack_accept_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_ack_prepare_op()

static void process_ack_prepare_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_are_you_alive_op()

static void process_are_you_alive_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_client_msg()

static void process_client_msg ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_die_op()

static void process_die_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_gcs_snapshot_op()

static void process_gcs_snapshot_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_i_am_alive_op()

static void process_i_am_alive_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_learn_op()

static void process_learn_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_need_boot_op()

static void process_need_boot_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_prepare_op()

static void process_prepare_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_read_op()

static void process_read_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_recover_learn_op()

static void process_recover_learn_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_skip_op()

static void process_skip_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_synode_allocated()

static void process_synode_allocated ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_synode_request()

static void process_synode_request ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ process_tiny_learn_op()

static void process_tiny_learn_op ( site_def const *  site,
pax_msg p,
linkage reply_queue 
)
static

◆ prop_majority()

static int prop_majority ( site_def const *  site,
pax_machine const *  p 
)
static

◆ propose_missing_values()

static void propose_missing_values ( int  n)
static

◆ propose_msg()

static int propose_msg ( pax_msg p)
static

◆ propose_noop()

static void propose_noop ( synode_no  find,
pax_machine p 
)
static

◆ propose_noop_2p()

static void propose_noop_2p ( synode_no  find,
pax_machine p 
)
static

◆ proposer_task()

static int proposer_task ( task_arg  arg)
static

◆ push_msg_2p()

static void push_msg_2p ( site_def const *  site,
pax_machine p 
)
static

◆ push_msg_3p()

static void push_msg_3p ( site_def const *  site,
pax_machine p,
pax_msg msg,
synode_no  msgno,
pax_msg_type  msg_type 
)
static

◆ read_missing_values()

static void read_missing_values ( int  n)
static

◆ recently_active()

static int recently_active ( pax_machine p)
static

◆ recompute_node_set()

void recompute_node_set ( node_set const *  old_set,
node_list const *  old_nodes,
node_set *  new_set,
node_list const *  new_nodes 
)

◆ recompute_node_sets()

static void recompute_node_sets ( site_def const *  old_site,
site_def new_site 
)
static

◆ recompute_timestamps()

void recompute_timestamps ( detector_state const  old_timestamp,
node_list const *  old_nodes,
detector_state  new_timestamp,
node_list const *  new_nodes 
)

◆ reconfigurable_event_horizon()

static bool_t reconfigurable_event_horizon ( xcom_proto  protocol_version)
static

◆ remote_synode_allocator()

static node_no remote_synode_allocator ( site_def site,
app_data const &  a 
)
static

◆ remove_proposer_synode()

static void remove_proposer_synode ( int  i)
static

◆ reply_handler_task()

int reply_handler_task ( task_arg  arg)

◆ reply_push_log()

static void reply_push_log ( synode_no  push,
linkage reply_queue 
)
static

◆ reserve_synode_number()

static int reserve_synode_number ( synode_allocation_type synode_allocation,
site_def **  site,
synode_no *  msgno,
int *  remote_retry,
app_data *  a,
synode_reservation_status ret 
)
static

◆ reset_snapshot_mask()

static void reset_snapshot_mask ( )
static

◆ safe_app_data_copy()

bool_t safe_app_data_copy ( pax_msg **  target,
app_data_ptr  source 
)

Copies app data source into target and checks if the copy succeeded.

Sets *target to NULL if the copy fails.

Parameters
[in,out]targetThe pax_msg to which the app_data will be copied.
sourceThe app data that will be copied.
Return values
TRUEif the copy was successful.
FALSEif the copy failed, in which case *target is set to NULL; a failed copy means that there was an error allocating memory for the copy.

◆ secondary_dispatch_table()

static msg_handler * secondary_dispatch_table ( )
static

◆ send_learn_msg()

static int send_learn_msg ( site_def const *  site,
pax_msg p 
)
static

◆ send_need_boot()

static void send_need_boot ( )
static

◆ send_propose_msg()

static int send_propose_msg ( pax_msg p)
static

◆ send_read()

static void send_read ( synode_no  find)
static

◆ send_tiny_learn_msg()

static int send_tiny_learn_msg ( site_def const *  site,
pax_msg p 
)
static

◆ send_value()

static void send_value ( site_def const *  site,
node_no  to,
synode_no  synode 
)
static

◆ send_x_fsm_complete()

void send_x_fsm_complete ( )

◆ server_handle_need_snapshot()

static void server_handle_need_snapshot ( server srv,
site_def const *  s,
node_no  node 
)
static

◆ server_push_log()

static void server_push_log ( server srv,
synode_no  push,
node_no  node 
)
static

◆ server_send_snapshot()

static void server_send_snapshot ( server srv,
site_def const *  s,
gcs_snapshot *  gcs_snap,
node_no  node 
)
static

◆ set_app_snap_getter()

void set_app_snap_getter ( app_snap_getter  x)

◆ set_app_snap_handler()

void set_app_snap_handler ( app_snap_handler  x)

◆ set_current_message()

synode_no set_current_message ( synode_no  msgno)

◆ set_executed_msg()

synode_no set_executed_msg ( synode_no  msgno)

◆ set_group()

void set_group ( uint32_t  id)

Set node group.

◆ set_last_received_config()

void set_last_received_config ( synode_no  received_config_change)

◆ set_learn_type()

static void set_learn_type ( pax_msg p)
static

◆ set_log_end()

void set_log_end ( gcs_snapshot *  gcs)

◆ set_max_synode()

void set_max_synode ( synode_no  synode)

◆ set_proposer_startpoint()

static void set_proposer_startpoint ( )
static

◆ set_start_and_boot()

static void set_start_and_boot ( site_def new_config,
app_data_ptr  a 
)
static

◆ set_unique_id()

static void set_unique_id ( pax_msg msg,
synode_no  synode 
)
static

◆ set_xcom_comms_cb()

void set_xcom_comms_cb ( xcom_state_change_cb  x)

◆ set_xcom_exit_cb()

void set_xcom_exit_cb ( xcom_state_change_cb  x)

◆ set_xcom_expel_cb()

void set_xcom_expel_cb ( xcom_state_change_cb  x)

◆ set_xcom_input_try_pop_cb()

void set_xcom_input_try_pop_cb ( xcom_input_try_pop_cb  pop)

◆ set_xcom_run_cb()

void set_xcom_run_cb ( xcom_state_change_cb  x)

◆ setup_exit_handling()

static void setup_exit_handling ( execute_context xc,
site_def site 
)
static

◆ should_handle_need_boot()

bool_t should_handle_need_boot ( site_def const *  site,
pax_msg p 
)
Returns
true if we should process the incoming need_boot_op message passed in parameter p.

◆ should_ignore_forced_config_or_view()

static constexpr bool should_ignore_forced_config_or_view ( xcom_proto  protocol_version)
staticconstexpr

◆ should_poll_cache()

static bool_t should_poll_cache ( pax_op  op)
static

◆ site_install_action()

void site_install_action ( site_def site,
cargo_type  operation 
)

◆ skip_msg()

static int skip_msg ( pax_msg p)
static

◆ skip_value()

static void skip_value ( pax_msg p)
static

◆ socket_read()

static result socket_read ( connection_descriptor rfd,
void *  buf,
int  n 
)
static

◆ socket_read_bytes()

static int64_t socket_read_bytes ( connection_descriptor rfd,
char *  p,
uint32_t  n 
)
static

◆ socket_read_msg()

static pax_msg * socket_read_msg ( connection_descriptor rfd,
pax_msg p 
)
static

◆ socket_write()

static int64_t socket_write ( connection_descriptor wfd,
void *  _buf,
uint32_t  n,
connnection_write_method  write_function = con_write 
)
static

◆ start_force_config()

static void start_force_config ( site_def s,
int  enforcer 
)
static

◆ start_x_timer()

static void start_x_timer ( double  t)
static

◆ started()

static int started ( pax_machine p)
inlinestatic

◆ stop_x_timer()

static void stop_x_timer ( )
static

◆ sweeper_task()

static int sweeper_task ( task_arg  arg)
static

◆ synthesize_leaders()

void synthesize_leaders ( leader_array *  leaders)

◆ teach_ignorant_node()

static void teach_ignorant_node ( site_def const *  site,
pax_machine p,
pax_msg pm,
synode_no  synode,
linkage reply_queue 
)
static

◆ terminate_and_exit()

void terminate_and_exit ( )

◆ terminate_proposers()

static void terminate_proposers ( )
static

◆ too_far()

static int too_far ( synode_no  s)
inlinestatic

◆ too_far_threshold()

static uint64_t too_far_threshold ( xcom_event_horizon  active_event_horizon)
static

Checks if the given synod s is outside the event horizon.

Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case threshold is:

last_executed_synod + event_horizon(active_config)

If an event horizon reconfiguration R is pending, it is possible that it reduces the event horizon. In that case, it is possible that the threshold above falls outside the new event horizon.

For example, consider last_executed_synod = 42 and event_horizon(active_config) = 10. At this point this member participates in synods up to 52. Now consider an event horizon reconfiguration that takes effect at synod 45, which modifies the event horizon to 2. This means that when last_executed_synod = 45, event_horizon(active_config) = 2. At this point this member should only be able to participate in synods up to 47. The member may have previously started processing messages directed to synods between 47 and 52, but will now ignore messages directed to those same synods.

We do not want to start processing messages that will eventually fall out of the event horizon. More importantly, the threshold above may not be safe due to the exit logic of executor_task.

When a node removes itself from the group on configuration C starting at synod start(C), the exit logic relies on knowing when a majority has executed synod start(C) - 1, i.e. the last message of the last configuration to contain the leaving node.

With a constant event horizon, we know that when synod start(C) + event_horizon is learnt, it is because a majority already executed or is ready to execute (and thus learned) synod start(C). This implies that a majority already executed start(C) - 1.

With a dynamic event horizon, we cannot be sure that when synod start(C) + event_horizon(C) is learnt, a majority already executed or is ready to execute synod start(C). This is because it is possible for a new, smaller, event horizon to take effect between start(C) and start(C) + event_horizon(C). If that happens, the threshold above allows nodes to participate in synods which are possibly beyond start(C) + event_horizon(C), which can lead to the value of synod start(C) + event_horizon(C) being learnt without a majority already having executed or being ready to execute synod start(C).

In order to maintain the assumption made by the executor_task's exit logic, when an event horizon reconfiguration R is pending we set the threshold to the minimum between:

last_executed_synod + event_horizon(active_config)

and:

start(R) - 1 + event_horizon(R)

◆ too_far_threshold_new_event_horizon_pending()

static uint64_t too_far_threshold_new_event_horizon_pending ( site_def const *  new_config)
static

◆ unsafe_against_event_horizon()

static bool unsafe_against_event_horizon ( node_address const *  node)
static

Check if a node is compatible with the group's event horizon.

A node is compatible with the group's configuration if:

a) The node supports event horizon reconfigurations, or b) The group's event horizon is, or is scheduled to be, the default event horizon.

◆ unsafe_leaders()

bool unsafe_leaders ( app_data *  a)

◆ update_best_snapshot()

static void update_best_snapshot ( gcs_snapshot *  gcs)
static

◆ update_max_synode()

static void update_max_synode ( pax_msg p)
static

◆ update_site()

static site_def * update_site ( site_def site,
node_set const *  ns,
synode_no  boot_key,
synode_no  start 
)
static

◆ update_srv()

static void update_srv ( server **  target,
server srv 
)
inlinestatic

◆ wait_for_cache()

static int wait_for_cache ( pax_machine **  pm,
synode_no  synode,
double  timeout 
)
static

◆ wakeup_delay()

static double wakeup_delay ( double  old)
static

◆ warn_protoversion_mismatch()

void warn_protoversion_mismatch ( connection_descriptor rfd)

◆ x_check_execute_inform()

static int x_check_execute_inform ( execute_context xc)
static

◆ x_check_exit()

static int x_check_exit ( execute_context xc)
static

◆ x_check_increment_execute()

static void x_check_increment_execute ( execute_context xc)
static

◆ x_check_increment_fetch()

static void x_check_increment_fetch ( execute_context xc)
static

◆ x_execute()

static void x_execute ( execute_context xc)
static

◆ x_fetch()

static void x_fetch ( execute_context xc)
static

◆ x_fsm_completion_task()

static int x_fsm_completion_task ( task_arg  arg)
static

◆ x_terminate()

static void x_terminate ( execute_context xc)
static

◆ xcom_check_reply()

static int xcom_check_reply ( int const  res)
static

◆ xcom_client_add_node()

int xcom_client_add_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_convert_into_local_server()

int xcom_client_convert_into_local_server ( connection_descriptor *const  fd)

◆ xcom_client_disable_arbitrator()

int xcom_client_disable_arbitrator ( connection_descriptor fd)

◆ xcom_client_enable_arbitrator()

int xcom_client_enable_arbitrator ( connection_descriptor fd)

◆ xcom_client_get_leaders()

int xcom_client_get_leaders ( connection_descriptor fd,
uint32_t  group_id,
leader_info_data *  leaders 
)

◆ xcom_client_get_synode_app_data()

int xcom_client_get_synode_app_data ( connection_descriptor *const  fd,
uint32_t  group_id,
synode_no_array *const  synodes,
synode_app_data_array *const  reply 
)

◆ xcom_client_remove_node()

int xcom_client_remove_node ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id 
)

◆ xcom_client_send_die()

int64_t xcom_client_send_die ( connection_descriptor fd)

◆ xcom_client_set_cache_limit()

int xcom_client_set_cache_limit ( connection_descriptor fd,
uint64_t  cache_limit 
)

◆ xcom_client_set_leaders() [1/2]

int xcom_client_set_leaders ( connection_descriptor fd,
u_int  n,
char const *  names[],
node_no  max_leaders,
uint32_t  group_id 
)

◆ xcom_client_set_leaders() [2/2]

int xcom_client_set_leaders ( connection_descriptor fd,
u_int  n,
char const *  names[],
uint32_t  group_id 
)

◆ xcom_client_set_max_leaders()

int xcom_client_set_max_leaders ( connection_descriptor fd,
node_no  max_leaders,
uint32_t  group_id 
)

◆ xcom_fsm()

char const * xcom_fsm ( xcom_actions  action,
task_arg  fsmargs 
)

◆ xcom_fsm_impl()

xcom_fsm_state * xcom_fsm_impl ( xcom_actions  action,
task_arg  fsmargs 
)

◆ xcom_fsm_init()

static int xcom_fsm_init ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_recover_wait()

static int xcom_fsm_recover_wait ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_recover_wait_enter()

static int xcom_fsm_recover_wait_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_run()

static int xcom_fsm_run ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_run_enter()

static int xcom_fsm_run_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_snapshot_wait()

static int xcom_fsm_snapshot_wait ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_snapshot_wait_enter()

static int xcom_fsm_snapshot_wait_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_start()

static int xcom_fsm_start ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_fsm_start_enter()

static int xcom_fsm_start_enter ( xcom_actions  action,
task_arg  fsmargs,
xcom_fsm_state ctxt 
)
static

◆ xcom_get_event_horizon()

static client_reply_code xcom_get_event_horizon ( xcom_event_horizon *  event_horizon)
static

Retrieves the latest event horizon.

There is no specific reason for this method to return the latest event horizon instead of the current one. Both would be acceptable results of this function, but we had to make a decision of one over the other.

Parameters
[out]event_horizonthe latest event horizon
Return values
REQUEST_FAILXCom is not initialized yet
REQUEST_OKfunction was successful and event_horizon contains the latest event horizon

◆ xcom_get_maximum_event_horizon()

xcom_event_horizon xcom_get_maximum_event_horizon ( )

◆ xcom_get_minimum_event_horizon()

xcom_event_horizon xcom_get_minimum_event_horizon ( )

◆ xcom_input_free_signal_connection()

void xcom_input_free_signal_connection ( void  )

◆ xcom_input_new_signal_connection()

bool_t xcom_input_new_signal_connection ( char const *  address,
xcom_port  port 
)

◆ xcom_input_signal()

bool_t xcom_input_signal ( void  )

◆ xcom_input_signal_connection_shutdown_ssl()

static bool_t xcom_input_signal_connection_shutdown_ssl ( )
static

◆ xcom_input_signal_connection_shutdown_ssl_wait_for_peer()

static bool_t xcom_input_signal_connection_shutdown_ssl_wait_for_peer ( )
static

◆ xcom_open_client_connection()

connection_descriptor * xcom_open_client_connection ( char const *  server,
xcom_port  port 
)

◆ xcom_recv_proto()

static int xcom_recv_proto ( connection_descriptor rfd,
xcom_proto *  x_proto,
x_msg_type x_type,
unsigned int *  tag 
)
static

◆ xcom_send()

void xcom_send ( app_data_ptr  a,
pax_msg msg 
)

◆ xcom_send_app_wait()

static int xcom_send_app_wait ( connection_descriptor fd,
app_data *  a,
int  force,
leader_info_data *  leaders 
)
static

◆ xcom_send_app_wait_and_get()

static xcom_send_app_wait_result xcom_send_app_wait_and_get ( connection_descriptor fd,
app_data *  a,
int  force,
pax_msg p,
leader_info_data *  leaders 
)
static

Send a message and wait for response.

The caller is responsible for freeing p after calling this function, i.e. xdr_free((xdrproc_t)xdr_pax_msg, (char *)p)

◆ xcom_send_cfg_wait()

int xcom_send_cfg_wait ( connection_descriptor fd,
node_list *  nl,
uint32_t  group_id,
cargo_type  ct,
int  force 
)

◆ xcom_send_client_app_data()

int64_t xcom_send_client_app_data ( connection_descriptor fd,
app_data_ptr  a,
int  force 
)

◆ xcom_send_proto()

static int xcom_send_proto ( connection_descriptor con,
xcom_proto  x_proto,
x_msg_type  x_type,
unsigned int  tag 
)
static

◆ xcom_sleep()

void xcom_sleep ( unsigned int  seconds)

◆ xcom_taskmain2()

int xcom_taskmain2 ( xcom_port  listen_port)

◆ xcom_thread_deinit()

void xcom_thread_deinit ( )

◆ xcom_thread_init()

void xcom_thread_init ( )

◆ xcom_timer()

static int xcom_timer ( task_arg  arg)
static

◆ xcom_unique_long()

long xcom_unique_long ( void  )

◆ zero_leader_array()

static void zero_leader_array ( leader_array *  l)
static

Variable Documentation

◆ alive_t

task_env* alive_t = nullptr
static

◆ ARBITRATOR_HACK

int ARBITRATOR_HACK = 0

◆ boot

task_env* boot = nullptr

◆ cache_task

task_env* cache_task = nullptr
static

◆ clicnt

int clicnt = 0
static

◆ client_boot_done

int client_boot_done
extern

◆ current_message

synode_no current_message
static

◆ current_tick

unsigned int current_tick = 0
static

◆ 

struct { ... } dead_sites

◆ debug_xc

execute_context* debug_xc
static

◆ 

struct { ... } delay_fifo

◆ delivered_msg

synode_no delivered_msg = NULL_SYNODE
static

◆ detector

task_env* detector = nullptr

◆ detector_wait

linkage detector_wait
Initial value:
linkage detector_wait
Definition: xcom_base.cc:505

◆ dispatch_table

msg_handler dispatch_table[LAST_OP]
static
Initial value:
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
static void process_client_msg(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:5914
static void process_learn_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6172
static void process_ack_prepare_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6110
static void process_skip_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6199
static void process_are_you_alive_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6221
static void process_die_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6240
static void process_i_am_alive_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6209
static void process_prepare_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6075
static void process_recover_learn_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6183
static void process_read_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6283
static void process_accept_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6125
static void process_tiny_learn_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6307
static void process_synode_request(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6321
static void process_gcs_snapshot_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6291
static void process_synode_allocated(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6365
static void process_ack_accept_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6157
static void process_need_boot_op(site_def const *site, pax_msg *p, linkage *reply_queue)
Definition: xcom_base.cc:6226

◆ exec_wait

linkage exec_wait
static
Initial value:
= {
static linkage exec_wait
Definition: xcom_base.cc:502

◆ executed_msg

synode_no executed_msg

◆ executor

task_env* executor = nullptr
static

◆ executor_site

site_def* executor_site = nullptr
static

◆ first_event_horizon_aware_protocol

xcom_proto const first_event_horizon_aware_protocol = x_1_4
static

◆ first_protocol_that_ignores_intermediate_forced_configs_or_views

constexpr xcom_proto first_protocol_that_ignores_intermediate_forced_configs_or_views
staticconstexpr
Initial value:
=
x_1_8

◆ forced_config

site_def* forced_config = nullptr
static

◆ front

int front

◆ get_app_snap_cb

app_snap_getter get_app_snap_cb
static

◆ handle_app_snap_cb

app_snap_handler handle_app_snap_cb
static

◆ id

unsigned long id[MAX_DEAD]

◆ input_signal_connection

connection_descriptor* input_signal_connection {nullptr}
static

These fields are used to signal XCom's request queue.

After a request is added, one will write 1 byte to warn local_server_task that it has work to do.

We use two types of signalling connection:

  • An anonymous pipe, when possible, in POSIX compatible systems
  • A regular socket connection, in Windows

input_signal_connection is the connection_descriptor returned when one opens a local signalling connection. It will contain either:

  • The write side of a connection, in case of using a pipe OR;
  • A bidirectional connection, when using a regular socket connection;

input_signal_connection_pipe is the connection_descriptor that holds the read side of a pipe connection. It is only allocated when we are able to have a pipe connection.

◆ input_signal_connection_pipe

connection_descriptor* input_signal_connection_pipe {nullptr}

◆ killer

task_env* killer = nullptr

◆ last_config_modification_id

synode_no last_config_modification_id
static

◆ last_delivered_msg

synode_no last_delivered_msg = NULL_SYNODE
static

◆ log_end_max

synode_no log_end_max
static

◆ log_start_max

synode_no log_start_max
static

◆ lsn

uint64_t lsn = 0
static

◆ max_synode

synode_no max_synode

◆ my_id

uint32_t my_id = 0
static

◆ n

int n

◆ net_boot

task_env* net_boot = nullptr

◆ net_recover

task_env* net_recover = nullptr

◆ netboot_ok

int netboot_ok
extern

◆ no_duplicate_payload

int const no_duplicate_payload = 1
static

◆ oblist

struct fp_name oblist[]
static
Initial value:
= {
NAME(x_fetch), NAME(x_execute), NAME(x_terminate), {nullptr, nullptr}}
static void x_terminate(execute_context *xc)
Definition: xcom_base.cc:4396
static void x_execute(execute_context *xc)
Definition: xcom_base.cc:4341
#define NAME(f)
Definition: xcom_base.cc:4159
static void x_fetch(execute_context *xc)
Definition: xcom_base.cc:4240

◆ oom_abort

int oom_abort = 0

◆ p1_idle_vtbl

paxos_state_action p1_idle_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr}
static void action_paxos_accept(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8902
static void action_paxos_learn(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8914
static void action_paxos_prepare(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8890
static void action_paxos_start(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8923

◆ p1_master_enter_vtbl

paxos_state_action p1_master_enter_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr}
static void action_new_accept(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8957
static void action_new_prepare(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8932
static void action_ack_prepare(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8948

◆ p1_master_wait_vtbl

paxos_state_action p1_master_wait_vtbl[last_p_event]
Initial value:

◆ p2_master_enter_vtbl

paxos_state_action p2_master_enter_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr}
static void action_ack_accept(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8972

◆ p2_master_wait_vtbl

paxos_state_action p2_master_wait_vtbl[last_p_event]
Initial value:

◆ p2_slave_wait_vtbl

paxos_state_action p2_slave_wait_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr,
nullptr}

◆ p3_master_wait_vtbl

paxos_state_action p3_master_wait_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr,
nullptr}

◆ p3_slave_wait_vtbl

paxos_state_action p3_slave_wait_vtbl[last_p_event]
Initial value:
nullptr,
nullptr,
nullptr,
nullptr}

◆ p_finished_vtbl

paxos_state_action p_finished_vtbl[last_p_event]
Initial value:
= {
action_ignorant, nullptr, action_ignorant, nullptr,
nullptr, nullptr, nullptr}
static void action_ignorant(pax_machine *paxos, site_def const *site, pax_msg *mess)
Definition: xcom_base.cc:8981

◆ paxos_default_timeout

unsigned constexpr int const paxos_default_timeout = 100
constexpr

◆ paxos_event_name

const char* paxos_event_name[] = {p_events}

◆ pipe_signal_connections

int pipe_signal_connections[2] = {-1, -1}

◆ prop_finished

int prop_finished = 0
static

◆ prop_input_queue

channel prop_input_queue
static

◆ prop_started

int prop_started = 0
static

◆ proposer

task_env* proposer[PROPOSERS]
static

◆ proposer_site

site_def* proposer_site = nullptr
static

◆ proposer_synodes

synode_no* proposer_synodes[PROPOSERS]
static

◆ protoversion_warning_time

double protoversion_warning_time
static
Initial value:
=
0.0

◆ q

synode_no q[FIFO_SIZE]

◆ rear

int rear

◆ recovery_begin_cb

xcom_recovery_cb recovery_begin_cb = nullptr
static

◆ recovery_end_cb

xcom_recovery_cb recovery_end_cb = nullptr
static

◆ recovery_init_cb

xcom_recovery_cb recovery_init_cb = nullptr
static

◆ recovery_restart_cb

xcom_recovery_cb recovery_restart_cb = nullptr
static

◆ retry

task_env* retry = nullptr
static

◆ sent_alive

double sent_alive = 0.0
static

◆ single_writer_support

constexpr xcom_proto single_writer_support = x_1_9
staticconstexpr

◆ snapshots

int snapshots[NSERVERS]
static

◆ start_config

synode_no start_config = NULL_SYNODE
static

◆ sweeper

task_env* sweeper = nullptr
static

◆ synode_number_pool

synode_pool synode_number_pool

◆ threephase

int const threephase = 0

Timestamp of previous protoversion warning.

◆ time_queue

linkage time_queue[paxos_timer_range]
static

◆ use_buffered_read

int use_buffered_read = 1
static

◆ wait_forced_config

int wait_forced_config = 0
static

◆ x_timer

task_env* x_timer = nullptr
static

◆ xcom_actions_name

const char* xcom_actions_name[] = {x_actions}

◆ xcom_comms_cb

xcom_state_change_cb xcom_comms_cb = nullptr
static

◆ xcom_dbg_stack

long xcom_dbg_stack[DBG_STACK_SIZE]

◆ xcom_dbg_stack_top

int xcom_dbg_stack_top = 0

◆ xcom_debug_mask

long xcom_debug_mask
Initial value:
=
@ D_FSM
Definition: gcs_debug.h:177

◆ xcom_exit_cb

xcom_state_change_cb xcom_exit_cb = nullptr
static

◆ xcom_expel_cb

xcom_state_change_cb xcom_expel_cb = nullptr
static

◆ xcom_run_cb

xcom_state_change_cb xcom_run_cb = nullptr
static

◆ xcom_shutdown

int xcom_shutdown = 0

◆ xcom_terminate_cb

xcom_state_change_cb xcom_terminate_cb = nullptr
static

◆ xcom_thread_input

void* xcom_thread_input = nullptr

◆ xcom_try_pop_from_input_cb

xcom_input_try_pop_cb xcom_try_pop_from_input_cb = nullptr
static