Commit 858aeded authored by Geoff Simmons's avatar Geoff Simmons

update the dispatcher and main loop

parent b9a5863e
......@@ -75,8 +75,15 @@
#define DEFAULT_CONFIG "/etc/varnishevent.conf"
#define DISPATCH_CONTINUE 0
#define DISPATCH_TERMINATE 7
#define DISPATCH_EOL 0
#define DISPATCH_RETURN_OK 0
#define DISPATCH_CONTINUE 1
#define DISPATCH_EOF -1
#define DISPATCH_CLOSED -2
#define DISPATCH_OVERRUN -3
#define DISPATCH_IOERR -4
#define DISPATCH_TERMINATE 10
#define DISPATCH_REOPEN 11
static unsigned occ_hi = 0, len_hi = 0;
......@@ -196,6 +203,14 @@ static inline tx_t
return (tx);
}
static inline void
take_free(void)
{
rdr_tx_free += DATA_Take_Freetx(&rdr_tx_freelist);
rdr_rec_free += DATA_Take_Freeline(&rdr_rec_freelist);
rdr_chunk_free += DATA_Take_Freechunk(&rdr_chunk_freelist);
}
static inline void
submit(tx_t *tx)
{
......@@ -210,27 +225,29 @@ submit(tx_t *tx)
static int
event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
{
struct tx_t *tx = NULL;
struct VSL_transaction *t;
int status = DISPATCH_CONTINUE;
int status = DISPATCH_RETURN_OK;
(void) priv;
if (term)
return DISPATCH_TERMINATE;
if (pt[0] == NULL)
return reopen;
for (t = pt[0]; t != NULL; t = *++pt) {
assert(t->type == VSL_t_req || t->type == VSL_t_bereq
|| t->type == VSL_t_raw);
for (struct VSL_transaction *t = pt[0]; t != NULL; t = *++pt) {
struct tx_t *tx;
switch(t->type) {
case VSL_t_req:
case VSL_t_bereq:
case VSL_t_raw:
break;
default:
continue;
}
tx = take_tx();
if (tx == NULL) {
no_free_tx++;
LOG_Log(LOG_DEBUG, "Freelist exhausted, tx DISCARDED: [%u %c]",
t->vxid, tx_type_name[tx->type]);
t->vxid, tx_type_name[tx->type]);
continue;
}
CHECK_OBJ_NOTNULL(tx, TX_MAGIC);
......@@ -238,31 +255,34 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
assert(VSTAILQ_EMPTY(&tx->lines));
tx->type = t->type;
tx->vxid = t->vxid;
if (tx->type == VSL_t_raw) {
/* XXX: set tx->t */
;
}
LOG_Log(LOG_DEBUG, "Tx: [%u %c]", tx->vxid, tx_type_name[tx->type]);
if (logconf.level == LOG_DEBUG)
LOG_Log(LOG_DEBUG, "Tx: [%u %c]", tx->vxid, tx_type_name[tx->type]);
while (1) {
while ((status = VSL_Next(t->c)) > 0) {
int len;
logline_t *rec;
status = VSL_Next(t->c);
if (status <= 0)
break;
if (!VSL_Match(_vsl, t->c))
continue;
len = VSL_LEN(t->c->rec.ptr);
LOG_Log(LOG_DEBUG, "Line: [%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
if (logconf.level == LOG_DEBUG)
LOG_Log(LOG_DEBUG, "Line: [%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
rec = take_rec();
if (rec == NULL) {
no_free_rec++;
LOG_Log(LOG_DEBUG, "Freelist exhausted, line DISCARDED: "
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
"[%u %s %.*s]", VSL_ID(t->c->rec.ptr),
VSL_tags[VSL_TAG(t->c->rec.ptr)], len,
VSL_CDATA(t->c->rec.ptr));
continue;
}
CHECK_OBJ_NOTNULL(rec, LOGLINE_MAGIC);
......@@ -270,13 +290,16 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
rec->tag = VSL_TAG(t->c->rec.ptr);
rec->len = len;
if (len != 0) {
if (len > 0) {
chunk_t *chunk;
int n = len;
const char *p = (const char *) VSL_CDATA(t->c->rec.ptr);
int nchunk = (len + config.chunk_size - 1) / config.chunk_size;
/* Copy the payload into chunks */
assert(VSTAILQ_EMPTY(&rec->chunks));
int nchunk = (len + config.chunk_size - 1) / config.chunk_size;
for (int i = 0; i < nchunk; i++) {
assert(n > 0);
chunk = take_chunk();
if (chunk == NULL) {
no_free_chunk++;
......@@ -288,33 +311,27 @@ event(struct VSL_data *_vsl, struct VSL_transaction * const pt[], void *priv)
break;
}
VSTAILQ_INSERT_TAIL(&rec->chunks, chunk, chunklist);
}
int n = len;
chunk = VSTAILQ_FIRST(&rec->chunks);
const char *p = (const char *) VSL_CDATA(t->c->rec.ptr);
while (n > 0 && chunk != NULL) {
CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
int cp = n;
if (cp > config.chunk_size)
cp = config.chunk_size;
memcpy(chunk->data, p, cp);
p += cp;
n -= cp;
chunk = VSTAILQ_NEXT(chunk, chunklist);
}
}
rec->state = DATA_DONE;
}
tx->state = TX_DONE;
seen++;
data_done++;
if (data_done > data_occ_hi)
data_occ_hi = data_done;
submit(tx);
}
tx->state = TX_DONE;
seen++;
data_done++;
if (data_done > data_occ_hi)
data_occ_hi = data_done;
submit(tx);
return reopen;
if (!reopen)
return status;
return DISPATCH_REOPEN;
}
/*--------------------------------------------------------------------*/
......@@ -385,7 +402,7 @@ usage(void)
int
main(int argc, char *argv[])
{
int c, errnum, finite = 0, a_flag = 0, g_flag = 0, format_flag = 0;
int c, errnum, status, a_flag = 0, g_flag = 0, format_flag = 0;
#if 0
int D_flag = 0;
const char *P_arg = NULL;
......@@ -453,7 +470,6 @@ main(int argc, char *argv[])
m_flag = 1; /* Fall through */
default:
if (c == 'r') {
finite = 1;
strcpy(config.varnish_bindump, optarg);
}
if (VSL_Arg(vsl, c, optarg) > 0)
......@@ -635,22 +651,56 @@ main(int argc, char *argv[])
/* Main loop */
term = 0;
/* XXX: TERM not noticed until request received */
while (VSLQ_Dispatch(vslq, event, NULL) >= 0)
if (term || finite)
break;
else if (reopen) {
while (!term) {
status = VSLQ_Dispatch(vslq, event, NULL);
switch(status) {
case DISPATCH_CONTINUE:
continue;
case DISPATCH_REOPEN:
take_free();
LOG_Log0(LOG_INFO, "Signal received to re-open output");
WRT_Reopen();
reopen = 0;
continue;
case DISPATCH_EOL:
take_free();
#if 0
TIM_sleep(config.idle_pause);
#endif
continue;
case DISPATCH_TERMINATE:
assert(term == 1);
LOG_Log0(LOG_INFO, "Termination signal received, will flush"
"pending transactions and exit");
break;
case DISPATCH_EOF:
term = 1;
LOG_Log0(LOG_INFO, "Reached end of file, will exit");
break;
/* XXX: for the rest of these, try to flush, re-acquire the log and
continue. */
case DISPATCH_CLOSED:
term = 1;
LOG_Log0(LOG_ERR, "Log was closed or abandoned, will exit");
break;
case DISPATCH_OVERRUN:
term = 1;
LOG_Log0(LOG_ERR, "Log reads were overrun, will exit");
break;
case DISPATCH_IOERR:
term = 1;
LOG_Log(LOG_ERR,
"IO error reading the log, will exit: %s (errno = %d)",
strerror(errno), errno);
break;
default:
WRONG("Unknown return status from dispatcher");
}
else
LOG_Log0(LOG_WARNING, "Log read interrupted, continuing");
}
if (status == DISPATCH_TERMINATE)
VSLQ_Flush(vslq, event, NULL);
if (term)
LOG_Log0(LOG_INFO, "Termination signal received");
else if (!finite)
LOG_Log0(LOG_WARNING, "Varnish log closed");
WRT_Halt();
WRT_Fini();
SPSCQ_Shutdown();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment