MySQL  8.0.18
Source Code Documentation
task.h File Reference

 Rudimentary task system in portable C, based on Tom Duff's switch-based

coroutine trick and a stack of environment structs. More...

Go to the source code of this file.

Classes

struct  task_ptr
 
struct  task_env
 
struct  task_queue
 
struct  channel
 

Macros

#define ADD_EVENTS(x)
 
#define ADD_T_EV(when, file, state, what)
 
#define ADD_WAIT_EV(when, file, state, what, milli)
 
#define TASK_POOL_ELEMS   1000
 
#define MAXTASKS   1000
 
#define _ep   ((struct env *)(stack->sp->ptr))
 
#define TASK_ALLOC(pool, type)   (task_allocate(pool, (unsigned int)sizeof(type)))
 
#define TASK_DEBUG(x)
 
#define FINALLY   task_cleanup:
 
#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)
 
#define TERM_CHECK   if (ON_STACK_TOP && stack->terminate) goto task_cleanup
 
#define TERMINATE   goto task_cleanup
 
#define TASK_STACK_DEBUG
 
#define TASK_BEGIN
 
#define TASK_END
 
#define TASK_RETURN(x)
 
#define TASK_DUMP_ERR
 
#define TASK_FAIL
 
#define TASK_YIELD
 
#define TASK_DEACTIVATE
 
#define TASK_DELAY(t)
 
#define TASK_DELAY_UNTIL(t)
 
#define TASK_WAIT(queue)
 
#define TIMED_TASK_WAIT(queue, t)
 
#define CHANNEL_GET(channel, ptr, type)
 
#define CHANNEL_PEEK(channel, ptr, type)
 
#define CHANNEL_GET_REVERSE(channel, ptr, type)
 
#define TASK_CALL(funcall)
 
#define DECL_ENV   struct env {
 
#define END_ENV
 
#define LOCK_FD(fd, op)
 
#define UNLOCK_FD(fd, op)   unlock_fd(fd, stack, op)
 
#define IF_CHANGED(DESIRED, CURRENT, TRUE_ACTION, FALSE_ACTION)
 
#define IF_GUARD(DESIRED, GUARD, TRUE_ACTION, FALSE_ACTION)
 
#define mcm_close(x)   close(x)
 
#define xstr(s)   #s
 

Typedefs

typedef struct task_ptr TaskAlign
 
typedef int(* task_func) (task_arg arg)
 
typedef struct task_env task_env
 
typedef struct task_queue task_queue
 
typedef struct channel channel
 

Functions

static void set_int_arg (task_arg *arg, int value)
 
static int get_int_arg (task_arg arg)
 
static void set_long_arg (task_arg *arg, long value)
 
static long get_long_arg (task_arg arg)
 
static void set_uint_arg (task_arg *arg, unsigned int value)
 
static unsigned int get_uint_arg (task_arg arg)
 
static void set_ulong_arg (task_arg *arg, unsigned long value)
 
static void set_ulong_long_arg (task_arg *arg, unsigned long long value)
 
static unsigned long get_ulong_arg (task_arg arg)
 
static unsigned long long get_ulong_long_arg (task_arg arg)
 
static void set_float_arg (task_arg *arg, float value)
 
static float get_float_arg (task_arg arg)
 
static void set_double_arg (task_arg *arg, double value)
 
static double get_double_arg (task_arg arg)
 
static void set_string_arg (task_arg *arg, char const *value)
 
static void set_void_arg (task_arg *arg, void *value)
 
static char const * get_string_arg (task_arg arg)
 
static void * get_void_arg (task_arg arg)
 
static task_arg int_arg (int i)
 
static task_arg uint_arg (unsigned int i)
 
static task_arg ulong_arg (unsigned long l)
 
static task_arg ulong_long_arg (unsigned long long ll)
 
static task_arg double_arg (double i)
 
static task_arg string_arg (char const *v)
 
static task_arg void_arg (void *v)
 
static task_arg end_arg ()
 
channelchannel_init (channel *c, unsigned int type)
 
channelchannel_new ()
 
