![]() |
MySQL 9.2.0
Source Code Documentation
|
xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision. More...
#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <poll.h>
#include "xcom/xcom_profile.h"
#include "my_compiler.h"
#include "xcom/x_platform.h"
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sockio.h>
#include <memory>
#include "xcom/app_data.h"
#include "xcom/get_synode_app_data.h"
#include "xcom/node_no.h"
#include "xcom/server_struct.h"
#include "xcom/simset.h"
#include "xcom/site_struct.h"
#include "xcom/task.h"
#include "xcom/task_net.h"
#include "xcom/task_os.h"
#include "xcom/xcom_base.h"
#include "xcom/xcom_common.h"
#include "xcom/xcom_detector.h"
#include "xcom/xcom_transport.h"
#include "xcom/xdr_utils.h"
#include "xdr_gen/xcom_vp.h"
#include "xcom/bitset.h"
#include "xcom/leader_info_data.h"
#include "xcom/node_list.h"
#include "xcom/node_set.h"
#include "xcom/pax_msg.h"
#include "xcom/site_def.h"
#include "xcom/sock_probe.h"
#include "xcom/synode_no.h"
#include "xcom/task_debug.h"
#include "xcom/xcom_cache.h"
#include "xcom/xcom_cfg.h"
#include "xcom/xcom_interface.h"
#include "xcom/xcom_memory.h"
#include "xcom/xcom_msg_queue.h"
#include "xcom/xcom_recover.h"
#include "xcom/xcom_statistics.h"
#include "xcom/xcom_vp_str.h"
#include "xcom/network/xcom_network_provider.h"
#include <openssl/ssl.h>
#include <chrono>
#include <future>
#include <queue>
#include <tuple>
#include "xcom/retry.h"
#include <sys/utsname.h>
#include <netinet/in.h>
Classes | |
struct | synode_pool |
struct | execute_context |
struct | fp_name |
struct | xcom_fsm_state |
Macros | |
#define | __STDC_FORMAT_MACROS |
#define | SYS_STRERROR_SIZE 512 |
#define | XCOM_SEND_APP_WAIT_TIMEOUT 20 |
#define | PROTOVERSION_WARNING_TIMEOUT 600.0 /** Every 10 minutes */ |
#define | IS_CONS_ALL(p) ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0) |
#define | PROP_ITER |
#define | FNVSTART 0x811c9dc5 |
#define | GOTO(x) |
#define | FIFO_SIZE 1000 |
#define | NAME(f) { f, #f } |
#define | reply_msg(m) |
#define | CREATE_REPLY(x) |
#define | SEND_REPLY |
#define | PING_GATHERING_TIME_WINDOW 5.0 |
#define | PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN 3 |
#define | BAL_FMT "ballot {cnt %d node %d}" |
#define | BAL_MEM(x) (x).cnt, (x).node |
#define | SERIALIZE_REPLY(msg) |
#define | WRITE_REPLY |
#define | X(b) #b |
#define | X_FSM_STATE(s) { s, #s } |
#define | SET_X_FSM_STATE(s) |
#define | CONNECT_FAIL |
#define | TICK_PERIOD 0.01 |
#define | X(b) #b |
Typedefs | |
typedef bool(* | unsafe_node_check) (node_address const *node) |
typedef enum allow_event_horizon_result | allow_event_horizon_result |
typedef struct execute_context | execute_context |
typedef void(* | exec_fp) (execute_context *xc) |
typedef struct xcom_fsm_state | xcom_fsm_state |
typedef int(* | xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
typedef enum xcom_send_app_wait_result | xcom_send_app_wait_result |
typedef void(* | paxos_state_action) (pax_machine *paxos, site_def const *site, pax_msg *mess) |
Enumerations | |
enum class | synode_allocation_type { todo = 0 , local , remote , global } |
enum class | synode_reservation_status : int { number_ok , no_nodes , delivery_timeout } |
enum | allow_event_horizon_result { EVENT_HORIZON_ALLOWED , EVENT_HORIZON_INVALID , EVENT_HORIZON_UNCHANGEABLE } |
Check if we can reconfigure the event horizon. More... | |
enum | { TAG_START = 313 } |
enum | xcom_send_app_wait_result { SEND_REQUEST_FAILED = 0 , RECEIVE_REQUEST_FAILED , REQUEST_BOTCHED , RETRIES_EXCEEDED , REQUEST_OK_RECEIVED , REQUEST_FAIL_RECEIVED , REQUEST_OK_REDIRECT } |
enum | { paxos_timer_range = 1000 } |
Functions | |
long | xcom_unique_long (void) |
static int64_t | socket_write (connection_descriptor *wfd, void *_buf, uint32_t n, connnection_write_method write_function=con_write) |
static double | wakeup_delay (double old) |
static void | note_snapshot (node_no node) |
static int | proposer_task (task_arg arg) |
static int | executor_task (task_arg arg) |
static int | sweeper_task (task_arg arg) |
int | alive_task (task_arg arg) |
int | cache_manager_task (task_arg arg) |
int | detector_task (task_arg arg) |
static int | finished (pax_machine *p) |
static int | accepted (pax_machine *p) |
static int | started (pax_machine *p) |
static synode_no | first_free_synode_local (synode_no msgno) |
static void | free_forced_config_site_def () |
static void | activate_sweeper () |
static void | force_pax_machine (pax_machine *p, int enforcer) |
static void | propose_noop_2p (synode_no find, pax_machine *p) |
static void | handle_need_snapshot (linkage *reply_queue, pax_msg *pm) |
static void | handle_skip (site_def const *site, pax_machine *p, pax_msg *m) |
static void | paxos_fsm (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static bool | is_leader (site_def *site) |
int | is_active_leader (node_no x, site_def *site) |
static msg_handler * | primary_dispatch_table () |
static msg_handler * | secondary_dispatch_table () |
static void | recompute_node_sets (site_def const *old_site, site_def *new_site) |
void | recompute_timestamps (detector_state const old_timestamp, node_list const *old_nodes, detector_state new_timestamp, node_list const *new_nodes) |
void | analyze_leaders (site_def *site) |
static int | ignore_message (synode_no x, site_def *site, char const *dbg) |
static void | init_proposers () |
void | initialize_lsn (uint64_t n) |
void | init_base_vars () |
uint32_t | get_my_xcom_id () |
synode_no | get_current_message () |
synode_no | get_max_synode () |
static bool_t | is_latest_config (site_def const *const config) |
static site_def const * | first_event_horizon_reconfig () |
Get the first pending configuration that reconfigures the event horizon. More... | |
static site_def const * | latest_event_horizon_reconfig () |
Get the latest pending configuration that reconfigures the event horizon. More... | |
static synode_no | add_event_horizon (synode_no s) |
Add the event horizon to the given base synod s. More... | |
void | set_group (uint32_t id) |
Set node group. More... | |
static void | bury_site (uint32_t id) |
static bool_t | is_dead_site (uint32_t id) |
node_set * | init_node_set (node_set *set, u_int n) |
node_set * | alloc_node_set (node_set *set, u_int n) |
static synode_no | incr_msgno (synode_no msgno) |
synode_no | incr_synode (synode_no synode) |
static void | skip_value (pax_msg *p) |
static int | ignoresig (int signum) |
static int | recently_active (pax_machine *p) |
int | pm_finished (pax_machine *p) |
static int | accepted_noop (pax_machine *p) |
static int | noop_match (pax_machine *p, pax_msg *pm) |
void | set_last_received_config (synode_no received_config_change) |
static node_no | max_check (site_def const *site) |
static int | is_forcing_node (pax_machine const *p) |
static int | majority (bit_set const *nodeset, site_def const *s, int all, int delay, int force) |
static int | prep_majority (site_def const *site, pax_machine const *p) |
static int | prop_majority (site_def const *site, pax_machine const *p) |
site_def const * | get_executor_site () |
site_def * | get_executor_site_rw () |
site_def const * | get_proposer_site () |
synode_no | get_delivered_msg () |
synode_no | get_last_delivered_msg () |
void | init_xcom_base () |
static void | init_tasks () |
void | xcom_thread_init () |
static void | empty_prop_input_queue () |
static void | empty_synode_number_pool () |
void | xcom_thread_deinit () |
static void | create_proposers () |
static void | add_proposer_synode (int i, synode_no *syn_ptr) |
static void | remove_proposer_synode (int i) |
static synode_no | get_proposer_synode (int i) |
static synode_no | min_proposer_synode () |
static void | terminate_proposers () |
static void | set_proposer_startpoint () |
void | set_xcom_run_cb (xcom_state_change_cb x) |
void | set_xcom_exit_cb (xcom_state_change_cb x) |
void | set_xcom_comms_cb (xcom_state_change_cb x) |
void | set_xcom_expel_cb (xcom_state_change_cb x) |
void | set_xcom_input_try_pop_cb (xcom_input_try_pop_cb pop) |
static bool_t | xcom_input_signal_connection_shutdown_ssl_wait_for_peer () |
static bool_t | xcom_input_signal_connection_shutdown_ssl () |
bool_t | xcom_input_new_signal_connection (char const *address, xcom_port port) |
bool_t | xcom_input_signal () |
void | xcom_input_free_signal_connection () |
static int | local_server_shutdown_ssl (connection_descriptor *con, void *buf, int n, int *ret) |
int | local_server (task_arg arg) |
static bool_t | local_server_is_setup () |
static void | init_time_queue () |
static int | paxos_timer_task (task_arg arg) |
int | xcom_taskmain2 (xcom_port listen_port) |
static void | prepare (pax_msg *p, pax_op op) |
void | init_prepare_msg (pax_msg *p) |
Initializes the message p as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol. More... | |
static int | prepare_msg (pax_msg *p) |
pax_msg * | create_noop (pax_msg *p) |
Initializes the message p as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol. More... | |
static pax_msg * | create_read (site_def const *site, pax_msg *p) |
static int | skip_msg (pax_msg *p) |
static void | brand_app_data (pax_msg *p) |
static synode_no | my_unique_id (synode_no synode) |
static void | set_unique_id (pax_msg *msg, synode_no synode) |
void | init_propose_msg (pax_msg *p) |
Initializes the message p as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol. More... | |
static int | send_propose_msg (pax_msg *p) |
static int | propose_msg (pax_msg *p) |
static void | set_learn_type (pax_msg *p) |
static void | init_learn_msg (pax_msg *p) |
static int | send_learn_msg (site_def const *site, pax_msg *p) |
static pax_msg * | create_tiny_learn_msg (pax_machine *pm, pax_msg *p) |
static int | send_tiny_learn_msg (site_def const *site, pax_msg *p) |
void | prepare_push_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type) |
Initializes the message msg to go through a 3-phase, regular Paxos. More... | |
void | prepare_push_2p (site_def const *site, pax_machine *p) |
Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_). More... | |
static void | push_msg_2p (site_def const *site, pax_machine *p) |
static void | push_msg_3p (site_def const *site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type) |
static void | brand_client_msg (pax_msg *msg, synode_no msgno) |
void | xcom_send (app_data_ptr a, pax_msg *msg) |
static uint32_t | fnv_hash (unsigned char *buf, size_t length, uint32_t sum) |
uint32_t | new_id () |
Create a new (hopefully unique) ID. More... | |
static synode_no | getstart (app_data_ptr a) |
void | site_install_action (site_def *site, cargo_type operation) |
static void | active_leaders (site_def *site, leader_array *leaders) |
void | synthesize_leaders (leader_array *leaders) |
static bool | leaders_set_by_client (site_def const *site) |
static site_def * | create_site_def_with_start (app_data_ptr a, synode_no start) |
static site_def * | install_ng_with_start (app_data_ptr a, synode_no start) |
site_def * | install_node_group (app_data_ptr a) |
void | set_max_synode (synode_no synode) |
static int | is_busy (synode_no s) |
bool_t | match_my_msg (pax_msg *learned, pax_msg *mine) |
static uint64_t | assign_lsn () |
Assign the next log sequence number (lsn) for a message. More... | |
static void | propose_noop (synode_no find, pax_machine *p) |
static uint64_t | too_far_threshold (xcom_event_horizon active_event_horizon) |
Checks if the given synod s is outside the event horizon. More... | |
static uint64_t | too_far_threshold_new_event_horizon_pending (site_def const *new_config) |
static int | too_far (synode_no s) |
static int | is_view (cargo_type x) |
static int | is_config (cargo_type x) |
static int | wait_for_cache (pax_machine **pm, synode_no synode, double timeout) |
static synode_no | local_synode_allocator (synode_no synode) |
static synode_no | global_synode_allocator (site_def *site, synode_no synode) |
static node_no | remote_synode_allocator (site_def *site, app_data const &a) |
static int | reserve_synode_number (synode_allocation_type *synode_allocation, site_def **site, synode_no *msgno, int *remote_retry, app_data *a, synode_reservation_status *ret) |
static constexpr bool | should_ignore_forced_config_or_view (xcom_proto protocol_version) |
static node_no | get_leader (site_def const *s) |
int | iamthegreatest (site_def const *s) |
static site_def * | update_site (site_def *site, node_set const *ns, synode_no boot_key, synode_no start) |
void | execute_msg (site_def *site, pax_machine *pma, pax_msg *p) |
static void | read_missing_values (int n) |
static void | propose_missing_values (int n) |
static void | find_value (site_def const *site, unsigned int *wait, int n) |
static void | dump_debug_exec_state () |
int | get_xcom_message (pax_machine **p, synode_no msgno, int n) |
synode_no | set_executed_msg (synode_no msgno) |
synode_no | set_current_message (synode_no msgno) |
static void | update_max_synode (pax_msg *p) |
static int | match_leader (char const *addr, leader_array const leaders) |
static bool | alive_node (site_def const *site, u_int i) |
node_no | found_active_leaders (site_def *site) |
static void | debug_loser (synode_no x) |
static void | send_value (site_def const *site, node_no to, synode_no synode) |
static synode_no | compute_delay (synode_no start, xcom_event_horizon event_horizon) |
Returns the message number where it is safe for nodes in previous configuration to exit. More... | |
static void | inform_removed (int index, int all) |
static bool_t | backwards_compatible (xcom_event_horizon event_horizon) |
static bool_t | reconfigurable_event_horizon (xcom_proto protocol_version) |
static bool_t | add_node_unsafe_against_ipv4_old_nodes (app_data_ptr a) |
static bool_t | add_node_adding_own_address (app_data_ptr a) |
This will test if we are receiving a boot request that contains ourselves. More... | |
static bool | unsafe_against_event_horizon (node_address const *node) |
Check if a node is compatible with the group's event horizon. More... | |
static bool | check_if_add_node_is_unsafe (app_data_ptr a, unsafe_node_check unsafe) |
static bool | check_if_add_node_is_unsafe_against_event_horizon (app_data_ptr a) |
void | recompute_node_set (node_set const *old_set, node_list const *old_nodes, node_set *new_set, node_list const *new_nodes) |
static bool | incompatible_proto_and_max_leaders (xcom_proto x_proto, node_no max_leaders) |
static bool | incompatible_proto_and_leaders (xcom_proto x_proto) |
static bool | incompatible_proto_and_max_leaders (node_address const *node) |
static bool | incompatible_proto_and_leaders (node_address const *node) |
bool | unsafe_leaders (app_data *a) |
static void | set_start_and_boot (site_def *new_config, app_data_ptr a) |
site_def * | handle_add_node (app_data_ptr a) |
Reconfigure the group membership: add new member(s). More... | |
static void | log_event_horizon_reconfiguration_failure (allow_event_horizon_result error_code, xcom_event_horizon attempted_event_horizon) |
static allow_event_horizon_result | allow_event_horizon (xcom_event_horizon event_horizon) |
static bool_t | is_unsafe_event_horizon_reconfiguration (app_data_ptr a) |
static bool_t | is_unsafe_max_leaders_reconfiguration (app_data_ptr a) |
static bool_t | is_unsafe_set_leaders_reconfiguration (app_data_ptr a) |
static bool_t | is_unsafe_leaders_reconfiguration (app_data_ptr a) |
static bool_t | are_there_dead_nodes_in_new_config (app_data_ptr a) |
bool_t | handle_event_horizon (app_data_ptr a) |
Reconfigure the event horizon. More... | |
static bool_t | handle_max_leaders (site_def *new_config, app_data_ptr a) |
bool_t | handle_max_leaders (app_data_ptr a) |
static void | zero_leader_array (leader_array *l) |
static void | move_leader_array (leader_array *target, leader_array *source) |
static bool_t | handle_set_leaders (site_def *new_config, app_data_ptr a) |
bool_t | handle_set_leaders (app_data_ptr a) |
bool_t | handle_leaders (app_data_ptr a) |
void | terminate_and_exit () |
static int | is_empty_site (site_def const *s) |
site_def * | handle_remove_node (app_data_ptr a) |
static void | log_ignored_forced_config (app_data_ptr a, char const *const caller_name) |
bool_t | handle_config (app_data_ptr a, bool const forced) |
static int | is_member (site_def const *site) |
static int | addone (int i) |
static int | fifo_empty () |
static int | fifo_full () |
static void | fifo_insert (synode_no s) |
static synode_no | fifo_extract () |
static synode_no | fifo_front () |
static void | dump_exec_state (execute_context *xc, long dbg) |
static int | x_check_exit (execute_context *xc) |
static int | x_check_execute_inform (execute_context *xc) |
static void | x_fetch (execute_context *xc) |
static void | x_execute (execute_context *xc) |
static void | x_check_increment_fetch (execute_context *xc) |
static void | x_check_increment_execute (execute_context *xc) |
static void | x_terminate (execute_context *xc) |
static void | setup_exit_handling (execute_context *xc, site_def *site) |
static synode_no | get_sweep_start () |
static bool | allow_channel_takeover (site_def const *site) |
static void | broadcast_noop (synode_no find, pax_machine *p) |
static site_def const * | init_noop (synode_no find, pax_machine *p) |
static void | send_read (synode_no find) |
static int | ok_to_propose (pax_machine *p) |
bool_t | safe_app_data_copy (pax_msg **target, app_data_ptr source) |
Copies app data source into target and checks if the copy succeeded. More... | |
static pax_msg * | create_learn_msg_for_ignorant_node (pax_machine *p, pax_msg *pm, synode_no synode) |
static void | teach_ignorant_node (site_def const *site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue) |
static void | handle_read (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm) |
static pax_msg * | create_ack_prepare_msg (pax_machine *p, pax_msg *pm, synode_no synode) |
pax_msg * | handle_simple_prepare (pax_machine *p, pax_msg *pm, synode_no synode) |
Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol. More... | |
static void | handle_prepare (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *pm) |
bool_t | check_propose (site_def const *site, pax_machine *p) |
static bool | learn_ok (site_def const *site, pax_machine const *p) |
static pax_msg * | check_learn (site_def const *site, pax_machine *p) |
static void | do_learn (site_def const *site, pax_machine *p, pax_msg *m) |
bool_t | handle_simple_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m) |
Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol. More... | |
static void | handle_ack_prepare (site_def const *site, pax_machine *p, pax_msg *m) |
static pax_msg * | create_ack_accept_msg (pax_msg *m, synode_no synode) |
pax_msg * | handle_simple_accept (pax_machine *p, pax_msg *m, synode_no synode) |
Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol. More... | |
static void | handle_accept (site_def const *site, pax_machine *p, linkage *reply_queue, pax_msg *m) |
pax_msg * | handle_simple_ack_accept (site_def const *site, pax_machine *p, pax_msg *m) |
Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol. More... | |
static void | handle_ack_accept (site_def const *site, pax_machine *p, pax_msg *m) |
void | handle_tiny_learn (site_def const *site, pax_machine *pm, pax_msg *p) |
Process the incoming tiny, i.e. More... | |
static void | force_interval (synode_no start, synode_no end, int enforcer) |
static void | start_force_config (site_def *s, int enforcer) |
void | handle_learn (site_def const *site, pax_machine *p, pax_msg *m) |
Process the incoming Learn message. More... | |
static void | handle_client_msg (pax_msg *p) |
static void | handle_boot (site_def const *site, linkage *reply_queue, pax_msg *p) |
bool_t | should_handle_need_boot (site_def const *site, pax_msg *p) |
void | init_need_boot_op (pax_msg *p, node_address *identity) |
Initializes the message p as a need_boot_op message. More... | |
int | pre_process_incoming_ping (site_def const *site, pax_msg const *pm, int has_client_already_booted, double current_time) |
Process incoming are_you_alive (i.e. More... | |
static void | handle_alive (site_def const *site, linkage *reply_queue, pax_msg *pm) |
xcom_event_horizon | xcom_get_minimum_event_horizon () |
xcom_event_horizon | xcom_get_maximum_event_horizon () |
static client_reply_code | xcom_get_event_horizon (xcom_event_horizon *event_horizon) |
Retrieves the latest event horizon. More... | |
static bool | add_node_test_connectivity_to_added_nodes (node_address *nodes_to_change, u_int number_of_nodes_to_change) |
static u_int | allow_add_node (app_data_ptr a) |
static u_int | allow_remove_node (app_data_ptr a) |
static void | log_cfgchange_wrong_group (app_data_ptr a, const char *const message_fmt) |
Logs the fact that an add/remove node request is aimed at another group. More... | |
static client_reply_code | can_execute_cfgchange (pax_msg *p) |
Validates if a configuration command can be executed. More... | |
void | dispatch_get_event_horizon (site_def const *site, pax_msg *p, linkage *reply_queue) |
static reply_data * | new_leader_info (site_def *site) |
void | dispatch_get_leaders (site_def *site, pax_msg *p, linkage *reply_queue) |
static void | log_get_synode_app_data_failure (xcom_get_synode_app_data_result error_code) |
void | dispatch_get_synode_app_data (site_def const *site, pax_msg *p, linkage *reply_queue) |
static int | can_send_snapshot () |
static void | process_client_msg (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static int | abort_processing (pax_msg *p) |
static void | process_ack_prepare_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_ack_accept_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_recover_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_skip_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_i_am_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_are_you_alive_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_need_boot_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_die_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_read_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_gcs_snapshot_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_tiny_learn_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_synode_request (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | process_synode_allocated (site_def const *site, pax_msg *p, linkage *reply_queue) |
static msg_handler * | clone_dispatch_table (msg_handler const *proto) |
pax_msg * | dispatch_op (site_def const *site, pax_msg *p, linkage *reply_queue) |
static void | update_srv (server **target, server *srv) |
static int | harmless (pax_msg const *p) |
static bool_t | should_poll_cache (pax_op op) |
int | acceptor_learner_task (task_arg arg) |
static void | server_handle_need_snapshot (server *srv, site_def const *s, node_no node) |
int | reply_handler_task (task_arg arg) |
void | xcom_sleep (unsigned int seconds) |
app_data_ptr | init_config_with_group (app_data *a, node_list *nl, cargo_type type, uint32_t group_id) |
app_data_ptr | init_set_event_horizon_msg (app_data *a, uint32_t group_id, xcom_event_horizon event_horizon) |
app_data_ptr | init_get_msg (app_data *a, uint32_t group_id, cargo_type const t) |
app_data_ptr | init_get_leaders_msg (app_data *a, uint32_t group_id) |
app_data_ptr | init_get_event_horizon_msg (app_data *a, uint32_t group_id) |
app_data_ptr | init_app_msg (app_data *a, char *payload, u_int payload_size) |
static app_data_ptr | init_get_synode_app_data_msg (app_data *a, uint32_t group_id, synode_no_array *const synodes) |
app_data_ptr | init_set_cache_size_msg (app_data *a, uint64_t cache_limit) |
app_data_ptr | init_convert_into_local_server_msg (app_data *a) |
static void | server_send_snapshot (server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node) |
static void | server_push_log (server *srv, synode_no push, node_no node) |
static void | reply_push_log (synode_no push, linkage *reply_queue) |
static gcs_snapshot * | create_snapshot () |
static int | xcom_timer (task_arg arg) |
static void | stop_x_timer () |
static void | start_x_timer (double t) |
static int | x_fsm_completion_task (task_arg arg) |
void | send_x_fsm_complete () |
static void | reset_snapshot_mask () |
static int | got_all_snapshots () |
static int | better_snapshot (gcs_snapshot *gcs) |
static void | handle_x_snapshot (gcs_snapshot *gcs) |
static void | update_best_snapshot (gcs_snapshot *gcs) |
static void | send_need_boot () |
void | set_log_end (gcs_snapshot *gcs) |
static int | xcom_fsm_init (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_start_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_start (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_snapshot_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_snapshot_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_recover_wait_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_recover_wait (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_run_enter (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | xcom_fsm_run (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | handle_fsm_net_boot (task_arg fsmargs, xcom_fsm_state *ctxt, int cont) |
static int | handle_fsm_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | handle_fsm_snapshot_wait (xcom_fsm_state *ctxt) |
static void | handle_fsm_exit () |
static int | handle_local_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | handle_snapshot (task_arg fsmargs, xcom_fsm_state *ctxt) |
static int | handle_fsm_terminate (task_arg fsmargs, xcom_fsm_state *ctxt) |
static void | handle_fsm_force_config (task_arg fsmargs) |
xcom_fsm_state * | xcom_fsm_impl (xcom_actions action, task_arg fsmargs) |
char const * | xcom_fsm (xcom_actions action, task_arg fsmargs) |
void | set_app_snap_handler (app_snap_handler x) |
void | set_app_snap_getter (app_snap_getter x) |
static result | socket_read (connection_descriptor *rfd, void *buf, int n) |
static int64_t | socket_read_bytes (connection_descriptor *rfd, char *p, uint32_t n) |
connection_descriptor * | xcom_open_client_connection (char const *server, xcom_port port) |
static int | xcom_send_proto (connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag) |
static int | xcom_recv_proto (connection_descriptor *rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag) |
static int | is_cargo_type (app_data_ptr a, cargo_type t) |
Checks if a given app_data is from a given cargo_type. More... | |
static char * | get_add_node_address (app_data_ptr a, unsigned int *member) |
Retrieves the address that was used in the add_node request. More... | |
int | is_node_v4_reachable_with_info (struct addrinfo *retrieved_addr_info) |
int | is_node_v4_reachable (char *node_address) |
int | are_we_allowed_to_upgrade_to_v6 (app_data_ptr a) |
int64_t | xcom_send_client_app_data (connection_descriptor *fd, app_data_ptr a, int force) |
int64_t | xcom_client_send_die (connection_descriptor *fd) |
void | warn_protoversion_mismatch (connection_descriptor *rfd) |
static pax_msg * | socket_read_msg (connection_descriptor *rfd, pax_msg *p) |
static xcom_send_app_wait_result | xcom_send_app_wait_and_get (connection_descriptor *fd, app_data *a, int force, pax_msg *p, leader_info_data *leaders) |
Send a message and wait for response. More... | |
static int | xcom_send_app_wait (connection_descriptor *fd, app_data *a, int force, leader_info_data *leaders) |
int | xcom_send_cfg_wait (connection_descriptor *fd, node_list *nl, uint32_t group_id, cargo_type ct, int force) |
int | xcom_client_add_node (connection_descriptor *fd, node_list *nl, uint32_t group_id) |
int | xcom_client_remove_node (connection_descriptor *fd, node_list *nl, uint32_t group_id) |
static int | xcom_check_reply (int const res) |
int | xcom_client_get_synode_app_data (connection_descriptor *const fd, uint32_t group_id, synode_no_array *const synodes, synode_app_data_array *const reply) |
int | xcom_client_enable_arbitrator (connection_descriptor *fd) |
int | xcom_client_disable_arbitrator (connection_descriptor *fd) |
int | xcom_client_set_cache_limit (connection_descriptor *fd, uint64_t cache_limit) |
int | xcom_client_convert_into_local_server (connection_descriptor *const fd) |
void | init_set_max_leaders (uint32_t group_id, app_data *a, node_no max_leaders) |
int | xcom_client_set_max_leaders (connection_descriptor *fd, node_no max_leaders, uint32_t group_id) |
leader_array | new_leader_array (u_int n, char const *names[]) |
void | init_set_leaders (uint32_t group_id, app_data *a, leader_array const leaders) |
void | init_set_leaders (uint32_t group_id, app_data *a, u_int n, char const *names[]) |
void | init_set_leaders (uint32_t group_id, app_data *leader_app, leader_array const leaders, app_data *max_app, node_no max_leaders) |
void | init_set_leaders (uint32_t group_id, app_data *leader_app, u_int n, char const *names[], app_data *max_app, node_no max_leaders) |
int | xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], uint32_t group_id) |
std::unique_ptr< Network_provider_management_interface > | get_network_management_interface () |
std::unique_ptr< Network_provider_operations_interface > | get_network_operations_interface () |
int | xcom_client_set_leaders (connection_descriptor *fd, u_int n, char const *names[], node_no max_leaders, uint32_t group_id) |
int | xcom_client_get_leaders (connection_descriptor *fd, uint32_t group_id, leader_info_data *leaders) |
static void | paxos_twait (pax_machine *p, unsigned int t) |
static void | paxos_twait_cancel (pax_machine *p) |
static void | paxos_wakeup (unsigned int t) |
static void | paxos_timer_advance () |
static int | paxos_fsm_p1_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p1_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p2_master_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p2_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p2_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p2_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p3_master_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p3_slave_enter (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_p3_slave_wait (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | paxos_fsm_finished (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
static int | accept_new_prepare (pax_machine *paxos, pax_msg *mess) |
static int | accept_new_accept (pax_machine *paxos, pax_msg *mess) |
static int | own_message (pax_msg *mess, site_def const *site) |
static void | action_paxos_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_paxos_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_paxos_learn (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_paxos_start (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_new_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_ack_prepare (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_new_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_ack_accept (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | action_ignorant (pax_machine *paxos, site_def const *site, pax_msg *mess) |
static void | dispatch_p_event (paxos_state_action *vtbl, pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
int | paxos_fsm_idle (pax_machine *paxos, site_def const *site, paxos_event event, pax_msg *mess) |
xcom/xcom_base.c The new version of xcom is a major rewrite to allow transmission of multiple messages from several sources simultaneously without collision.
The interface to xcom is largely intact, one notable change is that xcom will consider the message delivered as soon as it has got a majority. Consequently, the VP set will not necessarily show all nodes which will actually receive the message.
OHKFIX Add wait for complete last known node set to mimic the old semantics.
IMPORTANT: What xcom does and what it does not do:
xcom messages are received in the same order on all nodes.
xcom guarantees that if a message is delivered to one node, it will eventually be seen on all other nodes as well.
xcom messages are available to a crashed node when it comes up again if at least one node which knows the value of the message has not crashed. The size of the message cache is configurable.
OHKFIX Add logging to disk to make messages durable across system crash and to increase the number of messages which may be cached.
There is no guarantee whatsoever about the order of messages from different nodes, not even the order of multiple messages from the same node. It is up to the client to impose such an order by waiting on a message before it sends the next.
xcom can notify the client that a message has timed out, and in that case will try to cancel the message, but it cannot guarantee that a message which has timed out will not be delivered.
xcom attaches a node set to each message as it is delivered to the client. This node set reflects the current node set that xcom believes is active, it does not mean that the message has been delivered yet to all nodes in the set. Neither does it mean that the message has not been delivered to the nodes not in the set.
A cache of Paxos state machines is central to the new design. The purpose of the cache is both to store a window of messages, and to decouple the different parts of xcom, like message proposal, message delivery and execution, and recovery. The old cache was limited to caching messages, and a single state machine ran the combined VP and Paxos algorithm. This constrained xcom to deliver only a single message at a time.
Each instance of the Paxos state machine implements the basic Paxos protocol. Unlike the cache in the old system, it is not cleared when a site is deleted. This removes some problems related to message delivery during site deletion. The cache is a classic fixed size LRU with a hash index.
Some extensions to the basic Paxos algorithm has been implemented:
A node has ownership to all synodes with its own node number. Only a node with node number N can propose a value for synode {X N}, where X is the sequence number, and N is the node number. Other nodes can only propose the special value no_op for synode {X N}. The reason for this is to retain the leaderless Paxos algorithm, but to avoid collisions between nodes which are competing for the same synode number. With this scheme, each node has its own unique number series during normal operation. The scheme has the following implications:
Messages are delivered in order to the client, and the order is determined by the sequence number and the node number, with the sequence number as the most significant part.
The xcom network interface has been redesigned and is now implemented directly on top of TCP, and has so far been completely trouble free. We use poll() or select() to implement non-blocking send and receive, but libev could equally well have been used.
Multicast is implemented on top of unicast as before, but the implementation is prepared to use real multicast with relatively minor changes.
The roles of proposer, acceptor/learner, and executor are now directly mapped to unique task types which interact with the Paxos state machines, whereas the previous implementation folded all the roles into a single event driven state machine.
The following terminology will be used:
A node is an instance of the xcom thread. There is only one instance of the xcom thread in the agent. A client is the application which is using xcom to send messages. A thread is a real OS thread. A task is a logical process. It is implemented by coroutines and an explicit stack.
The implementation of tasks and non-blocking socket operations is isolated in task.h and task.c.
A node will open a tcp connection to each of the other nodes. This connection is used for all communication initiated by the node, and replies to messages will arrive on the connection on which it was sent.
static int tcp_server(task_arg);
The tcp_server listens on the xcom port and starts an acceptor_learner_task whenever a new connection is detected.
static int tcp_reaper_task(task_arg);
Closes tcp connection which have been unused for too long.
static int sender_task(task_arg);
The sender_task waits for tcp messages on its input queue and sends it on the tcp socket. If the socket is closed for any reason, the sender_task will reconnect the socket. There is one sender_task for each socket. The sender task exists mainly to simplify the logic in the other tasks, but it could have been replaced with a coroutine which handles the connection logic after having reserved the socket for its client task.
static int generator_task(task_arg);
The generator_task reads messages from the client queue and moves them into the input queue of the proposer_task.
OHKFIX Use a tcp socket instead of the client queue. We can then remove the generator_task and let the acceptor_learner_task do the dispatching.
static int proposer_task(task_arg);
Assign a message number to an outgoing message and try to get it accepted. There may be several proposer tasks on each node working in parallel. If there are multiple proposer tasks, xcom can not guarantee that the messages will be sent in the same order as received from the client.
static int acceptor_learner_task(task_arg);
This is the server part of the xcom thread. There is one acceptor_learner_task for each node in the system. The acceptor learner_task reads messages from the socket, finds the correct Paxos state machine, and dispatches to the correct message handler with the state machine and message as arguments.
static int reply_handler_task(task_arg);
The reply_handler_task does the same job as the acceptor_learner_task, but listens on the socket which the node uses to send messages, so it will handle only replies on that socket.
static int executor_task(task_arg);
The ececutor_task waits for a Paxos message to be accpeted. When the message is accepted, it is delivered to the client, unless it is a no-op. In either case, the executor_task steps to the next message and repeats the wait. If it times out waiting for a message, it will try to get a no-op accepted.
static int alive_task(task_arg);
Sends i-am-alive to other nodes if there has been no normal traffic for a while. It also pings nodes which seem to be inactive.
static int detector_task(task_arg);
The detector_task periodically scans the set of connections from other nodes and sees if there has been any activity. If there has been no activity for some time, it will assume that the node is dead, and send a view message to the client.
Reconfiguration:
The xcom reconfiguration process is essentially the one described in "Reconfiguring a State Machine" by Lamport et al. as the R-alpha algorithm. We execute the reconfiguration command immediately, but the config is only valid after a delay of alpha messages. The parameter alpha is the same as EVENT_HORIZON in this implementation. :/static.*too_far All tcp messages from beyond the event horizon will be ignored.
#define __STDC_FORMAT_MACROS |
#define BAL_FMT "ballot {cnt %d node %d}" |
#define BAL_MEM | ( | x | ) | (x).cnt, (x).node |
#define CONNECT_FAIL |
#define CREATE_REPLY | ( | x | ) |
#define FIFO_SIZE 1000 |
#define FNVSTART 0x811c9dc5 |
#define GOTO | ( | x | ) |
#define NAME | ( | f | ) | { f, #f } |
#define PING_GATHERING_TIME_WINDOW 5.0 |
#define PINGS_GATHERED_BEFORE_CONNECTION_SHUTDOWN 3 |
#define PROP_ITER |
#define PROTOVERSION_WARNING_TIMEOUT 600.0 /** Every 10 minutes */ |
#define reply_msg | ( | m | ) |
#define SEND_REPLY |
#define SERIALIZE_REPLY | ( | msg | ) |
#define SET_X_FSM_STATE | ( | s | ) |
#define SYS_STRERROR_SIZE 512 |
#define TICK_PERIOD 0.01 |
#define WRITE_REPLY |
#define X | ( | b | ) | #b |
#define X | ( | b | ) | #b |
#define X_FSM_STATE | ( | s | ) | { s, #s } |
#define XCOM_SEND_APP_WAIT_TIMEOUT 20 |
typedef enum allow_event_horizon_result allow_event_horizon_result |
typedef void(* exec_fp) (execute_context *xc) |
typedef struct execute_context execute_context |
typedef void(* paxos_state_action) (pax_machine *paxos, site_def const *site, pax_msg *mess) |
typedef bool(* unsafe_node_check) (node_address const *node) |
typedef int(* xcom_fsm_fp) (xcom_actions action, task_arg fsmargs, xcom_fsm_state *ctxt) |
typedef struct xcom_fsm_state xcom_fsm_state |
typedef enum xcom_send_app_wait_result xcom_send_app_wait_result |
Check if we can reconfigure the event horizon.
We can reconfigure the event horizon if all group members support reconfiguring the event horizon, and the new event horizon in the domain [EVENT_HORIZON_MIN, EVENT_HORIZON_MAX].
We use the group's latest common XCom protocol as a proxy to decide if all members support reconfiguring the event horizon.
If the common protocol is at least version 5 (x_1_4) then all members run compatible server instances.
Otherwise there are older instances, and it follows that the event horizon must be the default and cannot be reconfigured.
Enumerator | |
---|---|
EVENT_HORIZON_ALLOWED | |
EVENT_HORIZON_INVALID | |
EVENT_HORIZON_UNCHANGEABLE |
|
strong |
|
strong |
|
inlinestatic |
|
static |
|
static |
|
inlinestatic |
|
inlinestatic |
int acceptor_learner_task | ( | task_arg | arg | ) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
Add the event horizon to the given base synod s.
We are assuming right now that this function is used solely in the context of "we have received a reconfiguration command at synod s, when should it be scheduled to take effect?" The result of this function is when it should take effect.
Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case result is:
s + event_horizon(active_config) + 1
If an event horizon reconfiguration R is pending, it means that the command C proposed for synod s is concurrent with R, i.e., s falls in the interval ]proposed(R), start(R)[.
In this situation we apply the command C proposed for synod s after taking into account R's event horizon.
This means that the result is:
start(R) + event_horizon(R) + 1
|
static |
This will test if we are receiving a boot request that contains ourselves.
This could happed in case of a misconfiguration of a local_address, that causes an add_node request to be erroneous delivered.
a | app_data with an add node request |
|
static |
|
static |
|
static |
|
inlinestatic |
int alive_task | ( | task_arg | arg | ) |
node_set * alloc_node_set | ( | node_set * | set, |
u_int | n | ||
) |
|
static |
|
static |
|
static |
|
static |
void analyze_leaders | ( | site_def * | site | ) |
|
static |
int are_we_allowed_to_upgrade_to_v6 | ( | app_data_ptr | a | ) |
|
static |
Assign the next log sequence number (lsn) for a message.
Initial propose sets lsn to msgno of the max message number as safe starting point, otherwise lsn shall be ever increasing. lsn ensures sender order known on receiver side, as messages may arrive "out of order" due to retransmission. We use max_synode instead of current_message to avoid any conflict with lsn allocated by a previous instance of the node.
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
int cache_manager_task | ( | task_arg | arg | ) |
|
static |
Validates if a configuration command can be executed.
Checks whether the configuration command is aimed at the correct group. Checks whether the configuration command pertains to a node reincarnation.
p | a pointer to the pax_msg of the configuration command |
REQUEST_OK | if the reconfiguration command can be executed |
REQUEST_RETRY | if XCom is still booting |
REQUEST_FAIL | if the configuration command cannot be executed |
|
static |
|
static |
|
static |
|
static |
bool_t check_propose | ( | site_def const * | site, |
pax_machine * | p | ||
) |
|
static |
|
static |
Returns the message number where it is safe for nodes in previous configuration to exit.
start | start synod of the next configuration |
event_horizon | event horizon of the next configuration |
|
static |
|
static |
Initializes the message p
as a Prepare message for a no-op, as in the message for Phase 1 (a) of the Paxos protocol.
Executed by Proposers.
p | The no-op message to send |
created | paxos message of type no_op |
|
static |
|
static |
|
static |
|
static |
|
static |
int detector_task | ( | task_arg | arg | ) |
|
inlinestatic |
|
static |
|
static |
|
static |
|
static |
|
static |
void execute_msg | ( | site_def * | site, |
pax_machine * | pma, | ||
pax_msg * | p | ||
) |
|
static |
|
inlinestatic |
|
inlinestatic |
|
inlinestatic |
|
inlinestatic |
|
inlinestatic |
|
static |
|
inlinestatic |
|
static |
Get the first pending configuration that reconfigures the event horizon.
Retrieve the first pending site_def, i.e. with the smallest start synod that is greater than executed_msg, that reconfigures the event horizon.
|
static |
|
static |
|
static |
|
static |
node_no found_active_leaders | ( | site_def * | site | ) |
|
static |
|
static |
Retrieves the address that was used in the add_node request.
a | app data containing the node to add |
member | address we used to present ourselves to other nodes |
synode_no get_current_message | ( | ) |
synode_no get_delivered_msg | ( | ) |
site_def const * get_executor_site | ( | ) |
site_def * get_executor_site_rw | ( | ) |
synode_no get_last_delivered_msg | ( | ) |
|
static |
synode_no get_max_synode | ( | ) |
uint32_t get_my_xcom_id | ( | ) |
std::unique_ptr< Network_provider_management_interface > get_network_management_interface | ( | ) |
std::unique_ptr< Network_provider_operations_interface > get_network_operations_interface | ( | ) |
site_def const * get_proposer_site | ( | ) |
|
static |
|
static |
int get_xcom_message | ( | pax_machine ** | p, |
synode_no | msgno, | ||
int | n | ||
) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
site_def * handle_add_node | ( | app_data_ptr | a | ) |
Reconfigure the group membership: add new member(s).
It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.
Particularly, it is possible that any of the concurrent reconfigurations modified the event horizon and that the new member(s) do not support event horizon reconfigurations.
We account for these situations by validating if adding the new members is still possible under the current state.
If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.
|
inlinestatic |
|
static |
bool_t handle_config | ( | app_data_ptr | a, |
bool const | forced | ||
) |
bool_t handle_event_horizon | ( | app_data_ptr | a | ) |
Reconfigure the event horizon.
It is possible that concurrent reconfigurations take effect between the time this reconfiguration was proposed and now.
Particularly, it is possible that any of the concurrent reconfigurations added a new member which does not support reconfiguring the event horizon.
We account for these situations by validating if the event horizon reconfiguration is still possible under the current state.
If it is not, this reconfiguration does not produce any effect, i.e. no new configuration is installed.
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
bool_t handle_leaders | ( | app_data_ptr | a | ) |
void handle_learn | ( | site_def const * | site, |
pax_machine * | p, | ||
pax_msg * | m | ||
) |
Process the incoming Learn message.
Executed by Learners.
site | XCom configuration |
p | Paxos instance |
m | Incoming message |
|
static |
bool_t handle_max_leaders | ( | app_data_ptr | a | ) |
|
static |
|
static |
site_def * handle_remove_node | ( | app_data_ptr | a | ) |
bool_t handle_set_leaders | ( | app_data_ptr | a | ) |
pax_msg * handle_simple_accept | ( | pax_machine * | p, |
pax_msg * | m, | ||
synode_no | synode | ||
) |
Process the incoming Accept from a Proposer, as in the message for Phase 2 (a) of the Paxos protocol.
Executed by Acceptors.
p | Paxos instance |
m | Incoming Accept message |
synode | Synode of the Paxos instance/Accept message |
pax_msg* | the reply to send to the Proposer (as in the Phase 2 (b) message of the Paxos protocol) if the Acceptor accepts the Accept |
NULL | otherwise |
pax_msg * handle_simple_ack_accept | ( | site_def const * | site, |
pax_machine * | p, | ||
pax_msg * | m | ||
) |
Process the incoming acknowledge from an Acceptor to a sent Accept, as in the message for Phase 2 (b) of the Paxos protocol.
Executed by Proposers.
site | XCom configuration |
p | Paxos instance |
m | Incoming message |
pax_msg* | the Learn message to send to Leaners if a majority of Acceptors replied to the Proposer's Accept |
NULL | otherwise |
bool_t handle_simple_ack_prepare | ( | site_def const * | site, |
pax_machine * | p, | ||
pax_msg * | m | ||
) |
Process the incoming acknowledge from an Acceptor to a sent Prepare, as in the message for Phase 1 (b) of the Paxos protocol.
Executed by Proposers.
site | XCom configuration |
p | Paxos instance |
m | Incoming message |
TRUE | if a majority of Acceptors replied to the Proposer's Prepare |
FALSE | otherwise |
pax_msg * handle_simple_prepare | ( | pax_machine * | p, |
pax_msg * | pm, | ||
synode_no | synode | ||
) |
Process the incoming Prepare message from a Proposer, as in the message for Phase 1 (a) of the Paxos protocol.
Executed by Acceptors.
p | Paxos instance |
pm | Incoming Prepare message |
synode | Synode of the Paxos instance/Accept message |
pax_msg* | the reply to send to the Proposer (as in the Phase 1 (b) message of the Paxos protocol) if the Acceptor accepts the Prepare |
NULL | otherwise |
|
static |
|
static |
void handle_tiny_learn | ( | site_def const * | site, |
pax_machine * | pm, | ||
pax_msg * | p | ||
) |
Process the incoming tiny, i.e.
without the learned value, Learn message. Executed by Learners.
site | XCom configuration |
pm | Paxos instance |
p | Incoming message |
|
static |
|
static |
int iamthegreatest | ( | site_def const * | s | ) |
|
inlinestatic |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
synode_no incr_synode | ( | synode_no | synode | ) |
|
static |
app_data_ptr init_app_msg | ( | app_data * | a, |
char * | payload, | ||
u_int | payload_size | ||
) |
void init_base_vars | ( | ) |
app_data_ptr init_config_with_group | ( | app_data * | a, |
node_list * | nl, | ||
cargo_type | type, | ||
uint32_t | group_id | ||
) |
app_data_ptr init_convert_into_local_server_msg | ( | app_data * | a | ) |
app_data_ptr init_get_event_horizon_msg | ( | app_data * | a, |
uint32_t | group_id | ||
) |
app_data_ptr init_get_leaders_msg | ( | app_data * | a, |
uint32_t | group_id | ||
) |
app_data_ptr init_get_msg | ( | app_data * | a, |
uint32_t | group_id, | ||
cargo_type const | t | ||
) |
|
static |
|
static |
void init_need_boot_op | ( | pax_msg * | p, |
node_address * | identity | ||
) |
Initializes the message p
as a need_boot_op message.
p | The message to send |
identity | The unique incarnation identifier of this XCom instance |
node_set * init_node_set | ( | node_set * | set, |
u_int | n | ||
) |
|
static |
void init_prepare_msg | ( | pax_msg * | p | ) |
Initializes the message p
as a Prepare message, as in the message for Phase 1 (a) of the Paxos protocol.
Executed by Proposers.
p | The message to send |
void init_propose_msg | ( | pax_msg * | p | ) |
Initializes the message p
as an Accept message, as in the message for Phase 2 (a) of the Paxos protocol.
Executed by Proposers.
p | The message to send |
|
static |
app_data_ptr init_set_cache_size_msg | ( | app_data * | a, |
uint64_t | cache_limit | ||
) |
app_data_ptr init_set_event_horizon_msg | ( | app_data * | a, |
uint32_t | group_id, | ||
xcom_event_horizon | event_horizon | ||
) |
void init_set_leaders | ( | uint32_t | group_id, |
app_data * | a, | ||
leader_array const | leaders | ||
) |
void init_set_leaders | ( | uint32_t | group_id, |
app_data * | a, | ||
u_int | n, | ||
char const * | names[] | ||
) |
void init_set_leaders | ( | uint32_t | group_id, |
app_data * | leader_app, | ||
leader_array const | leaders, | ||
app_data * | max_app, | ||
node_no | max_leaders | ||
) |
void init_set_leaders | ( | uint32_t | group_id, |
app_data * | leader_app, | ||
u_int | n, | ||
char const * | names[], | ||
app_data * | max_app, | ||
node_no | max_leaders | ||
) |
void init_set_max_leaders | ( | uint32_t | group_id, |
app_data * | a, | ||
node_no | max_leaders | ||
) |
|
static |
|
static |
void init_xcom_base | ( | ) |
Reset lsn
void initialize_lsn | ( | uint64_t | n | ) |
|
static |
site_def * install_node_group | ( | app_data_ptr | a | ) |
int is_active_leader | ( | node_no | x, |
site_def * | site | ||
) |
|
static |
|
inlinestatic |
Checks if a given app_data is from a given cargo_type.
a | the app_data |
t | the cargo type |
|
inlinestatic |
|
static |
|
inlinestatic |
|
static |
|
inlinestatic |
|
inlinestatic |
int is_node_v4_reachable | ( | char * | node_address | ) |
int is_node_v4_reachable_with_info | ( | struct addrinfo * | retrieved_addr_info | ) |
|
static |
|
static |
|
static |
|
static |
|
inlinestatic |
|
static |
Get the latest pending configuration that reconfigures the event horizon.
Retrieve the last pending site_def, i.e. with the greatest start synod that is greater than executed_msg, that reconfigures the event horizon.
|
static |
|
static |
int local_server | ( | task_arg | arg | ) |
If an error occurs or if the client connection for the local server is forcefully shutdown, we continue processing the queue until the end resorting to time-based waits.
|
static |
|
static |
|
static |
|
static |
Logs the fact that an add/remove node request is aimed at another group.
a | a pointer to the app_data of the configuration command |
message_fmt | a formatted message to log, containing a single s that will be replaced by the node's address |
|
static |
|
static |
|
static |
|
inlinestatic |
|
inlinestatic |
|
inlinestatic |
|
static |
|
static |
|
static |
uint32_t new_id | ( | ) |
Create a new (hopefully unique) ID.
The basic idea is to create a hash from the host ID and a timestamp.
leader_array new_leader_array | ( | u_int | n, |
char const * | names[] | ||
) |
|
static |
|
inlinestatic |
|
static |
|
static |
|
static |
|
static |
int paxos_fsm_idle | ( | pax_machine * | paxos, |
site_def const * | site, | ||
paxos_event | event, | ||
pax_msg * | mess | ||
) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
int pm_finished | ( | pax_machine * | p | ) |
1 | if the value for the Paxos instance *p has been learned |
0 | otherwise |
int pre_process_incoming_ping | ( | site_def const * | site, |
pax_msg const * | pm, | ||
int | has_client_already_booted, | ||
double | current_time | ||
) |
Process incoming are_you_alive (i.e.
: ping) messages and act accordingly
GCS/XCom has a full mesh of connections between all nodes. A connects to B and B connects back to A.
If we cut out B with, for instance, a firewall, we have the A->B connection silently dead, but we have the B->A connection alive. Since we only do monitoring on one half of the connection (the incoming messages), we will consider that B is alive, although we can't contact it. In the same way, B will consider that A is dead, since it does not receive any message from it.
We must be able to break the outgoing connection if we detect that something is wrong, in order to make the bi-directional connection state consistent and report that node as unreachable. That can be done if we start receiving pings from a node that we consider that it is alive. After some pings, we just kill the outgoing connection, thus creating a consistent state.
Breaking this connection should only occur if the node has already booted, meaning that the whole joining process is complete and the node is up and running. This is due to the fact that we receive pings as part of the process of joining a group.
site | current site definitions |
pm | a possible ping message: |
has_client_already_booted | check if this node has already booted |
current_time | current XCom time |
|
static |
|
static |
|
static |
void prepare_push_2p | ( | site_def const * | site, |
pax_machine * | p | ||
) |
Initializes the proposer's message to go through a 2-phase Paxos on the proposer's reserved ballot (0,_).
Executed by Proposers.
site | XCom configuration |
p | Paxos instance |
void prepare_push_3p | ( | site_def const * | site, |
pax_machine * | p, | ||
pax_msg * | msg, | ||
synode_no | msgno, | ||
pax_msg_type | msg_type | ||
) |
Initializes the message msg
to go through a 3-phase, regular Paxos.
Executed by Proposers.
site | XCom configuration |
p | Paxos instance |
msg | Message to send |
msgno | Synode where msg will be proposed |
msg_type | The type of the message, e.g. normal or no_op |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
void recompute_node_set | ( | node_set const * | old_set, |
node_list const * | old_nodes, | ||
node_set * | new_set, | ||
node_list const * | new_nodes | ||
) |
void recompute_timestamps | ( | detector_state const | old_timestamp, |
node_list const * | old_nodes, | ||
detector_state | new_timestamp, | ||
node_list const * | new_nodes | ||
) |
|
static |
|
static |
|
static |
int reply_handler_task | ( | task_arg | arg | ) |
|
static |
|
static |
|
static |
Copies app data source
into target
and checks if the copy succeeded.
Sets *target to NULL if the copy fails.
[in,out] | target | The pax_msg to which the app_data will be copied. |
source | The app data that will be copied. |
TRUE | if the copy was successful. |
FALSE | if the copy failed, in which case *target is set to NULL; a failed copy means that there was an error allocating memory for the copy. |
|
static |
|
static |
|
static |
|
static |
|
static |
void send_x_fsm_complete | ( | ) |
|
static |
|
static |
void set_app_snap_getter | ( | app_snap_getter | x | ) |
void set_app_snap_handler | ( | app_snap_handler | x | ) |
synode_no set_current_message | ( | synode_no | msgno | ) |
synode_no set_executed_msg | ( | synode_no | msgno | ) |
void set_group | ( | uint32_t | id | ) |
Set node group.
void set_last_received_config | ( | synode_no | received_config_change | ) |
|
static |
void set_log_end | ( | gcs_snapshot * | gcs | ) |
void set_max_synode | ( | synode_no | synode | ) |
|
static |
|
static |
|
static |
void set_xcom_comms_cb | ( | xcom_state_change_cb | x | ) |
void set_xcom_exit_cb | ( | xcom_state_change_cb | x | ) |
void set_xcom_expel_cb | ( | xcom_state_change_cb | x | ) |
void set_xcom_input_try_pop_cb | ( | xcom_input_try_pop_cb | pop | ) |
void set_xcom_run_cb | ( | xcom_state_change_cb | x | ) |
|
static |
|
staticconstexpr |
|
static |
void site_install_action | ( | site_def * | site, |
cargo_type | operation | ||
) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
inlinestatic |
|
static |
|
static |
void synthesize_leaders | ( | leader_array * | leaders | ) |
|
static |
void terminate_and_exit | ( | ) |
|
static |
|
inlinestatic |
|
static |
Checks if the given synod s is outside the event horizon.
Common case: there are no configurations pending, or if there are, none of them reconfigure the event horizon. The common case threshold is:
last_executed_synod + event_horizon(active_config)
If an event horizon reconfiguration R is pending, it is possible that it reduces the event horizon. In that case, it is possible that the threshold above falls outside the new event horizon.
For example, consider last_executed_synod = 42 and event_horizon(active_config) = 10. At this point this member participates in synods up to 52. Now consider an event horizon reconfiguration that takes effect at synod 45, which modifies the event horizon to 2. This means that when last_executed_synod = 45, event_horizon(active_config) = 2. At this point this member should only be able to participate in synods up to 47. The member may have previously started processing messages directed to synods between 47 and 52, but will now ignore messages directed to those same synods.
We do not want to start processing messages that will eventually fall out of the event horizon. More importantly, the threshold above may not be safe due to the exit logic of executor_task.
When a node removes itself from the group on configuration C starting at synod start(C), the exit logic relies on knowing when a majority has executed synod start(C) - 1, i.e. the last message of the last configuration to contain the leaving node.
With a constant event horizon, we know that when synod start(C) + event_horizon is learnt, it is because a majority already executed or is ready to execute (and thus learned) synod start(C). This implies that a majority already executed start(C) - 1.
With a dynamic event horizon, we cannot be sure that when synod start(C) + event_horizon(C) is learnt, a majority already executed or is ready to execute synod start(C). This is because it is possible for a new, smaller, event horizon to take effect between start(C) and start(C) + event_horizon(C). If that happens, the threshold above allows nodes to participate in synods which are possibly beyond start(C) + event_horizon(C), which can lead to the value of synod start(C) + event_horizon(C) being learnt without a majority already having executed or being ready to execute synod start(C).
In order to maintain the assumption made by the executor_task's exit logic, when an event horizon reconfiguration R is pending we set the threshold to the minimum between:
last_executed_synod + event_horizon(active_config)
and:
start(R) - 1 + event_horizon(R)
|
static |
|
static |
Check if a node is compatible with the group's event horizon.
A node is compatible with the group's configuration if:
a) The node supports event horizon reconfigurations, or b) The group's event horizon is, or is scheduled to be, the default event horizon.
bool unsafe_leaders | ( | app_data * | a | ) |
|
static |
|
static |
|
static |
|
static |
|
static |
void warn_protoversion_mismatch | ( | connection_descriptor * | rfd | ) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
int xcom_client_add_node | ( | connection_descriptor * | fd, |
node_list * | nl, | ||
uint32_t | group_id | ||
) |
int xcom_client_convert_into_local_server | ( | connection_descriptor *const | fd | ) |
int xcom_client_disable_arbitrator | ( | connection_descriptor * | fd | ) |
int xcom_client_enable_arbitrator | ( | connection_descriptor * | fd | ) |
int xcom_client_get_leaders | ( | connection_descriptor * | fd, |
uint32_t | group_id, | ||
leader_info_data * | leaders | ||
) |
int xcom_client_get_synode_app_data | ( | connection_descriptor *const | fd, |
uint32_t | group_id, | ||
synode_no_array *const | synodes, | ||
synode_app_data_array *const | reply | ||
) |
int xcom_client_remove_node | ( | connection_descriptor * | fd, |
node_list * | nl, | ||
uint32_t | group_id | ||
) |
int64_t xcom_client_send_die | ( | connection_descriptor * | fd | ) |
int xcom_client_set_cache_limit | ( | connection_descriptor * | fd, |
uint64_t | cache_limit | ||
) |
int xcom_client_set_leaders | ( | connection_descriptor * | fd, |
u_int | n, | ||
char const * | names[], | ||
node_no | max_leaders, | ||
uint32_t | group_id | ||
) |
int xcom_client_set_leaders | ( | connection_descriptor * | fd, |
u_int | n, | ||
char const * | names[], | ||
uint32_t | group_id | ||
) |
int xcom_client_set_max_leaders | ( | connection_descriptor * | fd, |
node_no | max_leaders, | ||
uint32_t | group_id | ||
) |
char const * xcom_fsm | ( | xcom_actions | action, |
task_arg | fsmargs | ||
) |
xcom_fsm_state * xcom_fsm_impl | ( | xcom_actions | action, |
task_arg | fsmargs | ||
) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
Retrieves the latest event horizon.
There is no specific reason for this method to return the latest event horizon instead of the current one. Both would be acceptable results of this function, but we had to make a decision of one over the other.
[out] | event_horizon | the latest event horizon |
REQUEST_FAIL | XCom is not initialized yet |
REQUEST_OK | function was successful and event_horizon contains the latest event horizon |
xcom_event_horizon xcom_get_maximum_event_horizon | ( | ) |
xcom_event_horizon xcom_get_minimum_event_horizon | ( | ) |
void xcom_input_free_signal_connection | ( | void | ) |
bool_t xcom_input_signal | ( | void | ) |
|
static |
|
static |
connection_descriptor * xcom_open_client_connection | ( | char const * | server, |
xcom_port | port | ||
) |
|
static |
void xcom_send | ( | app_data_ptr | a, |
pax_msg * | msg | ||
) |
|
static |
|
static |
Send a message and wait for response.
The caller is responsible for freeing p after calling this function, i.e. xdr_free((xdrproc_t)xdr_pax_msg, (char *)p)
int xcom_send_cfg_wait | ( | connection_descriptor * | fd, |
node_list * | nl, | ||
uint32_t | group_id, | ||
cargo_type | ct, | ||
int | force | ||
) |
int64_t xcom_send_client_app_data | ( | connection_descriptor * | fd, |
app_data_ptr | a, | ||
int | force | ||
) |
|
static |
void xcom_sleep | ( | unsigned int | seconds | ) |
int xcom_taskmain2 | ( | xcom_port | listen_port | ) |
void xcom_thread_deinit | ( | ) |
void xcom_thread_init | ( | ) |
|
static |
long xcom_unique_long | ( | void | ) |
|
static |
int ARBITRATOR_HACK = 0 |
|
static |
|
extern |
|
static |
|
static |
struct { ... } dead_sites |
|
static |
struct { ... } delay_fifo |
|
static |
linkage detector_wait |
|
static |
|
static |
synode_no executed_msg |
|
static |
|
staticconstexpr |
int front |
|
static |
|
static |
unsigned long id[MAX_DEAD] |
|
static |
These fields are used to signal XCom's request queue.
After a request is added, one will write 1 byte to warn local_server_task that it has work to do.
We use two types of signalling connection:
input_signal_connection is the connection_descriptor returned when one opens a local signalling connection. It will contain either:
input_signal_connection_pipe is the connection_descriptor that holds the read side of a pipe connection. It is only allocated when we are able to have a pipe connection.
connection_descriptor* input_signal_connection_pipe {nullptr} |
|
static |
|
static |
|
static |
|
static |
|
static |
synode_no max_synode |
|
static |
int n |
|
extern |
|
static |
|
static |
int oom_abort = 0 |
paxos_state_action p1_idle_vtbl[last_p_event] |
paxos_state_action p1_master_enter_vtbl[last_p_event] |
paxos_state_action p1_master_wait_vtbl[last_p_event] |
paxos_state_action p2_master_enter_vtbl[last_p_event] |
paxos_state_action p2_master_wait_vtbl[last_p_event] |
paxos_state_action p2_slave_wait_vtbl[last_p_event] |
paxos_state_action p3_master_wait_vtbl[last_p_event] |
paxos_state_action p3_slave_wait_vtbl[last_p_event] |
paxos_state_action p_finished_vtbl[last_p_event] |
|
constexpr |
const char* paxos_event_name[] = {p_events} |
int pipe_signal_connections[2] = {-1, -1} |
|
static |
|
static |
|
static |
|
static |
|
static |
synode_no q[FIFO_SIZE] |
int rear |
|
static |
|
static |
|
static |
|
static |
|
static |
|
staticconstexpr |
|
static |
|
static |
synode_pool synode_number_pool |
int const threephase = 0 |
Timestamp of previous protoversion warning.
|
static |
|
static |
|
static |
const char* xcom_actions_name[] = {x_actions} |
|
static |
long xcom_dbg_stack[DBG_STACK_SIZE] |
int xcom_dbg_stack_top = 0 |
long xcom_debug_mask |
|
static |
|
static |
|
static |
int xcom_shutdown = 0 |
|
static |
void* xcom_thread_input = nullptr |
|
static |