xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.
More...
|
long | xcom_unique_long (void) |
|
static int64_t | socket_write (connection_descriptor *wfd, void *_buf, uint32_t n, connnection_write_method write_function=con_write) |
|
static double | wakeup_delay (double old) |
|
static void | note_snapshot (node_no node) |
|
static int | proposer_task (task_arg arg) |
|
static int | executor_task (task_arg arg) |
|
static int | sweeper_task (task_arg arg) |
|
int | alive_task (task_arg arg) |
|
int | cache_manager_task (task_arg arg) |
|
int | detector_task (task_arg arg) |
|
static int | finished (pax_machine *p) |
|
static int | accepted (pax_machine *p) |
|
static int | started (pax_machine *p) |
|
static synode_no | first_free_synode_local (synode_no msgno) |
|
static void | free_forced_config_site_def () |
|
static void | activate_sweeper () |
|
static void | force_pax_machine (pax_machine *p, int enforcer) |
|
static void | propose_noop_2p (synode_no find, pax_machine *p) |
|
static void | handle_need_snapshot (linkage *reply_queue, pax_msg *pm) |
|
static void | handle_skip (site_def const *site, pax_machine *p, pax_msg *m) |
|
static void | paxos_fsm (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static bool | is_leader (site_def *site) |
|
int | is_active_leader (node_no x, site_def *site) |
|
static msg_handler * | primary_dispatch_table () |
|
static msg_handler * | secondary_dispatch_table () |
|
static void | recompute_node_sets (site_def const *old_site, site_def *new_site) |
|
void | recompute_timestamps (detector_state const old_timestamp, node_list const *old_nodes, detector_state new_timestamp, node_list const *new_nodes) |
|
void | analyze_leaders (site_def *site) |
|
static int | ignore_message (synode_no x, site_def *site, char const *dbg) |
|
static void | init_proposers () |
|
void | initialize_lsn (uint64_t n) |
|
void | init_base_vars () |
|
uint32_t | get_my_xcom_id () |
|
synode_no | get_current_message () |
|
synode_no | get_max_synode () |
|
static bool_t | is_latest_config (site_def const *const config) |
|
static site_def const * | first_event_horizon_reconfig () |
| Get the first pending configuration that reconfigures the event horizon. More...
|
|
static site_def const * | latest_event_horizon_reconfig () |
| Get the latest pending configuration that reconfigures the event horizon. More...
|
|
static synode_no | add_event_horizon (synode_no s) |
| Add the event horizon to the given base synod s. More...
|
|
void | set_group (uint32_t id) |
| Set node group. More...
|
|
static void | bury_site (uint32_t id) |
|
static bool_t | is_dead_site (uint32_t id) |
|
node_set * | init_node_set (node_set *set, u_int n) |
|
node_set * | alloc_node_set (node_set *set, u_int n) |
|
static synode_no | incr_msgno (synode_no msgno) |
|
synode_no | incr_synode (synode_no synode) |
|
static void | skip_value (pax_msg *p) |
|
static int | ignoresig (int signum) |
|
static int | recently_active (pax_machine *p) |
|
int | pm_finished (pax_machine *p) |
|
static int | accepted_noop (pax_machine *p) |
|
static int | noop_match (pax_machine *p, pax_msg *pm) |
|
void | set_last_received_config (synode_no received_config_change) |
|
static node_no | max_check (site_def const *site) |
|
static int | is_forcing_node (pax_machine const *p) |
|
static int | majority (bit_set const *nodeset, site_def const *s, int all, int delay, int force) |
|
static int | prep_majority (site_def const *site, pax_machine const *p) |
|
static int | prop_majority (site_def const *site, pax_machine const *p) |
|
site_def const * | get_executor_site () |
|
site_def * | get_executor_site_rw () |
|
site_def const * | get_proposer_site () |
|
synode_no | get_delivered_msg () |
|
synode_no | get_last_delivered_msg () |
|
void | init_xcom_base () |
|
static void | init_tasks () |
|
void | xcom_thread_init () |
|
static void | empty_prop_input_queue () |
|
static void | empty_synode_number_pool () |
|
void | xcom_thread_deinit () |
|
static void | create_proposers () |
|
static void | add_proposer_synode (int i, synode_no *syn_ptr) |
|
static void | remove_proposer_synode (int i) |
|
static synode_no | get_proposer_synode (int i) |
|
static synode_no | min_proposer_synode () |
|
static void | terminate_proposers () |
|
static void | set_proposer_startpoint () |
|
void | set_xcom_run_cb (xcom_state_change_cb x) |
|
void | set_xcom_exit_cb (xcom_state_change_cb x) |
|
void | set_xcom_comms_cb (xcom_state_change_cb x) |
|
void | set_xcom_expel_cb (xcom_state_change_cb x) |
|
void | set_xcom_input_try_pop_cb (xcom_input_try_pop_cb pop) |
|
static bool_t | xcom_input_signal_connection_shutdown_ssl_wait_for_peer () |
|
static bool_t | xcom_input_signal_connection_shutdown_ssl () |
|
bool_t | xcom_input_new_signal_connection (char const *address, xcom_port port) |
|
bool_t | xcom_input_signal () |
|
void | xcom_input_free_signal_connection () |
|
static int | local_server_shutdown_ssl (connection_descriptor *con, void *buf, int n, int *ret) |
|
int | local_server (task_arg arg) |
|
static bool_t | local_server_is_setup () |
|
static void | init_time_queue () |
|
static int | paxos_timer_task (task_arg arg) |
|
int | xcom_taskmain2 (xcom_port listen_port) |
|
static void | prepare (pax_msg *p, pax_op op) |
|
void | init_prepare_msg (pax_msg *p) |
| Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol. More...
|
|
static int | prepare_msg (pax_msg *p) |
|
pax_msg * | create_noop (pax_msg *p) |
| Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol. More...
|
|
static pax_msg * | create_read (site_def const *site, pax_msg *p) |
|
static int | skip_msg (pax_msg *p) |
|
static void | brand_app_data (pax_msg *p) |
|
static synode_no | my_unique_id (synode_no synode) |
|
static void | set_unique_id (pax_msg *msg, synode_no synode) |
|
void | init_propose_msg (pax_msg *p) |
| Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol. More...
|
|
static int | send_propose_msg (pax_msg *p) |
|
static int | propose_msg (pax_msg *p) |
|
static void | set_learn_type (pax_msg *p) |
|
static void | init_learn_msg (pax_msg *p) |
|
static int | send_learn_msg (site_def const *site, pax_msg *p) |
|
static pax_msg * | create_tiny_learn_msg (pax_machine *pm, pax_msg *p) |
|
static int | send_tiny_learn_msg (site_def const *site, pax_msg *p) |
|
void | prepare_push_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type) |
| Initializes the message msg to go through a 3-phase, regular Paxos. More...
|
|
void | prepare_push_2p (site_def const *site, pax_machine *p) |
| Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_). More...
|
|
static void | push_msg_2p (site_def const *site, pax_machine *p) |
|
static void | push_msg_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type) |
|
static void | brand_client_msg (pax_msg *msg, synode_no msgno) |
|
void | xcom_send (app_data_ptr a, pax_msg *msg) |
|
static uint32_t | fnv_hash (unsigned char *buf, size_t length, uint32_t sum) |
|
uint32_t | new_id () |
| Create a new (hopefully unique) ID. More...
|
|
static synode_no | getstart (app_data_ptr a) |
|
void | site_install_action (site_def *site, cargo_type operation) |
|
static void | active_leaders (site_def *site, leader_array *leaders) |
|
void | synthesize_leaders (leader_array *leaders) |
|
static bool | leaders_set_by_client (site_def const *site) |
|
static site_def * | create_site_def_with_start (app_data_ptr a, synode_no start) |
|
static site_def * | install_ng_with_start (app_data_ptr a, synode_no start) |
|
site_def * | install_node_group (app_data_ptr a) |
|
void | set_max_synode (synode_no synode) |
|
static int | is_busy (synode_no s) |
|
bool_t | match_my_msg (pax_msg *learned, pax_msg *mine) |
|
static uint64_t | assign_lsn () |
| Assign the next log sequence number (lsn) for a message. More...
|
|
static void | propose_noop (synode_no find, pax_machine *p) |
|
static uint64_t | too_far_threshold (xcom_event_horizon active_event_horizon) |
| Checks if the given synod s is outside the event horizon. More...
|
|
static uint64_t | too_far_threshold_new_event_horizon_pending (site_def const *new_config) |
|
static int | too_far (synode_no s) |
|
static int | is_view (cargo_type x) |
|
static int | is_config (cargo_type x) |
|
static int | wait_for_cache (pax_machine **pm, synode_no synode, double timeout) |
|
static synode_no | local_synode_allocator (synode_no synode) |
|
static synode_no | global_synode_allocator (site_def *site, synode_no synode) |
|
static node_no | remote_synode_allocator (site_def *site, app_data const &a) |
|
static int | reserve_synode_number (synode_allocation_type *synode_allocation, site_def **site, synode_no *msgno, int *remote_retry, app_data *a, synode_reservation_status *ret) |
|
static constexpr bool | should_ignore_forced_config_or_view (xcom_proto protocol_version) |
|
static node_no | get_leader (site_def const *s) |
|
int | iamthegreatest (site_def const *s) |
|
static site_def * | update_site (site_def *site, node_set const *ns, synode_no boot_key, synode_no start) |
|
void | execute_msg (site_def *site, pax_machine *pma, pax_msg *p) |
|
static void | read_missing_values (int n) |
|
static void | propose_missing_values (int n) |
|
static void | find_value (site_def const *site, unsigned int *wait, int n) |
|
static void | dump_debug_exec_state () |
|
int | get_xcom_message (pax_machine **p, synode_no msgno, int n) |
|
synode_no | set_executed_msg (synode_no msgno) |
|
synode_no | set_current_message (synode_no msgno) |
|
static void | update_max_synode (pax_msg *p) |
|
static int | match_leader (char const *addr, leader_array const leaders) |
|
static bool | alive_node (site_def const *site, u_int i) |
|
node_no | found_active_leaders (site_def *site) |
|
static void | debug_loser (synode_no x) |
|
static void | send_value (site_def const *site, node_no to, synode_no synode) |
|
static synode_no | compute_delay (synode_no start, xcom_event_horizon event_horizon) |
| Returns the message number where it is safe for nodes in previous configuration to exit. More...
|
|
static void | inform_removed (int index, int all) |
|
static bool_t | backwards_compatible (xcom_event_horizon event_horizon) |
|
static bool_t | reconfigurable_event_horizon (xcom_proto protocol_version) |
|
static bool_t | add_node_unsafe_against_ipv4_old_nodes (app_data_ptr a) |
|
static bool_t | add_node_adding_own_address (app_data_ptr a) |
| This will test if we are receiving a boot request that contains ourselves. More...
|
|
static bool | unsafe_against_event_horizon (node_address const *node) |
| Check if a node is compatible with the group's event horizon. More...
|
|
static bool | check_if_add_node_is_unsafe (app_data_ptr a, unsafe_node_check unsafe) |
|
static bool | check_if_add_node_is_unsafe_against_event_horizon (app_data_ptr a) |
|
void | recompute_node_set (node_set const *old_set, node_list const *old_nodes, node_set *new_set, node_list const *new_nodes) |
|
static bool | incompatible_proto_and_max_leaders (xcom_proto x_proto, node_no max_leaders) |
|
static bool | incompatible_proto_and_leaders (xcom_proto x_proto) |
|
static bool | incompatible_proto_and_max_leaders (node_address const *node) |
|
static bool | incompatible_proto_and_leaders (node_address const *node) |
|
bool | unsafe_leaders (app_data *a) |
|
static void | set_start_and_boot (site_def *new_config, app_data_ptr a) |
|
site_def * | handle_add_node (app_data_ptr a) |
| Reconfigure the group membership: add new member(s). More...
|
|
static void | log_event_horizon_reconfiguration_failure (allow_event_horizon_result error_code, xcom_event_horizon attempted_event_horizon) |
|
static allow_event_horizon_result | allow_event_horizon (xcom_event_horizon event_horizon) |
|
static bool_t | is_unsafe_event_horizon_reconfiguration (app_data_ptr a) |
|
static bool_t | is_unsafe_max_leaders_reconfiguration (app_data_ptr a) |
|
static bool_t | is_unsafe_set_leaders_reconfiguration (app_data_ptr a) |
|
static bool_t | is_unsafe_leaders_reconfiguration (app_data_ptr a) |
|
static bool_t | are_there_dead_nodes_in_new_config (app_data_ptr a) |
|
bool_t | handle_event_horizon (app_data_ptr a) |
| Reconfigure the event horizon. More...
|
|
static bool_t | handle_max_leaders (site_def *new_config, app_data_ptr a) |
|
bool_t | handle_max_leaders (app_data_ptr a) |
|
static void | zero_leader_array (leader_array *l) |
|
static void | move_leader_array (leader_array *target, leader_array *source) |
|
static bool_t | handle_set_leaders (site_def *new_config, app_data_ptr a) |
|
bool_t | handle_set_leaders (app_data_ptr a) |
|
bool_t | handle_leaders (app_data_ptr a) |
|
void | terminate_and_exit () |
|
static int | is_empty_site (site_def const *s) |
|
site_def * | handle_remove_node (app_data_ptr a) |
|
static void | log_ignored_forced_config (app_data_ptr a, char const *const caller_name) |
|
bool_t | handle_config (app_data_ptr a, bool const forced) |
|
static int | is_member (site_def const *site) |
|
static int | addone (int i) |
|
static int | fifo_empty () |
|
static int | fifo_full () |
|
static void | fifo_insert (synode_no s) |
|
static synode_no | fifo_extract () |
|
static synode_no | fifo_front () |
|
static void | dump_exec_state (execute_context *xc, long dbg) |
|
static int | x_check_exit (execute_context *xc) |
|
static int | x_check_execute_inform (execute_context *xc) |
|
static void | x_fetch (execute_context *xc) |
|
static void | x_execute (execute_context *xc) |
|
static void | x_check_increment_fetch (execute_context *xc) |
|
static void | x_check_increment_execute (execute_context *xc) |
|
static void | x_terminate (execute_context *xc) |
|
static void | setup_exit_handling (execute_context *xc, site_def *site) |
|
static synode_no | get_sweep_start () |
|
static bool | allow_channel_takeover (site_def const *site) |
|
static void | broadcast_noop (synode_no find, pax_machine *p) |
|
static site_def const * | init_noop (synode_no find, pax_machine *p) |
|
static void | send_read (synode_no find) |
|
static int | ok_to_propose (pax_machine *p) |
|
bool_t | safe_app_data_copy (pax_msg **target, app_data_ptr source) |
| Copies app data source into target and checks if the copy succeeded. More...
|
|
static pax_msg * | create_learn_msg_for_ignorant_node (pax_machine *p, pax_msg *pm, synode_no synode) |
|
static void | teach_ignorant_node (site_def const *site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue) |
|
static void | handle_read (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm) |
|
static pax_msg * | create_ack_prepare_msg (pax_machine *p, pax_msg *pm, synode_no synode) |
|
pax_msg * | handle_simple_prepare (pax_machine *p, pax_msg *pm, synode_no synode) |
| Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol. More...
|
|
static void | handle_prepare (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm) |
|
bool_t | check_propose (site_def const *site, pax_machine *p) |
|
static bool | learn_ok (site_def const *site, pax_machine const *p) |
|
static pax_msg * | check_learn (site_def const *site, pax_machine *p) |
|
static void | do_learn (site_def const *site, pax_machine *p, pax_msg *m) |
|
bool_t | handle_simple_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m) |
| Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol. More...
|
|
static void | handle_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m) |
|
static pax_msg * | create_ack_accept_msg (pax_msg *m, synode_no synode) |
|
pax_msg * | handle_simple_accept (pax_machine *p, pax_msg *m, synode_no synode) |
| Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol. More...
|
|
static void | handle_accept (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *m) |
|
pax_msg * | handle_simple_ack_accept (site_def const *site, pax_machine *p, pax_msg *m) |
| Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol. More...
|
|
static void | handle_ack_accept (site_def const *site, pax_machine *p, pax_msg *m) |
|
void | handle_tiny_learn (site_def const *site, pax_machine *pm, pax_msg *p) |
| Process the incoming tiny, i.e. More...
|
|
static void | force_interval (synode_no start, synode_no end, int enforcer) |
|
static void | start_force_config (site_def *s, int enforcer) |
|
void | handle_learn (site_def const *site, pax_machine *p, pax_msg *m) |
| Process the incoming Learn message. More...
|
|
static void | handle_client_msg (pax_msg *p) |
|
static void | handle_boot (site_def const *site, linkage *reply_queue, pax_msg *p) |
|
bool_t | should_handle_need_boot (site_def const *site, pax_msg *p) |
|
void | init_need_boot_op (pax_msg *p, node_address *identity) |
| Initializes the message p as a need_boot_op message. More...
|
|
int | pre_process_incoming_ping (site_def const *site, pax_msg const *pm, int has_client_already_booted, double current_time) |
| Process incoming are_you_alive (i.e. More...
|
|
static void | handle_alive (site_def const *site, linkage *reply_queue, pax_msg *pm) |
|
xcom_event_horizon | xcom_get_minimum_event_horizon () |
|
xcom_event_horizon | xcom_get_maximum_event_horizon () |
|
static client_reply_code | xcom_get_event_horizon (xcom_event_horizon *event_horizon) |
| Retrieves the latest event horizon. More...
|
|
static u_int | allow_add_node (app_data_ptr a) |
|
static u_int | allow_remove_node (app_data_ptr a) |
|
static void | log_cfgchange_wrong_group (app_data_ptr a, const char *const message_fmt) |
| Logs the fact that an add/remove node request is aimed at another group. More...
|
|
static client_reply_code | can_execute_cfgchange (pax_msg *p) |
| Validates if a configuration command can be executed. More...
|
|
void | dispatch_get_event_horizon (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static reply_data * | new_leader_info (site_def *site) |
|
void | dispatch_get_leaders (site_def *site, pax_msg *p, linkage *reply_queue) |
|
static void | log_get_synode_app_data_failure (xcom_get_synode_app_data_result error_code) |
|
void | dispatch_get_synode_app_data (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static int | can_send_snapshot () |
|
static void | process_client_msg (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static int | abort_processing (pax_msg *p) |
|
static void | process_ack_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_ack_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_recover_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_skip_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_i_am_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_are_you_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_need_boot_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_die_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_read_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_gcs_snapshot_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_tiny_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_synode_request (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | process_synode_allocated (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static msg_handler * | clone_dispatch_table (msg_handler const *proto) |
|
pax_msg * | dispatch_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
|
static void | update_srv (server **target, server *srv) |
|
static int | harmless (pax_msg const *p) |
|
static bool_t | should_poll_cache (pax_op op) |
|
int | acceptor_learner_task (task_arg arg) |
|
static void | server_handle_need_snapshot (server *srv, site_def const *s, node_no node) |
|
int | reply_handler_task (task_arg arg) |
|
void | xcom_sleep (unsigned int seconds) |
|
app_data_ptr | init_config_with_group (app_data *a, node_list *nl, cargo_type type, uint32_t group_id) |
|
app_data_ptr | init_set_event_horizon_msg (app_data *a, uint32_t group_id, xcom_event_horizon event_horizon) |
|
app_data_ptr | init_get_msg (app_data *a, uint32_t group_id, cargo_type const t) |
|
app_data_ptr | init_get_leaders_msg (app_data *a, uint32_t group_id) |
|
app_data_ptr | init_get_event_horizon_msg (app_data *a, uint32_t group_id) |
|
app_data_ptr | init_app_msg (app_data *a, char *payload, u_int payload_size) |
|
static app_data_ptr | init_get_synode_app_data_msg (app_data *a, uint32_t group_id, synode_no_array *const synodes) |
|
app_data_ptr | init_set_cache_size_msg (app_data *a, uint64_t cache_limit) |
|
app_data_ptr | init_convert_into_local_server_msg (app_data *a) |
|
static void | server_send_snapshot (server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node) |
|
static void | server_push_log (server *srv, synode_no push, node_no node) |
|
static void | reply_push_log (synode_no push, linkage *reply_queue) |
|
static gcs_snapshot * | create_snapshot () |
|
static int | xcom_timer (task_arg arg) |
|
static void | stop_x_timer () |
|
static void | start_x_timer (double t) |
|
static int | x_fsm_completion_task (task_arg arg) |
|
void | send_x_fsm_complete () |
|
static void | reset_snapshot_mask () |
|
static int | got_all_snapshots () |
|
static int | better_snapshot (gcs_snapshot *gcs) |
|
static void | handle_x_snapshot (gcs_snapshot *gcs) |
|
static void | update_best_snapshot (gcs_snapshot *gcs) |
|
static void | send_need_boot () |
|
void | set_log_end (gcs_snapshot *gcs) |
|
static int | xcom_fsm_init (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_start_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_start (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_snapshot_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_snapshot_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_recover_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_recover_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_run_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | xcom_fsm_run (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | handle_fsm_net_boot (task_arg fsmargs, xcom_fsm_state *ctxt, int cont) |
|
static int | handle_fsm_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | handle_fsm_snapshot_wait (xcom_fsm_state *ctxt) |
|
static void | handle_fsm_exit () |
|
static int | handle_local_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | handle_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static int | handle_fsm_terminate (task_arg fsmargs, xcom_fsm_state *ctxt) |
|
static void | handle_fsm_force_config (task_arg fsmargs) |
|
xcom_fsm_state * | xcom_fsm_impl (xcom_actions action, task_arg fsmargs) |
|
char const * | xcom_fsm (xcom_actions action, task_arg fsmargs) |
|
void | set_app_snap_handler (app_snap_handler x) |
|
void | set_app_snap_getter (app_snap_getter x) |
|
static result | socket_read (connection_descriptor *rfd, void *buf, int n) |
|
static int64_t | socket_read_bytes (connection_descriptor *rfd, char *p, uint32_t n) |
|
connection_descriptor * | xcom_open_client_connection (char const *server, xcom_port port) |
|
static int | xcom_send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag) |
|
static int | xcom_recv_proto (connection_descriptor *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag) |
|
static int | is_cargo_type (app_data_ptr a, cargo_type t) |
| Checks if a given app_data is from a given cargo_type. More...
|
|
static char * | get_add_node_address (app_data_ptr a, unsigned int *member) |
| Retrieves the address that was used in the add_node request. More...
|
|
int | is_node_v4_reachable_with_info (struct addrinfo *retrieved_addr_info) |
|
int | is_node_v4_reachable (char *node_address) |
|
int | are_we_allowed_to_upgrade_to_v6 (app_data_ptr a) |
|
int64_t | xcom_send_client_app_data (connection_descriptor *fd, app_data_ptr a, int force) |
|
int64_t | xcom_client_send_die (connection_descriptor *fd) |
|
void | warn_protoversion_mismatch (connection_descriptor *rfd) |
|
static pax_msg * | socket_read_msg (connection_descriptor *rfd, pax_msg *p) |
|
static xcom_send_app_wait_result | xcom_send_app_wait_and_get (connection_descriptor *fd, app_data *a, int force, pax_msg *p, leader_info_data *leaders) |
| Send a message and wait for response. More...
|
|
static int | xcom_send_app_wait (connection_descriptor *fd, app_data *a, int force, leader_info_data *leaders) |
|
int | xcom_send_cfg_wait (connection_descriptor *fd, node_list *nl, uint32_t group_id, cargo_type ct, int force) |
|
int | xcom_client_add_node (connection_descriptor *fd, node_list *nl, uint32_t group_id) |
|
int | xcom_client_remove_node (connection_descriptor *fd, node_list *nl, uint32_t group_id) |
|
static int | xcom_check_reply (int const res) |
|
int | xcom_client_get_synode_app_data (connection_descriptor *const fd, uint32_t group_id, synode_no_array *const synodes, synode_app_data_array *const reply) |
|
int | xcom_client_enable_arbitrator (connection_descriptor *fd) |
|
int | xcom_client_disable_arbitrator (connection_descriptor *fd) |
|
int | xcom_client_set_cache_limit (connection_descriptor *fd, uint64_t cache_limit) |
|
int | xcom_client_convert_into_local_server (connection_descriptor *const fd) |
|
void | init_set_max_leaders (uint32_t group_id, app_data *a, node_no max_leaders) |
|
int | xcom_client_set_max_leaders (connection_descriptor *fd, node_no max_leaders, uint32_t group_id) |
|
leader_array | new_leader_array (u_int n, char const *names[]) |
|
void | init_set_leaders (uint32_t group_id, app_data *a, leader_array const leaders) |
|
void | init_set_leaders (uint32_t group_id, app_data *a, u_int n, char const *names[]) |
|
void | init_set_leaders (uint32_t group_id, app_data *leader_app, leader_array const leaders, app_data *max_app, node_no max_leaders) |
|
void | init_set_leaders (uint32_t group_id, app_data *leader_app, u_int n, char const *names[], app_data *max_app, node_no max_leaders) |
|
int | xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], uint32_t group_id) |
|
std::unique_ptr< Network_provider_management_interface > | get_network_management_interface () |
|
std::unique_ptr< Network_provider_operations_interface > | get_network_operations_interface () |
|
int | xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], node_no max_leaders, uint32_t group_id) |
|
int | xcom_client_get_leaders (connection_descriptor *fd, uint32_t group_id, leader_info_data *leaders) |
|
static void | paxos_twait (pax_machine *p, unsigned int t) |
|
static void | paxos_twait_cancel (pax_machine *p) |
|
static void | paxos_wakeup (unsigned int t) |
|
static void | paxos_timer_advance () |
|
static int | paxos_fsm_p1_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p1_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p2_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p2_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p2_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p2_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p3_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p3_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_p3_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | paxos_fsm_finished (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
static int | accept_new_prepare (pax_machine *paxos, pax_msg *mess) |
|
static int | accept_new_accept (pax_machine *paxos, pax_msg *mess) |
|
static int | own_message (pax_msg *mess, site_def const *site) |
|
static void | action_paxos_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_paxos_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_paxos_learn (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_paxos_start (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_new_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_ack_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_new_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_ack_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | action_ignorant (pax_machine *paxos, site_def const *site, pax_msg *mess) |
|
static void | dispatch_p_event (paxos_state_action *vtbl, pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
int | paxos_fsm_idle (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
|
xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.
The interface to xcom is largely intact, one notable change is that xcom will consider the message delivered as soon as it has got a majority. Consequently, the VP set will not necessarily show all nodes which will actually receive the message.
OHKFIX Add wait for complete last known node set to mimic the old semantics.
IMPORTANT: What xcom does and what it does not do:
xcom messages are received in the same order on all nodes.
xcom guarantees that if a message is delivered to one node, it will eventually be seen on all other nodes as well.
xcom messages are available to a crashed node when it comes up again if at least one node which knows the value of the message has not crashed. The size of the message cache is configurable.
OHKFIX Add logging to disk to make messages durable across system crash and to increase the number of messages which may be cached.
There is no guarantee whatsoever about the order of messages from different nodes, not even the order of multiple messages from the same node. It is up to the client to impose such an order by waiting on a message before it sends the next.
xcom can notify the client that a message has timed out, and in that case will try to cancel the message, but it cannot guarantee that a message which has timed out will not be delivered.
xcom attaches a node set to each message as it is delivered to the client. This node set reflects the current node set that xcom believes is active, it does not mean that the message has been delivered yet to all nodes in the set. Neither does it mean that the message has not been delivered to the nodes not in the set.
A cache of Paxos state machines is central to the new design. The purpose of the cache is both to store a window of messages, and to decouple the different parts of xcom, like message proposal, message delivery and execution, and recovery. The old cache was limited to caching messages, and a single state machine ran the combined VP and Paxos algorithm. This constrained xcom to deliver only a single message at a time.
Each instance of the Paxos state machine implements the basic Paxos protocol. Unlike the cache in the old system, it is not cleared when a site is deleted. This removes some problems related to message delivery during site deletion. The cache is a classic fixed size LRU with a hash index.
Some extensions to the basic Paxos algorithm has been implemented:
A node has ownership to all synodes with its own node number. Only a node with node number N can propose a value for synode {X N}, where X is the sequence number, and N is the node number. Other nodes can only propose the special value no_op for synode {X N}. The reason for this is to retain the leaderless Paxos algorithm, but to avoid collisions between nodes which are competing for the same synode number. With this scheme, each node has its own unique number series during normal operation. The scheme has the following implications:
- If a node N has not already proposed a value for the synode {X N}, it may at any time send a LEARN message to the other nodes with the reserved value no_op, without going through phase 1 and 2 of Paxos. This is because the other nodes are constrained to propose no_op for this synode, so the final outcome will always be no_op. To avoid unnecessary message transmission, a node will try to broadcast the no_op LEARN messages by piggybacking the information on the messages of the basic Paxos protocol.
- Other nodes which want to find the value of synode {X N} may do so by trying to get the value no_op accepted by following the basic Paxos algorithm. The result will be the actual value proposed by node N if it has done so, otherwise no_op. This will typically only be necessary when a node is down, and the other nodes need to find the values from the missing node in order to be able to continue execution.
Messages are delivered in order to the client, and the order is determined by the sequence number and the node number, with the sequence number as the most significant part.
The xcom network interface has been redesigned and is now implemented directly on top of TCP, and has so far been completely trouble free. We use poll() or select() to implement non-blocking send and receive, but libev could equally well have been used.
Multicast is implemented on top of unicast as before, but the implementation is prepared to use real multicast with relatively minor changes.
The roles of proposer, acceptor/learner, and executor are now directly mapped to unique task types which interact with the Paxos state machines, whereas the previous implementation folded all the roles into a single event driven state machine.
The following terminology will be used:
A node is an instance of the xcom thread. There is only one instance of the xcom thread in the agent. A client is the application which is using xcom to send messages. A thread is a real OS thread. A task is a logical process. It is implemented by coroutines and an explicit stack.
The implementation of tasks and non-blocking socket operations is isolated in task.h and task.c.
A node will open a tcp connection to each of the other nodes. This connection is used for all communication initiated by the node, and replies to messages will arrive on the connection on which it was sent.
static int tcp_server(task_arg);
The tcp_server listens on the xcom port and starts an acceptor_learner_task whenever a new connection is detected.
static int tcp_reaper_task(task_arg);
Closes tcp connection which have been unused for too long.
static int sender_task(task_arg);
The sender_task waits for tcp messages on its input queue and sends it on the tcp socket. If the socket is closed for any reason, the sender_task will reconnect the socket. There is one sender_task for each socket. The sender task exists mainly to simplify the logic in the other tasks, but it could have been replaced with a coroutine which handles the connection logic after having reserved the socket for its client task.
static int generator_task(task_arg);
The generator_task reads messages from the client queue and moves them into the input queue of the proposer_task.
OHKFIX Use a tcp socket instead of the client queue. We can then remove the generator_task and let the acceptor_learner_task do the dispatching.
static int proposer_task(task_arg);
Assign a message number to an outgoing message and try to get it accepted. There may be several proposer tasks on each node working in parallel. If there are multiple proposer tasks, xcom can not guarantee that the messages will be sent in the same order as received from the client.
static int acceptor_learner_task(task_arg);
This is the server part of the xcom thread. There is one acceptor_learner_task for each node in the system. The acceptor learner_task reads messages from the socket, finds the correct Paxos state machine, and dispatches to the correct message handler with the state machine and message as arguments.
static int reply_handler_task(task_arg);
The reply_handler_task does the same job as the acceptor_learner_task, but listens on the socket which the node uses to send messages, so it will handle only replies on that socket.
static int executor_task(task_arg);
The ececutor_task waits for a Paxos message to be accpeted. When the message is accepted, it is delivered to the client, unless it is a no-op. In either case, the executor_task steps to the next message and repeats the wait. If it times out waiting for a message, it will try to get a no-op accepted.
static int alive_task(task_arg);
Sends i-am-alive to other nodes if there has been no normal traffic for a while. It also pings nodes which seem to be inactive.
static int detector_task(task_arg);
The detector_task periodically scans the set of connections from other nodes and sees if there has been any activity. If there has been no activity for some time, it will assume that the node is dead, and send a view message to the client.
Reconfiguration:
The xcom reconfiguration process is essentially the one described in "Reconfiguring a State Machine" by Lamport et al. as the R-alpha algorithm. We execute the reconfiguration command immediately, but the config is only valid after a delay of alpha messages. The parameter alpha is the same as EVENT_HORIZON in this implementation. :/static.*too_far All tcp messages from beyond the event horizon will be ignored.