Commit fa8368ad authored by Geoff Simmons's avatar Geoff Simmons

extend DATA_Clear_Tx() to move free data back to free lists

parent 2723d5bc
......@@ -76,7 +76,11 @@ data_Cleanup(void)
}
void
DATA_Clear_Tx(tx_t *tx)
DATA_Clear_Tx(tx_t * const tx, txhead_t * const freetx,
linehead_t * const freerec, chunkhead_t * const freechunk,
unsigned * restrict const nfree_tx,
unsigned * restrict const nfree_rec,
unsigned * restrict const nfree_chunk)
{
logline_t *rec;
chunk_t *chunk;
......@@ -86,20 +90,29 @@ DATA_Clear_Tx(tx_t *tx)
tx->state = TX_EMPTY;
tx->vxid = -1;
tx->type = VSL_t_unknown;
tx->t = 0.;
VSTAILQ_FOREACH(rec, &tx->lines, linelist) {
CHECK_OBJ_NOTNULL(rec, LOGLINE_MAGIC);
while ((rec = VSTAILQ_FIRST(&tx->lines)) != NULL) {
CHECK_OBJ(rec, LOGLINE_MAGIC);
rec->state = DATA_EMPTY;
rec->tag = SLT__Bogus;
rec->len = 0;
VSTAILQ_FOREACH(chunk, &rec->chunks, chunklist) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
while ((chunk = VSTAILQ_FIRST(&rec->chunks)) != NULL) {
CHECK_OBJ(chunk, CHUNK_MAGIC);
chunk->state = DATA_EMPTY;
*chunk->data = '\0';
VSTAILQ_REMOVE_HEAD(&rec->chunks, chunklist);
VSTAILQ_INSERT_HEAD(freechunk, chunk, freelist);
*nfree_chunk += 1;
}
VSTAILQ_INIT(&rec->chunks);
assert(VSTAILQ_EMPTY(&rec->chunks));
VSTAILQ_REMOVE_HEAD(&tx->lines, linelist);
VSTAILQ_INSERT_HEAD(freerec, rec, freelist);
*nfree_rec += 1;
}
VSTAILQ_INIT(&tx->lines);
assert(VSTAILQ_EMPTY(&tx->lines));
VSTAILQ_INSERT_HEAD(freetx, tx, freelist);
*nfree_tx += 1;
}
int
......
......@@ -239,6 +239,80 @@ static const char
return NULL;
}
static const char
*test_data_clear_tx(void)
{
#define NRECS 10
#define CHUNKS_PER_REC 3
tx_t tx;
logline_t r[NRECS], *rec;
chunk_t c[NRECS * CHUNKS_PER_REC], *chunk;
int n = 0;
unsigned nfree_tx = 4711, nfree_recs = 815, nfree_chunks = 1147;
printf("... testing transaction clear\n");
VSTAILQ_INIT(&local_freetx);
VSTAILQ_INIT(&local_freeline);
VSTAILQ_INIT(&local_freechunk);
tx.magic = TX_MAGIC;
VSTAILQ_INIT(&tx.lines);
for (int i = 0; i < NRECS; i++) {
VSTAILQ_INSERT_TAIL(&tx.lines, &r[i], linelist);
r[i].magic = LOGLINE_MAGIC;
VSTAILQ_INIT(&r[i].chunks);
for (int j = 0; j < CHUNKS_PER_REC; j++) {
chunk = &c[i*CHUNKS_PER_REC + j];
VSTAILQ_INSERT_TAIL(&r[i].chunks, chunk, chunklist);
chunk->magic = CHUNK_MAGIC;
chunk->data = (char *) calloc(1, config.chunk_size);
}
}
DATA_Clear_Tx(&tx, &local_freetx, &local_freeline, &local_freechunk,
&nfree_tx, &nfree_recs, &nfree_chunks);
MASSERT(nfree_tx == 4712);
MASSERT(nfree_recs == 815 + NRECS);
MASSERT(nfree_chunks == 1147 + NRECS*CHUNKS_PER_REC);
MCHECK_OBJ(&tx, TX_MAGIC);
MASSERT(tx.state == TX_EMPTY);
MASSERT(tx.vxid == -1);
MASSERT(tx.type == VSL_t_unknown);
MAZ(tx.t);
MASSERT(VSTAILQ_EMPTY(&tx.lines));
MASSERT(!VSTAILQ_EMPTY(&local_freetx));
MASSERT(VSTAILQ_FIRST(&local_freetx) == &tx);
MAZ(VSTAILQ_NEXT(&tx, freelist));
MASSERT(!VSTAILQ_EMPTY(&local_freeline));
VSTAILQ_FOREACH(rec, &local_freeline, freelist) {
MCHECK_OBJ_NOTNULL(rec, LOGLINE_MAGIC);
MASSERT(rec->state == DATA_EMPTY);
MASSERT(rec->tag == SLT__Bogus);
MAZ(rec->len);
MASSERT(VSTAILQ_EMPTY(&tx.lines));
n++;
}
MASSERT(n == NRECS);
MASSERT(!VSTAILQ_EMPTY(&local_freechunk));
n = 0;
VSTAILQ_FOREACH(chunk, &local_freechunk, freelist) {
MCHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
MASSERT(chunk->state == DATA_EMPTY);
MAZ(chunk->data[0]);
n++;
free(chunk->data);
}
MASSERT(n == NRECS * CHUNKS_PER_REC);
return NULL;
}
static const char
*all_tests(void)
{
......@@ -250,6 +324,7 @@ static const char
mu_run_test(test_data_return_rec);
mu_run_test(test_data_return_chunk);
mu_run_test(test_data_prepend);
mu_run_test(test_data_clear_tx);
return NULL;
}
......
......@@ -69,8 +69,8 @@ static char
MAZ(WRT_Init());
VSTAILQ_INIT(&wrt_freelist);
MASSERT(VSTAILQ_EMPTY(&wrt_freelist));
VSTAILQ_INIT(&wrt_freetx);
MASSERT(VSTAILQ_EMPTY(&wrt_freetx));
/* XXX: common helper functions with test_format */
tx.magic = TX_MAGIC;
......
......@@ -249,7 +249,11 @@ int LOG_Open(const char *progname);
/* data.c */
int DATA_Init(void);
void DATA_Clear_Tx(tx_t *tx);
void DATA_Clear_Tx(tx_t * const tx, txhead_t * const freetx,
linehead_t * const freerec, chunkhead_t * const freechunk,
unsigned * restrict const nfree_tx,
unsigned * restrict const nfree_rec,
unsigned * restrict const nfree_chunk);
unsigned DATA_Take_Freetx(struct txhead_s *dst);
unsigned DATA_Take_Freeline(struct linehead_s *dst);
unsigned DATA_Take_Freechunk(struct chunkhead_s *dst);
......
......@@ -67,7 +67,10 @@ static const char* statename[WRT_STATE_E_LIMIT] = {
/* Single writer thread, consumer for the SPSC queue. */
static pthread_t writer;
static unsigned wrt_nfree;
linehead_t wrt_freerecs;
chunkhead_t wrt_freechunks;
static unsigned wrt_nfree_tx, wrt_nfree_recs, wrt_nfree_chunks;
static struct vsb *os;
......@@ -136,10 +139,10 @@ open_log(void)
static inline void
wrt_return_freelist(void)
{
DATA_Return_Freetx(&wrt_freelist, wrt_nfree);
LOG_Log(LOG_DEBUG, "Writer: returned %u to free list", wrt_nfree);
wrt_nfree = 0;
assert(VSTAILQ_EMPTY(&wrt_freelist));
DATA_Return_Freetx(&wrt_freetx, wrt_nfree_tx);
LOG_Log(LOG_DEBUG, "Writer: returned %u to free list", wrt_nfree_tx);
wrt_nfree_tx = 0;
assert(VSTAILQ_EMPTY(&wrt_freetx));
if (RDR_Waiting()) {
AZ(pthread_mutex_lock(&data_ready_lock));
AZ(pthread_cond_signal(&data_ready_cond));
......@@ -219,9 +222,8 @@ wrt_write(tx_t *tx)
MON_StatsUpdate(STATS_WRITTEN);
/* clean up */
DATA_Clear_Tx(tx);
VSTAILQ_INSERT_TAIL(&wrt_freelist, tx, freelist);
wrt_nfree++;
DATA_Clear_Tx(tx, &wrt_freetx, &wrt_freerecs, &wrt_freechunks,
&wrt_nfree_tx, &wrt_nfree_recs, &wrt_nfree_chunks);
if (global_nfree_tx < (config.max_data >> 1) || RDR_Waiting())
wrt_return_freelist();
......@@ -237,8 +239,8 @@ static void
CHECK_OBJ_NOTNULL(wrt, WRITER_DATA_MAGIC);
wrt->state = WRT_INITIALIZING;
VSTAILQ_INIT(&wrt_freelist);
wrt_nfree = 0;
VSTAILQ_INIT(&wrt_freetx);
wrt_nfree_tx = 0;
wrt->state = WRT_RUNNING;
......@@ -260,7 +262,7 @@ static void
errno, strerror(errno));
errors++;
}
if (wrt_nfree > 0)
if (wrt_nfree_tx > 0)
wrt_return_freelist();
wrt->state = WRT_WAITING;
......@@ -334,8 +336,8 @@ WRT_Stats(void)
LOG_Log(LOG_INFO,
"Writer (%s): seen=%lu writes=%lu bytes=%lu errors=%lu timeouts=%lu"
" waits=%lu free=%u",
statename[wrt_data.state], deqs, writes, bytes, errors, timeouts, waits,
wrt_nfree);
statename[wrt_data.state], deqs, writes, bytes, errors, timeouts,
waits, wrt_nfree_tx);
}
int
......
......@@ -32,7 +32,7 @@
#include "varnishevent.h"
/* local freelist - return space in chunks */
struct txhead_s wrt_freelist;
struct txhead_s wrt_freetx;
/*
* Set to the write timeout, and indicates the time elapsed after the
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment