Commit 946bc54d authored by Nils Goroll's avatar Nils Goroll

simplify thread control and use a condvar to wake up threads

parent 216fa71e
...@@ -294,6 +294,8 @@ vmod_random_add_backend(VRT_CTX, struct vmod_weightadjust_random *rr, ...@@ -294,6 +294,8 @@ vmod_random_add_backend(VRT_CTX, struct vmod_weightadjust_random *rr,
} }
ALLOC_OBJ(prop, WADJ_PROP_MAGIC); ALLOC_OBJ(prop, WADJ_PROP_MAGIC);
AZ(pthread_mutex_init(&prop->wakeup.mtx, NULL));
AZ(pthread_cond_init(&prop->wakeup.cv, NULL));
prop->vd = rr->vd; prop->vd = rr->vd;
prop->be = be; prop->be = be;
prop->vd_updates = &rr->extra.updates; prop->vd_updates = &rr->extra.updates;
......
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
#include "vsa.h" #include "vsa.h"
#include "vtim.h" #include "vtim.h"
#include "vtcp.h" #include "vtcp.h"
#include "vrnd.h"
#include "waiter/waiter.h" #include "waiter/waiter.h"
#include "cache/cache_director.h" #include "cache/cache_director.h"
...@@ -155,14 +156,15 @@ wadj_poke(const struct wadj_prop *prop, char *buf, ...@@ -155,14 +156,15 @@ wadj_poke(const struct wadj_prop *prop, char *buf,
POKE_ERR(prop, "read error: %s", strerror(errno)); POKE_ERR(prop, "read error: %s", strerror(errno));
} }
static void * static void *
wadj_update(void *arg) wadj_update(void *arg)
{ {
const struct wadj_prop *pr; struct wadj_prop *pa;
CAST_OBJ_NOTNULL(pr, arg, WADJ_PROP_MAGIC); CAST_OBJ_NOTNULL(pa, arg, WADJ_PROP_MAGIC);
struct wadj_prop_wakeup *wa = &pa->wakeup;
const struct wadj_prop *pr = pa;
struct vdir *vd; struct vdir *vd;
double t, due; struct timespec due;
unsigned u = UINT_MAX; unsigned u = UINT_MAX;
const int ovsz = 2 * 3; const int ovsz = 2 * 3;
int i, ov[ovsz]; int i, ov[ovsz];
...@@ -180,17 +182,19 @@ wadj_update(void *arg) ...@@ -180,17 +182,19 @@ wadj_update(void *arg)
AN(pr->req[0]); AN(pr->req[0]);
POKE_TRACE(pr, "starting"); POKE_TRACE(pr, "starting");
due = VTIM_mono();
due.tv_sec = (long)VTIM_real();
due.tv_nsec = 1e9 * VRND_RandomTestableDouble();
AZ(pthread_mutex_lock(&wa->mtx));
while (pr->run >= STARTING) { while (pr->run >= STARTING) {
t = VTIM_mono(); i = pthread_cond_timedwait(&wa->cv, &wa->mtx,
if (t < due) { &due);
VTIM_sleep(due - t); if (i == 0 || i == EINTR)
continue; continue;
} assert(i == ETIMEDOUT);
due += pr->interval; due.tv_sec += pr->interval;
wadj_poke(pr, buf, &len); wadj_poke(pr, buf, &len);
if (pr->run < STARTING)
break;
if (len == 0) if (len == 0)
continue; continue;
...@@ -249,6 +253,7 @@ wadj_update(void *arg) ...@@ -249,6 +253,7 @@ wadj_update(void *arg)
VPOKE_TRACE(pr, "set to %f total %f", w, vd->total_weight); VPOKE_TRACE(pr, "set to %f total %f", w, vd->total_weight);
vdir_unlock(vd); vdir_unlock(vd);
} }
AZ(pthread_mutex_unlock(&wa->mtx));
POKE_TRACE(pr, "stopping"); POKE_TRACE(pr, "stopping");
return NULL; return NULL;
} }
...@@ -261,6 +266,11 @@ wadj_thr_start(struct wadj_prop *prop) ...@@ -261,6 +266,11 @@ wadj_thr_start(struct wadj_prop *prop)
if (prop->run >= STARTING) if (prop->run >= STARTING)
return; return;
AZ(pthread_mutex_lock(&prop->wakeup.mtx));
if (prop->run >= STARTING) {
AZ(pthread_mutex_unlock(&prop->wakeup.mtx));
return;
}
AZ(pthread_attr_init(&attr)); AZ(pthread_attr_init(&attr));
// 32K JIT + 32K general + buf // 32K JIT + 32K general + buf
AZ(pthread_attr_setstacksize(&attr, 64 * 1024 + prop->bufsz)); AZ(pthread_attr_setstacksize(&attr, 64 * 1024 + prop->bufsz));
...@@ -268,17 +278,29 @@ wadj_thr_start(struct wadj_prop *prop) ...@@ -268,17 +278,29 @@ wadj_thr_start(struct wadj_prop *prop)
prop->run = STARTING; prop->run = STARTING;
AZ(pthread_create(&prop->thread, &attr, wadj_update, prop)); AZ(pthread_create(&prop->thread, &attr, wadj_update, prop));
prop->run = RUNNING; prop->run = RUNNING;
AZ(pthread_mutex_unlock(&prop->wakeup.mtx));
AZ(pthread_attr_destroy(&attr)); AZ(pthread_attr_destroy(&attr));
} }
static void static void
wadj_thr_stop(struct wadj_prop *prop) wadj_thr_stop(struct wadj_prop *prop)
{ {
if (prop->run <= STOPPED) if (prop->run <= STOPPING)
return; return;
AZ(pthread_mutex_lock(&prop->wakeup.mtx));
if (prop->run <= STOPPING) {
AZ(pthread_mutex_unlock(&prop->wakeup.mtx));
return;
}
prop->run = STOPPING; prop->run = STOPPING;
AZ(pthread_cond_signal(&prop->wakeup.cv));
AZ(pthread_mutex_unlock(&prop->wakeup.mtx));
AZ(pthread_join(prop->thread, NULL)); AZ(pthread_join(prop->thread, NULL));
prop->run = STOPPED; AZ(pthread_mutex_lock(&prop->wakeup.mtx));
if (prop->run == STOPPING)
prop->run = STOPPED;
AZ(pthread_mutex_unlock(&prop->wakeup.mtx));
} }
/* wa_vcl->mtx must be held */ /* wa_vcl->mtx must be held */
...@@ -315,16 +337,7 @@ wadj_thr_ctl(struct vmod_wadj_vcl *wa_vcl, ...@@ -315,16 +337,7 @@ wadj_thr_ctl(struct vmod_wadj_vcl *wa_vcl,
assert(wa_vcl->state == VCL_EVENT_COLD); assert(wa_vcl->state == VCL_EVENT_COLD);
VTAILQ_FOREACH(prop, &wa_vcl->props, list_vcl) { VTAILQ_FOREACH(prop, &wa_vcl->props, list_vcl) {
if (prop->run <= STOPPING) wadj_thr_stop(prop);
break;
assert(prop->run == RUNNING);
prop->run = STOPPING;
}
VTAILQ_FOREACH(prop, &wa_vcl->props, list_vcl) {
if (prop->run != STOPPING)
break;
AZ(pthread_join(prop->thread, NULL));
prop->run = STOPPED;
} }
} }
...@@ -343,6 +356,8 @@ wadj_prop_fini(struct wadj_prop **propp) ...@@ -343,6 +356,8 @@ wadj_prop_fini(struct wadj_prop **propp)
VRE_free(&prop->vre); VRE_free(&prop->vre);
AN(prop->req); AN(prop->req);
free(prop->req); free(prop->req);
AZ(pthread_cond_destroy(&prop->wakeup.cv));
AZ(pthread_mutex_destroy(&prop->wakeup.mtx));
FREE_OBJ(prop); FREE_OBJ(prop);
} }
...@@ -359,18 +374,9 @@ wadj_dir_fini(const struct wadj_prop_head *vd_props, ...@@ -359,18 +374,9 @@ wadj_dir_fini(const struct wadj_prop_head *vd_props,
{ {
struct wadj_prop *prop, *save; struct wadj_prop *prop, *save;
VTAILQ_FOREACH(prop, vd_props, list_vd) {
if (prop->run <= STOPPING)
break;
assert(prop->run == RUNNING);
prop->run = STOPPING;
}
VTAILQ_FOREACH_SAFE(prop, vd_props, list_vd, save) { VTAILQ_FOREACH_SAFE(prop, vd_props, list_vd, save) {
VTAILQ_REMOVE(vcl_props, prop, list_vcl); VTAILQ_REMOVE(vcl_props, prop, list_vcl);
if (prop->run == STOPPING) { wadj_thr_stop(prop);
AZ(pthread_join(prop->thread, NULL));
prop->run = STOPPED;
}
wadj_prop_fini(&prop); wadj_prop_fini(&prop);
AZ(prop); AZ(prop);
} }
......
...@@ -34,6 +34,11 @@ enum run_state_e { ...@@ -34,6 +34,11 @@ enum run_state_e {
RUNNING RUNNING
}; };
struct wadj_prop_wakeup {
pthread_mutex_t mtx;
pthread_cond_t cv;
};
struct wadj_prop { struct wadj_prop {
unsigned magic; unsigned magic;
#define WADJ_PROP_MAGIC 0xa05991ff #define WADJ_PROP_MAGIC 0xa05991ff
...@@ -46,6 +51,7 @@ struct wadj_prop { ...@@ -46,6 +51,7 @@ struct wadj_prop {
int *vd_updates; int *vd_updates;
pthread_t thread; pthread_t thread;
struct wadj_prop_wakeup wakeup;
enum run_state_e run; enum run_state_e run;
vre_t *vre; vre_t *vre;
char *req; char *req;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment