Commit 385930c3 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Rework the backend/waiter stuff.

Instead of trying to steal vbc's away from the waiter, send the
request and wait for the waiter to hand the vbc back to us.

All waiters but poll still disabled ... coming up next.
parent f2c1e642
......@@ -78,7 +78,7 @@ VBE_Healthy(const struct backend *backend, double *changed)
*/
static int __match_proto__(vdi_getfd_f)
vbe_dir_getfd(const struct director *d, struct busyobj *bo)
vbe_dir_getfd(struct worker *wrk, const struct director *d, struct busyobj *bo)
{
struct vbc *vc;
struct backend *bp;
......@@ -87,6 +87,7 @@ vbe_dir_getfd(const struct director *d, struct busyobj *bo)
char abuf1[VTCP_ADDRBUFSIZE], abuf2[VTCP_ADDRBUFSIZE];
char pbuf1[VTCP_PORTBUFSIZE], pbuf2[VTCP_PORTBUFSIZE];
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(d, DIRECTOR_MAGIC);
CAST_OBJ_NOTNULL(bp, d->priv, BACKEND_MAGIC);
......@@ -112,7 +113,7 @@ vbe_dir_getfd(const struct director *d, struct busyobj *bo)
return (-1);
FIND_TMO(connect_timeout, tmod, bo, vrt);
vc = VBT_Get(bp->tcp_pool, tmod);
vc = VBT_Get(bp->tcp_pool, tmod, bp, wrk);
if (vc == NULL) {
// XXX: Per backend stats ?
VSC_C_main->backend_fail++;
......@@ -197,7 +198,7 @@ static int __match_proto__(vdi_gethdrs_f)
vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
struct busyobj *bo)
{
int i;
int i, extrachance = 0;
const struct vrt_backend *vrt;
CHECK_OBJ_NOTNULL(d, DIRECTOR_MAGIC);
......@@ -205,12 +206,14 @@ vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CAST_OBJ_NOTNULL(vrt, d->priv2, VRT_BACKEND_MAGIC);
i = vbe_dir_getfd(d, bo);
i = vbe_dir_getfd(wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
return (-1);
}
AN(bo->htc);
if (bo->htc->vbc->state == VBC_STATE_STOLEN)
extrachance = 1;
i = V1F_fetch_hdr(wrk, bo, vrt->hosthdr);
/*
......@@ -218,12 +221,12 @@ vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
* that the backend closed it before we get a request to it.
* Do a single retry in that case.
*/
if (i == 1 && bo->htc->vbc->recycled) {
if (i == 1 && extrachance) {
vbe_dir_finish(d, wrk, bo);
AZ(bo->htc);
VSC_C_main->backend_retry++;
bo->doclose = SC_NULL;
i = vbe_dir_getfd(d, bo);
i = vbe_dir_getfd(wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
bo->htc = NULL;
......@@ -281,13 +284,15 @@ vbe_dir_http1pipe(const struct director *d, struct req *req, struct busyobj *bo)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
i = vbe_dir_getfd(d, bo);
i = vbe_dir_getfd(req->wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
SES_Close(req->sp, SC_RX_TIMEOUT);
return;
} else {
V1P_Process(req, bo, i);
vbe_dir_finish(d, req->wrk, bo);
}
V1P_Process(req, bo, i);
vbe_dir_finish(d, bo->wrk, bo);
}
/*--------------------------------------------------------------------*/
......
......@@ -88,21 +88,19 @@ struct backend {
struct vbc {
unsigned magic;
#define VBC_MAGIC 0x0c5e6592
VTAILQ_ENTRY(vbc) list;
int fd;
VTAILQ_ENTRY(vbc) list;
const struct suckaddr *addr;
uint8_t recycled;
uint8_t state;
#define VBC_STATE_AVAIL (1<<0)
#define VBC_STATE_USED (1<<1)
#define VBC_STATE_STOLEN (1<<2)
#define VBC_STATE_CLEANUP (1<<3)
uint8_t in_waiter;
uint8_t have_been_in_waiter;
struct waited waited[1];
struct tcp_pool *tcp_pool;
struct backend *backend;
struct worker *wrk;
};
/* cache_backend_cfg.c */
......@@ -123,6 +121,7 @@ void VBT_Rel(struct tcp_pool **tpp);
int VBT_Open(const struct tcp_pool *tp, double tmo, const struct suckaddr **sa);
void VBT_Recycle(struct tcp_pool *tp, struct vbc **vbc);
void VBT_Close(struct tcp_pool *tp, struct vbc **vbc);
struct vbc *VBT_Get(struct tcp_pool *tp, double tmo);
struct vbc *VBT_Get(struct tcp_pool *, double tmo, struct backend *,
struct worker *);
void VBT_Wait(struct worker *wrk, const struct vbc *vbc);
......@@ -44,7 +44,6 @@
#include "waiter/waiter.h"
#include "vtim.h"
struct tcp_pool {
unsigned magic;
#define TCP_POOL_MAGIC 0x28b0e42a
......@@ -89,24 +88,14 @@ tcp_handle(struct waited *w, enum wait_event ev, double now)
tp = vbc->tcp_pool;
Lck_Lock(&tp->mtx);
VSL(SLT_Debug, 0,
"------> Handler fd %d in_w %d state 0x%x ev %d have_been %d",
vbc->fd, vbc->in_waiter, vbc->state, ev, vbc->have_been_in_waiter);
AN(vbc->in_waiter);
switch(vbc->state) {
case VBC_STATE_STOLEN:
vbc->state = VBC_STATE_AVAIL;
vbc->state = VBC_STATE_USED;
VTAILQ_REMOVE(&tp->connlist, vbc, list);
if (Wait_Enter(tp->waiter, vbc->waited)) {
VSL(SLT_Debug, 0,
"------> Handler stolen -> re-wait failed");
VTCP_close(&vbc->fd);
tp->n_conn--;
FREE_OBJ(vbc);
} else {
VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
}
CHECK_OBJ_NOTNULL(vbc->backend, BACKEND_MAGIC);
CHECK_OBJ_NOTNULL(vbc->wrk, WORKER_MAGIC);
AZ(pthread_cond_signal(&vbc->wrk->cond));
break;
case VBC_STATE_AVAIL:
VTCP_close(&vbc->fd);
......@@ -114,10 +103,6 @@ tcp_handle(struct waited *w, enum wait_event ev, double now)
tp->n_conn--;
FREE_OBJ(vbc);
break;
case VBC_STATE_USED:
vbc->in_waiter = 0;
vbc->have_been_in_waiter = 1;
break;
case VBC_STATE_CLEANUP:
VTCP_close(&vbc->fd);
tp->n_kill--;
......@@ -208,16 +193,11 @@ VBT_Rel(struct tcp_pool **tpp)
VTAILQ_FOREACH_SAFE(vbc, &tp->connlist, list, vbc2) {
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
if (vbc->in_waiter) {
vbc->state = VBC_STATE_CLEANUP;
shutdown(vbc->fd, SHUT_WR);
VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
tp->n_kill++;
} else {
VTCP_close(&vbc->fd);
memset(vbc, 0x22, sizeof *vbc);
free(vbc);
}
assert(vbc->state == VBC_STATE_AVAIL);
vbc->state = VBC_STATE_CLEANUP;
(void)shutdown(vbc->fd, SHUT_WR);
VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
tp->n_kill++;
}
while (tp->n_kill) {
Lck_Unlock(&tp->mtx);
......@@ -279,39 +259,30 @@ VBT_Recycle(struct tcp_pool *tp, struct vbc **vbcp)
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
AZ(vbc->backend);
Lck_Lock(&tp->mtx);
tp->n_used--;
VSL(SLT_Debug, 0, "------> Recycle fd %d in_w %d",
vbc->fd, vbc->in_waiter);
if (!vbc->in_waiter) {
vbc->in_waiter = 1;
vbc->waited->ptr = vbc;
vbc->waited->fd = vbc->fd;
vbc->waited->idle = VTIM_real();
vbc->state = VBC_STATE_AVAIL;
VSL(SLT_Debug, 0, "------> Recycle fd %d Wait_Enter", vbc->fd);
if (Wait_Enter(tp->waiter, vbc->waited)) {
VTCP_close(&vbc->fd);
memset(vbc, 0x33, sizeof *vbc);
free(vbc);
vbc = NULL;
} else {
VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
}
i = 1;
vbc->waited->ptr = vbc;
vbc->waited->fd = vbc->fd;
vbc->waited->idle = VTIM_real();
vbc->state = VBC_STATE_AVAIL;
if (Wait_Enter(tp->waiter, vbc->waited)) {
VTCP_close(&vbc->fd);
memset(vbc, 0x33, sizeof *vbc);
free(vbc);
// XXX: stats
vbc = NULL;
} else {
vbc->state = VBC_STATE_STOLEN;
VTAILQ_INSERT_TAIL(&tp->connlist, vbc, list);
VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
i++;
}
if (vbc != NULL) {
if (vbc != NULL)
tp->n_conn++;
vbc->recycled = 1;
}
Lck_Unlock(&tp->mtx);
if (i && DO_DEBUG(DBG_VTC_MODE)) {
/*
* In varnishtest we do not have the luxury of using
......@@ -346,17 +317,17 @@ VBT_Close(struct tcp_pool *tp, struct vbc **vbcp)
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
VSL(SLT_Debug, 0, "------> Close fd %d in_w %d",
vbc->fd, vbc->in_waiter);
AZ(vbc->backend);
Lck_Lock(&tp->mtx);
tp->n_used--;
if (vbc->in_waiter) {
shutdown(vbc->fd, SHUT_WR);
if (vbc->state == VBC_STATE_STOLEN) {
(void)shutdown(vbc->fd, SHUT_WR);
vbc->state = VBC_STATE_CLEANUP;
VTAILQ_INSERT_HEAD(&tp->killlist, vbc, list);
tp->n_kill++;
} else {
assert(vbc->state == VBC_STATE_USED);
VTCP_close(&vbc->fd);
memset(vbc, 0x44, sizeof *vbc);
free(vbc);
......@@ -369,27 +340,29 @@ VBT_Close(struct tcp_pool *tp, struct vbc **vbcp)
*/
struct vbc *
VBT_Get(struct tcp_pool *tp, double tmo)
VBT_Get(struct tcp_pool *tp, double tmo, struct backend *be, struct worker *wrk)
{
struct vbc *vbc;
CHECK_OBJ_NOTNULL(tp, TCP_POOL_MAGIC);
CHECK_OBJ_NOTNULL(be, BACKEND_MAGIC);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
Lck_Lock(&tp->mtx);
vbc = VTAILQ_FIRST(&tp->connlist);
if (vbc != NULL) {
CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
VSL(SLT_Debug, 0, "------> Steal fd %d state 0x%x",
vbc->fd, vbc->state);
assert(vbc->state == VBC_STATE_AVAIL ||
vbc->state == VBC_STATE_STOLEN);
CHECK_OBJ_ORNULL(vbc, VBC_MAGIC);
if (vbc == NULL || vbc->backend != NULL)
vbc = NULL;
else {
assert(vbc->tcp_pool == tp);
assert(vbc->state == VBC_STATE_AVAIL);
VTAILQ_REMOVE(&tp->connlist, vbc, list);
VTAILQ_INSERT_TAIL(&tp->connlist, vbc, list);
tp->n_conn--;
VSC_C_main->backend_reuse += 1;
vbc->state = VBC_STATE_USED;
assert(vbc->tcp_pool == tp);
vbc->state = VBC_STATE_STOLEN;
vbc->backend = be;
vbc->wrk = wrk;
}
tp->n_used++; // Opening mostly works
Lck_Unlock(&tp->mtx);
......@@ -402,15 +375,33 @@ VBT_Get(struct tcp_pool *tp, double tmo)
INIT_OBJ(vbc->waited, WAITED_MAGIC);
vbc->state = VBC_STATE_USED;
vbc->tcp_pool = tp;
vbc->backend = be;
vbc->fd = VBT_Open(tp, tmo, &vbc->addr);
if (vbc->fd < 0)
FREE_OBJ(vbc);
if (vbc == NULL) {
VSL(SLT_Debug, 0, "------> No new fd");
Lck_Lock(&tp->mtx);
tp->n_used--; // Nope, didn't work after all.
Lck_Unlock(&tp->mtx);
} else
VSL(SLT_Debug, 0, "------> New fd %d", vbc->fd);
}
return (vbc);
}
/*--------------------------------------------------------------------
*/
void
VBT_Wait(struct worker *wrk, const struct vbc *vbc)
{
struct tcp_pool *tp;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
tp = vbc->tcp_pool;
CHECK_OBJ_NOTNULL(tp, TCP_POOL_MAGIC);
assert(vbc->wrk == wrk);
Lck_Lock(&tp->mtx);
while (vbc->state == VBC_STATE_STOLEN)
AZ(Lck_CondWait(&wrk->cond, &tp->mtx, 0));
Lck_Unlock(&tp->mtx);
}
......@@ -37,6 +37,7 @@
#include "hash/hash_slinger.h"
#include "cache/cache_backend.h"
#include "cache/cache_director.h"
#include "vcli_priv.h"
#include "vtcp.h"
......@@ -144,6 +145,11 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, const char *def_host)
/* Receive response */
if (htc->vbc->state != VBC_STATE_USED)
VBT_Wait(wrk, htc->vbc);
assert(htc->vbc->state == VBC_STATE_USED);
SES_RxInit(htc, bo->ws, cache_param->http_resp_size,
cache_param->http_resp_hdr_len);
CHECK_OBJ_NOTNULL(htc, HTTP_CONN_MAGIC);
......
......@@ -105,6 +105,7 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
wrk = req->wrk;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
assert(fd > 0);
req->res_mode = RES_PIPE;
......@@ -112,12 +113,8 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
acct_pipe.req = req->acct.req_hdrbytes;
req->acct.req_hdrbytes = 0;
if (fd < 0) {
pipecharge(req, &acct_pipe, NULL);
SES_Close(req->sp, SC_OVERLOAD);
return;
}
CHECK_OBJ_NOTNULL(bo->htc, HTTP_CONN_MAGIC);
CHECK_OBJ_NOTNULL(bo->htc->vbc, VBC_MAGIC);
bo->wrk = req->wrk;
bo->director_state = DIR_S_BODY;
(void)VTCP_blocking(fd);
......@@ -134,6 +131,9 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
VSLb_ts_req(req, "Pipe", W_TIM_real(wrk));
if (i == 0) {
if (bo->htc->vbc->state == VBC_STATE_STOLEN)
VBT_Wait(req->wrk, bo->htc->vbc);
memset(fds, 0, sizeof fds);
fds[0].fd = fd;
fds[0].events = POLLIN | POLLERR;
......
......@@ -62,7 +62,16 @@ struct vwp {
static void
vwp_extend_pollspace(struct vwp *vwp)
{
size_t inc = (1<<16);
size_t inc;
if (vwp->npoll < (1<<12))
inc = (1<<10);
else if (vwp->npoll < (1<<14))
inc = (1<<12);
else if (vwp->npoll < (1<<16))
inc = (1<<14);
else
inc = (1<<16);
VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
inc, vwp->npoll + inc);
......@@ -86,6 +95,8 @@ static void
vwp_add(struct vwp *vwp, struct waited *w)
{
CHECK_OBJ_NOTNULL(w, WAITED_MAGIC);
CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
if (vwp->hpoll == vwp->npoll)
vwp_extend_pollspace(vwp);
assert(vwp->hpoll < vwp->npoll);
......@@ -123,9 +134,15 @@ vwp_dopipe(struct vwp *vwp)
assert(ss > 0);
i = 0;
while (ss) {
if (w[i] == NULL) {
assert(ss == sizeof w[0]);
assert(vwp->hpoll == 1);
pthread_exit(NULL);
}
CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
assert(w[i]->fd > 0); // no stdin
vwp_add(vwp, w[i++]);
ss -= sizeof w[0];
}
}
......@@ -134,47 +151,49 @@ vwp_dopipe(struct vwp *vwp)
static void *
vwp_main(void *priv)
{
int v, v2;
int v;
struct vwp *vwp;
struct waited *wp;
double now, idle;
int i;
int i, dopipe;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
while (1) {
// Try to keep the high point as low as possible
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
// XXX: sleep on ->tmo
v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
v = poll(vwp->pollfd, vwp->hpoll,
(int)floor(1e3 * *vwp->waiter->tmo));
assert(v >= 0);
v2 = v;
if (v == 0)
v = vwp->hpoll;
now = VTIM_real();
idle = now - *vwp->waiter->tmo;
i = 1;
while (v2 > 0 && i < vwp->hpoll) {
i = 0;
dopipe = 0;
while (v > 0 && i < vwp->hpoll) {
if (vwp->pollfd[i].revents)
v--;
if (vwp->pollfd[i].fd == vwp->pipes[0]) {
if (vwp->pollfd[i].revents)
dopipe = 1;
i++;
continue;
}
wp = vwp->idx[i];
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
if (vwp->pollfd[i].revents != 0) {
v2--;
if (wp->idle <= idle) {
vwp->waiter->func(wp, WAITER_TIMEOUT, now);
vwp_del(vwp, i);
} else if (vwp->pollfd[i].revents & POLLIN) {
assert(wp->fd > 0);
assert(wp->fd == vwp->pollfd[i].fd);
VSL(SLT_Debug, wp->fd, "POLL Handle %d %x",
wp->fd, vwp->pollfd[i].revents);
vwp_del(vwp, i);
vwp->waiter->func(wp, WAITER_ACTION, now);
} else if (wp->idle <= idle) {
vwp_del(vwp, i);
vwp->waiter->func(wp, WAITER_TIMEOUT, now);
} else {
i++;
}
}
if (vwp->pollfd[0].revents)
if (dopipe)
vwp_dopipe(vwp);
}
NEEDLESS_RETURN(NULL);
......@@ -208,10 +227,10 @@ vwp_init(struct waiter *w)
AZ(pipe(vwp->pipes));
// XXX: set write pipe non-blocking
vwp->hpoll = 1;
vwp_extend_pollspace(vwp);
vwp->pollfd[0].fd = vwp->pipes[0];
vwp->pollfd[0].events = POLLIN;
vwp->hpoll = 1;
AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
}
......@@ -228,7 +247,10 @@ vwp_fini(struct waiter *w)
// XXX: set write pipe blocking
assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
AZ(pthread_join(vwp->thread, &vp));
AZ(close(vwp->pipes[0]));
AZ(close(vwp->pipes[1]));
free(vwp->pollfd);
free(vwp->idx);
}
/*--------------------------------------------------------------------*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment