Commit 3878ccf7 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Make it possible to have multiple worker pools.

The acceptor selects the pool based on filedescriptor modulus
number of pools.

This is an attempt to reduce lock contention.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@1031 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent 744ced55
......@@ -29,16 +29,21 @@
#include "cli_priv.h"
#include "cache.h"
static MTX wrk_mtx;
TAILQ_HEAD(workerhead, worker);
/* Number of work requests queued in excess of worker threads available */
static unsigned wrk_overflow;
TAILQ_HEAD(workerhead, worker);
struct wq {
MTX mtx;
struct workerhead idle;
TAILQ_HEAD(, workreq) req;
unsigned overflow;
};
static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
static MTX tmtx;
static struct wq **wq;
static unsigned nwq;
/*--------------------------------------------------------------------
* Write data to fd
......@@ -169,9 +174,10 @@ static void *
wrk_thread(void *priv)
{
struct worker *w, ww;
struct wq *qp;
char c;
(void)priv;
qp = priv;
w = &ww;
memset(w, 0, sizeof *w);
w->magic = WORKER_MAGIC;
......@@ -179,40 +185,38 @@ wrk_thread(void *priv)
AZ(pipe(w->pipe));
VSL(SLT_WorkThread, 0, "%p start", w);
LOCK(&wrk_mtx);
LOCK(&qp->mtx);
VSL_stats->n_wrk_create++;
TAILQ_INSERT_HEAD(&wrk_busy, w, list);
VSL_stats->n_wrk_busy++;
while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
/* Process overflow requests, if any */
if (wrk_overflow > 0) {
wrk_overflow--;
w->wrq = TAILQ_FIRST(&wrk_reqhead);
if (qp->overflow > 0) {
qp->overflow--;
w->wrq = TAILQ_FIRST(&qp->req);
AN(w->wrq);
TAILQ_REMOVE(&wrk_reqhead, w->wrq, list);
TAILQ_REMOVE(&qp->req, w->wrq, list);
VSL_stats->n_wrk_queue--;
UNLOCK(&wrk_mtx);
UNLOCK(&qp->mtx);
wrk_do_one(w);
LOCK(&wrk_mtx);
LOCK(&qp->mtx);
continue;
}
TAILQ_REMOVE(&wrk_busy, w, list);
TAILQ_INSERT_HEAD(&wrk_idle, w, list);
TAILQ_INSERT_HEAD(&qp->idle, w, list);
assert(w->idle != 0);
VSL_stats->n_wrk_busy--;
UNLOCK(&wrk_mtx);
UNLOCK(&qp->mtx);
assert(1 == read(w->pipe[0], &c, 1));
if (w->idle == 0)
break;
wrk_do_one(w);
LOCK(&wrk_mtx);
LOCK(&qp->mtx);
}
LOCK(&wrk_mtx);
LOCK(&tmtx);
VSL_stats->n_wrk--;
UNLOCK(&wrk_mtx);
UNLOCK(&tmtx);
VSL(SLT_WorkThread, 0, "%p end", w);
close(w->pipe[0]);
close(w->pipe[1]);
......@@ -226,39 +230,42 @@ WRK_QueueSession(struct sess *sp)
{
struct worker *w;
pthread_t tp;
struct wq *qp;
sp->workreq.sess = sp;
qp = wq[sp->fd % nwq];
LOCK(&wrk_mtx);
LOCK(&qp->mtx);
/* If there are idle threads, we tickle the first one into action */
w = TAILQ_FIRST(&wrk_idle);
w = TAILQ_FIRST(&qp->idle);
if (w != NULL) {
TAILQ_REMOVE(&wrk_idle, w, list);
TAILQ_INSERT_TAIL(&wrk_busy, w, list);
TAILQ_REMOVE(&qp->idle, w, list);
VSL_stats->n_wrk_busy++;
UNLOCK(&wrk_mtx);
UNLOCK(&qp->mtx);
w->wrq = &sp->workreq;
assert(1 == write(w->pipe[1], w, 1));
return;
}
TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
TAILQ_INSERT_TAIL(&qp->req, &sp->workreq, list);
VSL_stats->n_wrk_queue++;
wrk_overflow++;
qp->overflow++;
UNLOCK(&qp->mtx);
LOCK(&tmtx);
/* Can we create more threads ? */
if (VSL_stats->n_wrk >= params->wthread_max) {
VSL_stats->n_wrk_max++;
UNLOCK(&wrk_mtx);
UNLOCK(&tmtx);
return;
}
/* Try to create a thread */
VSL_stats->n_wrk++;
UNLOCK(&wrk_mtx);
UNLOCK(&tmtx);
if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
if (!pthread_create(&tp, NULL, wrk_thread, qp)) {
AZ(pthread_detach(tp));
return;
}
......@@ -266,11 +273,40 @@ WRK_QueueSession(struct sess *sp)
VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
errno, strerror(errno));
LOCK(&tmtx);
/* Register overflow */
LOCK(&wrk_mtx);
VSL_stats->n_wrk--;
VSL_stats->n_wrk_failed++;
UNLOCK(&wrk_mtx);
UNLOCK(&tmtx);
}
/*--------------------------------------------------------------------*/
static void
wrk_addpools(unsigned t)
{
struct wq **pwq, **owq;
unsigned u;
if (t <= nwq)
return;
pwq = calloc(sizeof *pwq, params->wthread_pools);
if (pwq == NULL)
return;
if (wq != NULL)
memcpy(pwq, wq, sizeof *pwq * nwq);
owq = wq;
wq = pwq;
for (u = nwq; u < t; u++) {
wq[u] = calloc(sizeof *wq[u], 1);
XXXAN(wq[u]);
MTX_INIT(&wq[u]->mtx);
TAILQ_INIT(&wq[u]->idle);
TAILQ_INIT(&wq[u]->req);
}
free(owq);
nwq = t;
}
/*--------------------------------------------------------------------*/
......@@ -280,26 +316,32 @@ wrk_reaperthread(void *priv)
{
time_t now;
struct worker *w;
struct wq *qp;
unsigned u;
(void)priv;
while (1) {
wrk_addpools(params->wthread_pools);
sleep(1);
if (VSL_stats->n_wrk <= params->wthread_min)
continue;
now = time(NULL);
LOCK(&wrk_mtx);
w = TAILQ_LAST(&wrk_idle, workerhead);
if (w != NULL &&
(w->idle + params->wthread_timeout < now ||
VSL_stats->n_wrk <= params->wthread_max))
TAILQ_REMOVE(&wrk_idle, w, list);
else
w = NULL;
UNLOCK(&wrk_mtx);
if (w == NULL)
continue;
w->idle = 0;
assert(1 == write(w->pipe[1], w, 1));
for (u = 0; u < nwq; u++) {
qp = wq[u];
LOCK(&qp->mtx);
w = TAILQ_LAST(&qp->idle, workerhead);
if (w != NULL &&
(w->idle + params->wthread_timeout < now ||
VSL_stats->n_wrk <= params->wthread_max))
TAILQ_REMOVE(&qp->idle, w, list);
else
w = NULL;
UNLOCK(&qp->mtx);
if (w == NULL)
continue;
w->idle = 0;
assert(1 == write(w->pipe[1], w, 1));
}
}
INCOMPL();
}
......@@ -310,53 +352,20 @@ void
WRK_Init(void)
{
pthread_t tp;
int i;
MTX_INIT(&wrk_mtx);
wrk_addpools(params->wthread_pools);
MTX_INIT(&tmtx);
AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
AZ(pthread_detach(tp));
VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min);
for (i = 0; i < params->wthread_min; i++) {
VSL_stats->n_wrk++;
AZ(pthread_create(&tp, NULL, wrk_thread, NULL));
AZ(pthread_detach(tp));
}
}
/*--------------------------------------------------------------------*/
void
cli_func_dump_pool(struct cli *cli, char **av, void *priv)
{
unsigned u;
struct sess *s;
time_t t;
(void)cli;
(void)av;
(void)priv;
struct worker *w;
LOCK(&wrk_mtx);
t = time(NULL);
TAILQ_FOREACH(w, &wrk_busy, list) {
cli_out(cli, "\n");
cli_out(cli, "W %p", w);
if (w->wrq == NULL)
continue;
s = w->wrq->sess;
if (s == NULL)
continue;
cli_out(cli, "sess %p fd %d xid %u step %d handling %d age %d",
s, s->fd, s->xid, s->step, s->handling,
t - s->t_req.tv_sec);
}
cli_out(cli, "\n");
u = 0;
TAILQ_FOREACH(w, &wrk_idle, list)
u++;
cli_out(cli, "%u idle workers\n", u);
UNLOCK(&wrk_mtx);
}
......@@ -36,6 +36,7 @@ struct params {
unsigned wthread_min;
unsigned wthread_max;
unsigned wthread_timeout;
unsigned wthread_pools;
/* Memory allocation hints */
unsigned mem_workspace;
......
......@@ -115,6 +115,18 @@ tweak_default_ttl(struct cli *cli, struct parspec *par, const char *arg)
tweak_generic_uint(cli, &params->default_ttl, arg, 0, UINT_MAX);
}
/*--------------------------------------------------------------------*/
static void
tweak_thread_pools(struct cli *cli, struct parspec *par, const char *arg)
{
(void)par;
tweak_generic_uint(cli, &params->wthread_pools, arg,
1, UINT_MAX);
}
/*--------------------------------------------------------------------*/
static void
......@@ -296,6 +308,9 @@ static struct parspec parspec[] = {
"To force an immediate effect at the expense of a total "
"flush of the cache use \"url.purge .\"",
"120", "seconds" },
{ "thread_pools", tweak_thread_pools,
"Number of thread pools.\n",
"1", "pools" },
{ "thread_pool_max", tweak_thread_pool_max,
"The maximum number of threads in the worker pool.\n"
"-1 is unlimited.\n"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment