MySQL 9.1.0
Source Code Documentation
task.h File Reference

Rudimentary task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs. More...

#include "xcom/xcom_profile.h"
#include <assert.h>
#include <errno.h>
#include "my_compiler.h"
#include "xcom/simset.h"
#include "xcom/task_arg.h"
#include "xcom/x_platform.h"
#include "xcom/xcom_common.h"
#include "xcom/node_connection.h"
#include "xcom/result.h"

Go to the source code of this file.

Classes

struct  task_ptr
 
struct  task_env
 
struct  task_queue
 
struct  channel
 

Macros

#define ADD_EVENTS(x)
 
#define ADD_DBG(d, x)
 
#define ADD_T_EV(when, file, state, what)
 
#define ADD_WAIT_EV(when, file, state, what, milli)
 
#define TASK_POOL_ELEMS   1000
 
#define MAXTASKS   1000
 
#define _ep   ((struct env *)(stack->sp->ptr))
 
#define TASK_ALLOC(pool, type)   (task_allocate(pool, (unsigned int)sizeof(type)))
 
#define TASK_DEBUG(x)
 
#define FINALLY    task_cleanup:
 
#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)
 
#define TERM_CHECK    if (ON_STACK_TOP && stack->terminate) goto task_cleanup
 
#define TERMINATE   goto task_cleanup
 
#define TASK_BEGIN
 
#define TASK_END
 
#define TASK_RETURN(x)
 
#define TASK_DUMP_ERR
 
#define TASK_FAIL
 
#define TASK_YIELD
 
#define TASK_DEACTIVATE
 
#define TASK_DELAY(t)
 
#define TASK_DELAY_UNTIL(t)
 
#define TASK_WAIT(queue)
 
#define TIMED_TASK_WAIT(queue, t)
 
#define CHANNEL_GET(channel, ptr, type)
 
#define CHANNEL_PEEK(channel, ptr, type)
 
#define CHANNEL_GET_REVERSE(channel, ptr, type)
 
#define TASK_CALL(funcall)
 
#define DECL_ENV   struct env {
 
#define ENV_INIT   void init() {
 
#define END_ENV_INIT   }
 
#define END_ENV
 
#define LOCK_FD(fd, op)
 
#define UNLOCK_FD(fd, op)   unlock_fd(fd, stack, op)
 
#define xstr(s)   #s
 

Typedefs

typedef struct task_ptr TaskAlign
 
typedef int(* task_func) (task_arg arg)
 
typedef enum terminate_enum terminate_enum
 
typedef struct task_env task_env
 
typedef struct task_queue task_queue
 
typedef struct channel channel
 
typedef result(* connnection_read_method) (connection_descriptor const *rfd, void *buf, int n)
 
typedef result(* connnection_write_method) (connection_descriptor const *rfd, void *buf, int n)
 

Enumerations

enum  terminate_enum { RUN = 0 , KILL = 1 , TERMINATED = 2 }
 

Functions

static void set_int_arg (task_arg *arg, int value)
 
static int get_int_arg (task_arg arg)
 
static void set_long_arg (task_arg *arg, long value)
 
static long get_long_arg (task_arg arg)
 
static void set_uint_arg (task_arg *arg, unsigned int value)
 
static unsigned int get_uint_arg (task_arg arg)
 
static void set_ulong_arg (task_arg *arg, unsigned long value)
 
static void set_ulong_long_arg (task_arg *arg, unsigned long long value)
 
static unsigned long get_ulong_arg (task_arg arg)
 
static unsigned long long get_ulong_long_arg (task_arg arg)
 
static void set_float_arg (task_arg *arg, float value)
 
static float get_float_arg (task_arg arg)
 
static void set_double_arg (task_arg *arg, double value)
 
static double get_double_arg (task_arg arg)
 
static void set_string_arg (task_arg *arg, char const *value)
 
static void set_void_arg (task_arg *arg, void *value)
 
static char const * get_string_arg (task_arg arg)
 
static void * get_void_arg (task_arg arg)
 
static task_arg int_arg (int i)
 
static task_arg uint_arg (unsigned int i)
 
static task_arg ulong_arg (unsigned long l)
 
static task_arg ulong_long_arg (unsigned long long ll)
 
static task_arg double_arg (double i)
 
static task_arg string_arg (char const *v)
 
static task_arg void_arg (void *v)
 
static task_arg end_arg ()
 
channelchannel_init (channel *c, unsigned int type)
 
void channel_put (channel *c, linkage *data)
 
void channel_put_front (channel *c, linkage *data)
 
void * task_allocate (task_env *p, unsigned int bytes)
 Allocate bytes from pool, initialized to zero. More...
 
void reset_state (task_env *p)
 
void pushp (task_env *p, void *ptr)
 
void popp (task_env *p)
 
double seconds ()
 
double task_now ()
 
void task_delay_until (double time)
 
unsigned long long int get_time_since_the_epoch ()
 Return time in microseconds. More...
 
int unblock_fd (int fd)
 
int block_fd (int fd)
 
result con_read (connection_descriptor const *rfd, void *buf, int n)
 
result con_pipe_read (connection_descriptor const *rfd, void *buf, int n)
 
int task_read (connection_descriptor const *con, void *buf, int n, int64_t *ret, connnection_read_method read_function=con_read)
 
result con_write (connection_descriptor const *wfd, void *buf, int n)
 
result con_pipe_write (connection_descriptor const *wfd, void *buf, int n)
 
int task_write (connection_descriptor const *con, void *buf, uint32_t n, int64_t *ret)
 
int is_locked (int fd)
 
int lock_fd (int fd, task_env *t, int lock)
 
int unlock_fd (int fd, task_env *t, int lock)
 
void task_sys_init ()
 
task_envtask_new (task_func func, task_arg arg, const char *name, int debug)
 
void task_loop ()
 
void task_wait (task_env *t, linkage *queue)
 
void task_wakeup (linkage *queue)
 
task_envtask_terminate (task_env *t)
 
void set_task (task_env **p, task_env *t)
 
void task_terminate_all ()
 
void remove_and_wakeup (int i)
 
int is_only_task ()
 
task_envtask_activate (task_env *t)
 
task_envtask_deactivate (task_env *t)
 
const char * task_name ()
 
task_envwait_io (task_env *t, int fd, int op)
 
result set_nodelay (int fd)
 
task_envtimed_wait_io (task_env *t, int fd, int op, double timeout)
 

Variables

task_envstack
 
int task_errno
 
xcom_proto const my_min_xcom_version
 
xcom_proto const my_xcom_version
 

Detailed Description

Rudimentary task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs.

(continuations?) Nonblocking IO and event handling need to be rewritten for each new OS.

Macro Definition Documentation

◆ _ep

#define _ep   ((struct env *)(stack->sp->ptr))

◆ ADD_DBG

#define ADD_DBG (   d,
 
)

◆ ADD_EVENTS

#define ADD_EVENTS (   x)

◆ ADD_T_EV

#define ADD_T_EV (   when,
  file,
  state,
  what 
)

◆ ADD_WAIT_EV

#define ADD_WAIT_EV (   when,
  file,
  state,
  what,
  milli 
)

◆ CHANNEL_GET

#define CHANNEL_GET (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_first(&(channel)->data); \
IFDBG(D_TRANSPORT, FN; STRLIT("CHANNEL_GET "); PTREXP(*(ptr)); \
PTREXP(&((channel)->data))); \
}
#define PTREXP(x)
Definition: gcs_debug.h:312
#define FN
Definition: gcs_debug.h:308
@ D_TRANSPORT
Definition: gcs_debug.h:178
#define STRLIT(x)
Definition: gcs_debug.h:316
static QUEUE queue
Definition: myisampack.cc:210
required string type
Definition: replication_group_member_actions.proto:34
static linkage * link_extract_first(linkage *self)
Definition: simset.h:106
static int link_empty(linkage *self)
Definition: simset.h:114
Definition: task.h:427

◆ CHANNEL_GET_REVERSE

#define CHANNEL_GET_REVERSE (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_last(&(channel)->data); \
}
static linkage * link_extract_last(linkage *self)
Definition: simset.h:110

◆ CHANNEL_PEEK

#define CHANNEL_PEEK (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_first(&(channel)->data); \
}
static linkage * link_first(linkage *self)
Definition: simset.h:102

◆ DECL_ENV

#define DECL_ENV   struct env {

◆ END_ENV

#define END_ENV
Value:
} \
; \
[[maybe_unused]] struct env *ep

◆ END_ENV_INIT

#define END_ENV_INIT   }

◆ ENV_INIT

#define ENV_INIT   void init() {

◆ FINALLY

#define FINALLY    task_cleanup:

◆ LOCK_FD

#define LOCK_FD (   fd,
  op 
)
Value:
{ \
while (!lock_fd( \
fd, stack, \
op)) { /* Effectively a spin lock, but should not happen very often */ \
wait_io(stack, fd, op); \
TASK_YIELD; \
/* TASK_DELAY(1.0); */ \
} \
}
task_env * stack
Definition: task.cc:892
int lock_fd(int fd, task_env *t, int lock)

◆ MAXTASKS

#define MAXTASKS   1000

◆ ON_STACK_TOP

#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)

◆ TASK_ALLOC

#define TASK_ALLOC (   pool,
  type 
)    (task_allocate(pool, (unsigned int)sizeof(type)))

◆ TASK_BEGIN

#define TASK_BEGIN
Value:
/* assert(ep); */ \
ADD_DBG( \
D_TASK, \
if (stack->sp->state) { \
add_event(EVENT_DUMP_PAD, string_arg("state")); \
add_event(EVENT_DUMP_PAD, int_arg(stack->sp->state)); \
} add_event(EVENT_DUMP_PAD, string_arg("TASK_BEGIN")); \
add_event(EVENT_DUMP_PAD, void_arg(stack));); \
TASK_DEBUG("TASK_BEGIN"); \
switch (stack->sp->state) { \
case 0: \
pushp(stack, TASK_ALLOC(stack, struct env)); \
ep = _ep; \
assert(ep); \
ep->init(); \
TERM_CHECK;
static char * add_event(const char *var, LEX_CSTRING event, const char *data, size_t data_length)
Definition: audit_null.cc:313
@ D_TASK
Definition: gcs_debug.h:175
TaskAlign * sp
Definition: task.h:256
int state
Definition: task.h:215
static task_arg string_arg(char const *v)
Definition: task.h:195
static task_arg void_arg(void *v)
Definition: task.h:201
#define TASK_ALLOC(pool, type)
Definition: task.h:277
#define _ep
Definition: task.h:275

◆ TASK_CALL

#define TASK_CALL (   funcall)
Value:
{ \
reset_state(stack); \
TASK_DEBUG("BEFORE CALL"); \
do { \
stack->sp--; \
stack->taskret = funcall; \
stack->sp++; \
TERM_CHECK; \
} while (stack->taskret); \
TASK_DEBUG("AFTER CALL"); \
}
int taskret
Definition: task.h:250
#define TASK_YIELD
Definition: task.h:370
#define TASK_DEBUG(x)
Definition: task.h:286

◆ TASK_DEACTIVATE

#define TASK_DEACTIVATE
Value:
{ \
TASK_DEBUG("TASK_DEACTIVATE"); \
task_deactivate(stack); \
TASK_YIELD; \
}

◆ TASK_DEBUG

#define TASK_DEBUG (   x)

◆ TASK_DELAY

#define TASK_DELAY (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY"); \
task_delay_until(seconds() + t); \
TASK_YIELD; \
}
double seconds()
Definition: task.cc:310

◆ TASK_DELAY_UNTIL

#define TASK_DELAY_UNTIL (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY_UNTIL"); \
task_delay_until(t); \
TASK_YIELD; \
}

◆ TASK_DUMP_ERR

#define TASK_DUMP_ERR
Value:
if (errno || SOCK_ERRNO || task_errno) { \
IFDBG(D_NONE, FN; NDBG(errno, d); STREXP(strerror(errno)); \
NDBG(SOCK_ERRNO, d); STREXP(strerror(SOCK_ERRNO)); \
NDBG(task_errno, d); STREXP(strerror(task_errno))); \
}
@ D_NONE
Definition: gcs_debug.h:174
#define NDBG(x, f)
Definition: gcs_debug.h:318
#define STREXP(x)
Definition: gcs_debug.h:315
int task_errno
Definition: task.cc:134
#define SOCK_ERRNO
Definition: task_os.h:97

◆ TASK_END

#define TASK_END
Value:
D_TASK, \
if (stack->sp->state) { \
add_event(EVENT_DUMP_PAD, string_arg("state")); \
add_event(EVENT_DUMP_PAD, int_arg(stack->sp->state)); \
} add_event(EVENT_DUMP_PAD, string_arg("TASK_END")); \
add_event(EVENT_DUMP_PAD, void_arg(stack));); \
TASK_DEBUG("TASK_END"); \
stack->sp->state = 0; \
stack->where = (TaskAlign *)stack->sp->ptr; \
assert(stack->where); \
popp(stack); \
return 0; \
} \
return 0
TaskAlign * where
Definition: task.h:254
Definition: task.h:214
void * ptr
Definition: task.h:216
#define ADD_DBG(d, x)
Definition: task.h:70

◆ TASK_FAIL

#define TASK_FAIL
Value:
{ \
*ret = (-1); \
TASK_DUMP_ERR; \
IFDBG(D_NONE, FN; STRLIT("TASK_FAIL")); \
ADD_DBG(D_TASK, add_event(EVENT_DUMP_PAD, string_arg("task failed"))); \
goto task_cleanup; \
}

◆ TASK_POOL_ELEMS

#define TASK_POOL_ELEMS   1000

◆ TASK_RETURN

#define TASK_RETURN (   x)
Value:
{ \
*ret = (x); \
goto task_cleanup; \
}

◆ TASK_WAIT

#define TASK_WAIT (   queue)
Value:
{ \
TASK_DEBUG("TASK_WAIT"); \
task_wait(stack, queue); \
TASK_YIELD; \
}

◆ TASK_YIELD

#define TASK_YIELD
Value:
{ \
TASK_DEBUG("TASK_YIELD"); \
stack->sp->state = __LINE__; \
return 1; \
case __LINE__: \
TASK_DEBUG("RETURN FROM YIELD"); \
ep = _ep; \
assert(ep); \
TERM_CHECK; \
}

◆ TERM_CHECK

#define TERM_CHECK    if (ON_STACK_TOP && stack->terminate) goto task_cleanup

◆ TERMINATE

#define TERMINATE   goto task_cleanup

◆ TIMED_TASK_WAIT

#define TIMED_TASK_WAIT (   queue,
 
)
Value:
{ \
TASK_DEBUG("TIMED_TASK_WAIT"); \
task_delay_until(seconds() + (t)); \
task_wait(stack, queue); \
TASK_YIELD; \
}

◆ UNLOCK_FD

#define UNLOCK_FD (   fd,
  op 
)    unlock_fd(fd, stack, op)

◆ xstr

#define xstr (   s)    #s

Typedef Documentation

◆ channel

typedef struct channel channel

◆ connnection_read_method

typedef result(* connnection_read_method) (connection_descriptor const *rfd, void *buf, int n)

◆ connnection_write_method

typedef result(* connnection_write_method) (connection_descriptor const *rfd, void *buf, int n)

◆ task_env

typedef struct task_env task_env

◆ task_func

typedef int(* task_func) (task_arg arg)

◆ task_queue

typedef struct task_queue task_queue

◆ TaskAlign

typedef struct task_ptr TaskAlign

◆ terminate_enum

Enumeration Type Documentation

◆ terminate_enum

Enumerator
RUN 
KILL 
TERMINATED 

Function Documentation

◆ block_fd()

int block_fd ( int  fd)

◆ channel_init()

channel * channel_init ( channel c,
unsigned int  type 
)

◆ channel_put()

void channel_put ( channel c,
linkage data 
)

◆ channel_put_front()

void channel_put_front ( channel c,
linkage data 
)

◆ con_pipe_read()

result con_pipe_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_pipe_write()

result con_pipe_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ con_read()

result con_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_write()

result con_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ double_arg()

static task_arg double_arg ( double  i)
inlinestatic

◆ end_arg()

static task_arg end_arg ( )
inlinestatic

◆ get_double_arg()

static double get_double_arg ( task_arg  arg)
inlinestatic

◆ get_float_arg()

static float get_float_arg ( task_arg  arg)
inlinestatic

◆ get_int_arg()

static int get_int_arg ( task_arg  arg)
inlinestatic

◆ get_long_arg()

static long get_long_arg ( task_arg  arg)
inlinestatic

◆ get_string_arg()

static char const * get_string_arg ( task_arg  arg)
inlinestatic

◆ get_time_since_the_epoch()

unsigned long long int get_time_since_the_epoch ( )

Return time in microseconds.

Uses std::chrono::high_resolution_clock

Return values
Numberof microseconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC)

◆ get_uint_arg()

static unsigned int get_uint_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_arg()

static unsigned long get_ulong_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_long_arg()

static unsigned long long get_ulong_long_arg ( task_arg  arg)
inlinestatic

◆ get_void_arg()

static void * get_void_arg ( task_arg  arg)
inlinestatic

◆ int_arg()

static task_arg int_arg ( int  i)
inlinestatic

◆ is_locked()

int is_locked ( int  fd)

◆ is_only_task()

int is_only_task ( )

◆ lock_fd()

int lock_fd ( int  fd,
task_env t,
int  lock 
)

◆ popp()

void popp ( task_env p)

◆ pushp()

void pushp ( task_env p,
void *  ptr 
)

◆ remove_and_wakeup()

void remove_and_wakeup ( int  i)

◆ reset_state()

void reset_state ( task_env p)

◆ seconds()

double seconds ( )

◆ set_double_arg()

static void set_double_arg ( task_arg arg,
double  value 
)
inlinestatic

◆ set_float_arg()

static void set_float_arg ( task_arg arg,
float  value 
)
inlinestatic

◆ set_int_arg()

static void set_int_arg ( task_arg arg,
int  value 
)
inlinestatic

◆ set_long_arg()

static void set_long_arg ( task_arg arg,
long  value 
)
inlinestatic

◆ set_nodelay()

result set_nodelay ( int  fd)

◆ set_string_arg()

static void set_string_arg ( task_arg arg,
char const *  value 
)
inlinestatic

◆ set_task()

void set_task ( task_env **  p,
task_env t 
)

◆ set_uint_arg()

static void set_uint_arg ( task_arg arg,
unsigned int  value 
)
inlinestatic

◆ set_ulong_arg()

static void set_ulong_arg ( task_arg arg,
unsigned long  value 
)
inlinestatic

◆ set_ulong_long_arg()

static void set_ulong_long_arg ( task_arg arg,
unsigned long long  value 
)
inlinestatic

◆ set_void_arg()

static void set_void_arg ( task_arg arg,
void *  value 
)
inlinestatic

◆ string_arg()

static task_arg string_arg ( char const *  v)
inlinestatic

◆ task_activate()

task_env * task_activate ( task_env t)

◆ task_allocate()

void * task_allocate ( task_env p,
unsigned int  bytes 
)

Allocate bytes from pool, initialized to zero.

◆ task_deactivate()

task_env * task_deactivate ( task_env t)

◆ task_delay_until()

void task_delay_until ( double  time)

◆ task_loop()

void task_loop ( )

◆ task_name()

const char * task_name ( )

◆ task_new()

task_env * task_new ( task_func  func,
task_arg  arg,
const char *  name,
int  debug 
)

◆ task_now()

double task_now ( )

◆ task_read()

int task_read ( connection_descriptor const *  con,
void *  buf,
int  n,
int64_t *  ret,
connnection_read_method  read_function = con_read 
)

◆ task_sys_init()

void task_sys_init ( )

◆ task_terminate()

task_env * task_terminate ( task_env t)

◆ task_terminate_all()

void task_terminate_all ( )

◆ task_wait()

void task_wait ( task_env t,
linkage queue 
)

◆ task_wakeup()

void task_wakeup ( linkage queue)

◆ task_write()

int task_write ( connection_descriptor const *  con,
void *  buf,
uint32_t  n,
int64_t *  ret 
)

◆ timed_wait_io()

task_env * timed_wait_io ( task_env t,
int  fd,
int  op,
double  timeout 
)

◆ uint_arg()

static task_arg uint_arg ( unsigned int  i)
inlinestatic

◆ ulong_arg()

static task_arg ulong_arg ( unsigned long  l)
inlinestatic

◆ ulong_long_arg()

static task_arg ulong_long_arg ( unsigned long long  ll)
inlinestatic

◆ unblock_fd()

int unblock_fd ( int  fd)

◆ unlock_fd()

int unlock_fd ( int  fd,
task_env t,
int  lock 
)

◆ void_arg()

static task_arg void_arg ( void *  v)
inlinestatic

◆ wait_io()

task_env * wait_io ( task_env t,
int  fd,
int  op 
)

Variable Documentation

◆ my_min_xcom_version

xcom_proto const my_min_xcom_version
extern

◆ my_xcom_version

xcom_proto const my_xcom_version
extern

◆ stack

task_env* stack
extern

◆ task_errno

int task_errno
extern