Commit a622325e authored by Geoff Simmons's avatar Geoff Simmons

filled in the implementation of the VSL dispatcher function (still

not tested)
parent 7b421d6f
......@@ -80,9 +80,8 @@
static unsigned open = 0, occ_hi = 0, len_hi = 0;
static unsigned long seen = 0, submitted = 0, not_logged = 0,
waits = 0, fd_overflows = 0, len_overflows = 0,
hdr_overflows = 0, expired = 0, spec_mismatches = 0, wrong_tags = 0;
static unsigned long seen = 0, submitted = 0, len_overflows = 0, no_free_tx = 0,
no_free_rec = 0, no_free_chunk = 0;
/* Hack, because we cannot have #ifdef in the macro definition SIGDISP */
#define _UNDEFINED(SIG) ((#SIG)[0] == 0)
......@@ -109,13 +108,15 @@ static int cb_flag = 0;
static int z_flag = 0;
#endif
/* Local freelist */
static struct freehead_s reader_freelist =
VSTAILQ_HEAD_INITIALIZER(reader_freelist);
static unsigned rdr_free = 0;
/* Local freelists */
static chunkhead_t rdr_chunk_freelist
= VSTAILQ_HEAD_INITIALIZER(rdr_chunk_freelist);
static unsigned rdr_chunk_free = 0;
static struct txhead_s rdr_tx_freelist =
VSTAILQ_HEAD_INITIALIZER(rdr_tx_freelist);
static linehead_t rdr_rec_freelist = VSTAILQ_HEAD_INITIALIZER(rdr_rec_freelist);
static unsigned rdr_rec_free = 0;
static txhead_t rdr_tx_freelist = VSTAILQ_HEAD_INITIALIZER(rdr_tx_freelist);
static unsigned rdr_tx_free = 0;
static int waiting = 0;
......@@ -133,14 +134,10 @@ RDR_Waiting(void)
void
RDR_Stats(void)
{
LOG_Log(LOG_INFO, "Reader (%s): fd_max=%u seen=%lu open=%u load=%.2f "
"submitted=%lu not_logged=%lu occ_hi=%u waits=%lu expired=%lu free=%u "
"len_hi=%u fd_overflows=%lu len_overflows=%lu hdr_overflows=%lu "
"spec_mismatches=%lu wrong_tags=%lu",
waiting ? "waiting" : "running", config.max_fd, seen, open,
100.0 * open / config.max_fd, submitted, not_logged, occ_hi, waits,
expired, rdr_free, len_hi, fd_overflows, len_overflows, hdr_overflows,
spec_mismatches, wrong_tags);
LOG_Log(LOG_INFO, "Reader (%s): seen=%lu submitted=%lu occ_hi=%u "
"free_tx=%u free_rec=%u free_chunk=%u len_hi=%u len_overflows=%lu",
waiting ? "waiting" : "running", seen, open, submitted, occ_hi,
rdr_tx_free, rdr_rec_free, rdr_chunk_free, len_hi, len_overflows);
}
static inline void
......@@ -153,71 +150,58 @@ signal_spscq_ready(void)
}
}
#if 0
static inline chunk_t
*take_chunk(void)
{
chunk_t *chunk;
if (VSTAILQ_EMPTY(&rdr_chunk_freelist)) {
signal_spscq_ready();
rdr_chunk_free = DATA_Take_Freechunk(&rdr_chunk_freelist);
if (VSTAILQ_EMPTY(&rdr_chunk_freelist))
return NULL;
}
chunk = VSTAILQ_FIRST(&rdr_chunk_freelist);
VSTAILQ_REMOVE_HEAD(&rdr_chunk_freelist, freelist);
rdr_chunk_free--;
return (chunk);
}
static inline logline_t
*take(void)
*take_rec(void)
{
struct logline_t *data;
while (VSTAILQ_EMPTY(&reader_freelist)) {
rdr_free = DATA_Take_Freelist(&reader_freelist);
if (VSTAILQ_EMPTY(&reader_freelist)) {
AZ(rdr_free);
signal_spscq_ready();
LOG_Log0(LOG_DEBUG, "Reader: waiting for free list");
waiting = 1;
AZ(pthread_mutex_lock(&data_ready_lock));
if (!WRT_Waiting()) {
waits++;
AZ(pthread_cond_wait(&data_ready_cond, &data_ready_lock));
}
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock));
rdr_free = DATA_Take_Freelist(&reader_freelist);
LOG_Log(LOG_DEBUG, "Reader: took %u from free list", rdr_free);
}
logline_t *rec;
if (VSTAILQ_EMPTY(&rdr_rec_freelist)) {
signal_spscq_ready();
rdr_rec_free = DATA_Take_Freeline(&rdr_rec_freelist);
if (VSTAILQ_EMPTY(&rdr_rec_freelist))
return NULL;
}
data = VSTAILQ_FIRST(&reader_freelist);
VSTAILQ_REMOVE_HEAD(&reader_freelist, freelist);
rdr_free--;
return (data);
rec = VSTAILQ_FIRST(&rdr_rec_freelist);
VSTAILQ_REMOVE_HEAD(&rdr_rec_freelist, freelist);
rdr_rec_free--;
return (rec);
}
#endif
static inline tx_t
*take_tx(void)
{
struct tx_t *tx;
while (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
tx_t *tx;
if (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
signal_spscq_ready();
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
if (VSTAILQ_EMPTY(&rdr_tx_freelist)) {
AZ(rdr_tx_free);
signal_spscq_ready();
LOG_Log0(LOG_DEBUG, "Reader: waiting for free list");
waiting = 1;
AZ(pthread_mutex_lock(&data_ready_lock));
if (!WRT_Waiting()) {
waits++;
AZ(pthread_cond_wait(&data_ready_cond, &data_ready_lock));
}
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock));
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
LOG_Log(LOG_DEBUG, "Reader: took %u txen from free list",
rdr_tx_free);
}
if (VSTAILQ_EMPTY(&rdr_tx_freelist))
return NULL;
}
tx = VSTAILQ_FIRST(&rdr_tx_freelist);
VSTAILQ_REMOVE_HEAD(&rdr_tx_freelist, freelist);
rdr_tx_free--;
return (tx);
}
static inline void
take_chunks(linehead_t *lineh, unsigned _nchunks)
{
(void) lineh;
(void) _nchunks;
return (tx);
}
static inline void
......@@ -225,20 +209,7 @@ submit(tx_t *tx)
{
CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
assert(tx->state == TX_DONE);
#if 0
assert(lp->state == DATA_DONE);
data_open--;
if ((m_flag && !VSL_Matched(vd, lp->bitmap))
|| (lp->spec && !(lp->spec & cb_flag))) {
not_logged++;
DATA_Clear_Logline(lp);
rdr_free++;
VSTAILQ_INSERT_TAIL(&reader_freelist, lp, freelist);
return;
}
SPSCQ_Enq((void *) lp);
#endif
SPSCQ_Enq(tx);
signal_spscq_ready();
MON_StatsUpdate(STATS_DONE);
submitted++;
......@@ -258,22 +229,29 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
if (pt[0] == NULL)
return reopen;
/* XXX: assert length(pt) == 1? */
for (t = pt[0]; t != NULL; t = *++pt) {
assert(t->type == VSL_t_req || t->type == VSL_t_bereq
|| t->type == VSL_t_raw);
tx = take_tx();
if (tx == NULL) {
no_free_tx++;
LOG_Log(LOG_DEBUG, "Freelist exhausted, tx DISCARDED: [%u %c]",
t->vxid, tx_type_name[tx->type]);
continue;
}
CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
assert(tx->state == TX_EMPTY);
assert(!VSTAILQ_EMPTY(&tx->lines));
assert(VSTAILQ_EMPTY(&tx->lines));
tx->type = t->type;
tx->vxid = t->vxid;
assert(t->type == VSL_t_req || t->type == VSL_t_bereq
|| t->type == VSL_t_raw);
LOG_Log(LOG_DEBUG, "Tx: [%u %c]", t->vxid, tx_type_name[tx->type]);
LOG_Log(LOG_DEBUG, "Tx: [%u %c]", tx->vxid, tx_type_name[tx->type]);
logline_t *line = VSTAILQ_FIRST(&tx->lines);
while (1) {
int len;
logline_t *rec;
status = VSL_Next(t->c);
if (status <= 0)
......@@ -286,32 +264,44 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
if (line == NULL) {
/* XXX: increment counter */
#if 0
line = VSTAILQ_LAST(tx->lines, logline_t, linelist);
take_lines(tx->lines);
line = VSTAILQ_NEXT(line, linelist);
#endif
rec = take_rec();
if (rec == NULL) {
no_free_rec++;
LOG_Log(LOG_DEBUG, "Freelist exhausted, line DISCARDED: "
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
continue;
}
CHECK_OBJ_NOTNULL(line, LOGLINE_MAGIC);
assert(line->state == DATA_EMPTY);
CHECK_OBJ_NOTNULL(rec, LOGLINE_MAGIC);
assert(rec->state == DATA_EMPTY);
line->tag = VSL_TAG(t->c->rec.ptr);
line->len = len;
rec->tag = VSL_TAG(t->c->rec.ptr);
rec->len = len;
if (len != 0) {
/* Copy the payload into chunks */
assert(VSTAILQ_EMPTY(&line->chunks));
chunk_t *chunk;
/* Copy the payload into chunks */
assert(VSTAILQ_EMPTY(&rec->chunks));
int nchunk = (len + config.chunk_size - 1) / config.chunk_size;
if (nchunk > 1)
/* XXX: increment counter */
take_chunks(&tx->lines, nchunk);
for (int i = 0; i < nchunk; i++) {
chunk = take_chunk();
if (chunk == NULL) {
no_free_chunk++;
LOG_Log(LOG_DEBUG,
"Freelist exhausted, payload TRUNCATED: "
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
break;
}
VSTAILQ_INSERT_TAIL(&rec->chunks, chunk, chunklist);
}
int n = len;
chunk_t *chunk = VSTAILQ_FIRST(&line->chunks);
chunk = VSTAILQ_FIRST(&rec->chunks);
const char *p = (const char *) VSL_CDATA(t->c->rec.ptr);
while (n > 0) {
while (n > 0 && chunk != NULL) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
int cp = n;
if (cp > config.chunk_size)
......@@ -322,10 +312,10 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
chunk = VSTAILQ_NEXT(chunk, chunklist);
}
}
line->state = DATA_DONE;
line = VSTAILQ_NEXT(line, linelist);
rec->state = DATA_DONE;
}
}
tx->state = TX_DONE;
seen++;
data_done++;
if (data_done > data_occ_hi)
......@@ -628,10 +618,14 @@ main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
rdr_free = DATA_Take_Freetx(&rdr_tx_freelist);
rdr_tx_free = DATA_Take_Freetx(&rdr_tx_freelist);
assert(!VSTAILQ_EMPTY(&rdr_tx_freelist));
assert(rdr_free == config.max_data);
assert(rdr_tx_free == config.max_data);
rdr_rec_free = DATA_Take_Freeline(&rdr_rec_freelist);
assert(!VSTAILQ_EMPTY(&rdr_rec_freelist));
rdr_chunk_free = DATA_Take_Freechunk(&rdr_chunk_freelist);
assert(!VSTAILQ_EMPTY(&rdr_chunk_freelist));
WRT_Start();
/* XXX: configure wrt_waits and sleep interval? */
int wrt_waits = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment