Commit d7cba52b authored by Geoff Simmons's avatar Geoff Simmons

encapsulate implementation details of data buffering

parent 45f2e9aa
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
#include <errno.h> #include <errno.h>
#include "trackrdrd.h" #include "trackrdrd.h"
#include "data.h"
#include "vas.h" #include "vas.h"
#include "miniobj.h" #include "miniobj.h"
...@@ -51,56 +53,50 @@ ...@@ -51,56 +53,50 @@
VSTAILQ_INIT((head2)); \ VSTAILQ_INIT((head2)); \
} while (0) } while (0)
static pthread_mutex_t freelist_lock;
static char *buf;
static void static void
data_Cleanup(void) data_Cleanup(void)
{ {
free(dtbl.entry); free(entrytbl);
free(dtbl.buf); free(buf);
AZ(pthread_mutex_destroy(&dtbl.freelist_lock)); AZ(pthread_mutex_destroy(&freelist_lock));
} }
int int
DATA_Init(void) DATA_Init(void)
{ {
dataentry *entryptr;
char *bufptr;
unsigned bufsize = config.maxdata + config.maxkeylen; unsigned bufsize = config.maxdata + config.maxkeylen;
/* /*
* we want enough space to accomodate all open and done records * we want enough space to accomodate all open and done records
* *
*/ */
entryptr = (dataentry *) calloc(config.maxdone, sizeof(dataentry)); entrytbl = (dataentry *) calloc(config.maxdone, sizeof(dataentry));
if (entryptr == NULL) if (entrytbl == NULL)
return(errno); return(errno);
bufptr = (char *) calloc(config.maxdone, bufsize); buf = (char *) calloc(config.maxdone, bufsize);
if (bufptr == NULL) { if (buf == NULL) {
free(entryptr); free(entrytbl);
return(errno); return(errno);
} }
memset(&dtbl, 0, sizeof(datatable)); VSTAILQ_INIT(&freehead);
dtbl.magic = DATATABLE_MAGIC; AZ(pthread_mutex_init(&freelist_lock, NULL));
dtbl.len = config.maxdone;
VSTAILQ_INIT(&dtbl.freehead);
AZ(pthread_mutex_init(&dtbl.freelist_lock, NULL));
dtbl.entry = entryptr; global_nfree = 0;
dtbl.buf = bufptr;
dtbl.nfree = 0;
for (unsigned i = 0; i < config.maxdone; i++) { for (unsigned i = 0; i < config.maxdone; i++) {
dtbl.entry[i].magic = DATA_MAGIC; entrytbl[i].magic = DATA_MAGIC;
dtbl.entry[i].data = &dtbl.buf[i * bufsize]; entrytbl[i].data = &buf[i * bufsize];
dtbl.entry[i].key = &dtbl.buf[(i * bufsize) + config.maxdata]; entrytbl[i].key = &buf[(i * bufsize) + config.maxdata];
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist); VSTAILQ_INSERT_TAIL(&freehead, &entrytbl[i], freelist);
dtbl.nfree++; global_nfree++;
} }
assert(dtbl.nfree == config.maxdone); assert(global_nfree == config.maxdone);
assert(VSTAILQ_FIRST(&dtbl.freehead)); assert(VSTAILQ_FIRST(&freehead));
atexit(data_Cleanup); atexit(data_Cleanup);
return(0); return(0);
...@@ -131,33 +127,33 @@ DATA_Take_Freelist(struct freehead_s *dst) ...@@ -131,33 +127,33 @@ DATA_Take_Freelist(struct freehead_s *dst)
{ {
unsigned nfree; unsigned nfree;
AZ(pthread_mutex_lock(&dtbl.freelist_lock)); AZ(pthread_mutex_lock(&freelist_lock));
nfree = dtbl.nfree; nfree = global_nfree;
dtbl.nfree = 0; global_nfree = 0;
VSTAILQ_PREPEND(dst, &dtbl.freehead); VSTAILQ_PREPEND(dst, &freehead);
AZ(pthread_mutex_unlock(&dtbl.freelist_lock)); AZ(pthread_mutex_unlock(&freelist_lock));
return nfree; return nfree;
} }
/* /*
* return to dtbl.freehead * return to freehead
* *
* returned must be locked by caller, if required * returned must be locked by caller, if required
*/ */
void void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned) DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{ {
AZ(pthread_mutex_lock(&dtbl.freelist_lock)); AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_PREPEND(&dtbl.freehead, returned); VSTAILQ_PREPEND(&freehead, returned);
dtbl.nfree += nreturned; global_nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock)); AZ(pthread_mutex_unlock(&freelist_lock));
} }
void void
DATA_Dump(void) DATA_Dump(void)
{ {
for (int i = 0; i < dtbl.len; i++) { for (int i = 0; i < config.maxdone; i++) {
dataentry *entry = &dtbl.entry[i]; dataentry *entry = &entrytbl[i];
if (!OCCUPIED(entry)) if (!OCCUPIED(entry))
continue; continue;
......
/*-
* Copyright (c) 2015 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2015 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Head of the global free list */
struct freehead_s freehead;
/* Table of data entries */
dataentry *entrytbl;
...@@ -49,7 +49,7 @@ static unsigned long reconnects = 0; /* Reconnects to MQ */ ...@@ -49,7 +49,7 @@ static unsigned long reconnects = 0; /* Reconnects to MQ */
static unsigned long restarts = 0; /* Worker thread restarts */ static unsigned long restarts = 0; /* Worker thread restarts */
static unsigned occ_hi = 0; /* Occupancy high water mark */ static unsigned occ_hi = 0; /* Occupancy high water mark */
static unsigned occ_hi_this = 0;/* Occupancy high water mark static unsigned occ_hi_this = 0;/* Occupancy high water mark
this reporting interval*/ this reporting interval */
static void static void
log_output(void) log_output(void)
...@@ -63,7 +63,7 @@ log_output(void) ...@@ -63,7 +63,7 @@ log_output(void)
LOG_Log(LOG_INFO, "Data table: len=%u occ=%u occ_hi=%u occ_hi_this=%u " LOG_Log(LOG_INFO, "Data table: len=%u occ=%u occ_hi=%u occ_hi_this=%u "
"global_free=%u", "global_free=%u",
dtbl.len, occ, occ_hi, occ_hi_this, dtbl.nfree); config.maxdone, occ, occ_hi, occ_hi_this, global_nfree);
/* Eliminate the dependency of trackrdrd.o for unit tests */ /* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER #ifndef TEST_DRIVER
......
...@@ -19,6 +19,7 @@ test_parse_LDADD = \ ...@@ -19,6 +19,7 @@ test_parse_LDADD = \
test_data_SOURCES = \ test_data_SOURCES = \
minunit.h \ minunit.h \
test_data.c \ test_data.c \
../data.h \
../trackrdrd.h ../trackrdrd.h
test_data_LDADD = \ test_data_LDADD = \
...@@ -83,6 +84,7 @@ test_config_LDADD = \ ...@@ -83,6 +84,7 @@ test_config_LDADD = \
test_worker_SOURCES = \ test_worker_SOURCES = \
minunit.h \ minunit.h \
test_worker.c \ test_worker.c \
../data.h \
../trackrdrd.h ../trackrdrd.h
test_worker_LDADD = \ test_worker_LDADD = \
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "minunit.h" #include "minunit.h"
#include "../trackrdrd.h" #include "../trackrdrd.h"
#include "../data.h"
int tests_run = 0; int tests_run = 0;
...@@ -55,19 +56,18 @@ static char ...@@ -55,19 +56,18 @@ static char
config.maxkeylen = DEF_MAXKEYLEN; config.maxkeylen = DEF_MAXKEYLEN;
err = DATA_Init(); err = DATA_Init();
VMASSERT(err == 0, "DATA_Init: %s", strerror(err)); VMASSERT(err == 0, "DATA_Init: %s", strerror(err));
MASSERT(dtbl.len == DEF_MAXDONE);
for (int i = 0; i < config.maxdone; i++) {
for (int i = 0; i < dtbl.len; i++) { MCHECK_OBJ_NOTNULL(&entrytbl[i], DATA_MAGIC);
MCHECK_OBJ_NOTNULL(&dtbl.entry[i], DATA_MAGIC); MASSERT(!OCCUPIED(&entrytbl[i]));
MASSERT(!OCCUPIED(&dtbl.entry[i])); MAZ(entrytbl[i].hasdata);
MAZ(dtbl.entry[i].hasdata); MAN(entrytbl[i].data);
MAN(dtbl.entry[i].data); MAN(entrytbl[i].key);
MAN(dtbl.entry[i].key); MAZ(entrytbl[i].xid);
MAZ(dtbl.entry[i].xid); MAZ(entrytbl[i].end);
MAZ(dtbl.entry[i].end); MAZ(entrytbl[i].keylen);
MAZ(dtbl.entry[i].keylen); MAZ(entrytbl[i].reqend_t.tv_sec);
MAZ(dtbl.entry[i].reqend_t.tv_sec); MAZ(entrytbl[i].reqend_t.tv_usec);
MAZ(dtbl.entry[i].reqend_t.tv_usec);
} }
return NULL; return NULL;
...@@ -80,17 +80,17 @@ static const char ...@@ -80,17 +80,17 @@ static const char
printf("... testing data write and read\n"); printf("... testing data write and read\n");
for (int i = 0; i < dtbl.len; i++) { for (int i = 0; i < config.maxdone; i++) {
memset(dtbl.entry[i].data, 'd', DEF_MAXDATA); memset(entrytbl[i].data, 'd', DEF_MAXDATA);
memset(dtbl.entry[i].key, 'k', DEF_MAXKEYLEN); memset(entrytbl[i].key, 'k', DEF_MAXKEYLEN);
} }
memset(data, 'd', DEF_MAXDATA); memset(data, 'd', DEF_MAXDATA);
memset(key, 'k', DEF_MAXKEYLEN); memset(key, 'k', DEF_MAXKEYLEN);
for (int i = 0; i < dtbl.len; i++) { for (int i = 0; i < config.maxdone; i++) {
MASSERT(memcmp(dtbl.entry[i].data, data, DEF_MAXDATA) == 0); MASSERT(memcmp(entrytbl[i].data, data, DEF_MAXDATA) == 0);
MASSERT(memcmp(dtbl.entry[i].key, key, DEF_MAXKEYLEN) == 0); MASSERT(memcmp(entrytbl[i].key, key, DEF_MAXKEYLEN) == 0);
} }
return NULL; return NULL;
...@@ -108,10 +108,10 @@ static const char ...@@ -108,10 +108,10 @@ static const char
MASSERT0(!VSTAILQ_EMPTY(&local_freehead), MASSERT0(!VSTAILQ_EMPTY(&local_freehead),
"Local freelist empty after take"); "Local freelist empty after take");
VMASSERT(dtbl.nfree == 0, "Global free count non-zero after take (%u)", VMASSERT(global_nfree == 0, "Global free count non-zero after take (%u)",
dtbl.nfree); global_nfree);
MASSERT0(VSTAILQ_EMPTY(&dtbl.freehead), MASSERT0(VSTAILQ_EMPTY(&freehead),
"Global free list non-empty after take"); "Global free list non-empty after take");
return NULL; return NULL;
...@@ -127,9 +127,9 @@ static const char ...@@ -127,9 +127,9 @@ static const char
MASSERT0(VSTAILQ_EMPTY(&local_freehead), MASSERT0(VSTAILQ_EMPTY(&local_freehead),
"Local freelist non-empty after return"); "Local freelist non-empty after return");
MASSERT(dtbl.nfree == DEF_MAXDONE); MASSERT(global_nfree == DEF_MAXDONE);
MASSERT0(!VSTAILQ_EMPTY(&dtbl.freehead), MASSERT0(!VSTAILQ_EMPTY(&freehead),
"Global free list empty after take"); "Global free list empty after take");
return NULL; return NULL;
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "minunit.h" #include "minunit.h"
#include "../trackrdrd.h" #include "../trackrdrd.h"
#include "../data.h"
#include "../vtim.h" #include "../vtim.h"
#define DEBUG 0 #define DEBUG 0
...@@ -142,7 +143,7 @@ static const char ...@@ -142,7 +143,7 @@ static const char
"%d of %d worker threads running", wrk_running, NWORKERS); "%d of %d worker threads running", wrk_running, NWORKERS);
for (int i = 0; i < config.maxdone; i++) { for (int i = 0; i < config.maxdone; i++) {
entry = &dtbl.entry[i]; entry = &entrytbl[i];
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC); MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
entry->xid = xid; entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1); sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
...@@ -162,7 +163,7 @@ static const char ...@@ -162,7 +163,7 @@ static const char
* empty states after worker threads are shut down. * empty states after worker threads are shut down.
*/ */
for (int i = 0; i < config.maxdone; i++) { for (int i = 0; i < config.maxdone; i++) {
entry = &dtbl.entry[i]; entry = &entrytbl[i];
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC); MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
MASSERT(!OCCUPIED(entry)); MASSERT(!OCCUPIED(entry));
MAZ(entry->end); MAZ(entry->end);
......
...@@ -90,6 +90,9 @@ void PRIV_Sandbox(void); ...@@ -90,6 +90,9 @@ void PRIV_Sandbox(void);
/* worker.c */ /* worker.c */
/* stats */
unsigned abandoned;
/** /**
* Initializes resources for worker threads -- allocates memory, * Initializes resources for worker threads -- allocates memory,
* initializes mutexes and condition variables. * initializes mutexes and condition variables.
...@@ -109,6 +112,8 @@ void WRK_Shutdown(void); ...@@ -109,6 +112,8 @@ void WRK_Shutdown(void);
#define OCCUPIED(e) ((e)->occupied == 1) #define OCCUPIED(e) ((e)->occupied == 1)
unsigned global_nfree;
/* XXX: do we need xid, hasdata, reqend_t? all temp in dispatch? */ /* XXX: do we need xid, hasdata, reqend_t? all temp in dispatch? */
struct dataentry_s { struct dataentry_s {
unsigned magic; unsigned magic;
...@@ -130,27 +135,6 @@ typedef struct dataentry_s dataentry; ...@@ -130,27 +135,6 @@ typedef struct dataentry_s dataentry;
VSTAILQ_HEAD(freehead_s, dataentry_s); VSTAILQ_HEAD(freehead_s, dataentry_s);
/* stats */
unsigned abandoned;
struct datatable_s {
unsigned magic;
#define DATATABLE_MAGIC 0xd3ef3bd4
unsigned len;
/* protected by freelist_lock */
struct freehead_s freehead;
pthread_mutex_t freelist_lock;
unsigned nfree;
dataentry *entry;
char *buf;
};
typedef struct datatable_s datatable;
datatable dtbl;
int DATA_Init(void); int DATA_Init(void);
void DATA_Reset(dataentry *entry); void DATA_Reset(dataentry *entry);
unsigned DATA_Take_Freelist(struct freehead_s *dst); unsigned DATA_Take_Freelist(struct freehead_s *dst);
......
...@@ -188,7 +188,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -188,7 +188,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist); VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist);
wrk->wrk_nfree++; wrk->wrk_nfree++;
if (dtbl.nfree == 0) { if (global_nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree); DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0; wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist)); assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment