WL#9855: MySQL GCS: Instrument memory usage in GCS/XCom

Affects: Server-8.0   —   Status: Complete

Executive Summary
=================

This worklog will instrument relevant memory management operations in GCS/XCom and
expose them automatically in P_S tables metrics, in order to provide users with
useful information for monitoring and debugging. As a result of this worklog, the
source code of GCS/XCom will be modified to use the Server's Performance Schema
Interface to register keys to be associated with the instrumented memory elements
and update the memory usage statistics in P_S tables.


User Stories
============

- As a MySQL DBA I want to analyse memory usage within the XCom Cache, so that I
am able to detect memory related problems that may affect the workflow of consensus
messages.
Functional requirements:
========================

FR1. GCS/XCom must be able to be built with or without instrumentation.
FR2. If instrumentation is enabled, GCS/XCom must instrument and show
instrumentation infomation on memory usage withing the XCom Cache.
FR3. If instrumentation is enabled, there must be an entry corresponding to the
XCom cache in the 'setup_instruments' named 'memory/group_rpl/GCS_XCom::xcom_cache'
FR4. If instrumentation is enabled, there must be an entry (named
'memory/group_rpl/GCS_XCom::xcom_cache') corresponding to the XCom cache in
the 'memory_summary_global_by_event_name' table, showing its memory usage
statistics.
FR5. If instrumentation is enabled, then the instrumentation of the XCom cache
must also be enabled even if the XCom thread is not instrumented.
FR6. If instrumentation is disabled globally, no instrumentation information
should be displayed.

Non-Functional requirements:
============================

NFR1. The code modifications resulting from this WL must not degrade the
performance of GCS and XCom.
NFR2. Each instrumented code element must be univocally identified so that
we can monitor and understand the behavior of each individual element as a
separate entity.
In order to enable memory management instrumentation, the following changes will
have to be made to XCom code:

- Key management: when instrumentation is enabled, instrumentation keys will have
to be declared and registered in order to report memory usage in P_S tables.

- Memory management: the memory management operations to instrument must be
reported on P_S tables. Because some of the relevant memory management operations
are implemented in the XDR library, it is not possible to use the mysys library
of MySQL to automatically update statistics. Instead, Performance Schema
Instrumentation (PSI) functions will be used directly to update the statistics
manually.

Changes to the build process of GCS
===================================

In this WL, the building process of GCS/XCom does not need any modification. As
mentioned in WL#9854, the existing XCOM_STANDALONE macro will be used on GCS/XCom
to distinguish if it is to be used separately from the Server, as required by FR1.
When this macro is disabled, GCS/XCom will execute the instrumentation code;
otherwise, the instrumentation code will be discarded. The implementation to use
when XCOM_STANDALONE is enabled is out of the scope of this WL.

XCom Cache
==========

This WL will focus on instrumenting memory operations that take place within the
XCom cache. The XCom cache is the component of XCom that holds the state of
recently executed consensus rounds. Internally, the cache maintains an LRU-ordered
linked list of Paxos Machines, an internal structure of XCom that represents the
state of an individual consensus round. The cache itself is statically allocated
with a fixed number of paxos machines; hence, paxos machines are also statically
allocated when the cache is created.

Each paxos machine stores three messages, each representing the three main phases
of the consensus round: propose, accept and learn. Unlike paxos machines, these
messages are dynamically allocated. While each paxos machine has additional data,
the messages are the most relevant items and those that consume the most memory
in the cache. Hence, for this WL, the XCom cache memory operations that will be
instrumented are the addition/removal of messages to/from each of the several
paxos machine that are contained in the cache.

While each of the three messages is physically allocated at a different stage,
the XCom cache only considers them as cached when the 'learn' phase of the
consensus round terminates, which occurs at a moment when all messages have
already been allocated. Hence, int the context of this worklog, we also consider
that the messages are all allocated in the cache at this moment and, thus, all at
once. For this reason, we consider this as a single alloc operation, rather than
three separate operations.

Deallocation, on the other hand, is physically applied to all three messages at
once when a paxos machine is uncached. To maintain memory usage statistics
consistent we also consider the uncache operation as a single deallocation instead
of three separate deallocations. The uncaching operation occurs at three occasions:
1) when all the (statically allocated) paxos machines have been cached, which
will require the cache to uncache a paxos machine before using it;
2) when the cache exceeds its maximum size, at which point XCom loops through the
LRU and uncaches paxos machines until the size is below the limit; 
3) when XCom terminates (i.e., when the node leaves the group), at which point
the XCom cache will reset all paxos machines.

Currently, the cache is composed of 50000 paxos machines and the maximum size of
the cache is 1000000000 bytes. This means that 1) the cache will be able to cache
50000 entries without having to uncache anything (provided that the space limit
has not been reached) and 2) the cache will be able to cache 1000000000 bytes
without having to uncache anything (provided that there are non-cached paxos
machines available). As a consequence, the XCom cache will simply continue to
allocate data as long as it has free space and free slots (i.e., non-cached paxos
machines) available. When either the space limit or the slots limit is reached,
it will have to remove some of the old entries to make space for new ones.

Instrumentation Data
====================

Each memory instrument in MySQL is identified by a name, which is of the form
memory/[component]/[instrument_name]. For this worklog, the component part of the
name will be 'group_rpl' and the instrument_name part will be
'GCS_XCom::xcom_cache', resulting in the following full name:
'memory/group_rpl/GCS_XCom::xcom_cache'

Memory instrumentation in MySQL allows data to be aggregated globally or
per-thread. Since the XCom cache is executed by a single thread (the XCom thread)
this worklog will collect statistics globally only. As a result, memory
instrumentation statistics for the XCom cache will be shown only in the
'memory_summary_global_by_event_name' table.

setup_instruments
-----------------
An entry corresponding to the XCom cache will be added to the 'setup_instruments'
table of performance schema. The entry will contains the following information:

  NAME - 'memory/group_rpl/GCS_XCom::xcom_cache'.
  ENABLED -  (YES by default, but can be set to NO by the user).
  TIMED - NULL (set by P_S).
  PROPERTIES - 'global_statistic'.
  VOLATILITY - 0 (set when registering the instrument).
  DOCUMENTATION - "Memory usage statistics for the XCom cache."

All values are constants except ENABLED. The value of the PROPERTIES column
indicates that statistics are aggregated globally only (i.e., not per-thread).
DOCUMENTATION is the description string for the XCom cache entry. TIMED and
VOLATILITY are not relevant for this WL.

memory_summary_global_by_event_name
-----------------------------------
An entry with EVENT_NAME 'memory/group_rpl/GCS_XCom::xcom_cache' will be added
to the 'memory_summary_global_by_event_name' table. This table will be the one in
which the memory usage statistics of the XCom cache will be shown. In addition
to EVENT_NAME, the table contains the following summary columns:

- COUNT_ALLOC, COUNT_FREE: Aggregated number of allocations and deallocation.
  In this worklog, these counts will correspond, respectively, to the aggregated
  number of caching and uncaching operations executed in the cache.
- SUM_NUMBER_OF_BYTES_ALLOC, SUM_NUMBER_OF_BYTES_FREE: Aggregated sizes of
  allocated and freed memory blocks. In this worklog, these columns will
  correspond to the aggregated sizes of cached and uncached paxos machines,
  respectively.
- CURRENT_COUNT_USED: Aggregated number of currently allocated blocks that
  have not been freed yet; its value is equal to COUNT_ALLOC − COUNT_FREE, which
  corresponds to the number of currently cached entries.
- CURRENT_NUMBER_OF_BYTES_USED: Aggregated size of currently allocated
  memory blocks that have not been freed yet; its value is equal to
  SUM_NUMBER_OF_BYTES_ALLOC − SUM_NUMBER_OF_BYTES_FREE, which corresponds to the
  aggregated size of all the currently cached paxos machines.  
- LOW_COUNT_USED, HIGH_COUNT_USED: Low and high water marks corresponding to
  the CURRENT_COUNT_USED column.
- LOW_NUMBER_OF_BYTES_USED, HIGH_NUMBER_OF_BYTES_USED: Low and high water
  marks corresponding to the CURRENT_NUMBER_OF_BYTES_USED column.

Instrumentation Configuration
=============================

Two conditions must be met for instrumentation to be enabled in GCS/XCom:
1) instrumentation must be enabled in MySQL and 2) the XCOM_STANDALONE
pre-processor directive must not be enabled. Memory instrumentation in MySQL can
be enabled or disabled at startup, or dynamically at runtime by setting the
ENABLED column of the instruments in the setup_instruments table to OFF. The
XCOM_STANDALONE pre-processor directive is always disabled in GCS and, hence,
does not affect this worklog.

Note that, following the memory instrumentation semantics of performance schema,
if instrumentation is disabled at runtime, the system will still instrument the
deallocation of cache entries whose allocation has been instrumented.
Because XCom uses XDR to create paxos messages (more specifically, to create the
'app_data' contained inside the paxos message), this WL will not use
the mysys library. Mysys has memory management functions that, in addition to
managing memory, automatically update memory usage in the appropriate P_F tables.
If one were to use mysys' function, it would be necessary to instrument functions
from the XDR library, which would force the implementation of the XDR library
itself inside XCom, which is highly undesirable.

Instead, to accomplish the goal of reporting memory usage to users in P_S tables,
we will make direct use of the functions of P_S that are used internally by mysys
to update statistics. This way, XCom will continue to use the regular
functions (such as malloc or calloc) for memory management and, then, will
manually update the statistics in P_S tables by using the appropriate functions,
namely those wrapped by the PSI_MEMORY_CALL macro.


Instrumentation Keys
====================

Memory management instrumentation keys must be declared with the PSI_memory_key
type in a new file (xcom_psi.c), as well as declared as extern in the
corresponding header (xcom_psi.h) to allow their usage throughout the code.

xcom_psi.c
----------

Since only the XCom cache will be instrumented, a single key will be declared:
- PSI_memory_key key_MEM_XCOM_xcom_cache;

As required by the PSI architecture, a PSI_memory_info static array containing
the information necessary to register the instrumentation key must also be
declared:

static PSI_memory_info all_xcom_memory_info[]=
{
  {
    &key_MEM_XCOM_xcom_cache,        /* Pointer to the instrumentation key. */
    "GCS_XCom::xcom_cache",          /* Name of the instrumented element (which
                                        will be shown in the P_S tables) */
    PSI_FLAG_ONLY_GLOBAL_STAT,       /* Flag stating that statistics will be
                                        aggregated globally (not per-thread) */
    PSI_VOLATILITY_UNKNOWN,          /* Volatility index to use. */
    "Memory usage statistics for "   /* The text to show in column DOCUMENTATION
    "the XCom cache."                   of the 'setup_instruments' table. */
  }
};

xcom_base.c
-----------

The keys used for memory management instrumentation must be
registered during the initialization of the component, by invoking the
mysql_memory_register function with the PSI_memory_info static
array as parameter, and identifying the category as 'group_repl', as it is
the Server's plugin that uses GCS/XCom. This registration will be done inside
a new 'xcom_init_psi_keys' function, which will be executed during XCom's
initialization.

static void register_xcom_memory_psi_keys()
{
  const char *category = "group_rpl";
  int count= (int)array_elements(xcom_cache_memory_info);

  mysql_memory_register(category, xcom_cache_memory_info, count);
}


Instrumentation Functions
=========================

The actual reporting of memory usage will be executed on a set of new 'report_*'
functions. These functions will be called by the existing functions in XCom that
create/delete the data that we want to instrument.

The 'report_*' functions will use the PSI_MEMORY_CALL macro of the P_S API. These
macro has the following format:

PSI_MEMORY_CALL()(key, size, thread_owner)

The parameters of this macro are:
-  is the memory management operation we want to report on; for this
WL its value will be either 'memory_alloc' or 'memory_free';
-  is the instrumentation key, which in this WL will be
'key_MEM_XCOM_xcom_cache';
-  is the number of bytes that is being allocated/deallocated; it will be
specified on a per call basis;
-  is the PSI_THREAD that is instrumenting the memory; since XCom
is single threaded, this parameter will be a constant, which will be stored as
a static variable in xcom_psi.c.

xcom_psi.c
----------

/*
  Updates the P_S tables statistics on memory used by the XCom cache. Paxos
  messages are added to the XCom cache using 'replace_pax_msg'; for ex.:

    replace_pax_msg(&p->acceptor.msg, source); // 'p' is a paxos machine

  This function reports memory used by the new paxos message when the source of
  the replacement is a new paxos message, i.e., a pax_msg created using
  'pax_msg_new'.

  @param[in] prev The message (either propose, accept or learn), currently held
                  by a paxos machine in the cache, to be replaced.

*/
void report_cache_mem_new(pax_msg *prev) {
  if (prev) {
    int app_data_size = app_data_list_size(prev->a);
    // If 'prev' exists, then it is either bigger (if it has app_data) or the
    // same size of any new pax_msg.
    if (app_data_size) // If it is bigger, 'free' the size difference.
      PSI_MEMORY_CALL(memory_free)(key_MEM_XCOM_xcom_cache,
                                   app_data_size, &thd_owner);
  } else { // If prev does not exists we need to report on the new size.
    PSI_MEMORY_CALL(memory_alloc)(key_MEM_XCOM_xcom_cache, sizeof(pax_msg),
                                  &thd_owner);
  }
}


/*
  Updates the P_S tables statistics on memory used by the XCom cache. Paxos
  messages are added to the XCom cache with 'replace_pax_msg'; for ex.:

    replace_pax_msg(&p->acceptor.msg, source);

  This function reports memory used by the new paxos message when the source of
  the replacement is an existing pax_msg.

  @param[in] prev The message (either propose, accept or learn), currently held
                  by a paxos machine in the cache, to be replaced.
  @param[in] new  The message that is being added to the cache. This parameter can
                  be NULL, signaling that prev is being free'd.
*/
void report_cache_mem_replace(pax_msg *prev, pax_msg *new) {
  int new_data_size = new ? app_data_list_size(new->a) : 0;
  if (prev) {
    int prev_data_size = app_data_list_size(prev->a);
    if (prev_data_size > new_data_size) { // New cache size will be smaller
      PSI_MEMORY_CALL(memory_free)(key_MEM_XCOM_xcom_cache,
                                   prev_data_size - new_data_size,
                                   &thd_owner); 
    }
    if (prev_data_size < new_data_size) { // New cache size will be bigger
      PSI_MEMORY_CALL(memory_alloc)(key_MEM_XCOM_xcom_cache,
                                   new_data_size - prev_data_size,
                                   &thd_owner); 
    }
  } else {
    PSI_MEMORY_CALL(memory_alloc)(key_MEM_XCOM_xcom_cache,
                                  sizeof(pax_msg) + new_data_size,
                                  &thd_owner);
  }
}

/*
  Directly reports a 'free()' operation.

  @param[in] size The number of bytes free'd.
*/
void report_cache_mem_free(size_t size) {
  PSI_MEMORY_CALL(memory_free)(key_MEM_XCOM_xcom_cache,
                                size,
                                &thd_owner);
}


XCom Cache
==========

Paxos messages are added to a paxos machine at different stages, mirroring the
flow of the consensus round. The corresponding calls to the 'report_*' functions
that update memory usage statistics will be added to the appropriate functions,
which are described as follows.

xcom_base.c
-----------

static void propose_noop(synode_no find, pax_machine *p) {
  /* Prepare to send a noop */
  site_def const *site = find_site_def(find);
  assert(!too_far(find));
+#ifndef XCOM_STANDALONE
+  report_cache_mem_new(p->proposer.msg);
+#endif
  replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
  ...
}

-----

void request_values(synode_no find, synode_no end) {
  DBGOUT(FN; SYCEXP(find); SYCEXP(end););
  while (!synode_gt(find, end) && !too_far(find)) {
    ...
    if (!finished(p) && !is_busy_machine(p)) {
      /* Prepare to send a noop */
+#ifndef XCOM_STANDALONE
+      report_cache_mem_new(p->proposer.msg);
+#endif
      replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
      ...
    }
    find = incr_synode(find);
  }
}

-----

static void handle_ack_prepare(site_def const *site, pax_machine *p,
                               pax_msg *m) {
  ...
  if (m->from != VOID_NODE_NO &&
      eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
    handle_simple_ack_prepare(site, p, m);
    if (gt_ballot(m->proposal, p->proposer.msg->proposal)) { /* greater */
+#ifndef XCOM_STANDALONE
+      report_cache_mem_replace(p->proposer.msg, m);
+#endif
      replace_pax_msg(&p->proposer.msg, m);
      assert(p->proposer.msg);
    }
    if (gt_ballot(m->reply_to, p->proposer.sent_prop)) check_propose(site, p);
  }
}

-----

static void handle_simple_accept(site_def const *site, pax_machine *p,
                                 pax_msg *m, synode_no synode,
                                 linkage *reply_queue) {
  if (finished(p)) { /* We have learned a value */
    teach_ignorant_node(site, p, m, synode, reply_queue);
  } else if (!gt_ballot(p->acceptor.promise,
                        m->proposal) || /* Paxos acceptor phase 2 decision */
             noop_match(p, m)) {
    MAY_DBG(FN; SYCEXP(m->synode); STRLIT("accept "); BALCEXP(m->proposal));
+#ifndef XCOM_STANDALONE
+   report_cache_mem_replace(p->acceptor.msg, m);
+#endif
    replace_pax_msg(&p->acceptor.msg, m);
    ...
  }
}

-----

static void do_learn(site_def const *site MY_ATTRIBUTE((unused)),
                     pax_machine *p, pax_msg *m) {
  ...
+#ifndef XCOM_STANDALONE
+ report_cache_mem_replace(p->acceptor.msg, m);
+ report_cache_mem_replace(p->learner.msg, m);
+#endif
  replace_pax_msg(&p->acceptor.msg, m);
  replace_pax_msg(&p->learner.msg, m);
  ...
}

----------

Paxos machines are cleared inside the cache when they are initialized (to be
reused). The cache clears all messages at once and updates the cache size.

xcom_cache.c
------------

static pax_machine *init_pax_machine(pax_machine *p, lru_machine *lru,
                                     synode_no synode) {
+ size_t dealloc_size = pax_machine_size(p);
+#ifndef XCOM_STANDALONE
+ report_cache_mem_free(size);
+#endif
+ sub_cache_size(dealloc_size);
- sub_cache_size(pax_machine_size(p));
  link_init(&p->hash_link, type_hash("pax_machine"));
  p->lru = lr
  ...
}