Commit 1152ec30 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Rework the worker thread pool logic slightly, we were leaking

threads before.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@510 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent 168c51ae
......@@ -26,11 +26,37 @@ static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
/*--------------------------------------------------------------------*/
static void
wrk_do_one(struct worker *w)
{
struct workreq *wrq;
wrq = TAILQ_FIRST(&wrk_reqhead);
assert(wrq != NULL);
VSL_stats->n_wrk_busy++;
TAILQ_REMOVE(&wrk_reqhead, wrq, list);
VSL_stats->n_wrk_queue--;
AZ(pthread_mutex_unlock(&wrk_mtx));
assert(wrq->sess != NULL);
wrq->sess->wrk = w;
CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC);
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
CNT_Session(wrq->sess);
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
AZ(pthread_mutex_lock(&wrk_mtx));
VSL_stats->n_wrk_busy--;
}
static void *
wrk_thread(void *priv)
{
struct worker *w, ww;
struct workreq *wrq;
struct timespec ts;
w = &ww;
......@@ -50,66 +76,40 @@ wrk_thread(void *priv)
if (priv == NULL) {
VSL_stats->n_wrk_create++;
VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr);
} else {
VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr);
}
TAILQ_INSERT_HEAD(&wrk_head, w, list);
while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
wrq = TAILQ_FIRST(&wrk_reqhead);
if (wrq != NULL) {
VSL_stats->n_wrk_busy++;
TAILQ_REMOVE(&wrk_head, w, list);
TAILQ_REMOVE(&wrk_reqhead, wrq, list);
AZ(pthread_mutex_unlock(&wrk_mtx));
assert(wrq->sess != NULL);
wrq->sess->wrk = w;
CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC);
CNT_Session(wrq->sess);
AZ(pthread_mutex_lock(&wrk_mtx));
VSL_stats->n_wrk_busy--;
TAILQ_INSERT_HEAD(&wrk_head, w, list);
}
/* Process overflow requests, if any */
if (wrk_overflow > 0) {
wrk_overflow--;
wrk_do_one(w);
continue;
}
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
TAILQ_INSERT_HEAD(&wrk_head, w, list);
/* If we are a reserved thread we don't die */
if (priv != NULL) {
AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
continue;
} else {
/* If we are a dynamic thread, time out and die */
AZ(clock_gettime(CLOCK_REALTIME, &ts));
ts.tv_sec += heritage.wthread_timeout;
if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
VSL_stats->n_wrk--;
TAILQ_REMOVE(&wrk_head, w, list);
AZ(pthread_mutex_unlock(&wrk_mtx));
VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
sbuf_delete(w->sb);
event_base_free(w->eb);
AZ(pthread_cond_destroy(&w->cv));
return (NULL);
}
}
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
/* If we are a dynamic thread, time out and die */
AZ(clock_gettime(CLOCK_REALTIME, &ts));
ts.tv_sec += heritage.wthread_timeout;
if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
VSL_stats->n_wrk--;
TAILQ_REMOVE(&wrk_head, w, list);
AZ(pthread_mutex_unlock(&wrk_mtx));
VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
sbuf_delete(w->sb);
event_base_free(w->eb);
AZ(pthread_cond_destroy(&w->cv));
return (NULL);
}
if (w->nobj != NULL)
CHECK_OBJ(w->nobj, OBJECT_MAGIC);
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
/* we are already removed from wrk_head */
wrk_do_one(w);
}
}
......@@ -127,19 +127,22 @@ WRK_QueueSession(struct sess *sp)
AZ(pthread_mutex_lock(&wrk_mtx));
TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
VSL_stats->n_wrk_queue++;
/* If there are idle threads, we tickle the first one into action */
w = TAILQ_FIRST(&wrk_head);
if (w != NULL) {
AZ(pthread_cond_signal(&w->cv));
TAILQ_REMOVE(&wrk_head, w, list);
AZ(pthread_mutex_unlock(&wrk_mtx));
return;
}
/* Register overflow if max threads reached */
wrk_overflow++;
/* Can we create more threads ? */
if (VSL_stats->n_wrk >= heritage.wthread_max) {
wrk_overflow++;
VSL_stats->n_wrk_short++;
VSL_stats->n_wrk_max++;
AZ(pthread_mutex_unlock(&wrk_mtx));
return;
}
......@@ -147,6 +150,7 @@ WRK_QueueSession(struct sess *sp)
/* Try to create a thread */
VSL_stats->n_wrk++;
AZ(pthread_mutex_unlock(&wrk_mtx));
if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
AZ(pthread_detach(tp));
return;
......@@ -158,9 +162,7 @@ WRK_QueueSession(struct sess *sp)
/* Register overflow */
AZ(pthread_mutex_lock(&wrk_mtx));
VSL_stats->n_wrk--;
wrk_overflow++;
VSL_stats->n_wrk_failed++;
VSL_stats->n_wrk_short++;
AZ(pthread_mutex_unlock(&wrk_mtx));
}
......
......@@ -21,7 +21,8 @@ MAC_STAT(n_vbe_conn, uint64_t, "u", "N struct vbe_conn")
MAC_STAT(n_wrk, uint64_t, "u", "N worker threads")
MAC_STAT(n_wrk_create, uint64_t, "u", "N worker threads created")
MAC_STAT(n_wrk_failed, uint64_t, "u", "N worker threads not created")
MAC_STAT(n_wrk_short, uint64_t, "u", "N worker threads shortages")
MAC_STAT(n_wrk_max, uint64_t, "u", "N worker threads limited")
MAC_STAT(n_wrk_busy, uint64_t, "u", "N busy worker threads")
MAC_STAT(n_wrk_queue, uint64_t, "u", "N queued work requests")
MAC_STAT(losthdr, uint64_t, "u", "HTTP header overflows")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment