Commit 7e25234d authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Overhaul the thread-pool herding code.

NB: Changes to parameter defaults & units included in this commit!

I tried to apply some queuing theory to this problem and as much as
I admire Agner Krarup Erlang, his math isn't much help when you don't
know any of the relevant metrics for your queue.

Instead I took a much simpler approach:  "If we fail to get a thread,
we probably need more threads", and have rewritten the herder to to
react faster and more reliably to such events.

I went over the parameters for thread-pools and normalized timeouts
to seconds rather than milliseconds (beware!) and polished descriptions
etc.
parent 590299ee
......@@ -31,13 +31,6 @@
* Pools can be added on the fly, as a means to mitigate lock contention,
* but can only be removed again by a restart. (XXX: we could fix that)
*
* Two threads herd the pools, one eliminates idle threads and aggregates
* statistics for all the pools, the other thread creates new threads
* on demand, subject to various numerical constraints.
*
* The algorithm for when to create threads needs to be reactive enough
* to handle startup spikes, but sufficiently attenuated to not cause
* thread pileups. This remains subject for improvement.
*/
#include "config.h"
......@@ -50,36 +43,6 @@
#include "vtim.h"
/*--------------------------------------------------------------------
* MAC OS/X is incredibly moronic when it comes to time and such...
*/
#ifndef CLOCK_MONOTONIC
#define CLOCK_MONOTONIC 0
#include <sys/time.h>
static int
clock_gettime(int foo, struct timespec *ts)
{
struct timeval tv;
(void)foo;
gettimeofday(&tv, NULL);
ts->tv_sec = tv.tv_sec;
ts->tv_nsec = tv.tv_usec * 1000;
return (0);
}
static int
pthread_condattr_setclock(pthread_condattr_t *attr, int foo)
{
(void)attr;
(void)foo;
return (0);
}
#endif /* !CLOCK_MONOTONIC */
VTAILQ_HEAD(taskhead, pool_task);
struct poolsock {
......@@ -97,7 +60,6 @@ struct pool {
VTAILQ_ENTRY(pool) list;
pthread_cond_t herder_cond;
struct lock herder_mtx;
pthread_t herder_thr;
struct vxid vxid;
......@@ -107,8 +69,8 @@ struct pool {
struct taskhead front_queue;
struct taskhead back_queue;
unsigned nthr;
unsigned dry;
unsigned lqueue;
unsigned last_lqueue;
uintmax_t ndropped;
uintmax_t nqueued;
struct sesspool *sesspool;
......@@ -121,19 +83,21 @@ static pthread_t thr_pool_herder;
*/
static struct worker *
pool_getidleworker(const struct pool *pp, int back)
pool_getidleworker(struct pool *pp)
{
struct pool_task *pt;
struct worker *wrk;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
Lck_AssertHeld(&pp->mtx);
if (back)
pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
else
pt = VTAILQ_FIRST(&pp->idle_queue);
if (pt == NULL)
if (pt == NULL) {
if (pp->nthr < cache_param->wthread_max) {
pp->dry++;
AZ(pthread_cond_signal(&pp->herder_cond));
}
return (NULL);
}
AZ(pt->func);
CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
return (wrk);
......@@ -185,7 +149,7 @@ pool_accept(struct worker *wrk, void *arg)
Lck_Lock(&pp->mtx);
wa->vxid = VXID_Get(&pp->vxid);
wrk2 = pool_getidleworker(pp, 0);
wrk2 = pool_getidleworker(pp);
if (wrk2 == NULL) {
/* No idle threads, do it ourselves */
Lck_Unlock(&pp->mtx);
......@@ -225,7 +189,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
* The common case first: Take an idle thread, do it.
*/
wrk = pool_getidleworker(pp, 0);
wrk = pool_getidleworker(pp);
if (wrk != NULL) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
AZ(wrk->task.func);
......@@ -242,7 +206,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
break;
case POOL_QUEUE_FRONT:
/* If we have too much in the queue already, refuse. */
if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
if (pp->lqueue > cache_param->wthread_queue_limit) {
pp->ndropped++;
retval = -1;
} else {
......@@ -258,8 +222,6 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
WRONG("Unknown enum pool_how");
}
Lck_Unlock(&pp->mtx);
if (retval)
AZ(pthread_cond_signal(&pp->herder_cond));
return (retval);
}
......@@ -320,7 +282,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
}
/*--------------------------------------------------------------------
* Create another thread, if necessary & possible
* Create another thread.
*/
static void
......@@ -328,35 +290,23 @@ pool_breed(struct pool *qp, const pthread_attr_t *tp_attr)
{
pthread_t tp;
/*
* If we need more threads, and have space, create
* one more thread.
*/
if (qp->nthr < cache_param->wthread_min || /* Not enough threads yet */
(qp->lqueue > cache_param->wthread_add_threshold && /* need more */
qp->lqueue > qp->last_lqueue)) { /* not getting better since last */
if (qp->nthr > cache_param->wthread_max) {
Lck_Lock(&pool_mtx);
VSC_C_main->threads_limited++;
Lck_Unlock(&pool_mtx);
} else if (pthread_create(&tp, tp_attr, WRK_thread, qp)) {
if (pthread_create(&tp, tp_attr, WRK_thread, qp)) {
VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
errno, strerror(errno));
Lck_Lock(&pool_mtx);
VSC_C_main->threads_failed++;
Lck_Unlock(&pool_mtx);
VTIM_sleep(cache_param->wthread_fail_delay * 1e-3);
VTIM_sleep(cache_param->wthread_fail_delay);
} else {
AZ(pthread_detach(tp));
VTIM_sleep(cache_param->wthread_add_delay * 1e-3);
qp->dry = 0;
qp->nthr++;
Lck_Lock(&pool_mtx);
VSC_C_main->threads++;
VSC_C_main->threads_created++;
Lck_Unlock(&pool_mtx);
VTIM_sleep(cache_param->wthread_add_delay);
}
}
qp->last_lqueue = qp->lqueue;
}
/*--------------------------------------------------------------------
......@@ -378,11 +328,10 @@ static void*
pool_herder(void *priv)
{
struct pool *pp;
struct pool_task *pt;
pthread_attr_t tp_attr;
struct timespec ts;
double t_idle;
struct worker *wrk;
int i;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
AZ(pthread_attr_init(&tp_attr));
......@@ -397,42 +346,36 @@ pool_herder(void *priv)
AZ(pthread_attr_init(&tp_attr));
}
/* Make more threads if needed and allowed */
if (pp->nthr < cache_param->wthread_min ||
(pp->dry && pp->nthr < cache_param->wthread_max)) {
pool_breed(pp, &tp_attr);
if (pp->nthr < cache_param->wthread_min)
continue;
AZ(clock_gettime(CLOCK_MONOTONIC, &ts));
ts.tv_sec += cache_param->wthread_purge_delay / 1000;
ts.tv_nsec +=
(cache_param->wthread_purge_delay % 1000) * 1000000;
if (ts.tv_nsec >= 1000000000) {
ts.tv_sec++;
ts.tv_nsec -= 1000000000;
}
Lck_Lock(&pp->herder_mtx);
i = Lck_CondWait(&pp->herder_cond, &pp->herder_mtx, &ts);
Lck_Unlock(&pp->herder_mtx);
if (!i)
continue;
if (pp->nthr <= cache_param->wthread_min)
continue;
if (pp->nthr > cache_param->wthread_min) {
t_idle = VTIM_real() - cache_param->wthread_timeout;
Lck_Lock(&pp->mtx);
/* XXX: unsafe counters */
VSC_C_main->sess_queued += pp->nqueued;
VSC_C_main->sess_dropped += pp->ndropped;
pp->nqueued = pp->ndropped = 0;
wrk = pool_getidleworker(pp, 1);
if (wrk != NULL && (wrk->lastused < t_idle ||
pp->nthr > cache_param->wthread_max)) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
AZ(wrk->task.func);
} else
wrk = NULL;
pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
if (pt != NULL) {
AZ(pt->func);
CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
if (wrk->lastused < t_idle ||
pp->nthr > cache_param->wthread_max)
VTAILQ_REMOVE(&pp->idle_queue,
&wrk->task, list);
else
wrk = NULL;
}
Lck_Unlock(&pp->mtx);
/* And give it a kiss on the cheek... */
......@@ -444,8 +387,15 @@ pool_herder(void *priv)
Lck_Unlock(&pool_mtx);
wrk->task.func = NULL;
wrk->task.priv = NULL;
AZ(pthread_cond_signal(&wrk->cond));
VTIM_sleep(cache_param->wthread_destroy_delay);
continue;
}
}
Lck_Lock(&pp->mtx);
if (!pp->dry)
(void)Lck_CondWait(&pp->herder_cond, &pp->mtx, NULL);
Lck_Unlock(&pp->mtx);
}
NEEDLESS_RETURN(NULL);
}
......@@ -460,10 +410,10 @@ pool_mkpool(unsigned pool_no)
struct pool *pp;
struct listen_sock *ls;
struct poolsock *ps;
pthread_condattr_t cv_attr;
ALLOC_OBJ(pp, POOL_MAGIC);
XXXAN(pp);
if (pp == NULL)
return (NULL);
Lck_New(&pp->mtx, lck_wq);
VTAILQ_INIT(&pp->idle_queue);
......@@ -483,11 +433,7 @@ pool_mkpool(unsigned pool_no)
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
AZ(pthread_condattr_init(&cv_attr));
AZ(pthread_condattr_setclock(&cv_attr, CLOCK_MONOTONIC));
AZ(pthread_cond_init(&pp->herder_cond, &cv_attr));
AZ(pthread_condattr_destroy(&cv_attr));
Lck_New(&pp->herder_mtx, lck_herder);
AZ(pthread_cond_init(&pp->herder_cond, NULL));
AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp));
return (pp);
......
......@@ -62,16 +62,15 @@ struct params {
/* Worker threads and pool */
unsigned wthread_min;
unsigned wthread_max;
unsigned wthread_timeout;
double wthread_timeout;
unsigned wthread_pools;
unsigned wthread_add_threshold;
unsigned wthread_add_delay;
unsigned wthread_fail_delay;
unsigned wthread_purge_delay;
unsigned wthread_stats_rate;
double wthread_add_delay;
double wthread_fail_delay;
double wthread_destroy_delay;
double wthread_stats_rate;
ssize_t wthread_stacksize;
unsigned queue_max;
unsigned wthread_queue_limit;
/* Memory allocation hints */
unsigned workspace_client;
......
......@@ -93,7 +93,7 @@ tweak_generic_timeout(struct cli *cli, volatile unsigned *dst, const char *arg)
/*--------------------------------------------------------------------*/
void
static void
tweak_timeout(struct cli *cli, const struct parspec *par, const char *arg)
{
volatile unsigned *dest;
......@@ -137,7 +137,7 @@ tweak_generic_timeout_double(struct cli *cli, volatile double *dest,
return (0);
}
static void
void
tweak_timeout_double(struct cli *cli, const struct parspec *par,
const char *arg)
{
......
......@@ -53,7 +53,7 @@ struct parspec {
int tweak_generic_uint(struct cli *cli,
volatile unsigned *dest, const char *arg, unsigned min, unsigned max);
void tweak_uint(struct cli *cli, const struct parspec *par, const char *arg);
void tweak_timeout(struct cli *cli,
void tweak_timeout_double(struct cli *cli,
const struct parspec *par, const char *arg);
void tweak_bytes(struct cli *cli, const struct parspec *par, const char *arg);
......
......@@ -97,7 +97,8 @@ tweak_thread_pool_max(struct cli *cli, const struct parspec *par,
/*--------------------------------------------------------------------*/
const struct parspec WRK_parspec[] = {
{ "thread_pools", tweak_uint, &mgt_param.wthread_pools, 1, UINT_MAX,
{ "thread_pools", tweak_uint, &mgt_param.wthread_pools,
1, UINT_MAX,
"Number of worker thread pools.\n"
"\n"
"Increasing number of worker pools decreases lock "
......@@ -110,71 +111,69 @@ const struct parspec WRK_parspec[] = {
"restart to take effect.",
EXPERIMENTAL | DELAYED_EFFECT,
"2", "pools" },
{ "thread_pool_max", tweak_thread_pool_max, NULL, 1, 0,
{ "thread_pool_max", tweak_thread_pool_max, NULL, 10, 0,
"The maximum number of worker threads in each pool.\n"
"\n"
"Do not set this higher than you have to, since excess "
"worker threads soak up RAM and CPU and generally just get "
"in the way of getting work done.\n",
EXPERIMENTAL | DELAYED_EFFECT,
"500", "threads" },
{ "thread_pool_min", tweak_thread_pool_min, NULL, 2, 0,
"in the way of getting work done.\n"
"\n"
"Minimum is 10 threads.",
DELAYED_EFFECT,
"5000", "threads" },
{ "thread_pool_min", tweak_thread_pool_min, NULL, 10, 0,
"The minimum number of worker threads in each pool.\n"
"\n"
"Increasing this may help ramp up faster from low load "
"situations where threads have expired.\n"
"\n"
"Minimum is 2 threads.",
EXPERIMENTAL | DELAYED_EFFECT,
"5", "threads" },
{ "thread_pool_timeout", tweak_timeout, &mgt_param.wthread_timeout,
1, 0,
"situations or when threads have expired.\n"
"\n"
"Minimum is 10 threads.",
DELAYED_EFFECT,
"100", "threads" },
{ "thread_pool_timeout",
tweak_timeout_double, &mgt_param.wthread_timeout,
10, UINT_MAX,
"Thread idle threshold.\n"
"\n"
"Threads in excess of thread_pool_min, which have been idle "
"for at least this long are candidates for purging.\n"
"for at least this long, will be destroyed.\n"
"\n"
"Minimum is 1 second.",
"Minimum is 10 seconds.",
EXPERIMENTAL | DELAYED_EFFECT,
"300", "seconds" },
{ "thread_pool_purge_delay",
tweak_timeout, &mgt_param.wthread_purge_delay, 100, 0,
"Wait this long between purging threads.\n"
{ "thread_pool_destroy_delay",
tweak_timeout_double, &mgt_param.wthread_destroy_delay,
0.01, UINT_MAX,
"Wait this long after destroying a thread.\n"
"\n"
"This controls the decay of thread pools when idle(-ish).\n"
"\n"
"Minimum is 100 milliseconds.",
"Minimum is 0.01 second.",
EXPERIMENTAL | DELAYED_EFFECT,
"1000", "milliseconds" },
{ "thread_pool_add_threshold",
tweak_uint, &mgt_param.wthread_add_threshold, 0, UINT_MAX,
"Overflow threshold for worker thread creation.\n"
"\n"
"Setting this too low, will result in excess worker threads, "
"which is generally a bad idea.\n"
"\n"
"Setting it too high results in insuffient worker threads.\n",
EXPERIMENTAL,
"2", "requests" },
"1", "seconds" },
{ "thread_pool_add_delay",
tweak_timeout, &mgt_param.wthread_add_delay, 0, UINT_MAX,
"Wait at least this long between creating threads.\n"
tweak_timeout_double, &mgt_param.wthread_add_delay,
0, UINT_MAX,
"Wait at least this long after creating a thread.\n"
"\n"
"Setting this too long results in insuffient worker threads.\n"
"Some (buggy) systems may need a short (sub-second) "
"delay between creating threads.\n"
"Set this to a few milliseconds if you see the "
"'threads_failed' counter grow too much.\n"
"\n"
"Setting this too short increases the risk of worker "
"thread pile-up.\n",
0,
"2", "milliseconds" },
"Setting this too high results in insuffient worker threads.\n",
EXPERIMENTAL,
"0", "seconds" },
{ "thread_pool_fail_delay",
tweak_timeout, &mgt_param.wthread_fail_delay, 100, UINT_MAX,
tweak_timeout_double, &mgt_param.wthread_fail_delay,
10e-3, UINT_MAX,
"Wait at least this long after a failed thread creation "
"before trying to create another thread.\n"
"\n"
"Failure to create a worker thread is often a sign that "
" the end is near, because the process is running out of "
"RAM resources for thread stacks.\n"
"This delay tries to not rush it on needlessly.\n"
"some resource. "
"This delay tries to not rush the end on needlessly.\n"
"\n"
"If thread creation failures are a problem, check that "
"thread_pool_max is not too high.\n"
......@@ -183,7 +182,7 @@ const struct parspec WRK_parspec[] = {
"thread_pool_min, to reduce the rate at which treads are "
"destroyed and later recreated.\n",
EXPERIMENTAL,
"200", "milliseconds" },
"0.2", "seconds" },
{ "thread_stats_rate",
tweak_uint, &mgt_param.wthread_stats_rate, 0, UINT_MAX,
"Worker threads accumulate statistics, and dump these into "
......@@ -194,13 +193,15 @@ const struct parspec WRK_parspec[] = {
"its accumulated stats into the global counters.\n",
EXPERIMENTAL,
"10", "requests" },
{ "queue_max", tweak_uint, &mgt_param.queue_max, 0, UINT_MAX,
"Percentage permitted queue length.\n"
{ "thread_queue_limit", tweak_uint, &mgt_param.wthread_queue_limit,
0, UINT_MAX,
"Permitted queue length per thread-pool.\n"
"\n"
"This sets the ratio of queued requests to worker threads, "
"above which sessions will be dropped instead of queued.\n",
"This sets the number of requests we will queue, waiting "
"for an available thread. Above this limit sessions will "
"be dropped instead of queued.\n",
EXPERIMENTAL,
"100", "%" },
"20", "" },
{ "rush_exponent", tweak_uint, &mgt_param.rush_exponent, 2, UINT_MAX,
"How many parked request we start for each completed "
"request on the object.\n"
......
......@@ -5,7 +5,7 @@ server s1 {
txresp -hdr "Connection: close" -body "012345\n"
} -start
varnish v1 -arg "-p thread_pool_min=2 -p thread_pool_max=8 -p thread_pools=4 -p thread_pool_purge_delay=100 -p thread_pool_timeout=1 -p thread_pool_add_delay=100"
varnish v1 -arg "-p thread_pool_min=10 -p thread_pool_max=10 -p thread_pools=2"
varnish v1 -vcl+backend {} -start
......@@ -16,4 +16,4 @@ client c1 {
expect resp.status == 200
} -run
varnish v1 -expect threads == 8
varnish v1 -expect threads == 20
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment