Commit 799db78c authored by Geoff Simmons's avatar Geoff Simmons

encapsulate implementation details of data buffering

parent 799749ce
......@@ -36,6 +36,8 @@
#include <errno.h>
#include "trackrdrd.h"
#include "data.h"
#include "vas.h"
#include "miniobj.h"
......@@ -51,56 +53,50 @@
VSTAILQ_INIT((head2)); \
} while (0)
static pthread_mutex_t freelist_lock;
static char *buf;
static void
data_Cleanup(void)
{
free(dtbl.entry);
free(dtbl.buf);
AZ(pthread_mutex_destroy(&dtbl.freelist_lock));
free(entrytbl);
free(buf);
AZ(pthread_mutex_destroy(&freelist_lock));
}
int
DATA_Init(void)
{
dataentry *entryptr;
char *bufptr;
unsigned bufsize = config.maxdata + config.maxkeylen;
/*
* we want enough space to accomodate all open and done records
*
*/
entryptr = (dataentry *) calloc(config.maxdone, sizeof(dataentry));
if (entryptr == NULL)
entrytbl = (dataentry *) calloc(config.maxdone, sizeof(dataentry));
if (entrytbl == NULL)
return(errno);
bufptr = (char *) calloc(config.maxdone, bufsize);
if (bufptr == NULL) {
free(entryptr);
buf = (char *) calloc(config.maxdone, bufsize);
if (buf == NULL) {
free(entrytbl);
return(errno);
}
memset(&dtbl, 0, sizeof(datatable));
dtbl.magic = DATATABLE_MAGIC;
dtbl.len = config.maxdone;
VSTAILQ_INIT(&dtbl.freehead);
AZ(pthread_mutex_init(&dtbl.freelist_lock, NULL));
VSTAILQ_INIT(&freehead);
AZ(pthread_mutex_init(&freelist_lock, NULL));
dtbl.entry = entryptr;
dtbl.buf = bufptr;
dtbl.nfree = 0;
global_nfree = 0;
for (unsigned i = 0; i < config.maxdone; i++) {
dtbl.entry[i].magic = DATA_MAGIC;
dtbl.entry[i].data = &dtbl.buf[i * bufsize];
dtbl.entry[i].key = &dtbl.buf[(i * bufsize) + config.maxdata];
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist);
dtbl.nfree++;
entrytbl[i].magic = DATA_MAGIC;
entrytbl[i].data = &buf[i * bufsize];
entrytbl[i].key = &buf[(i * bufsize) + config.maxdata];
VSTAILQ_INSERT_TAIL(&freehead, &entrytbl[i], freelist);
global_nfree++;
}
assert(dtbl.nfree == config.maxdone);
assert(VSTAILQ_FIRST(&dtbl.freehead));
assert(global_nfree == config.maxdone);
assert(VSTAILQ_FIRST(&freehead));
atexit(data_Cleanup);
return(0);
......@@ -131,33 +127,33 @@ DATA_Take_Freelist(struct freehead_s *dst)
{
unsigned nfree;
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
nfree = dtbl.nfree;
dtbl.nfree = 0;
VSTAILQ_PREPEND(dst, &dtbl.freehead);
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
AZ(pthread_mutex_lock(&freelist_lock));
nfree = global_nfree;
global_nfree = 0;
VSTAILQ_PREPEND(dst, &freehead);
AZ(pthread_mutex_unlock(&freelist_lock));
return nfree;
}
/*
* return to dtbl.freehead
* return to freehead
*
* returned must be locked by caller, if required
*/
void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_PREPEND(&dtbl.freehead, returned);
dtbl.nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_PREPEND(&freehead, returned);
global_nfree += nreturned;
AZ(pthread_mutex_unlock(&freelist_lock));
}
void
DATA_Dump(void)
{
for (int i = 0; i < dtbl.len; i++) {
dataentry *entry = &dtbl.entry[i];
for (int i = 0; i < config.maxdone; i++) {
dataentry *entry = &entrytbl[i];
if (!OCCUPIED(entry))
continue;
......
/*-
* Copyright (c) 2015 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2015 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Authors: Geoffrey Simmons <geoffrey.simmons@uplex.de>
* Nils Goroll <nils.goroll@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
/* Head of the global free list */
struct freehead_s freehead;
/* Table of data entries */
dataentry *entrytbl;
......@@ -49,7 +49,7 @@ static unsigned long reconnects = 0; /* Reconnects to MQ */
static unsigned long restarts = 0; /* Worker thread restarts */
static unsigned occ_hi = 0; /* Occupancy high water mark */
static unsigned occ_hi_this = 0;/* Occupancy high water mark
this reporting interval*/
this reporting interval */
static void
log_output(void)
......@@ -63,7 +63,7 @@ log_output(void)
LOG_Log(LOG_INFO, "Data table: len=%u occ=%u occ_hi=%u occ_hi_this=%u "
"global_free=%u",
dtbl.len, occ, occ_hi, occ_hi_this, dtbl.nfree);
config.maxdone, occ, occ_hi, occ_hi_this, global_nfree);
/* Eliminate the dependency of trackrdrd.o for unit tests */
#ifndef TEST_DRIVER
......
......@@ -19,6 +19,7 @@ test_parse_LDADD = \
test_data_SOURCES = \
minunit.h \
test_data.c \
../data.h \
../trackrdrd.h
test_data_LDADD = \
......@@ -83,6 +84,7 @@ test_config_LDADD = \
test_worker_SOURCES = \
minunit.h \
test_worker.c \
../data.h \
../trackrdrd.h
test_worker_LDADD = \
......
......@@ -34,6 +34,7 @@
#include "minunit.h"
#include "../trackrdrd.h"
#include "../data.h"
int tests_run = 0;
......@@ -55,19 +56,18 @@ static char
config.maxkeylen = DEF_MAXKEYLEN;
err = DATA_Init();
VMASSERT(err == 0, "DATA_Init: %s", strerror(err));
MASSERT(dtbl.len == DEF_MAXDONE);
for (int i = 0; i < dtbl.len; i++) {
MCHECK_OBJ_NOTNULL(&dtbl.entry[i], DATA_MAGIC);
MASSERT(!OCCUPIED(&dtbl.entry[i]));
MAZ(dtbl.entry[i].hasdata);
MAN(dtbl.entry[i].data);
MAN(dtbl.entry[i].key);
MAZ(dtbl.entry[i].xid);
MAZ(dtbl.entry[i].end);
MAZ(dtbl.entry[i].keylen);
MAZ(dtbl.entry[i].reqend_t.tv_sec);
MAZ(dtbl.entry[i].reqend_t.tv_usec);
for (int i = 0; i < config.maxdone; i++) {
MCHECK_OBJ_NOTNULL(&entrytbl[i], DATA_MAGIC);
MASSERT(!OCCUPIED(&entrytbl[i]));
MAZ(entrytbl[i].hasdata);
MAN(entrytbl[i].data);
MAN(entrytbl[i].key);
MAZ(entrytbl[i].xid);
MAZ(entrytbl[i].end);
MAZ(entrytbl[i].keylen);
MAZ(entrytbl[i].reqend_t.tv_sec);
MAZ(entrytbl[i].reqend_t.tv_usec);
}
return NULL;
......@@ -80,17 +80,17 @@ static const char
printf("... testing data write and read\n");
for (int i = 0; i < dtbl.len; i++) {
memset(dtbl.entry[i].data, 'd', DEF_MAXDATA);
memset(dtbl.entry[i].key, 'k', DEF_MAXKEYLEN);
for (int i = 0; i < config.maxdone; i++) {
memset(entrytbl[i].data, 'd', DEF_MAXDATA);
memset(entrytbl[i].key, 'k', DEF_MAXKEYLEN);
}
memset(data, 'd', DEF_MAXDATA);
memset(key, 'k', DEF_MAXKEYLEN);
for (int i = 0; i < dtbl.len; i++) {
MASSERT(memcmp(dtbl.entry[i].data, data, DEF_MAXDATA) == 0);
MASSERT(memcmp(dtbl.entry[i].key, key, DEF_MAXKEYLEN) == 0);
for (int i = 0; i < config.maxdone; i++) {
MASSERT(memcmp(entrytbl[i].data, data, DEF_MAXDATA) == 0);
MASSERT(memcmp(entrytbl[i].key, key, DEF_MAXKEYLEN) == 0);
}
return NULL;
......@@ -108,10 +108,10 @@ static const char
MASSERT0(!VSTAILQ_EMPTY(&local_freehead),
"Local freelist empty after take");
VMASSERT(dtbl.nfree == 0, "Global free count non-zero after take (%u)",
dtbl.nfree);
VMASSERT(global_nfree == 0, "Global free count non-zero after take (%u)",
global_nfree);
MASSERT0(VSTAILQ_EMPTY(&dtbl.freehead),
MASSERT0(VSTAILQ_EMPTY(&freehead),
"Global free list non-empty after take");
return NULL;
......@@ -127,9 +127,9 @@ static const char
MASSERT0(VSTAILQ_EMPTY(&local_freehead),
"Local freelist non-empty after return");
MASSERT(dtbl.nfree == DEF_MAXDONE);
MASSERT(global_nfree == DEF_MAXDONE);
MASSERT0(!VSTAILQ_EMPTY(&dtbl.freehead),
MASSERT0(!VSTAILQ_EMPTY(&freehead),
"Global free list empty after take");
return NULL;
......
......@@ -36,6 +36,7 @@
#include "minunit.h"
#include "../trackrdrd.h"
#include "../data.h"
#include "../vtim.h"
#define DEBUG 0
......@@ -142,7 +143,7 @@ static const char
"%d of %d worker threads running", wrk_running, NWORKERS);
for (int i = 0; i < config.maxdone; i++) {
entry = &dtbl.entry[i];
entry = &entrytbl[i];
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
entry->xid = xid;
sprintf(entry->data, "XID=%d&foo=bar&baz=quux&record=%d", xid, i+1);
......@@ -162,7 +163,7 @@ static const char
* empty states after worker threads are shut down.
*/
for (int i = 0; i < config.maxdone; i++) {
entry = &dtbl.entry[i];
entry = &entrytbl[i];
MCHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
MASSERT(!OCCUPIED(entry));
MAZ(entry->end);
......
......@@ -90,6 +90,9 @@ void PRIV_Sandbox(void);
/* worker.c */
/* stats */
unsigned abandoned;
/**
* Initializes resources for worker threads -- allocates memory,
* initializes mutexes and condition variables.
......@@ -109,6 +112,8 @@ void WRK_Shutdown(void);
#define OCCUPIED(e) ((e)->occupied == 1)
unsigned global_nfree;
/* XXX: do we need xid, hasdata, reqend_t? all temp in dispatch? */
struct dataentry_s {
unsigned magic;
......@@ -130,27 +135,6 @@ typedef struct dataentry_s dataentry;
VSTAILQ_HEAD(freehead_s, dataentry_s);
/* stats */
unsigned abandoned;
struct datatable_s {
unsigned magic;
#define DATATABLE_MAGIC 0xd3ef3bd4
unsigned len;
/* protected by freelist_lock */
struct freehead_s freehead;
pthread_mutex_t freelist_lock;
unsigned nfree;
dataentry *entry;
char *buf;
};
typedef struct datatable_s datatable;
datatable dtbl;
int DATA_Init(void);
void DATA_Reset(dataentry *entry);
unsigned DATA_Take_Freelist(struct freehead_s *dst);
......
......@@ -188,7 +188,7 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
VSTAILQ_INSERT_TAIL(&wrk->wrk_freelist, entry, freelist);
wrk->wrk_nfree++;
if (dtbl.nfree == 0) {
if (global_nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment