Commit ef03e1c4 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Try another strategy to marry waiters to backend connection pools.

parent e8d89346
......@@ -84,15 +84,6 @@ struct backend {
/* -------------------------------------------------------------------*/
enum vbc_waiter {
VBC_W_NEW,
VBC_W_INWAIT,
VBC_W_STOLEN,
VBC_W_NOWAIT,
VBC_W_PENDING,
VBC_W_KILL,
};
/* Backend connection */
struct vbc {
unsigned magic;
......@@ -101,7 +92,12 @@ struct vbc {
int fd;
const struct suckaddr *addr;
uint8_t recycled;
enum vbc_waiter in_waiter;
uint8_t state;
#define VBC_STATE_AVAIL (1<<0)
#define VBC_STATE_USED (1<<1)
#define VBC_STATE_CLEANUP (1<<2)
uint8_t in_waiter;
uint8_t stolen;
struct waited waited[1];
struct backend *backend;
......
......@@ -47,6 +47,8 @@
#include "vtcp.h"
#include "vsa.h"
#include "waiter/waiter.h"
#include "vtim.h"
struct tcp_pool {
unsigned magic;
......@@ -69,9 +71,6 @@ struct tcp_pool {
VTAILQ_HEAD(, vbc) killlist;
int n_kill;
VTAILQ_HEAD(, vbc) pendlist;
int n_pend;
int n_used;
};
......@@ -94,42 +93,44 @@ tcp_handle(struct waited *w, enum wait_event ev, double now)
tp = vbc->backend->tcp_pool; // NB: Incestous
Lck_Lock(&tp->mtx);
switch (vbc->in_waiter) {
case VBC_W_KILL:
VSL(SLT_Debug, 0, "==========> Handle %s fd %d iw %d ev %d KILL",
vbc->backend->vcl_name, vbc->fd, vbc->in_waiter, ev);
AN(vbc->in_waiter);
VSL(SLT_Debug, 0, "------> Handler fd %d in_w %d state %d ev %d stolen %d", vbc->fd, vbc->in_waiter, vbc->state, ev, vbc->stolen);
switch(vbc->state) {
case VBC_STATE_AVAIL:
if (ev != WAITER_ACTION || !vbc->stolen) {
VSL(SLT_Debug, 0, "------> Handler avail + !action -> close");
VTCP_close(&vbc->fd);
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
FREE_OBJ(vbc);
} else {
VSL(SLT_Debug, 0, "------> Handler avail + action -> re-wait");
vbc->stolen = 0;
if (Wait_Enter(tp->waiter, vbc->waited)) {
VSL(SLT_Debug, 0, "------> Handler avail + !timeout -> re-wait failed");
VTCP_close(&vbc->fd);
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
FREE_OBJ(vbc);
}
}
break;
case VBC_STATE_USED:
VSL(SLT_Debug, 0, "------> Handler used");
vbc->in_waiter = 0;
break;
case VBC_STATE_CLEANUP:
VSL(SLT_Debug, 0, "------> Handler cleanup");
assert(vbc->fd < 0);
tp->n_kill--;
VTAILQ_REMOVE(&tp->killlist, vbc, list);
FREE_OBJ(vbc);
break;
case VBC_W_PENDING:
VSL(SLT_Debug, 0, "==========> Handle %s fd %d iw %d ev %d PENDING",
vbc->backend->vcl_name, vbc->fd, vbc->in_waiter, ev);
vbc->in_waiter = VBC_W_NOWAIT;
VTAILQ_REMOVE(&tp->pendlist, vbc, list);
tp->n_pend--;
break;
case VBC_W_STOLEN:
VSL(SLT_Debug, 0, "==========> Handle %s fd %d iw %d ev %d STOLEN",
vbc->backend->vcl_name, vbc->fd, vbc->in_waiter, ev);
vbc->in_waiter = VBC_W_NOWAIT;
vbc = NULL;
break;
case VBC_W_INWAIT:
VSL(SLT_Debug, 0, "==========> Handle %s fd %d iw %d ev %d INWAIT",
vbc->backend->vcl_name, vbc->fd, vbc->in_waiter, ev);
VTCP_close(&vbc->fd);
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
FREE_OBJ(vbc);
break;
default:
WRONG("Wrong vbc in_wait state");
WRONG("Wrong vbc state");
}
Lck_Unlock(&tp->mtx);
if (vbc != NULL)
VBT_Recycle(tp, &vbc);
}
/*--------------------------------------------------------------------
......@@ -209,21 +210,26 @@ VBT_Rel(struct tcp_pool **tpp)
free(tp->name);
free(tp->ip4);
free(tp->ip6);
Lck_Delete(&tp->mtx);
Lck_Lock(&tp->mtx);
VTAILQ_FOREACH_SAFE(vbc, &tp->connlist, list, vbc2) {
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
vbc->in_waiter = VBC_W_STOLEN;
vbc->state = VBC_STATE_CLEANUP;
VTCP_close(&vbc->fd);
FREE_OBJ(vbc);
if (vbc->in_waiter) {
VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
tp->n_kill++;
} else {
FREE_OBJ(vbc);
}
}
VTAILQ_FOREACH_SAFE(vbc, &tp->killlist, list, vbc2) {
VTAILQ_REMOVE(&tp->killlist, vbc, list);
tp->n_kill--;
assert(vbc->in_waiter == VBC_W_STOLEN); // XXX ?
VTCP_close(&vbc->fd);
FREE_OBJ(vbc);
while (tp->n_kill) {
Lck_Unlock(&tp->mtx);
(void)usleep(20000);
Lck_Lock(&tp->mtx);
}
Lck_Unlock(&tp->mtx);
Lck_Delete(&tp->mtx);
AZ(tp->n_conn);
AZ(tp->n_kill);
Wait_Destroy(&tp->waiter);
......@@ -264,8 +270,6 @@ VBT_Open(struct tcp_pool *tp, double tmo, const struct suckaddr **sa)
* Recycle a connection.
*/
#include "vtim.h"
void
VBT_Recycle(struct tcp_pool *tp, struct vbc **vbcp)
{
......@@ -277,41 +281,33 @@ VBT_Recycle(struct tcp_pool *tp, struct vbc **vbcp)
*vbcp = NULL;
CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
Lck_Lock(&tp->mtx);
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
Lck_Lock(&tp->mtx);
tp->n_used--;
VSL(SLT_Debug, 0, "------> Recycle fd %d in_w %d", vbc->fd, vbc->in_waiter);
switch (vbc->in_waiter) {
case VBC_W_NEW:
case VBC_W_NOWAIT:
vbc->in_waiter = VBC_W_INWAIT;
if (!vbc->in_waiter) {
vbc->in_waiter = 1;
vbc->waited->ptr = vbc;
vbc->waited->fd = vbc->fd;
vbc->waited->idle = VTIM_real();
VSL(SLT_Debug, 0, "------> Recycle fd %d Enter", vbc->fd);
VSL(SLT_Debug, 0, "------> Recycle fd %d Wait_Enter", vbc->fd);
if (Wait_Enter(tp->waiter, vbc->waited)) {
VTCP_close(&vbc->fd);
FREE_OBJ(vbc);
} else {
VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
tp->n_conn++;
vbc->recycled = 1;
}
break;
case VBC_W_STOLEN:
/*
* We stole the fd from the waiter and it hasn't noticed
* this yet.
*/
VSL(SLT_Debug, 0, "------> Recycle fd %d Still Stolen -> Pending", vbc->fd);
vbc->in_waiter = VBC_W_PENDING;
VTAILQ_INSERT_HEAD(&tp->pendlist, vbc, list);
tp->n_pend++;
i = 1;
break;
default:
WRONG("Wrong vbc in_wait state");
}
if (vbc != NULL) {
vbc->state = VBC_STATE_AVAIL;
vbc->stolen = 1;
VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
tp->n_conn++;
vbc->recycled = 1;
}
Lck_Unlock(&tp->mtx);
if (i && DO_DEBUG(DBG_VTC_MODE)) {
......@@ -346,22 +342,19 @@ VBT_Close(struct tcp_pool *tp, struct vbc **vbcp)
*vbcp = NULL;
CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
VTCP_close(&vbc->fd);
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
VSL(SLT_Debug, 0, "------> Close fd %d in_w %d", vbc->fd, vbc->in_waiter);
VTCP_close(&vbc->fd);
Lck_Lock(&tp->mtx);
tp->n_used--;
switch (vbc->in_waiter) {
case VBC_W_NEW:
case VBC_W_NOWAIT:
FREE_OBJ(vbc);
break;
case VBC_W_STOLEN:
vbc->in_waiter = VBC_W_KILL;
if (vbc->in_waiter) {
vbc->state = VBC_STATE_CLEANUP;
VTAILQ_INSERT_HEAD(&tp->killlist, vbc, list);
tp->n_kill++;
break;
default:
WRONG("Wrong vbc in_waiter state");
} else {
FREE_OBJ(vbc);
}
Lck_Unlock(&tp->mtx);
}
......@@ -374,7 +367,6 @@ struct vbc *
VBT_Get(struct tcp_pool *tp, double tmo)
{
struct vbc *vbc;
struct pollfd pfd;
CHECK_OBJ_NOTNULL(tp, TCP_POOL_MAGIC);
......@@ -383,34 +375,13 @@ VBT_Get(struct tcp_pool *tp, double tmo)
if (vbc != NULL) {
CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
assert(vbc->in_waiter == VBC_W_INWAIT);
assert(vbc->state == VBC_STATE_AVAIL);
VSL(SLT_Debug, 0, "------> Steal fd %d", vbc->fd);
Wait_Steal(tp->waiter, vbc->waited);
vbc->in_waiter = VBC_W_STOLEN;
pfd.fd = vbc->fd;
pfd.events = POLLIN;
pfd.revents = 0;
if (0 && poll(&pfd, 1, 0)) { // XXX
/*
* If this vbc is dead assume the rest of the list
* has also been chopped from the other end.
* XXX: Not sure if this makes any sense with waiter
*/
VSC_C_main->backend_toolate++;
do {
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
VTCP_close(&vbc->fd);
vbc->in_waiter = VBC_W_KILL;
VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
tp->n_kill++;
vbc = VTAILQ_FIRST(&tp->connlist);
} while (vbc != NULL);
} else {
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
VSC_C_main->backend_reuse += 1;
}
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
VSC_C_main->backend_reuse += 1;
vbc->state = VBC_STATE_USED;
}
tp->n_used++; // Opening mostly works
Lck_Unlock(&tp->mtx);
......@@ -421,16 +392,16 @@ VSL(SLT_Debug, 0, "------> Steal fd %d", vbc->fd);
ALLOC_OBJ(vbc, VBC_MAGIC);
AN(vbc);
INIT_OBJ(vbc->waited, WAITED_MAGIC);
vbc->in_waiter = VBC_W_NEW;
if (vbc != NULL) {
vbc->fd = VBT_Open(tp, tmo, &vbc->addr);
if (vbc->fd < 0)
FREE_OBJ(vbc);
}
vbc->state = VBC_STATE_USED;
vbc->fd = VBT_Open(tp, tmo, &vbc->addr);
if (vbc->fd < 0)
FREE_OBJ(vbc);
if (vbc == NULL) {
VSL(SLT_Debug, 0, "------> No new fd");
Lck_Lock(&tp->mtx);
tp->n_used--; // Nope, didn't work after all.
Lck_Unlock(&tp->mtx);
}
} else
VSL(SLT_Debug, 0, "------> New fd %d", vbc->fd);
return (vbc);
}
......@@ -195,37 +195,6 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
assert(w->pipes[1] > 0);
up = (uintptr_t)wp;
AZ(up & 1);
written = write(w->pipes[1], &up, sizeof up);
if (written != sizeof up && (errno == EAGAIN || errno == EWOULDBLOCK))
return (-1);
assert (written == sizeof up);
return (0);
}
int
Wait_Steal(const struct waiter *w, struct waited *wp)
{
ssize_t written;
uintptr_t up;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd > 0); // stdin never comes here
AZ(w->dismantle);
if (w->impl->pass != NULL) {
INCOMPL();
}
assert(w->pipes[1] > 0);
if (w->impl->evict == NULL)
return (0);
up = (uintptr_t)wp;
AZ(up & 1);
up |= 1;
written = write(w->pipes[1], &up, sizeof up);
if (written != sizeof up && (errno == EAGAIN || errno == EWOULDBLOCK))
return (-1);
......@@ -256,24 +225,33 @@ Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
uintptr_t ss[NEV];
struct waited *wp2;
int i, j, dotimer = 0;
int steal;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
if (wp != w->pipe_w) {
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
if (wp != NULL) {
if (wp == w->pipe_w) {
w->do_pipe = 1;
VTAILQ_REMOVE(&w->waithead, w->pipe_w, list);
wp->idle = now;
VTAILQ_INSERT_TAIL(&w->waithead, w->pipe_w, list);
} else {
if (w->impl->evict != NULL)
w->impl->evict(w, wp);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, ev, now);
wait_updidle(w, now);
VTAILQ_REMOVE(&w->waithead, wp, list);
w->func(wp, ev, now);
wait_updidle(w, now);
}
return;
}
VTAILQ_REMOVE(&w->waithead, wp, list);
wp->idle = now;
VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
AZ(wp);
if (!w->do_pipe)
return;
w->do_pipe = 0;
i = read(w->pipes[0], ss, sizeof ss);
if (i == -1 && errno == EAGAIN)
......@@ -284,17 +262,10 @@ Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
AN(w->dismantle);
continue;
}
steal = ss[j] & 1;
ss[j] &= ~1;
CAST_OBJ_NOTNULL(wp2, (void*)ss[j], WAITED_MAGIC);
if (wp2 == w->pipe_w) {
dotimer = 1;
} else if (steal) {
assert(wp2->fd >= 0);
VTAILQ_REMOVE(&w->waithead, wp2, list);
AN (w->impl->evict);
w->impl->evict(w, wp2);
w->func(wp2, WAITER_ACTION, now);
} else {
assert(wp2->fd >= 0);
VTAILQ_INSERT_TAIL(&w->waithead, wp2, list);
......
......@@ -118,6 +118,7 @@ vwe_thread(void *priv)
now = VTIM_real();
for (ep = ev, i = 0; i < n; i++, ep++)
vwe_eev(vwe, ep, now);
Wait_Handle(vwe->waiter, NULL, WAITER_ACTION, now);
}
return (NULL);
}
......
......@@ -116,16 +116,19 @@ static void
vwk_sess_ev(const struct vwk *vwk, const struct kevent *kp, double now)
{
struct waited *sp;
double idle;
AN(kp->udata);
CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
idle = now - *vwk->waiter->tmo;
if (kp->data > 0) {
Wait_Handle(vwk->waiter, sp, WAITER_ACTION, now);
return;
} else if (sp->idle <= idle) {
Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
} else if (kp->flags & EV_EOF) {
Wait_Handle(vwk->waiter, sp, WAITER_REMCLOSE, now);
return;
} else {
WRONG("unknown kqueue state");
}
......@@ -160,6 +163,7 @@ vwk_thread(void *priv)
assert(kp->filter == EVFILT_READ);
vwk_sess_ev(vwk, kp, now);
}
Wait_Handle(vwk->waiter, NULL, WAITER_ACTION, now);
}
NEEDLESS_RETURN(NULL);
}
......@@ -204,7 +208,6 @@ const struct waiter_impl waiter_kqueue = {
.init = vwk_init,
.fini = vwk_fini,
.inject = vwk_inject,
// .evict = vwk_evict,
.size = sizeof(struct vwk),
};
......
......@@ -164,6 +164,7 @@ VSL(SLT_Debug, 0, "POLL Handle %d %x", fd, vwp->pollfd[fd].revents);
now);
}
}
Wait_Handle(vwp->waiter, NULL, WAITER_ACTION, now);
}
NEEDLESS_RETURN(NULL);
}
......
......@@ -58,7 +58,6 @@ typedef void waiter_handle_f(struct waited *, enum wait_event, double now);
/* cache_waiter.c */
int Wait_Enter(const struct waiter *, struct waited *);
int Wait_Steal(const struct waiter *, struct waited *);
struct waiter *Wait_New(waiter_handle_f *, volatile double *timeout);
void Wait_Destroy(struct waiter **);
const char *Wait_GetName(void);
......
......@@ -43,6 +43,7 @@ struct waiter {
int pipes[2];
struct waited *pipe_w;
double next_idle;
int do_pipe;
volatile double *tmo;
VTAILQ_HEAD(,waited) waithead;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment