MySQL 9.1.0
Source Code Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
task.h File Reference

Rudimentary task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs. More...

#include "xcom/xcom_profile.h"
#include <assert.h>
#include <errno.h>
#include "my_compiler.h"
#include "xcom/simset.h"
#include "xcom/task_arg.h"
#include "xcom/x_platform.h"
#include "xcom/xcom_common.h"
#include "xcom/node_connection.h"
#include "xcom/result.h"

Go to the source code of this file.

Classes

struct  task_ptr
 
struct  task_env
 
struct  task_queue
 
struct  channel
 

Macros

#define ADD_EVENTS(x)
 
#define ADD_DBG(d, x)
 
#define ADD_T_EV(when, file, state, what)
 
#define ADD_WAIT_EV(when, file, state, what, milli)
 
#define TASK_POOL_ELEMS   1000
 
#define MAXTASKS   1000
 
#define _ep   ((struct env *)(stack->sp->ptr))
 
#define TASK_ALLOC(pool, type)   (task_allocate(pool, (unsigned int)sizeof(type)))
 
#define TASK_DEBUG(x)
 
#define FINALLY    task_cleanup:
 
#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)
 
#define TERM_CHECK    if (ON_STACK_TOP && stack->terminate) goto task_cleanup
 
#define TERMINATE   goto task_cleanup
 
#define TASK_BEGIN
 
#define TASK_END
 
#define TASK_RETURN(x)
 
#define TASK_DUMP_ERR
 
#define TASK_FAIL
 
#define TASK_YIELD
 
#define TASK_DEACTIVATE
 
#define TASK_DELAY(t)
 
#define TASK_DELAY_UNTIL(t)
 
#define TASK_WAIT(queue)
 
#define TIMED_TASK_WAIT(queue, t)
 
#define CHANNEL_GET(channel, ptr, type)
 
#define CHANNEL_PEEK(channel, ptr, type)
 
#define CHANNEL_GET_REVERSE(channel, ptr, type)
 
#define TASK_CALL(funcall)
 
#define DECL_ENV   struct env {
 
#define ENV_INIT   void init() {
 
#define END_ENV_INIT   }
 
#define END_ENV
 
#define LOCK_FD(fd, op)
 
#define UNLOCK_FD(fd, op)   unlock_fd(fd, stack, op)
 
#define xstr(s)   #s
 

Typedefs

typedef struct task_ptr TaskAlign
 
typedef int(* task_func) (task_arg arg)
 
typedef enum terminate_enum terminate_enum
 
typedef struct task_env task_env
 
typedef struct task_queue task_queue
 
typedef struct channel channel
 
typedef result(* connnection_read_method) (connection_descriptor const *rfd, void *buf, int n)
 
typedef result(* connnection_write_method) (connection_descriptor const *rfd, void *buf, int n)
 

Enumerations

enum  terminate_enum { RUN = 0 , KILL = 1 , TERMINATED = 2 }
 

Functions

static void set_int_arg (task_arg *arg, int value)
 
static int get_int_arg (task_arg arg)
 
static void set_long_arg (task_arg *arg, long value)
 
static long get_long_arg (task_arg arg)
 
static void set_uint_arg (task_arg *arg, unsigned int value)
 
static unsigned int get_uint_arg (task_arg arg)
 
static void set_ulong_arg (task_arg *arg, unsigned long value)
 
static void set_ulong_long_arg (task_arg *arg, unsigned long long value)
 
static unsigned long get_ulong_arg (task_arg arg)
 
static unsigned long long get_ulong_long_arg (task_arg arg)
 
static void set_float_arg (task_arg *arg, float value)
 
static float get_float_arg (task_arg arg)
 
static void set_double_arg (task_arg *arg, double value)
 
static double get_double_arg (task_arg arg)
 
static void set_string_arg (task_arg *arg, char const *value)
 
static void set_void_arg (task_arg *arg, void *value)
 
static char const * get_string_arg (task_arg arg)
 
static void * get_void_arg (task_arg arg)
 
static task_arg int_arg (int i)
 
static task_arg uint_arg (unsigned int i)
 
static task_arg ulong_arg (unsigned long l)
 
static task_arg ulong_long_arg (unsigned long long ll)
 
static task_arg double_arg (double i)
 
static task_arg string_arg (char const *v)
 
static task_arg void_arg (void *v)
 
static task_arg end_arg ()
 
channelchannel_init (channel *c, unsigned int type)
 
void channel_put (channel *c, linkage *data)
 
void channel_put_front (channel *c, linkage *data)
 
void * task_allocate (task_env *p, unsigned int bytes)
 Allocate bytes from pool, initialized to zero. More...
 
void reset_state (task_env *p)
 
void pushp (task_env *p, void *ptr)
 
void popp (task_env *p)
 
double seconds ()
 
double task_now ()
 
void task_delay_until (double time)
 
unsigned long long int get_time_since_the_epoch ()
 Return time in microseconds. More...
 
int unblock_fd (int fd)
 
int block_fd (int fd)
 
result con_read (connection_descriptor const *rfd, void *buf, int n)
 
result con_pipe_read (connection_descriptor const *rfd, void *buf, int n)
 
int task_read (connection_descriptor const *con, void *buf, int n, int64_t *ret, connnection_read_method read_function=con_read)
 
result con_write (connection_descriptor const *wfd, void *buf, int n)
 
result con_pipe_write (connection_descriptor const *wfd, void *buf, int n)
 
int task_write (connection_descriptor const *con, void *buf, uint32_t n, int64_t *ret)
 
int is_locked (int fd)
 
int lock_fd (int fd, task_env *t, int lock)
 
int unlock_fd (int fd, task_env *t, int lock)
 
void task_sys_init ()
 
task_envtask_new (task_func func, task_arg arg, const char *name, int debug)
 
void task_loop ()
 
void task_wait (task_env *t, linkage *queue)
 
void task_wakeup (linkage *queue)
 
task_envtask_terminate (task_env *t)
 
void set_task (task_env **p, task_env *t)
 
void task_terminate_all ()
 
void remove_and_wakeup (int i)
 
int is_only_task ()
 
task_envtask_activate (task_env *t)
 
task_envtask_deactivate (task_env *t)
 
const char * task_name ()
 
task_envwait_io (task_env *t, int fd, int op)
 
result set_nodelay (int fd)
 
task_envtimed_wait_io (task_env *t, int fd, int op, double timeout)
 

Variables

task_envstack
 
int task_errno
 
xcom_proto const my_min_xcom_version
 
xcom_proto const my_xcom_version
 

Detailed Description

Rudimentary task system in portable C, based on Tom Duff's switch-based coroutine trick and a stack of environment structs.

(continuations?) Nonblocking IO and event handling need to be rewritten for each new OS.

Macro Definition Documentation

◆ _ep

#define _ep   ((struct env *)(stack->sp->ptr))

◆ ADD_DBG

#define ADD_DBG (   d,
 
)

◆ ADD_EVENTS

#define ADD_EVENTS (   x)

◆ ADD_T_EV

#define ADD_T_EV (   when,
  file,
  state,
  what 
)

◆ ADD_WAIT_EV

#define ADD_WAIT_EV (   when,
  file,
  state,
  what,
  milli 
)

◆ CHANNEL_GET

#define CHANNEL_GET (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_first(&(channel)->data); \
IFDBG(D_TRANSPORT, FN; STRLIT("CHANNEL_GET "); PTREXP(*(ptr)); \
PTREXP(&((channel)->data))); \
}
#define PTREXP(x)
Definition: gcs_debug.h:312
#define FN
Definition: gcs_debug.h:308
@ D_TRANSPORT
Definition: gcs_debug.h:178
#define STRLIT(x)
Definition: gcs_debug.h:316
static QUEUE queue
Definition: myisampack.cc:210
required string type
Definition: replication_group_member_actions.proto:34
static linkage * link_extract_first(linkage *self)
Definition: simset.h:106
static int link_empty(linkage *self)
Definition: simset.h:114
Definition: task.h:427

◆ CHANNEL_GET_REVERSE

#define CHANNEL_GET_REVERSE (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_extract_last(&(channel)->data); \
}
static linkage * link_extract_last(linkage *self)
Definition: simset.h:110

◆ CHANNEL_PEEK

#define CHANNEL_PEEK (   channel,
  ptr,
  type 
)
Value:
{ \
while (link_empty(&(channel)->data)) { \
TASK_WAIT(&(channel)->queue); \
} \
*(ptr) = (type *)link_first(&(channel)->data); \
}
static linkage * link_first(linkage *self)
Definition: simset.h:102

◆ DECL_ENV

#define DECL_ENV   struct env {

◆ END_ENV

#define END_ENV
Value:
} \
; \
[[maybe_unused]] struct env *ep

◆ END_ENV_INIT

#define END_ENV_INIT   }

◆ ENV_INIT

#define ENV_INIT   void init() {

◆ FINALLY

#define FINALLY    task_cleanup:

◆ LOCK_FD

#define LOCK_FD (   fd,
  op 
)
Value:
{ \
while (!lock_fd( \
fd, stack, \
op)) { /* Effectively a spin lock, but should not happen very often */ \
wait_io(stack, fd, op); \
TASK_YIELD; \
/* TASK_DELAY(1.0); */ \
} \
}
task_env * stack
Definition: task.cc:892
int lock_fd(int fd, task_env *t, int lock)

◆ MAXTASKS

#define MAXTASKS   1000

◆ ON_STACK_TOP

#define ON_STACK_TOP   (stack->sp == stack->stack_top + 1)

◆ TASK_ALLOC

#define TASK_ALLOC (   pool,
  type 
)    (task_allocate(pool, (unsigned int)sizeof(type)))

◆ TASK_BEGIN

#define TASK_BEGIN
Value:
/* assert(ep); */ \
ADD_DBG( \
D_TASK, \
if (stack->sp->state) { \
add_event(EVENT_DUMP_PAD, string_arg("state")); \
add_event(EVENT_DUMP_PAD, int_arg(stack->sp->state)); \
} add_event(EVENT_DUMP_PAD, string_arg("TASK_BEGIN")); \
add_event(EVENT_DUMP_PAD, void_arg(stack));); \
TASK_DEBUG("TASK_BEGIN"); \
switch (stack->sp->state) { \
case 0: \
pushp(stack, TASK_ALLOC(stack, struct env)); \
ep = _ep; \
assert(ep); \
ep->init(); \
TERM_CHECK;
static char * add_event(const char *var, LEX_CSTRING event, const char *data, size_t data_length)
Definition: audit_null.cc:313
@ D_TASK
Definition: gcs_debug.h:175
TaskAlign * sp
Definition: task.h:256
int state
Definition: task.h:215
static task_arg string_arg(char const *v)
Definition: task.h:195
static task_arg void_arg(void *v)
Definition: task.h:201
#define TASK_ALLOC(pool, type)
Definition: task.h:277
#define _ep
Definition: task.h:275

◆ TASK_CALL

#define TASK_CALL (   funcall)
Value:
{ \
reset_state(stack); \
TASK_DEBUG("BEFORE CALL"); \
do { \
stack->sp--; \
stack->taskret = funcall; \
stack->sp++; \
TERM_CHECK; \
} while (stack->taskret); \
TASK_DEBUG("AFTER CALL"); \
}
int taskret
Definition: task.h:250
#define TASK_YIELD
Definition: task.h:370
#define TASK_DEBUG(x)
Definition: task.h:286

◆ TASK_DEACTIVATE

#define TASK_DEACTIVATE
Value:
{ \
TASK_DEBUG("TASK_DEACTIVATE"); \
task_deactivate(stack); \
TASK_YIELD; \
}

◆ TASK_DEBUG

#define TASK_DEBUG (   x)

◆ TASK_DELAY

#define TASK_DELAY (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY"); \
task_delay_until(seconds() + t); \
TASK_YIELD; \
}
double seconds()
Definition: task.cc:310

◆ TASK_DELAY_UNTIL

#define TASK_DELAY_UNTIL (   t)
Value:
{ \
TASK_DEBUG("TASK_DELAY_UNTIL"); \
task_delay_until(t); \
TASK_YIELD; \
}

◆ TASK_DUMP_ERR

#define TASK_DUMP_ERR
Value:
if (errno || SOCK_ERRNO || task_errno) { \
IFDBG(D_NONE, FN; NDBG(errno, d); STREXP(strerror(errno)); \
NDBG(SOCK_ERRNO, d); STREXP(strerror(SOCK_ERRNO)); \
NDBG(task_errno, d); STREXP(strerror(task_errno))); \
}
@ D_NONE
Definition: gcs_debug.h:174
#define NDBG(x, f)
Definition: gcs_debug.h:318
#define STREXP(x)
Definition: gcs_debug.h:315
int task_errno
Definition: task.cc:134
#define SOCK_ERRNO
Definition: task_os.h:97

◆ TASK_END

#define TASK_END
Value:
D_TASK, \
if (stack->sp->state) { \
add_event(EVENT_DUMP_PAD, string_arg("state")); \
add_event(EVENT_DUMP_PAD, int_arg(stack->sp->state)); \
} add_event(EVENT_DUMP_PAD, string_arg("TASK_END")); \
add_event(EVENT_DUMP_PAD, void_arg(stack));); \
TASK_DEBUG("TASK_END"); \
stack->sp->state = 0; \
stack->where = (TaskAlign *)stack->sp->ptr; \
assert(stack->where); \
popp(stack); \
return 0; \
} \
return 0
TaskAlign * where
Definition: task.h:254
Definition: task.h:214
void * ptr
Definition: task.h:216
#define ADD_DBG(d, x)
Definition: task.h:70

◆ TASK_FAIL

#define TASK_FAIL
Value:
{ \
*ret = (-1); \
TASK_DUMP_ERR; \
IFDBG(D_NONE, FN; STRLIT("TASK_FAIL")); \
ADD_DBG(D_TASK, add_event(EVENT_DUMP_PAD, string_arg("task failed"))); \
goto task_cleanup; \
}

◆ TASK_POOL_ELEMS

#define TASK_POOL_ELEMS   1000

◆ TASK_RETURN

#define TASK_RETURN (   x)
Value:
{ \
*ret = (x); \
goto task_cleanup; \
}

◆ TASK_WAIT

#define TASK_WAIT (   queue)
Value:
{ \
TASK_DEBUG("TASK_WAIT"); \
task_wait(stack, queue); \
TASK_YIELD; \
}

◆ TASK_YIELD

#define TASK_YIELD
Value:
{ \
TASK_DEBUG("TASK_YIELD"); \
stack->sp->state = __LINE__; \
return 1; \
case __LINE__: \
TASK_DEBUG("RETURN FROM YIELD"); \
ep = _ep; \
assert(ep); \
TERM_CHECK; \
}

◆ TERM_CHECK

#define TERM_CHECK    if (ON_STACK_TOP && stack->terminate) goto task_cleanup

◆ TERMINATE

#define TERMINATE   goto task_cleanup

◆ TIMED_TASK_WAIT

#define TIMED_TASK_WAIT (   queue,
 
)
Value:
{ \
TASK_DEBUG("TIMED_TASK_WAIT"); \
task_delay_until(seconds() + (t)); \
task_wait(stack, queue); \
TASK_YIELD; \
}

◆ UNLOCK_FD

#define UNLOCK_FD (   fd,
  op 
)    unlock_fd(fd, stack, op)

◆ xstr

#define xstr (   s)    #s

Typedef Documentation

◆ channel

typedef struct channel channel

◆ connnection_read_method

typedef result(* connnection_read_method) (connection_descriptor const *rfd, void *buf, int n)

◆ connnection_write_method

typedef result(* connnection_write_method) (connection_descriptor const *rfd, void *buf, int n)

◆ task_env

typedef struct task_env task_env

◆ task_func

typedef int(* task_func) (task_arg arg)

◆ task_queue

typedef struct task_queue task_queue

◆ TaskAlign

typedef struct task_ptr TaskAlign

◆ terminate_enum

Enumeration Type Documentation

◆ terminate_enum

Enumerator
RUN 
KILL 
TERMINATED 

Function Documentation

◆ block_fd()

int block_fd ( int  fd)

◆ channel_init()

channel * channel_init ( channel c,
unsigned int  type 
)

◆ channel_put()

void channel_put ( channel c,
linkage data 
)

◆ channel_put_front()

void channel_put_front ( channel c,
linkage data 
)

◆ con_pipe_read()

result con_pipe_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_pipe_write()

result con_pipe_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ con_read()

result con_read ( connection_descriptor const *  rfd,
void *  buf,
int  n 
)

◆ con_write()

result con_write ( connection_descriptor const *  wfd,
void *  buf,
int  n 
)

◆ double_arg()

static task_arg double_arg ( double  i)
inlinestatic

◆ end_arg()

static task_arg end_arg ( )
inlinestatic

◆ get_double_arg()

static double get_double_arg ( task_arg  arg)
inlinestatic

◆ get_float_arg()

static float get_float_arg ( task_arg  arg)
inlinestatic

◆ get_int_arg()

static int get_int_arg ( task_arg  arg)
inlinestatic

◆ get_long_arg()

static long get_long_arg ( task_arg  arg)
inlinestatic

◆ get_string_arg()

static char const * get_string_arg ( task_arg  arg)
inlinestatic

◆ get_time_since_the_epoch()

unsigned long long int get_time_since_the_epoch ( )

Return time in microseconds.

Uses std::chrono::high_resolution_clock

Return values
Numberof microseconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC)

◆ get_uint_arg()

static unsigned int get_uint_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_arg()

static unsigned long get_ulong_arg ( task_arg  arg)
inlinestatic

◆ get_ulong_long_arg()

static unsigned long long get_ulong_long_arg ( task_arg  arg)
inlinestatic

◆ get_void_arg()

static void * get_void_arg ( task_arg  arg)
inlinestatic

◆ int_arg()

static task_arg int_arg ( int  i)
inlinestatic

◆ is_locked()

int is_locked ( int  fd)

◆ is_only_task()

int is_only_task ( )

◆ lock_fd()

int lock_fd ( int  fd,
task_env t,
int  lock 
)

◆ popp()

void popp ( task_env p)

◆ pushp()

void pushp ( task_env p,
void *  ptr 
)

◆ remove_and_wakeup()

void remove_and_wakeup ( int  i)

◆ reset_state()

void reset_state ( task_env p)

◆ seconds()

double seconds ( )

◆ set_double_arg()

static void set_double_arg ( task_arg arg,
double  value 
)
inlinestatic

◆ set_float_arg()

static void set_float_arg ( task_arg arg,
float  value 
)
inlinestatic

◆ set_int_arg()

static void set_int_arg ( task_arg arg,
int  value 
)
inlinestatic

◆ set_long_arg()

static void set_long_arg ( task_arg arg,
long  value 
)
inlinestatic

◆ set_nodelay()

result set_nodelay ( int  fd)

◆ set_string_arg()

static void set_string_arg ( task_arg arg,
char const *  value 
)
inlinestatic

◆ set_task()

void set_task ( task_env **  p,
task_env t 
)

◆ set_uint_arg()

static void set_uint_arg ( task_arg arg,
unsigned int  value 
)
inlinestatic

◆ set_ulong_arg()

static void set_ulong_arg ( task_arg arg,
unsigned long  value 
)
inlinestatic

◆ set_ulong_long_arg()

static void set_ulong_long_arg ( task_arg arg,
unsigned long long  value 
)
inlinestatic

◆ set_void_arg()

static void set_void_arg ( task_arg arg,
void *  value 
)
inlinestatic

◆ string_arg()

static task_arg string_arg ( char const *  v)
inlinestatic

◆ task_activate()

task_env * task_activate ( task_env t)

◆ task_allocate()

void * task_allocate ( task_env p,
unsigned int  bytes 
)

Allocate bytes from pool, initialized to zero.

◆ task_deactivate()

task_env * task_deactivate ( task_env t)

◆ task_delay_until()

void task_delay_until ( double  time)

◆ task_loop()

void task_loop ( )

◆ task_name()

const char * task_name ( )

◆ task_new()

task_env * task_new ( task_func  func,
task_arg  arg,
const char *  name,
int  debug 
)

◆ task_now()

double task_now ( )

◆ task_read()

int task_read ( connection_descriptor const *  con,
void *  buf,
int  n,
int64_t *  ret,
connnection_read_method  read_function = con_read 
)

◆ task_sys_init()

void task_sys_init ( )

◆ task_terminate()

task_env * task_terminate ( task_env t)

◆ task_terminate_all()

void task_terminate_all ( )

◆ task_wait()

void task_wait ( task_env t,
linkage queue 
)

◆ task_wakeup()

void task_wakeup ( linkage queue)

◆ task_write()

int task_write ( connection_descriptor const *  con,
void *  buf,
uint32_t  n,
int64_t *  ret 
)

◆ timed_wait_io()

task_env * timed_wait_io ( task_env t,
int  fd,
int  op,
double  timeout 
)

◆ uint_arg()

static task_arg uint_arg ( unsigned int  i)
inlinestatic

◆ ulong_arg()

static task_arg ulong_arg ( unsigned long  l)
inlinestatic

◆ ulong_long_arg()

static task_arg ulong_long_arg ( unsigned long long  ll)
inlinestatic

◆ unblock_fd()

int unblock_fd ( int  fd)

◆ unlock_fd()

int unlock_fd ( int  fd,
task_env t,
int  lock 
)

◆ void_arg()

static task_arg void_arg ( void *  v)
inlinestatic

◆ wait_io()

task_env * wait_io ( task_env t,
int  fd,
int  op 
)

Variable Documentation

◆ my_min_xcom_version

xcom_proto const my_min_xcom_version
extern

◆ my_xcom_version

xcom_proto const my_xcom_version
extern

◆ stack

task_env* stack
extern

◆ task_errno

int task_errno
extern