Commit 94f84a8a authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Properly lock n_wrk* statics variables without using more lock operations.

Have the worker threads add themselves to the pool, while the hold the pool
lock anyway.

Collect stats per pool, and have the decimator aggregate over all pools
and into the shared memory counters.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@2655 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent 1b3a0665
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
* $Id$ * $Id$
* *
* XXX: automatic thread-pool size adaptation. * XXX: automatic thread-pool size adaptation.
* XXX: unlocked stats variables: consider summing over pools in timer thread.
*/ */
#include "config.h" #include "config.h"
...@@ -72,7 +71,8 @@ struct wq { ...@@ -72,7 +71,8 @@ struct wq {
VTAILQ_HEAD(, workreq) overflow; VTAILQ_HEAD(, workreq) overflow;
unsigned nthr; unsigned nthr;
unsigned nqueue; unsigned nqueue;
uintmax_t drops; uintmax_t ndrop;
uintmax_t noverflow;
}; };
static struct wq **wq; static struct wq **wq;
...@@ -226,30 +226,33 @@ wrk_thread(void *priv) ...@@ -226,30 +226,33 @@ wrk_thread(void *priv)
VSL(SLT_WorkThread, 0, "%p start", w); VSL(SLT_WorkThread, 0, "%p start", w);
LOCK(&qp->mtx);
qp->nthr++;
while (1) { while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
assert(!isnan(w->used)); assert(!isnan(w->used));
/* Process overflow requests, if any */ /* Process overflow requests, if any */
LOCK(&qp->mtx);
w->wrq = VTAILQ_FIRST(&qp->overflow); w->wrq = VTAILQ_FIRST(&qp->overflow);
if (w->wrq != NULL) { if (w->wrq != NULL) {
VTAILQ_REMOVE(&qp->overflow, w->wrq, list); VTAILQ_REMOVE(&qp->overflow, w->wrq, list);
qp->nqueue--; qp->nqueue--;
VSL_stats->n_wrk_queue--; /* XXX: unlocked */
} else { } else {
VTAILQ_INSERT_HEAD(&qp->idle, w, list); VTAILQ_INSERT_HEAD(&qp->idle, w, list);
AZ(pthread_cond_wait(&w->cond, &qp->mtx)); AZ(pthread_cond_wait(&w->cond, &qp->mtx));
} }
UNLOCK(&qp->mtx);
if (w->wrq == NULL) if (w->wrq == NULL)
break; break;
UNLOCK(&qp->mtx);
AN(w->wrq); AN(w->wrq);
wrq = w->wrq; wrq = w->wrq;
AN(wrq->func); AN(wrq->func);
wrq->func(w, wrq->priv); wrq->func(w, wrq->priv);
w->wrq = NULL; w->wrq = NULL;
LOCK(&qp->mtx);
} }
qp->nthr--;
UNLOCK(&qp->mtx);
VSL(SLT_WorkThread, 0, "%p end", w); VSL(SLT_WorkThread, 0, "%p end", w);
if (w->vcl != NULL) if (w->vcl != NULL)
...@@ -302,15 +305,13 @@ WRK_Queue(struct workreq *wrq) ...@@ -302,15 +305,13 @@ WRK_Queue(struct workreq *wrq)
/* If we have too much in the overflow already, refuse */ /* If we have too much in the overflow already, refuse */
if (qp->nqueue > ovfl_max) { if (qp->nqueue > ovfl_max) {
qp->drops++; qp->ndrop++;
VSL_stats->n_wrk_drop++; /* XXX: unlocked */
UNLOCK(&qp->mtx); UNLOCK(&qp->mtx);
return (-1); return (-1);
} }
VTAILQ_INSERT_TAIL(&qp->overflow, wrq, list); VTAILQ_INSERT_TAIL(&qp->overflow, wrq, list);
VSL_stats->n_wrk_queue++; /* XXX: unlocked */ qp->noverflow++;
VSL_stats->n_wrk_overflow++; /* XXX: unlocked */
qp->nqueue++; qp->nqueue++;
UNLOCK(&qp->mtx); UNLOCK(&qp->mtx);
AZ(pthread_cond_signal(&herder_cond)); AZ(pthread_cond_signal(&herder_cond));
...@@ -373,11 +374,11 @@ wrk_addpools(const unsigned pools) ...@@ -373,11 +374,11 @@ wrk_addpools(const unsigned pools)
} }
/*-------------------------------------------------------------------- /*--------------------------------------------------------------------
* If a thread is idle or excess, pick it out of the pool... * If a thread is idle or excess, pick it out of the pool.
*/ */
static void static void
wrk_decimate_flock(struct wq *qp, double t_idle) wrk_decimate_flock(struct wq *qp, double t_idle, struct varnish_stats *vs)
{ {
struct worker *w; struct worker *w;
...@@ -390,14 +391,16 @@ wrk_decimate_flock(struct wq *qp, double t_idle) ...@@ -390,14 +391,16 @@ wrk_decimate_flock(struct wq *qp, double t_idle)
VTAILQ_REMOVE(&qp->idle, w, list); VTAILQ_REMOVE(&qp->idle, w, list);
else else
w = NULL; w = NULL;
vs->n_wrk += qp->nthr;
vs->n_wrk_queue += qp->nqueue;
vs->n_wrk_drop += qp->ndrop;
vs->n_wrk_overflow += qp->noverflow;
UNLOCK(&qp->mtx); UNLOCK(&qp->mtx);
/* And give it a kiss on the cheek... */ /* And give it a kiss on the cheek... */
if (w != NULL) { if (w != NULL) {
AZ(w->wrq); AZ(w->wrq);
AZ(pthread_cond_signal(&w->cond)); AZ(pthread_cond_signal(&w->cond));
qp->nthr--;
VSL_stats->n_wrk--; /* XXX: unlocked */
(void)usleep(params->wthread_purge_delay * 1000); (void)usleep(params->wthread_purge_delay * 1000);
} }
} }
...@@ -409,6 +412,7 @@ wrk_decimate_flock(struct wq *qp, double t_idle) ...@@ -409,6 +412,7 @@ wrk_decimate_flock(struct wq *qp, double t_idle)
* Add pools * Add pools
* Scale constants * Scale constants
* Get rid of excess threads * Get rid of excess threads
* Aggregate stats across pools
*/ */
static void * static void *
...@@ -416,9 +420,13 @@ wrk_herdtimer_thread(void *priv) ...@@ -416,9 +420,13 @@ wrk_herdtimer_thread(void *priv)
{ {
volatile unsigned u; volatile unsigned u;
double t_idle; double t_idle;
struct varnish_stats vsm, *vs;
THR_Name("wrk_herdtimer"); THR_Name("wrk_herdtimer");
memset(&vsm, 0, sizeof vsm);
vs = &vsm;
(void)priv; (void)priv;
while (1) { while (1) {
/* Add Pools */ /* Add Pools */
...@@ -439,9 +447,19 @@ wrk_herdtimer_thread(void *priv) ...@@ -439,9 +447,19 @@ wrk_herdtimer_thread(void *priv)
ovfl_max = (nthr_max * params->overflow_max) / 100; ovfl_max = (nthr_max * params->overflow_max) / 100;
vs->n_wrk = 0;
vs->n_wrk_queue = 0;
vs->n_wrk_drop = 0;
vs->n_wrk_overflow = 0;
t_idle = TIM_real() - params->wthread_timeout; t_idle = TIM_real() - params->wthread_timeout;
for (u = 0; u < nwq; u++) for (u = 0; u < nwq; u++)
wrk_decimate_flock(wq[u], t_idle); wrk_decimate_flock(wq[u], t_idle, vs);
VSL_stats->n_wrk= vs->n_wrk;
VSL_stats->n_wrk_queue = vs->n_wrk_queue;
VSL_stats->n_wrk_drop = vs->n_wrk_drop;
VSL_stats->n_wrk_overflow = vs->n_wrk_overflow;
(void)usleep(params->wthread_purge_delay * 1000); (void)usleep(params->wthread_purge_delay * 1000);
} }
...@@ -470,9 +488,7 @@ wrk_breed_flock(struct wq *qp) ...@@ -470,9 +488,7 @@ wrk_breed_flock(struct wq *qp)
VSL_stats->n_wrk_failed++; VSL_stats->n_wrk_failed++;
(void)usleep(params->wthread_fail_delay * 1000); (void)usleep(params->wthread_fail_delay * 1000);
} else { } else {
qp->nthr++;
AZ(pthread_detach(tp)); AZ(pthread_detach(tp));
VSL_stats->n_wrk++; /* XXX: unlocked */
VSL_stats->n_wrk_create++; VSL_stats->n_wrk_create++;
(void)usleep(params->wthread_add_delay * 1000); (void)usleep(params->wthread_add_delay * 1000);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment