Commit 56e3c543 authored by Geoff Simmons's avatar Geoff Simmons

implement freelist take for tx, records and chunks, tested for txn

parent 1e5d8632
......@@ -46,10 +46,12 @@
static const char *statename[3] = { "EMPTY", "OPEN", "DONE" };
#endif
static pthread_mutex_t freelist_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t freetx_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t freeline_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t freechunk_lock = PTHREAD_MUTEX_INITIALIZER;
static char *bufptr;
static txhead_t freetxhead;
static linehead_t freerechead;
static linehead_t freelinehead;
static chunkhead_t freechunkhead;
static int lines_per_tx = FAKE_DEFAULT_LINES_PER_TX;
......@@ -73,7 +75,9 @@ data_Cleanup(void)
}
free(txn);
free(bufptr);
AZ(pthread_mutex_destroy(&freelist_lock));
AZ(pthread_mutex_destroy(&freetx_lock));
AZ(pthread_mutex_destroy(&freeline_lock));
AZ(pthread_mutex_destroy(&freechunk_lock));
}
void
......@@ -131,14 +135,14 @@ DATA_Init(void)
free(chunks);
return errno;
}
VSTAILQ_INIT(&freerechead);
VSTAILQ_INIT(&freelinehead);
for (int i = 0; i < nrecords; i++) {
lines[i].magic = LOGLINE_MAGIC;
lines[i].state = DATA_EMPTY;
lines[i].tag = SLT__Bogus;
lines[i].len = 0;
VSTAILQ_INIT(&lines[i].chunks);
VSTAILQ_INSERT_TAIL(&freechunkhead, &chunks[i], freelist);
VSTAILQ_INSERT_TAIL(&freelinehead, &lines[i], freelist);
}
LOG_Log(LOG_DEBUG, "Allocating table for %d transactions (%d bytes)",
......@@ -161,7 +165,7 @@ DATA_Init(void)
}
data_open = data_done = data_occ_hi = 0;
global_nfree = config.max_data;
global_nfree_tx = config.max_data;
atexit(data_Cleanup);
......@@ -171,19 +175,24 @@ DATA_Init(void)
/*
* take all free entries from the datatable for lockless allocation
*/
unsigned
DATA_Take_Freelist(struct txhead_s *dst)
{
unsigned nfree;
AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_CONCAT(dst, &freetxhead);
nfree = global_nfree;
global_nfree = 0;
AZ(pthread_mutex_unlock(&freelist_lock));
return nfree;
#define DATA_Take_Free(type) \
unsigned \
DATA_Take_Free##type(struct type##head_s *dst) \
{ \
unsigned nfree; \
\
AZ(pthread_mutex_lock(&free##type##_lock)); \
VSTAILQ_CONCAT(dst, &free##type##head); \
nfree = global_nfree_##type; \
global_nfree_##type = 0; \
AZ(pthread_mutex_unlock(&free##type##_lock)); \
return nfree; \
}
DATA_Take_Free(tx)
DATA_Take_Free(line)
DATA_Take_Free(chunk)
/*
* return to global freelist
* returned must be locked by caller, if required
......@@ -191,10 +200,10 @@ DATA_Take_Freelist(struct txhead_s *dst)
void
DATA_Return_Freelist(struct txhead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&freelist_lock));
AZ(pthread_mutex_lock(&freetx_lock));
VSTAILQ_CONCAT(&freetxhead, returned);
global_nfree += nreturned;
AZ(pthread_mutex_unlock(&freelist_lock));
global_nfree_tx += nreturned;
AZ(pthread_mutex_unlock(&freetx_lock));
}
#define DUMP_HDRS(vsb, ll, hdr) do { \
......
......@@ -48,7 +48,7 @@ log_output(void)
LOG_Log(LOG_INFO, "Data table: len=%u open=%u done=%u load=%.2f occ_hi=%u "
"global_free=%u", config.max_data, data_open, data_done,
100.0 * (data_open + data_done) / config.max_data,
data_occ_hi, global_nfree);
data_occ_hi, global_nfree_tx);
RDR_Stats();
......
......@@ -208,7 +208,7 @@ static inline tx_t
{
struct tx_t *tx;
while (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
rdr_tx_free = DATA_Take_Freelist(&rdr_tx_freelist);
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
if (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
AZ(rdr_tx_free);
signal_spscq_ready();
......@@ -221,7 +221,7 @@ static inline tx_t
}
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock));
rdr_tx_free = DATA_Take_Freelist(&rdr_tx_freelist);
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
LOG_Log(LOG_DEBUG, "Reader: took %u txen from free list",
rdr_tx_free);
}
......@@ -668,7 +668,7 @@ main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
rdr_free = DATA_Take_Freelist(&rdr_tx_freelist);
rdr_free = DATA_Take_Freetx(&rdr_tx_freelist);
assert(!VSTAILQ_EMPTY(&rdr_tx_freelist));
assert(rdr_free == config.max_data);
......
......@@ -140,7 +140,7 @@ enum VSL_tag_e idx2tag[MAX_VSL_TAG];
VSTAILQ_HEAD(freehead_s, logline_t);
unsigned global_nfree;
unsigned global_nfree_tx, global_nfree_line, global_nfree_chunk;
/* Reader waits for this condition when the freelist is exhausted.
Writer signals the condition after returning space to the freelist. */
......@@ -240,7 +240,9 @@ int LOG_Open(const char *progname);
/* data.c */
int DATA_Init(void);
void DATA_Clear_Logline(tx_t *tx);
unsigned DATA_Take_Freelist(struct txhead_s *dst);
unsigned DATA_Take_Freetx(struct txhead_s *dst);
unsigned DATA_Take_Freeline(struct linehead_s *dst);
unsigned DATA_Take_Freechunk(struct chunkhead_s *dst);
void DATA_Return_Freelist(struct txhead_s *returned, unsigned nreturned);
void DATA_Dump(void);
......
......@@ -227,10 +227,10 @@ wrt_write(tx_t *tx)
VSTAILQ_INSERT_TAIL(&wrt_freelist, tx, freelist);
wrt_nfree++;
if (global_nfree < (config.max_data >> 1) || RDR_Waiting())
if (global_nfree_tx < (config.max_data >> 1) || RDR_Waiting())
wrt_return_freelist();
}
static void
*wrt_main(void *arg)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment