MySQL 8.4.0
Source Code Documentation
task.cc File Reference

Rudimentary, non-preemptive task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs. More...

#include "xcom/x_platform.h"
#include "xcom/xcom_memory.h"
#include "xcom/xcom_profile.h"
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <limits.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#include <memory>
#include "xcom/node_connection.h"
#include "xdr_gen/xcom_vp.h"
#include "xcom/simset.h"
#include "xcom/site_def.h"
#include "xcom/task.h"
#include "xcom/task_debug.h"
#include "xcom/task_net.h"
#include "xcom/task_os.h"
#include "xcom/xcom_base.h"
#include "xcom/xcom_cfg.h"
#include "xcom/xcom_transport.h"
#include <poll.h>
#include "xcom/retry.h"
#include "xcom/xdr_utils.h"

Classes

struct  pollfd_array
 
struct  task_env_p_array
 
struct  xcom_clock
 

Macros

#define xcom_buf   void
 
#define crash(x)   g_critical("%s\n", x)
 
#define FIX_POS(i)   q->x[i]->heap_pos = (i)
 
#define TASK_SWAP(i, j)
 
#define TASK_MOVE(i, j)
 
#define STAT_INTERVAL   1.0
 

Typedefs

typedef struct iotasks iotasks
 
typedef task_envtask_env_p
 
typedef struct xcom_clock xcom_clock
 

Functions

const char * pax_op_to_str (int x)
 
uint32_t get_my_xcom_id ()
 
 init_xdr_array (pollfd) free_xdr_array(pollfd) set_xdr_array(pollfd) get_xdr_array(pollfd) init_xdr_array(task_env_p) free_xdr_array(task_env_p) set_xdr_array(task_env_p) get_xdr_array(task_env_p) struct iotasks
 
static task_envextract_first_delayed ()
 
static task_envtask_ref (task_env *t)
 
static task_envtask_unref (task_env *t)
 
static void wake_all_io ()
 
static void task_sys_deinit ()
 
static double ts_to_sec (struct timespec *ts)
 
static double get_monotonic_time ()
 
static double get_real_time ()
 
static double xcom_monotonic_seconds (xcom_clock *clock)
 
static void xcom_init_clock (xcom_clock *clock)
 
double seconds ()
 
double task_now ()
 
unsigned long long int get_time_since_the_epoch ()
 Return time in microseconds. More...
 
static void task_queue_siftup (task_queue *q, int n)
 
static void task_queue_siftdown (task_queue *q, int l, int n)
 
static task_envtask_queue_remove (task_queue *q, int i)
 
static void task_queue_insert (task_queue *q, task_env *t)
 
static int task_queue_empty (task_queue *q)
 
static task_envtask_queue_min (task_queue *q)
 
static task_envtask_queue_extractmin (task_queue *q)
 
static void task_init (task_env *t)
 Initialize task memory. More...
 
void * task_allocate (task_env *p, unsigned int bytes)
 Allocate bytes from pool, initialized to zero. More...
 
static task_envactivate (task_env *t)
 
static task_envdeactivate (task_env *t)
 
void task_delay_until (double time)
 
void task_wait (task_env *t, linkage *queue)
 
void task_wakeup (linkage *queue)
 
static void task_wakeup_first (linkage *queue)
 
channelchannel_init (channel *c, unsigned int type)
 
void channel_put (channel *c, linkage *data)
 
void channel_put_front (channel *c, linkage *data)
 
task_envtask_new (task_func func, task_arg arg, const char *name, int debug)
 
void reset_state (task_env *p)
 
void pushp (task_env *p, void *ptr)
 
void popp (task_env *p)
 
static int runnable_tasks ()
 
static int delayed_tasks ()
 
static void task_delete (task_env *t)
 
task_envtask_activate (task_env *t)
 
task_envtask_deactivate (task_env *t)
 
task_envtask_terminate (task_env *t)
 
void task_terminate_all ()
 
static task_envfirst_delayed ()
 
static void iotasks_init (iotasks *iot_to_init)
 
static void iotasks_deinit (iotasks *iot_to_deinit)
 
static void poll_wakeup (u_int i)
 
static int poll_wait (int ms)
 
static void add_fd (task_env *t, int fd, int op)
 
static void unpoll (u_int i)
 
void remove_and_wakeup (int fd)
 
task_envwait_io (task_env *t, int fd, int op)
 
result con_read (connection_descriptor const *rfd, void *buf, int n)
 
result con_pipe_read (connection_descriptor const *rfd, void *buf, int n)
 
int task_read (connection_descriptor const *con, void *buf, int n, int64_t *ret, connnection_read_method read_function)
 
result con_write (connection_descriptor const *wfd, void *buf, int n)
 
result con_pipe_write (connection_descriptor const *wfd, void *buf, int n)
 
int task_write (connection_descriptor const *con, void *_buf, uint32_t n, int64_t *ret)
 
int unblock_fd (int fd)
 
int block_fd (int fd)
 
int is_only_task ()
 
static task_envfirst_runnable ()
 
static task_envnext_task (task_env *t)
 
static int is_task_head (task_env *t)
 
static int msdiff (double time)
 
void set_should_exit_getter (should_exit_getter x)
 
void task_loop ()
 
static void init_task_vars ()
 
void task_sys_init ()
 
void set_task (task_env **p, task_env *t)
 
const char * task_name ()
 

Variables

task_arg null_arg = {a_end, {0}}
 
int task_errno = 0
 
static xcom_clock task_timer
 
static linkage ash_nazg_gimbatul
 
static linkage tasks = {0, &tasks, &tasks}
 
static task_queue task_time_q
 
static linkage free_tasks = {0, &free_tasks, &free_tasks}
 
static int active_tasks = 0
 
static iotasks iot
 
task_envstack = nullptr
 
static uint64_t send_count
 
static uint64_t receive_count
 
static uint64_t send_bytes
 
static uint64_t receive_bytes
 
static should_exit_getter get_should_exit
 
static double idle_time = 0.0
 

Detailed Description

Rudimentary, non-preemptive task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs.

(continuations?) Nonblocking IO and event handling need to be rewritten for each new OS. The code is not MT-safe, but could be made safe by moving all global variables into a context struct which could be the first parameter to all the functions.

Macro Definition Documentation

◆ crash

#define crash (   x)    g_critical("%s\n", x)

◆ FIX_POS

#define FIX_POS (   i)    q->x[i]->heap_pos = (i)

◆ STAT_INTERVAL

#define STAT_INTERVAL   1.0

◆ TASK_MOVE

#define TASK_MOVE (   i,
 
)
Value:
{ \
q->x[i] = q->x[j]; \
FIX_POS(i); \
}
synode_no q[FIFO_SIZE]
Definition: xcom_base.cc:4086

◆ TASK_SWAP

#define TASK_SWAP (   i,
 
)
Value:
{ \
task_env *tmp = q->x[i]; \
q->x[i] = q->x[j]; \
q->x[j] = tmp; \
FIX_POS(i); \
FIX_POS(j); \
}

◆ xcom_buf

#define xcom_buf   void

Typedef Documentation

◆ iotasks

typedef struct iotasks iotasks

◆ task_env_p

typedef task_env* task_env_p

◆ xcom_clock

typedef struct xcom_clock xcom_clock

Function Documentation

◆ activate()

static task_env * activate ( task_env t)
static

◆ add_fd()

static void add_fd ( task_env t,
int  fd,
int  op 
)
static

◆ block_fd()

int block_fd ( int  fd)

◆ channel_init()

channel * channel_init ( channel c,
unsigned int  type 
)

◆ channel_put()

void channel_put ( channel c,
linkage data 
)

◆ channel_put_front()

void channel_put_front ( channel c,
linkage data 
)

◆ con_pipe_read()

result con_pipe_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_pipe_write()

result con_pipe_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ con_read()

result con_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_write()

result con_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ deactivate()

static task_env * deactivate ( task_env t)
static

◆ delayed_tasks()

static int delayed_tasks ( )
static

◆ extract_first_delayed()

static task_env * extract_first_delayed ( )
static

◆ first_delayed()

static task_env * first_delayed ( )
static

◆ first_runnable()

static task_env * first_runnable ( )
static

◆ get_monotonic_time()

static double get_monotonic_time ( )
inlinestatic

◆ get_my_xcom_id()

uint32_t get_my_xcom_id ( )

◆ get_real_time()

static double get_real_time ( )
inlinestatic

◆ get_time_since_the_epoch()

unsigned long long int get_time_since_the_epoch ( )

Return time in microseconds.

Uses std::chrono::high_resolution_clock

Return values
Numberof microseconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC)

◆ init_task_vars()

static void init_task_vars ( )
static

◆ init_xdr_array()

init_xdr_array ( pollfd  )

◆ iotasks_deinit()

static void iotasks_deinit ( iotasks iot_to_deinit)
static

◆ iotasks_init()

static void iotasks_init ( iotasks iot_to_init)
static

◆ is_only_task()

int is_only_task ( )

◆ is_task_head()

static int is_task_head ( task_env t)
static

◆ msdiff()

static int msdiff ( double  time)
static

◆ next_task()

static task_env * next_task ( task_env t)
static

◆ pax_op_to_str()

const char * pax_op_to_str ( int  x)

◆ poll_wait()

static int poll_wait ( int  ms)
static

◆ poll_wakeup()

static void poll_wakeup ( u_int  i)
static

◆ popp()

void popp ( task_env p)

◆ pushp()

void pushp ( task_env p,
void *  ptr 
)

◆ remove_and_wakeup()

void remove_and_wakeup ( int  fd)

◆ reset_state()

void reset_state ( task_env p)

◆ runnable_tasks()

static int runnable_tasks ( )
static

◆ seconds()

double seconds ( )

◆ set_should_exit_getter()

void set_should_exit_getter ( should_exit_getter  x)

◆ set_task()

void set_task ( task_env **  p,
task_env t 
)

◆ task_activate()

task_env * task_activate ( task_env t)

◆ task_allocate()

void * task_allocate ( task_env p,
unsigned int  bytes 
)

Allocate bytes from pool, initialized to zero.

◆ task_deactivate()

task_env * task_deactivate ( task_env t)

◆ task_delay_until()

void task_delay_until ( double  time)

◆ task_delete()

static void task_delete ( task_env t)
static

◆ task_init()

static void task_init ( task_env p)
static

Initialize task memory.

◆ task_loop()

void task_loop ( )

◆ task_name()

const char * task_name ( )

◆ task_new()

task_env * task_new ( task_func  func,
task_arg  arg,
const char *  name,
int  debug 
)

◆ task_now()

double task_now ( )

◆ task_queue_empty()

static int task_queue_empty ( task_queue q)
static

◆ task_queue_extractmin()

static task_env * task_queue_extractmin ( task_queue q)
static

◆ task_queue_insert()

static void task_queue_insert ( task_queue q,
task_env t 
)
static

◆ task_queue_min()

static task_env * task_queue_min ( task_queue q)
static

◆ task_queue_remove()

static task_env * task_queue_remove ( task_queue q,
int  i 
)
static

◆ task_queue_siftdown()

static void task_queue_siftdown ( task_queue q,
int  l,
int  n 
)
static

◆ task_queue_siftup()

static void task_queue_siftup ( task_queue q,
int  n 
)
static

◆ task_read()

int task_read ( connection_descriptor const *  con,
void *  buf,
int  n,
int64_t *  ret,
connnection_read_method  read_function 
)

◆ task_ref()

static task_env * task_ref ( task_env t)
static

◆ task_sys_deinit()

static void task_sys_deinit ( )
static

◆ task_sys_init()

void task_sys_init ( )

◆ task_terminate()

task_env * task_terminate ( task_env t)

◆ task_terminate_all()

void task_terminate_all ( )

◆ task_unref()

static task_env * task_unref ( task_env t)
static

◆ task_wait()

void task_wait ( task_env t,
linkage queue 
)

◆ task_wakeup()

void task_wakeup ( linkage queue)

◆ task_wakeup_first()

static void task_wakeup_first ( linkage queue)
static

◆ task_write()

int task_write ( connection_descriptor const *  con,
void *  _buf,
uint32_t  n,
int64_t *  ret 
)

◆ ts_to_sec()

static double ts_to_sec ( struct timespec *  ts)
inlinestatic

◆ unblock_fd()

int unblock_fd ( int  fd)

◆ unpoll()

static void unpoll ( u_int  i)
static

◆ wait_io()

task_env * wait_io ( task_env t,
int  fd,
int  op 
)

◆ wake_all_io()

static void wake_all_io ( )
static

◆ xcom_init_clock()

static void xcom_init_clock ( xcom_clock clock)
static

◆ xcom_monotonic_seconds()

static double xcom_monotonic_seconds ( xcom_clock clock)
static

Variable Documentation

◆ active_tasks

int active_tasks = 0
static

◆ ash_nazg_gimbatul

linkage ash_nazg_gimbatul
static
Initial value:
= {
static linkage ash_nazg_gimbatul
Definition: task.cc:516

◆ free_tasks

linkage free_tasks = {0, &free_tasks, &free_tasks}
static

◆ get_should_exit

should_exit_getter get_should_exit
static

◆ idle_time

double idle_time = 0.0
static

◆ iot

iotasks iot
static

◆ null_arg

task_arg null_arg = {a_end, {0}}

◆ receive_bytes

uint64_t receive_bytes
static

◆ receive_count

uint64_t receive_count
static

◆ send_bytes

uint64_t send_bytes
static

◆ send_count

uint64_t send_count
static

◆ stack

task_env* stack = nullptr

◆ task_errno

int task_errno = 0

◆ task_time_q

task_queue task_time_q
static

◆ task_timer

xcom_clock task_timer
static

◆ tasks

linkage tasks = {0, &tasks, &tasks}
static