Commit 73909737 authored by Geoff Simmons's avatar Geoff Simmons

varnishevent: fix freelist mgmt (was causing thread deadlock)

	also fix a bug formatting the timestamp
parent 385cfc3c
...@@ -39,17 +39,6 @@ ...@@ -39,17 +39,6 @@
#include "vqueue.h" #include "vqueue.h"
#include "vsb.h" #include "vsb.h"
/*
* Place the contents of head1 before head2, then empty head2
*/
#define VSTAILQ_PREPEND(head1, head2) do { \
if (!VSTAILQ_EMPTY((head2))) { \
*(head2)->vstqh_last = (head1)->vstqh_first; \
(head1)->vstqh_first = (head2)->vstqh_first; \
VSTAILQ_INIT((head2)); \
} \
} while (0)
static const char *statename[3] = { "EMPTY", "OPEN", "DONE" }; static const char *statename[3] = { "EMPTY", "OPEN", "DONE" };
static pthread_mutex_t freelist_lock; static pthread_mutex_t freelist_lock;
...@@ -175,7 +164,7 @@ unsigned ...@@ -175,7 +164,7 @@ unsigned
DATA_Take_Freelist(struct freehead_s *dst) DATA_Take_Freelist(struct freehead_s *dst)
{ {
unsigned nfree; unsigned nfree;
AZ(pthread_mutex_lock(&freelist_lock)); AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_CONCAT(dst, &freehead); VSTAILQ_CONCAT(dst, &freehead);
nfree = global_nfree; nfree = global_nfree;
...@@ -194,7 +183,7 @@ void ...@@ -194,7 +183,7 @@ void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned) DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{ {
AZ(pthread_mutex_lock(&freelist_lock)); AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_PREPEND(&freehead, returned); VSTAILQ_CONCAT(&freehead, returned);
global_nfree += nreturned; global_nfree += nreturned;
AZ(pthread_mutex_unlock(&freelist_lock)); AZ(pthread_mutex_unlock(&freelist_lock));
} }
......
...@@ -119,18 +119,16 @@ get_tm(logline_t *ll, struct tm * t) { ...@@ -119,18 +119,16 @@ get_tm(logline_t *ll, struct tm * t) {
time_t tt = 0; time_t tt = 0;
if (C(ll->spec)) { if (C(ll->spec)) {
if (TAG(ll,SLT_ReqEnd).len) { if (TAG(ll,SLT_ReqEnd).len
ts = get_fld(&TAG(ll,SLT_ReqEnd), 2); && (ts = get_fld(&TAG(ll,SLT_ReqEnd), 2)) != NULL)
tt = (time_t) atol(ts); tt = (time_t) atol(ts);
}
else else
date_rec = GET_HDR(ll, rx, "Date"); date_rec = GET_HDR(ll, rx, "Date");
} }
else if (B(ll->spec)) { else if (B(ll->spec)) {
if (TAG(ll,SLT_BackendReq).len) { if (TAG(ll,SLT_BackendReq).len
ts = get_fld(&TAG(ll,SLT_BackendReq), 2); && (ts = get_fld(&TAG(ll,SLT_BackendReq), 2)) != NULL)
tt = (time_t) (atol(ts) + TIM_real_mono_diff); tt = (time_t) (atol(ts) + TIM_real_mono_diff);
}
else else
date_rec = GET_HDR(ll, tx, "Date"); date_rec = GET_HDR(ll, tx, "Date");
} }
......
...@@ -145,12 +145,6 @@ static int waiting = 0; ...@@ -145,12 +145,6 @@ static int waiting = 0;
static char cli_config_filename[BUFSIZ] = ""; static char cli_config_filename[BUFSIZ] = "";
#define SIGNAL_SPSCQ_READY() do { \
AZ(pthread_mutex_lock(&spscq_ready_lock)); \
AZ(pthread_cond_signal(&spscq_ready_cond)); \
AZ(pthread_mutex_unlock(&spscq_ready_lock)); \
} while (0)
int int
RDR_Waiting(void) RDR_Waiting(void)
{ {
...@@ -170,6 +164,16 @@ RDR_Stats(void) ...@@ -170,6 +164,16 @@ RDR_Stats(void)
spec_mismatches, wrong_tags); spec_mismatches, wrong_tags);
} }
static inline void
signal_spscq_ready(void)
{
if (WRT_Waiting()) {
AZ(pthread_mutex_lock(&spscq_ready_lock));
AZ(pthread_cond_signal(&spscq_ready_cond));
AZ(pthread_mutex_unlock(&spscq_ready_lock));
}
}
static inline logline_t static inline logline_t
*take(void) *take(void)
{ {
...@@ -177,19 +181,19 @@ static inline logline_t ...@@ -177,19 +181,19 @@ static inline logline_t
while (VSTAILQ_EMPTY(&reader_freelist)) { while (VSTAILQ_EMPTY(&reader_freelist)) {
rdr_free = DATA_Take_Freelist(&reader_freelist); rdr_free = DATA_Take_Freelist(&reader_freelist);
if (VSTAILQ_EMPTY(&reader_freelist)) { if (VSTAILQ_EMPTY(&reader_freelist)) {
LOG_Log0(LOG_DEBUG, "Reader: waiting for free space"); AZ(rdr_free);
waiting = 1; signal_spscq_ready();
LOG_Log0(LOG_DEBUG, "Reader: waiting for free list");
AZ(pthread_mutex_lock(&data_ready_lock)); AZ(pthread_mutex_lock(&data_ready_lock));
if (WRT_Waiting()) waiting = 1;
SIGNAL_SPSCQ_READY();
waits++; waits++;
AZ(pthread_cond_wait(&data_ready_cond, &data_ready_lock)); if (!WRT_Waiting())
rdr_free = DATA_Take_Freelist(&reader_freelist); AZ(pthread_cond_wait(&data_ready_cond, &data_ready_lock));
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock)); AZ(pthread_mutex_unlock(&data_ready_lock));
LOG_Log0(LOG_DEBUG, "Reader: done waiting for free space"); waiting = 0;
rdr_free = DATA_Take_Freelist(&reader_freelist);
LOG_Log(LOG_DEBUG, "Reader: took %u from free list", rdr_free);
} }
} }
data = VSTAILQ_FIRST(&reader_freelist); data = VSTAILQ_FIRST(&reader_freelist);
...@@ -219,8 +223,7 @@ submit(fd_t *entry) ...@@ -219,8 +223,7 @@ submit(fd_t *entry)
return; return;
} }
SPSCQ_Enq((void *) lp); SPSCQ_Enq((void *) lp);
if (WRT_Waiting()) signal_spscq_ready();
SIGNAL_SPSCQ_READY();
MON_StatsUpdate(STATS_DONE); MON_StatsUpdate(STATS_DONE);
submitted++; submitted++;
} }
...@@ -408,7 +411,7 @@ event(void *priv, enum VSL_tag_e tag, unsigned fd, ...@@ -408,7 +411,7 @@ event(void *priv, enum VSL_tag_e tag, unsigned fd,
if (term && open == 0) if (term && open == 0)
return 1; return 1;
LOG_Log(LOG_DEBUG, "Data: [%u %s %c %.*s]", fd, VSL_tags[tag], LOG_Log(LOG_DEBUG, "Data: [%u %s %c %.*s]", fd, VSL_tags[tag],
C(spec) ? 'c' : B(spec) ? 'b' : '-', len, ptr); C(spec) ? 'c' : B(spec) ? 'b' : '-', len, ptr);
......
...@@ -117,7 +117,9 @@ static inline void ...@@ -117,7 +117,9 @@ static inline void
wrt_return_freelist(void) wrt_return_freelist(void)
{ {
DATA_Return_Freelist(&wrt_freelist, wrt_nfree); DATA_Return_Freelist(&wrt_freelist, wrt_nfree);
LOG_Log(LOG_DEBUG, "Writer: returned %u to free list", wrt_nfree);
wrt_nfree = 0; wrt_nfree = 0;
assert(VSTAILQ_EMPTY(&wrt_freelist));
if (RDR_Waiting()) { if (RDR_Waiting()) {
AZ(pthread_mutex_lock(&data_ready_lock)); AZ(pthread_mutex_lock(&data_ready_lock));
AZ(pthread_cond_signal(&data_ready_cond)); AZ(pthread_cond_signal(&data_ready_cond));
...@@ -171,8 +173,6 @@ wrt_write(logline_t *ll) ...@@ -171,8 +173,6 @@ wrt_write(logline_t *ll)
VSTAILQ_INSERT_TAIL(&wrt_freelist, ll, freelist); VSTAILQ_INSERT_TAIL(&wrt_freelist, ll, freelist);
wrt_nfree++; wrt_nfree++;
/* XXX: shouldn't we return free space sooner?
e.g. nfree < (max >> 1), i.e. less than half */
if (global_nfree == 0 || RDR_Waiting()) if (global_nfree == 0 || RDR_Waiting())
wrt_return_freelist(); wrt_return_freelist();
} }
...@@ -210,15 +210,15 @@ static void ...@@ -210,15 +210,15 @@ static void
errno, strerror(errno)); errno, strerror(errno));
errors++; errors++;
} }
wrt->state = WRT_WAITING;
AZ(pthread_mutex_lock(&spscq_ready_lock));
if (wrt_nfree > 0) if (wrt_nfree > 0)
wrt_return_freelist(); wrt_return_freelist();
AZ(pthread_mutex_lock(&spscq_ready_lock));
/* /*
* run is guaranteed to be fresh after the lock * run is guaranteed to be fresh after the lock
*/ */
if (run) { if (run) {
wrt->state = WRT_WAITING;
waits++; waits++;
AZ(pthread_cond_wait(&spscq_ready_cond, &spscq_ready_lock)); AZ(pthread_cond_wait(&spscq_ready_cond, &spscq_ready_lock));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment