MySQL  8.0.19
Source Code Documentation
task.c File Reference

Classes

struct  pollfd_array
 
struct  task_env_p_array
 
struct  xcom_clock
 

Macros

#define crash(x)   g_critical("%s\n", x)
 
#define FIX_POS(i)   q->x[i]->heap_pos = (i)
 
#define TASK_SWAP(i, j)
 
#define TASK_MOVE(i, j)
 
#define STAT_INTERVAL   1.0
 

Typedefs

typedef struct iotasks iotasks
 
typedef task_envtask_env_p
 
typedef struct xcom_clock xcom_clock
 

Functions

char * pax_op_to_str (int x)
 
 init_xdr_array (pollfd)
 
static task_envextract_first_delayed ()
 
static task_envtask_ref (task_env *t)
 
static task_envtask_unref (task_env *t)
 
static void wake_all_io ()
 
static void task_sys_deinit ()
 
static double ts_to_sec (struct timespec *ts)
 
static double get_monotonic_time ()
 
static double get_real_time ()
 
static double xcom_monotonic_seconds (xcom_clock *clock)
 
static void xcom_init_clock (xcom_clock *clock)
 
double seconds ()
 
double task_now ()
 
static void task_queue_siftup (task_queue *q, int n)
 
static void task_queue_siftdown (task_queue *q, int l, int n)
 
static task_envtask_queue_remove (task_queue *q, int i)
 
static void task_queue_insert (task_queue *q, task_env *t)
 
static int task_queue_empty (task_queue *q)
 
static task_envtask_queue_min (task_queue *q)
 
static task_envtask_queue_extractmin (task_queue *q)
 
static void task_init (task_env *t)
 Initialize task memory. More...
 
void * task_allocate (task_env *p, unsigned int bytes)
 Allocate bytes from pool, initialized to zero. More...
 
static task_envactivate (task_env *t)
 
static task_envdeactivate (task_env *t)
 
void task_delay_until (double time)
 
void task_wait (task_env *t, linkage *queue)
 
void task_wakeup (linkage *queue)
 
static void task_wakeup_first (linkage *queue)
 
channelchannel_init (channel *c, unsigned int type)
 
channelchannel_new ()
 
void channel_put (channel *c, linkage *data)
 
void channel_put_front (channel *c, linkage *data)
 
task_envtask_new (task_func func, task_arg arg, const char *name, int debug)
 
void reset_state (task_env *p)
 
void pushp (task_env *p, void *ptr)
 
void popp (task_env *p)
 
static int runnable_tasks ()
 
static int delayed_tasks ()
 
static void task_delete (task_env *t)
 
task_envtask_activate (task_env *t)
 
task_envtask_deactivate (task_env *t)
 
task_envtask_terminate (task_env *t)
 
void task_terminate_all ()
 
static task_envfirst_delayed ()
 
static void iotasks_init (iotasks *iot_to_init)
 
static void iotasks_deinit (iotasks *iot_to_deinit)
 
static void poll_wakeup (int i)
 
static int poll_wait (int ms)
 
static void add_fd (task_env *t, int fd, int op)
 
void unpoll (int i)
 
void remove_and_wakeup (int fd)
 
task_envwait_io (task_env *t, int fd, int op)
 
static task_envtimed_wait_io (task_env *t, int fd, int op, double timeout)
 
result con_read (connection_descriptor const *rfd, void *buf, int n)
 
int task_read (connection_descriptor const *con, void *buf, int n, int64_t *ret)
 
result con_write (connection_descriptor const *wfd, void *buf, int n)
 
int task_write (connection_descriptor const *con, void *_buf, uint32_t n, int64_t *ret)
 
int unblock_fd (int fd)
 
int block_fd (int fd)
 
int is_only_task ()
 
static task_envfirst_runnable ()
 
static task_envnext_task (task_env *t)
 
static int is_task_head (task_env *t)
 
static int msdiff (double time)
 
void set_should_exit_getter (should_exit_getter x)
 
void task_loop ()
 
int connect_tcp (char *server, xcom_port port, int *ret)
 
result set_nodelay (int fd)
 
static result create_server_socket ()
 
static result create_server_socket_v4 ()
 
static void init_server_addr (struct sockaddr **sock_addr, socklen_t *sock_len, xcom_port port, int family)
 Initializes a sockaddr prepared to be used in bind() More...
 
result announce_tcp (xcom_port port)
 
int accept_tcp (int fd, int *ret)
 
static void init_task_vars ()
 
void task_sys_init ()
 
int is_running (task_env *t)
 
void set_task (task_env **p, task_env *t)
 
const char * task_name ()
 

Variables

task_arg null_arg = {a_end, {0}}
 
int task_errno = 0
 
static xcom_clock task_timer
 
static linkage ash_nazg_gimbatul
 
static linkage tasks = {0, &tasks, &tasks}
 
static task_queue task_time_q
 
static linkage free_tasks = {0, &free_tasks, &free_tasks}
 
static int active_tasks = 0
 
static iotasks iot
 
task_envstack = NULL
 
static uint64_t send_count
 
static uint64_t receive_count
 
static uint64_t send_bytes
 
static uint64_t receive_bytes
 
static should_exit_getter get_should_exit
 
static double idle_time = 0.0
 

Detailed Description

Rudimentary, non-preemptive task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs. (continuations?) Nonblocking IO and event handling need to be rewritten for each new OS. The code is not MT-safe, but could be made safe by moving all global variables into a context struct which could be the first parameter to all the functions.

Macro Definition Documentation

◆ crash

#define crash (   x)    g_critical("%s\n", x)

◆ FIX_POS

#define FIX_POS (   i)    q->x[i]->heap_pos = (i)

◆ STAT_INTERVAL

#define STAT_INTERVAL   1.0

◆ TASK_MOVE

#define TASK_MOVE (   i,
 
)
Value:
{ \
q->x[i] = q->x[j]; \
FIX_POS(i); \
}

◆ TASK_SWAP

#define TASK_SWAP (   i,
 
)
Value:
{ \
task_env *tmp = q->x[i]; \
q->x[i] = q->x[j]; \
q->x[j] = tmp; \
FIX_POS(i); \
FIX_POS(j); \
}

Typedef Documentation

◆ iotasks

typedef struct iotasks iotasks

◆ task_env_p

typedef task_env* task_env_p

◆ xcom_clock

typedef struct xcom_clock xcom_clock

Function Documentation

◆ accept_tcp()

int accept_tcp ( int  fd,
int *  ret 
)

◆ activate()

static task_env* activate ( task_env t)
static

◆ add_fd()

static void add_fd ( task_env t,
int  fd,
int  op 
)
static

◆ announce_tcp()

result announce_tcp ( xcom_port  port)

◆ block_fd()

int block_fd ( int  fd)

◆ channel_init()

channel* channel_init ( channel c,
unsigned int  type 
)

◆ channel_new()

channel* channel_new ( )

◆ channel_put()

void channel_put ( channel c,
linkage data 
)

◆ channel_put_front()

void channel_put_front ( channel c,
linkage data 
)

◆ con_read()

result con_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_write()

result con_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ connect_tcp()

int connect_tcp ( char *  server,
xcom_port  port,
int *  ret 
)

◆ create_server_socket()

static result create_server_socket ( )
static

◆ create_server_socket_v4()

static result create_server_socket_v4 ( )
static

◆ deactivate()

static task_env* deactivate ( task_env t)
static

◆ delayed_tasks()

static int delayed_tasks ( )
static

◆ extract_first_delayed()

static task_env * extract_first_delayed ( )
static

◆ first_delayed()

static task_env* first_delayed ( )
static

◆ first_runnable()

static task_env* first_runnable ( )
static

◆ get_monotonic_time()

static double get_monotonic_time ( )
inlinestatic

◆ get_real_time()

static double get_real_time ( )
inlinestatic

◆ init_server_addr()

static void init_server_addr ( struct sockaddr **  sock_addr,
socklen_t *  sock_len,
xcom_port  port,
int  family 
)
static

Initializes a sockaddr prepared to be used in bind()

Parameters
sock_addrstruct sockaddr out parameter. You will need to free it after being used.
sock_lensocklen_t out parameter. It will contain the length of sock_addr
portthe port to bind.
familythe address family

◆ init_task_vars()

static void init_task_vars ( )
static

◆ init_xdr_array()

init_xdr_array ( pollfd  )

◆ iotasks_deinit()

static void iotasks_deinit ( iotasks iot_to_deinit)
static

◆ iotasks_init()

static void iotasks_init ( iotasks iot_to_init)
static

◆ is_only_task()

int is_only_task ( )

◆ is_running()

int is_running ( task_env t)

◆ is_task_head()

static int is_task_head ( task_env t)
static

◆ msdiff()

static int msdiff ( double  time)
static

◆ next_task()

static task_env* next_task ( task_env t)
static

◆ pax_op_to_str()

char* pax_op_to_str ( int  x)

◆ poll_wait()

static int poll_wait ( int  ms)
static

◆ poll_wakeup()

static void poll_wakeup ( int  i)
static

◆ popp()

void popp ( task_env p)

◆ pushp()

void pushp ( task_env p,
void *  ptr 
)

◆ remove_and_wakeup()

void remove_and_wakeup ( int  fd)

◆ reset_state()

void reset_state ( task_env p)

◆ runnable_tasks()

static int runnable_tasks ( )
static

◆ seconds()

double seconds ( )

◆ set_nodelay()

result set_nodelay ( int  fd)

◆ set_should_exit_getter()

void set_should_exit_getter ( should_exit_getter  x)

◆ set_task()

void set_task ( task_env **  p,
task_env t 
)

◆ task_activate()

task_env* task_activate ( task_env t)

◆ task_allocate()

void * task_allocate ( task_env p,
unsigned int  bytes 
)

Allocate bytes from pool, initialized to zero.

◆ task_deactivate()

task_env* task_deactivate ( task_env t)

◆ task_delay_until()

void task_delay_until ( double  time)

◆ task_delete()

static void task_delete ( task_env t)
static

◆ task_init()

static void task_init ( task_env p)
static

Initialize task memory.

◆ task_loop()

void task_loop ( )

◆ task_name()

const char* task_name ( )

◆ task_new()

task_env* task_new ( task_func  func,
task_arg  arg,
const char *  name,
int  debug 
)

◆ task_now()

double task_now ( )

◆ task_queue_empty()

static int task_queue_empty ( task_queue q)
static

◆ task_queue_extractmin()

static task_env* task_queue_extractmin ( task_queue q)
static

◆ task_queue_insert()

static void task_queue_insert ( task_queue q,
task_env t 
)
static

◆ task_queue_min()

static task_env* task_queue_min ( task_queue q)
static

◆ task_queue_remove()

static task_env* task_queue_remove ( task_queue q,
int  i 
)
static

◆ task_queue_siftdown()

static void task_queue_siftdown ( task_queue q,
int  l,
int  n 
)
static

◆ task_queue_siftup()

static void task_queue_siftup ( task_queue q,
int  n 
)
static

◆ task_read()

int task_read ( connection_descriptor const *  con,
void *  buf,
int  n,
int64_t *  ret 
)

◆ task_ref()

static task_env * task_ref ( task_env t)
static

◆ task_sys_deinit()

static void task_sys_deinit ( )
static

◆ task_sys_init()

void task_sys_init ( )

◆ task_terminate()

task_env* task_terminate ( task_env t)

◆ task_terminate_all()

void task_terminate_all ( )

◆ task_unref()

static task_env * task_unref ( task_env t)
static

◆ task_wait()

void task_wait ( task_env t,
linkage queue 
)

◆ task_wakeup()

void task_wakeup ( linkage queue)

◆ task_wakeup_first()

static void task_wakeup_first ( linkage queue)
static

◆ task_write()

int task_write ( connection_descriptor const *  con,
void *  _buf,
uint32_t  n,
int64_t *  ret 
)

◆ timed_wait_io()

static task_env* timed_wait_io ( task_env t,
int  fd,
int  op,
double  timeout 
)
static

◆ ts_to_sec()

static double ts_to_sec ( struct timespec *  ts)
inlinestatic

◆ unblock_fd()

int unblock_fd ( int  fd)

◆ unpoll()

void unpoll ( int  i)

◆ wait_io()

task_env* wait_io ( task_env t,
int  fd,
int  op 
)

◆ wake_all_io()

static void wake_all_io ( )
static

◆ xcom_init_clock()

static void xcom_init_clock ( xcom_clock clock)
static

◆ xcom_monotonic_seconds()

static double xcom_monotonic_seconds ( xcom_clock clock)
static

Variable Documentation

◆ active_tasks

int active_tasks = 0
static

◆ ash_nazg_gimbatul

linkage ash_nazg_gimbatul
static
Initial value:

◆ free_tasks

linkage free_tasks = {0, &free_tasks, &free_tasks}
static

◆ get_should_exit

should_exit_getter get_should_exit
static

◆ idle_time

double idle_time = 0.0
static

◆ iot

iotasks iot
static

◆ null_arg

task_arg null_arg = {a_end, {0}}

◆ receive_bytes

uint64_t receive_bytes
static

◆ receive_count

uint64_t receive_count
static

◆ send_bytes

uint64_t send_bytes
static

◆ send_count

uint64_t send_count
static

◆ stack

task_env* stack = NULL

◆ task_errno

int task_errno = 0

◆ task_time_q

task_queue task_time_q
static

◆ task_timer

xcom_clock task_timer
static

◆ tasks

linkage tasks = {0, &tasks, &tasks}
static
ash_nazg_gimbatul
static linkage ash_nazg_gimbatul
Definition: task.c:499
q
synode_no q[FIFO_SIZE]
Definition: xcom_base.c:2940