void channel_put (channel *c, linkage *data)
 
void channel_put_front (channel *c, linkage *data)
 
void * task_allocate (task_env *p, unsigned int bytes)
 Allocate bytes from pool, initialized to zero. More...
 
void reset_state (task_env *p)
 
void pushp (task_env *p, void *ptr)
 
void popp (task_env *p)
 
double seconds ()
 
double task_now ()
 
void task_delay_until (double time)
 
int unblock_fd (int fd)
 
int block_fd (int fd)
 
int connect_tcp (char *server, xcom_port port, int *ret)
 
result announce_tcp (xcom_port port)
 
int accept_tcp (int fd, int *ret)
 
int task_read (connection_descriptor const *con, void *buf, int n, int64_t *ret)
 
int task_write (connection_descriptor const *con, void *buf, uint32_t n, int64_t *ret)
 
int is_locked (int fd)
 
int lock_fd (int fd, task_env *t, int lock)
 
int unlock_fd (int fd, task_env *t, int lock)
 
void task_sys_init ()
 
task_envtask_new (task_func func, task_arg arg, const char *name, int debug)
 
void task_loop ()
 
void task_wait (task_env *t, linkage *queue)
 
void task_wakeup (linkage *queue)
 
task_envtask_terminate (task_env *t)
 
int is_running (task_env *t)
 
void set_task (task_env **p, task_env *t)
 
void task_terminate_all ()
 
void remove_and_wakeup (int i)
 
int is_only_task ()
 
task_envtask_activate (task_env *t)
 
task_envtask_deactivate (task_env *t)
 
const char * task_name ()
 
task_envwait_io (task_env *t, int fd, int op)
 
result con_read (connection_descriptor const *rfd, void *buf, int n)
 
result con_write (connection_descriptor const *wfd, void *buf, int n)
 
result set_nodelay (int fd)
 
void xcom_enable_ssl ()
 
void xcom_disable_ssl ()
 

Variables

task_envstack
 
int task_errno
 

Detailed Description

 Rudimentary task system in portable C, based on Tom Duff's switch-based

coroutine trick and a stack of environment structs.

(continuations?) Nonblocking IO and event handling need to be rewritten for each new OS.

Macro Definition Documentation

◆ _ep

#define _ep   ((struct env *)(stack->sp->ptr))

◆ ADD_EVENTS

#define ADD_EVENTS (   x)

◆ ADD_T_EV

#define ADD_T_EV (   when,
  file,
  state,
  what 
)

◆ ADD_WAIT_EV

#define ADD_WAIT_EV (   when,
  file,
  state,
  what,
  milli 
)

◆ CHANNEL_GET

#define CHANNEL_GET (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_first(&(channel)->data); \
DBGOUT(FN; STRLIT("CHANNEL_GET "); PTREXP(*(ptr)); \
PTREXP(&((channel)->data))); \
}
#define PTREXP(x)
Definition: gcs_debug.h:220
#define link_extract_first(self)
Definition: simset.h:116
static QUEUE queue
Definition: myisampack.cc:206
Definition: task.h:430
#define FN
Definition: gcs_debug.h:217
#define STRLIT(x)
Definition: gcs_debug.h:223
int type
Definition: http_common.h:411
#define link_empty(self)
Definition: simset.h:117

◆ CHANNEL_GET_REVERSE

#define CHANNEL_GET_REVERSE (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_last(&(channel)->data); \
}
linkage * link_extract_last(linkage *self)
Definition: simset.c:44
static QUEUE queue
Definition: myisampack.cc:206
Definition: task.h:430
int type
Definition: http_common.h:411
#define link_empty(self)
Definition: simset.h:117

◆ CHANNEL_PEEK

#define CHANNEL_PEEK (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_first(&(channel)->data); \
}
static QUEUE queue
Definition: myisampack.cc:206
Definition: task.h:430
linkage * link_first(linkage *self)
Definition: simset.c:33
int type
Definition: http_common.h:411
#define link_empty(self)
Definition: simset.h:117

◆ DECL_ENV

#define DECL_ENV   struct env {

◆ END_ENV

#define END_ENV
Value:
} \
; \
struct env MY_ATTRIBUTE((unused)) * ep

◆ FINALLY

#define FINALLY   task_cleanup:

◆ IF_CHANGED

#define IF_CHANGED (   DESIRED,
  CURRENT,
  TRUE_ACTION,
  FALSE_ACTION 
)
Value:
{ \
int _expr = DESIRED; \
if (_expr != (CURRENT)) { \
DBGOUT(FN; STRLIT("need change: "); NDBG(DESIRED, d); \
NDBG(CURRENT, d);); \
changed = 1; \
if (_expr) { \
DBGOUT(FN; STRLIT(#TRUE_ACTION " because: " #DESIRED " == TRUE");); \
TRUE_ACTION; \
} else { \
DBGOUT(FN; STRLIT(#FALSE_ACTION " because: " #DESIRED " == FALSE");); \
FALSE_ACTION; \
} \
} \
}
#define FN
Definition: gcs_debug.h:217
#define STRLIT(x)
Definition: gcs_debug.h:223
#define NDBG(x, f)
Definition: gcs_debug.h:225

◆ IF_GUARD

#define IF_GUARD (   DESIRED,
  GUARD,
  TRUE_ACTION,
  FALSE_ACTION 
)
Value:
{ \
int _expr = DESIRED; \
if (GUARD) { \
DBGOUT(FN; STRLIT("need change: "); NDBG(DESIRED, d); NDBG(GUARD, d);); \
changed = 1; \
if (_expr) { \
DBGOUT(FN; STRLIT(#TRUE_ACTION " because: " #DESIRED " == TRUE");); \
TRUE_ACTION; \
} else { \
DBGOUT(FN; STRLIT(#FALSE_ACTION " because: " #DESIRED " == FALSE");); \
FALSE_ACTION; \
} \
} \
}
#define FN
Definition: gcs_debug.h:217
#define STRLIT(x)
Definition: gcs_debug.h:223
#define NDBG(x, f)
Definition: gcs_debug.h:225

◆ LOCK_FD

#define LOCK_FD (   fd,
  op 
)
Value:
{ \
while (!lock_fd( \
fd, stack, \
op)) { /* Effectively a spin lock, but should not happen very often */ \
wait_io(stack, fd, op); \
TASK_YIELD; \
/* TASK_DELAY(1.0); */ \
} \
}
task_env * stack
Definition: task.c:881
int lock_fd(int fd, task_env *t, int lock)

◆ MAXTASKS

#define MAXTASKS   1000

◆ mcm_close

#define mcm_close (   x)    close(x)

◆ ON_STACK_TOP

#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)

◆ TASK_ALLOC

#define TASK_ALLOC (   pool,
  type 
)    (task_allocate(pool, (unsigned int)sizeof(type)))

◆ TASK_BEGIN

#define TASK_BEGIN
Value:
/* assert(ep); */ \
ADD_EVENTS(if (stack->sp->state) { \
add_event(string_arg("state")); \
add_event(int_arg(stack->sp->state)); \
} add_event(string_arg("TASK_BEGIN")); \
TASK_DEBUG("TASK_BEGIN"); \
switch (stack->sp->state) { \
case 0: \
pushp(stack, TASK_ALLOC(stack, struct env)); \
ep = _ep; \
assert(ep); \
TERM_CHECK;
task_env * stack
Definition: task.c:881
static task_arg void_arg(void *v)
Definition: task.h:201
const char * name
Definition: task.h:250
#define TASK_ALLOC(pool, type)
Definition: task.h:274
int state
Definition: task.h:215
#define _ep
Definition: task.h:272
TaskAlign * sp
Definition: task.h:253
static task_arg string_arg(char const *v)
Definition: task.h:195
static char * add_event(const char *var, LEX_CSTRING event, const char *data, size_t data_length)
Definition: audit_null.cc:299

◆ TASK_CALL

#define TASK_CALL (   funcall)
Value:
{ \
reset_state(stack); \
TASK_DEBUG("BEFORE CALL"); \
do { \
stack->sp--; \
stack->taskret = funcall; \
stack->sp++; \
TERM_CHECK; \
} while (stack->taskret); \
TASK_DEBUG("AFTER CALL"); \
}
task_env * stack
Definition: task.c:881
#define TASK_YIELD
Definition: task.h:373
int taskret
Definition: task.h:247
while(-- *argc > 0 &&*(pos= *(++*argv))=='-')
Definition: do_ctype.cc:83
#define TASK_DEBUG(x)
Definition: task.h:283

◆ TASK_DEACTIVATE

#define TASK_DEACTIVATE
Value:
{ \
TASK_DEBUG("TASK_DEACTIVATE"); \
task_deactivate(stack); \
TASK_YIELD; \
}
task_env * stack
Definition: task.c:881

◆ TASK_DEBUG

#define TASK_DEBUG (   x)

◆ TASK_DELAY

#define TASK_DELAY (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY"); \
task_delay_until(seconds() + t); \
TASK_YIELD; \
}
double seconds()
Definition: task.c:298

◆ TASK_DELAY_UNTIL

#define TASK_DELAY_UNTIL (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY_UNTIL"); \
task_delay_until(t); \
TASK_YIELD; \
}

◆ TASK_DUMP_ERR

#define TASK_DUMP_ERR
Value:
if (errno || SOCK_ERRNO || task_errno) { \
DBGOUT(FN; NDBG(errno, d); STREXP(strerror(errno)); NDBG(SOCK_ERRNO, d); \
STREXP(strerror(SOCK_ERRNO)); NDBG(task_errno, d); \
STREXP(strerror(task_errno))); \
}
#define FN
Definition: gcs_debug.h:217
#define STREXP(x)
Definition: gcs_debug.h:222
#define SOCK_ERRNO
Definition: task_os.h:96
int task_errno
Definition: task.c:122
#define NDBG(x, f)
Definition: gcs_debug.h:225

◆ TASK_END

#define TASK_END
Value:
add_event(string_arg("state")); \
add_event(int_arg(stack->sp->state)); \
} add_event(string_arg("TASK_END")); \
TASK_DEBUG("TASK_END"); \
stack->sp->state = 0; \
stack->where = stack->sp->ptr; \
assert(stack->where); \
popp(stack); \
return 0; \
} \
return 0
void * ptr
Definition: task.h:216
task_env * stack
Definition: task.c:881
static task_arg void_arg(void *v)
Definition: task.h:201
const char * name
Definition: task.h:250
int state
Definition: task.h:215
TaskAlign * where
Definition: task.h:251
TaskAlign * sp
Definition: task.h:253
static task_arg string_arg(char const *v)
Definition: task.h:195
static char * add_event(const char *var, LEX_CSTRING event, const char *data, size_t data_length)
Definition: audit_null.cc:299
#define ADD_EVENTS(x)
Definition: task.h:69

◆ TASK_FAIL

#define TASK_FAIL
Value:
{ \
*ret = (-1); \
TASK_DUMP_ERR; \
DBGOUT(FN; STRLIT("TASK_FAIL")); \
ADD_EVENTS(add_event(string_arg("task failed"))); \
goto task_cleanup; \
}
#define FN
Definition: gcs_debug.h:217
static task_arg string_arg(char const *v)
Definition: task.h:195
#define STRLIT(x)
Definition: gcs_debug.h:223
static char * add_event(const char *var, LEX_CSTRING event, const char *data, size_t data_length)
Definition: audit_null.cc:299

◆ TASK_POOL_ELEMS

#define TASK_POOL_ELEMS   1000

◆ TASK_RETURN

#define TASK_RETURN (   x)
Value:
{ \
*ret = (x); \
goto task_cleanup; \
}

◆ TASK_STACK_DEBUG

#define TASK_STACK_DEBUG
Value:
if (stack->debug) { \
char *fnpos = strrchr(__FILE__, DIR_SEP); \
if (fnpos) \
fnpos++; \
else \
fnpos = __FILE__; \
DBGOUT(FN; STRLIT("TASK_BEGIN "); STREXP(stack->name); STRLIT(fnpos); \
STRLIT(":"); NPUT(stack->sp->state, d); NDBG(stack->terminate, d)); \
}
task_env * stack
Definition: task.c:881
const char * name
Definition: task.h:250
#define FN
Definition: gcs_debug.h:217
#define NPUT(x, f)
Definition: gcs_debug.h:224
int state
Definition: task.h:215
#define STREXP(x)
Definition: gcs_debug.h:222
int debug
Definition: task.h:256
TaskAlign * sp
Definition: task.h:253
enum task_env::@10 terminate
#define STRLIT(x)
Definition: gcs_debug.h:223
#define DIR_SEP
Definition: task_os.h:83
#define NDBG(x, f)
Definition: gcs_debug.h:225

◆ TASK_WAIT

#define TASK_WAIT (   queue)
Value:
{ \
TASK_DEBUG("TASK_WAIT"); \
task_wait(stack, queue); \
TASK_YIELD; \
}
task_env * stack
Definition: task.c:881
static QUEUE queue
Definition: myisampack.cc:206

◆ TASK_YIELD

#define TASK_YIELD
Value:
{ \
TASK_DEBUG("TASK_YIELD"); \
stack->sp->state = __LINE__; \
return 1; \
case __LINE__: \
TASK_DEBUG("RETURN FROM YIELD"); \
ep = _ep; \
assert(ep); \
TERM_CHECK; \
}
#define _ep
Definition: task.h:272

◆ TERM_CHECK

#define TERM_CHECK   if (ON_STACK_TOP && stack->terminate) goto task_cleanup

◆ TERMINATE

#define TERMINATE   goto task_cleanup

◆ TIMED_TASK_WAIT

#define TIMED_TASK_WAIT (   queue,
 
)
Value:
{ \
TASK_DEBUG("TIMED_TASK_WAIT"); \
task_delay_until(seconds() + (t)); \
task_wait(stack, queue); \
TASK_YIELD; \
}
task_env * stack
Definition: task.c:881
static QUEUE queue
Definition: myisampack.cc:206
double seconds()
Definition: task.c:298

◆ UNLOCK_FD

#define UNLOCK_FD (   fd,
  op 
)    unlock_fd(fd, stack, op)

◆ xstr

#define xstr (   s)    #s

Typedef Documentation

◆ channel

typedef struct channel channel

◆ task_env

typedef struct task_env task_env

◆ task_func

typedef int(* task_func) (task_arg arg)

◆ task_queue

typedef struct task_queue task_queue

◆ TaskAlign

typedef struct task_ptr TaskAlign

Function Documentation

◆ accept_tcp()

int accept_tcp ( int  fd,
int *  ret 
)

◆ announce_tcp()

result announce_tcp ( xcom_port  port)

◆ block_fd()

int block_fd ( int  fd)

◆ channel_init()

channel* channel_init ( channel c,
unsigned int  type 
)

◆ channel_new()

channel* channel_new ( )

◆ channel_put()

void channel_put ( channel c,
linkage data 
)

◆ channel_put_front()

void channel_put_front ( channel c,
linkage data 
)

◆ con_read()

result con_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_write()

result con_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ connect_tcp()

int connect_tcp ( char *  server,
xcom_port  port,
int *  ret 
)

◆ double_arg()

static task_arg double_arg ( double  i)
inlinestatic

◆ end_arg()

static task_arg end_arg ( )
inlinestatic

◆ get_double_arg()

static double get_double_arg ( task_arg  arg)
inlinestatic

◆ get_float_arg()

static float get_float_arg ( task_arg  arg)
inlinestatic

◆ get_int_arg()

static int get_int_arg ( task_arg  arg)
inlinestatic

◆ get_long_arg()

static long get_long_arg ( task_arg  arg)
inlinestatic

◆ get_string_arg()

static char const* get_string_arg ( task_arg  arg)
inlinestatic

◆ get_uint_arg()

static unsigned int get_uint_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_arg()

static unsigned long get_ulong_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_long_arg()

static unsigned long long get_ulong_long_arg ( task_arg  arg)
inlinestatic

◆ get_void_arg()

static void* get_void_arg ( task_arg  arg)
inlinestatic

◆ int_arg()

static task_arg int_arg ( int  i)
inlinestatic

◆ is_locked()

int is_locked ( int  fd)

◆ is_only_task()

int is_only_task ( )

◆ is_running()

int is_running ( task_env t)

◆ lock_fd()

int lock_fd ( int  fd,
task_env t,
int  lock 
)

◆ popp()

void popp ( task_env p)

◆ pushp()

void pushp ( task_env p,
void *  ptr 
)

◆ remove_and_wakeup()

void remove_and_wakeup ( int  i)

◆ reset_state()

void reset_state ( task_env p)

◆ seconds()

double seconds ( )

◆ set_double_arg()

static void set_double_arg ( task_arg arg,
double  value 
)
inlinestatic

◆ set_float_arg()

static void set_float_arg ( task_arg arg,
float  value 
)
inlinestatic

◆ set_int_arg()

static void set_int_arg ( task_arg arg,
int  value 
)
inlinestatic

◆ set_long_arg()

static void set_long_arg ( task_arg arg,
long  value 
)
inlinestatic

◆ set_nodelay()

result set_nodelay ( int  fd)

◆ set_string_arg()

static void set_string_arg ( task_arg arg,
char const *  value 
)
inlinestatic

◆ set_task()

void set_task ( task_env **  p,
task_env t 
)

◆ set_uint_arg()

static void set_uint_arg ( task_arg arg,
unsigned int  value 
)
inlinestatic

◆ set_ulong_arg()

static void set_ulong_arg ( task_arg arg,
unsigned long  value 
)
inlinestatic

◆ set_ulong_long_arg()

static void set_ulong_long_arg ( task_arg arg,
unsigned long long  value 
)
inlinestatic

◆ set_void_arg()

static void set_void_arg ( task_arg arg,
void *  value 
)
inlinestatic

◆ string_arg()

static task_arg string_arg ( char const *  v)
inlinestatic

◆ task_activate()

task_env* task_activate ( task_env t)

◆ task_allocate()

void* task_allocate ( task_env p,
unsigned int  bytes 
)

Allocate bytes from pool, initialized to zero.

◆ task_deactivate()

task_env* task_deactivate ( task_env t)

◆ task_delay_until()

void task_delay_until ( double  time)

◆ task_loop()

void task_loop ( )

◆ task_name()

const char* task_name ( )

◆ task_new()

task_env* task_new ( task_func  func,
task_arg  arg,
const char *  name,
int  debug 
)

◆ task_now()

double task_now ( )

◆ task_read()

int task_read ( connection_descriptor const *  con,
void *  buf,
int  n,
int64_t *  ret 
)

◆ task_sys_init()

void task_sys_init ( )

◆ task_terminate()

task_env* task_terminate ( task_env t)

◆ task_terminate_all()

void task_terminate_all ( )

◆ task_wait()

void task_wait ( task_env t,
linkage queue 
)

◆ task_wakeup()

void task_wakeup ( linkage queue)

◆ task_write()

int task_write ( connection_descriptor const *  con,
void *  buf,
uint32_t  n,
int64_t *  ret 
)

◆ uint_arg()

static task_arg uint_arg ( unsigned int  i)
inlinestatic

◆ ulong_arg()

static task_arg ulong_arg ( unsigned long  l)
inlinestatic

◆ ulong_long_arg()

static task_arg ulong_long_arg ( unsigned long long  ll)
inlinestatic

◆ unblock_fd()

int unblock_fd ( int  fd)

◆ unlock_fd()

int unlock_fd ( int  fd,
task_env t,
int  lock 
)

◆ void_arg()

static task_arg void_arg ( void *  v)
inlinestatic

◆ wait_io()

task_env* wait_io ( task_env t,
int  fd,
int  op 
)

◆ xcom_disable_ssl()

void xcom_disable_ssl ( )

◆ xcom_enable_ssl()

void xcom_enable_ssl ( )

Variable Documentation

◆ stack

task_env* stack

◆ task_errno

int task_errno