Commit 967f857c authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Rename the write-buffering functions to WRW_*().

Make reservation and release explicit.

Add asserts that this it happens.


git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@3429 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent fbd8cb9d
...@@ -529,12 +529,15 @@ void PipeSession(struct sess *sp); ...@@ -529,12 +529,15 @@ void PipeSession(struct sess *sp);
void WRK_Init(void); void WRK_Init(void);
int WRK_Queue(struct workreq *wrq); int WRK_Queue(struct workreq *wrq);
void WRK_QueueSession(struct sess *sp); void WRK_QueueSession(struct sess *sp);
void WRK_Reset(struct worker *w, int *fd);
unsigned WRK_Flush(struct worker *w); void WRW_Reserve(struct worker *w, int *fd);
unsigned WRK_Write(struct worker *w, const void *ptr, int len); void WRW_Release(struct worker *w);
unsigned WRK_WriteH(struct worker *w, const txt *hh, const char *suf); unsigned WRW_Flush(struct worker *w);
unsigned WRW_FlushRelease(struct worker *w);
unsigned WRW_Write(struct worker *w, const void *ptr, int len);
unsigned WRW_WriteH(struct worker *w, const txt *hh, const char *suf);
#ifdef SENDFILE_WORKS #ifdef SENDFILE_WORKS
void WRK_Sendfile(struct worker *w, int fd, off_t off, unsigned len); void WRW_Sendfile(struct worker *w, int fd, off_t off, unsigned len);
#endif /* SENDFILE_WORKS */ #endif /* SENDFILE_WORKS */
/* cache_session.c [SES] */ /* cache_session.c [SES] */
......
...@@ -288,8 +288,8 @@ FetchReqBody(struct sess *sp) ...@@ -288,8 +288,8 @@ FetchReqBody(struct sess *sp)
content_length -= rdcnt; content_length -= rdcnt;
if (!sp->sendbody) if (!sp->sendbody)
continue; continue;
WRK_Write(sp->wrk, buf, rdcnt); /* XXX: stats ? */ (void)WRW_Write(sp->wrk, buf, rdcnt); /* XXX: stats ? */
if (WRK_Flush(sp->wrk)) if (WRW_Flush(sp->wrk))
return (2); return (2);
} }
} }
...@@ -348,7 +348,7 @@ Fetch(struct sess *sp) ...@@ -348,7 +348,7 @@ Fetch(struct sess *sp)
VBE_AddHostHeader(sp); VBE_AddHostHeader(sp);
TCP_blocking(vc->fd); /* XXX: we should timeout instead */ TCP_blocking(vc->fd); /* XXX: we should timeout instead */
WRK_Reset(w, &vc->fd); WRW_Reserve(w, &vc->fd);
http_Write(w, hp, 0); /* XXX: stats ? */ http_Write(w, hp, 0); /* XXX: stats ? */
/* Deal with any message-body the request might have */ /* Deal with any message-body the request might have */
...@@ -358,7 +358,7 @@ Fetch(struct sess *sp) ...@@ -358,7 +358,7 @@ Fetch(struct sess *sp)
return (__LINE__); return (__LINE__);
} }
if (WRK_Flush(w)) { if (WRW_FlushRelease(w)) {
VBE_ClosedFd(sp); VBE_ClosedFd(sp);
/* XXX: other cleanup ? */ /* XXX: other cleanup ? */
return (__LINE__); return (__LINE__);
......
...@@ -809,28 +809,28 @@ http_Write(struct worker *w, const struct http *hp, int resp) ...@@ -809,28 +809,28 @@ http_Write(struct worker *w, const struct http *hp, int resp)
if (resp) { if (resp) {
AN(hp->hd[HTTP_HDR_STATUS].b); AN(hp->hd[HTTP_HDR_STATUS].b);
l = WRK_WriteH(w, &hp->hd[HTTP_HDR_PROTO], " "); l = WRW_WriteH(w, &hp->hd[HTTP_HDR_PROTO], " ");
WSLH(w, *w->wfd, hp, HTTP_HDR_PROTO); WSLH(w, *w->wfd, hp, HTTP_HDR_PROTO);
l += WRK_WriteH(w, &hp->hd[HTTP_HDR_STATUS], " "); l += WRW_WriteH(w, &hp->hd[HTTP_HDR_STATUS], " ");
WSLH(w, *w->wfd, hp, HTTP_HDR_STATUS); WSLH(w, *w->wfd, hp, HTTP_HDR_STATUS);
l += WRK_WriteH(w, &hp->hd[HTTP_HDR_RESPONSE], "\r\n"); l += WRW_WriteH(w, &hp->hd[HTTP_HDR_RESPONSE], "\r\n");
WSLH(w, *w->wfd, hp, HTTP_HDR_RESPONSE); WSLH(w, *w->wfd, hp, HTTP_HDR_RESPONSE);
} else { } else {
AN(hp->hd[HTTP_HDR_URL].b); AN(hp->hd[HTTP_HDR_URL].b);
l = WRK_WriteH(w, &hp->hd[HTTP_HDR_REQ], " "); l = WRW_WriteH(w, &hp->hd[HTTP_HDR_REQ], " ");
WSLH(w, *w->wfd, hp, HTTP_HDR_REQ); WSLH(w, *w->wfd, hp, HTTP_HDR_REQ);
l += WRK_WriteH(w, &hp->hd[HTTP_HDR_URL], " "); l += WRW_WriteH(w, &hp->hd[HTTP_HDR_URL], " ");
WSLH(w, *w->wfd, hp, HTTP_HDR_URL); WSLH(w, *w->wfd, hp, HTTP_HDR_URL);
l += WRK_WriteH(w, &hp->hd[HTTP_HDR_PROTO], "\r\n"); l += WRW_WriteH(w, &hp->hd[HTTP_HDR_PROTO], "\r\n");
WSLH(w, *w->wfd, hp, HTTP_HDR_PROTO); WSLH(w, *w->wfd, hp, HTTP_HDR_PROTO);
} }
for (u = HTTP_HDR_FIRST; u < hp->nhd; u++) { for (u = HTTP_HDR_FIRST; u < hp->nhd; u++) {
AN(hp->hd[u].b); AN(hp->hd[u].b);
AN(hp->hd[u].e); AN(hp->hd[u].e);
l += WRK_WriteH(w, &hp->hd[u], "\r\n"); l += WRW_WriteH(w, &hp->hd[u], "\r\n");
WSLH(w, *w->wfd, hp, u); WSLH(w, *w->wfd, hp, u);
} }
l += WRK_Write(w, "\r\n", -1); l += WRW_Write(w, "\r\n", -1);
return (l); return (l);
} }
......
...@@ -83,14 +83,14 @@ PipeSession(struct sess *sp) ...@@ -83,14 +83,14 @@ PipeSession(struct sess *sp)
vc = sp->vbe; vc = sp->vbe;
TCP_blocking(vc->fd); TCP_blocking(vc->fd);
WRK_Reset(w, &vc->fd); WRW_Reserve(w, &vc->fd);
w->acct.hdrbytes += http_Write(w, bereq->http, 0); w->acct.hdrbytes += http_Write(w, bereq->http, 0);
if (sp->htc->pipeline.b != NULL) if (sp->htc->pipeline.b != NULL)
w->acct.bodybytes += w->acct.bodybytes +=
WRK_Write(w, sp->htc->pipeline.b, Tlen(sp->htc->pipeline)); WRW_Write(w, sp->htc->pipeline.b, Tlen(sp->htc->pipeline));
if (WRK_Flush(w)) { if (WRW_FlushRelease(w)) {
vca_close_session(sp, "pipe"); vca_close_session(sp, "pipe");
VBE_ClosedFd(sp); VBE_ClosedFd(sp);
return; return;
......
...@@ -106,22 +106,35 @@ static struct lock herder_mtx; ...@@ -106,22 +106,35 @@ static struct lock herder_mtx;
*/ */
void void
WRK_Reset(struct worker *w, int *fd) WRW_Reserve(struct worker *w, int *fd)
{ {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AZ(w->wfd);
w->werr = 0; w->werr = 0;
w->liov = 0; w->liov = 0;
w->niov = 0; w->niov = 0;
w->wfd = fd; w->wfd = fd;
} }
void
WRW_Release(struct worker *w)
{
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
w->werr = 0;
w->liov = 0;
w->niov = 0;
w->wfd = NULL;
}
unsigned unsigned
WRK_Flush(struct worker *w) WRW_Flush(struct worker *w)
{ {
ssize_t i; ssize_t i;
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AN(w->wfd);
if (*w->wfd >= 0 && w->niov > 0 && w->werr == 0) { if (*w->wfd >= 0 && w->niov > 0 && w->werr == 0) {
i = writev(*w->wfd, w->iov, w->niov); i = writev(*w->wfd, w->iov, w->niov);
if (i != w->liov) { if (i != w->liov) {
...@@ -137,32 +150,46 @@ WRK_Flush(struct worker *w) ...@@ -137,32 +150,46 @@ WRK_Flush(struct worker *w)
} }
unsigned unsigned
WRK_WriteH(struct worker *w, const txt *hh, const char *suf) WRW_FlushRelease(struct worker *w)
{
unsigned u;
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AN(w->wfd);
u = WRW_Flush(w);
WRW_Release(w);
return (u);
}
unsigned
WRW_WriteH(struct worker *w, const txt *hh, const char *suf)
{ {
unsigned u; unsigned u;
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AN(w->wfd);
AN(w); AN(w);
AN(hh); AN(hh);
AN(hh->b); AN(hh->b);
AN(hh->e); AN(hh->e);
u = WRK_Write(w, hh->b, hh->e - hh->b); u = WRW_Write(w, hh->b, hh->e - hh->b);
if (suf != NULL) if (suf != NULL)
u += WRK_Write(w, suf, -1); u += WRW_Write(w, suf, -1);
return (u); return (u);
} }
unsigned unsigned
WRK_Write(struct worker *w, const void *ptr, int len) WRW_Write(struct worker *w, const void *ptr, int len)
{ {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AN(w->wfd);
if (len == 0 || *w->wfd < 0) if (len == 0 || *w->wfd < 0)
return (0); return (0);
if (len == -1) if (len == -1)
len = strlen(ptr); len = strlen(ptr);
if (w->niov == MAX_IOVS) if (w->niov == MAX_IOVS)
(void)WRK_Flush(w); (void)WRW_Flush(w);
w->iov[w->niov].iov_base = TRUST_ME(ptr); w->iov[w->niov].iov_base = TRUST_ME(ptr);
w->iov[w->niov].iov_len = len; w->iov[w->niov].iov_len = len;
w->liov += len; w->liov += len;
...@@ -172,10 +199,11 @@ WRK_Write(struct worker *w, const void *ptr, int len) ...@@ -172,10 +199,11 @@ WRK_Write(struct worker *w, const void *ptr, int len)
#ifdef SENDFILE_WORKS #ifdef SENDFILE_WORKS
void void
WRK_Sendfile(struct worker *w, int fd, off_t off, unsigned len) WRW_Sendfile(struct worker *w, int fd, off_t off, unsigned len)
{ {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
AN(w->wfd);
assert(fd >= 0); assert(fd >= 0);
assert(len > 0); assert(len > 0);
......
...@@ -137,16 +137,24 @@ RES_WriteObj(struct sess *sp) ...@@ -137,16 +137,24 @@ RES_WriteObj(struct sess *sp)
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
WRK_Reset(sp->wrk, &sp->fd); WRW_Reserve(sp->wrk, &sp->fd);
if (sp->esis == 0) if (sp->esis == 0)
sp->wrk->acct.hdrbytes += http_Write(sp->wrk, sp->http, 1); sp->wrk->acct.hdrbytes += http_Write(sp->wrk, sp->http, 1);
if (sp->wantbody && !VTAILQ_EMPTY(&sp->obj->esibits)) { if (sp->wantbody && !VTAILQ_EMPTY(&sp->obj->esibits)) {
if (WRW_FlushRelease(sp->wrk)) {
vca_close_session(sp, "remote closed");
return;
}
ESI_Deliver(sp); ESI_Deliver(sp);
} else if (sp->wantbody) { return;
}
if (sp->wantbody) {
if (sp->esis > 0 && sp->http->protover >= 1.1) { if (sp->esis > 0 && sp->http->protover >= 1.1) {
sprintf(lenbuf, "%x\r\n", sp->obj->len); sprintf(lenbuf, "%x\r\n", sp->obj->len);
(void)WRK_Write(sp->wrk, lenbuf, -1); (void)WRW_Write(sp->wrk, lenbuf, -1);
} }
VTAILQ_FOREACH(st, &sp->obj->store, list) { VTAILQ_FOREACH(st, &sp->obj->store, list) {
...@@ -164,18 +172,18 @@ RES_WriteObj(struct sess *sp) ...@@ -164,18 +172,18 @@ RES_WriteObj(struct sess *sp)
if (st->fd >= 0 && if (st->fd >= 0 &&
st->len >= params->sendfile_threshold) { st->len >= params->sendfile_threshold) {
VSL_stats->n_objsendfile++; VSL_stats->n_objsendfile++;
WRK_Sendfile(sp->wrk, st->fd, WRW_Sendfile(sp->wrk, st->fd,
st->where, st->len); st->where, st->len);
continue; continue;
} }
#endif /* SENDFILE_WORKS */ #endif /* SENDFILE_WORKS */
VSL_stats->n_objwrite++; VSL_stats->n_objwrite++;
(void)WRK_Write(sp->wrk, st->ptr, st->len); (void)WRW_Write(sp->wrk, st->ptr, st->len);
} }
assert(u == sp->obj->len); assert(u == sp->obj->len);
if (sp->esis > 0 && sp->http->protover >= 1.1) if (sp->esis > 0 && sp->http->protover >= 1.1)
(void)WRK_Write(sp->wrk, "\r\n", -1); (void)WRW_Write(sp->wrk, "\r\n", -1);
} }
if (WRK_Flush(sp->wrk)) if (WRW_FlushRelease(sp->wrk))
vca_close_session(sp, "remote closed"); vca_close_session(sp, "remote closed");
} }
...@@ -799,27 +799,24 @@ ESI_Deliver(struct sess *sp) ...@@ -799,27 +799,24 @@ ESI_Deliver(struct sess *sp)
struct esi_bit *eb; struct esi_bit *eb;
struct object *obj; struct object *obj;
WRW_Reserve(sp->wrk, &sp->fd);
VTAILQ_FOREACH(eb, &sp->obj->esibits, list) { VTAILQ_FOREACH(eb, &sp->obj->esibits, list) {
assert(sp->wrk->wfd == &sp->fd);
if (Tlen(eb->verbatim)) { if (Tlen(eb->verbatim)) {
if (sp->http->protover >= 1.1) if (sp->http->protover >= 1.1)
(void)WRK_Write(sp->wrk, eb->chunk_length, -1); (void)WRW_Write(sp->wrk, eb->chunk_length, -1);
sp->wrk->acct.bodybytes += WRK_Write(sp->wrk, sp->wrk->acct.bodybytes += WRW_Write(sp->wrk,
eb->verbatim.b, Tlen(eb->verbatim)); eb->verbatim.b, Tlen(eb->verbatim));
if (sp->http->protover >= 1.1) if (sp->http->protover >= 1.1)
(void)WRK_Write(sp->wrk, "\r\n", -1); (void)WRW_Write(sp->wrk, "\r\n", -1);
} }
if (eb->include.b == NULL || if (eb->include.b == NULL ||
sp->esis >= params->max_esi_includes) sp->esis >= params->max_esi_includes)
continue; continue;
/* if (WRW_FlushRelease(sp->wrk)) {
* We flush here, because the next transaction is vca_close_session(sp, "remote closed");
* quite likely to take some time, so we should get return;
* as many bits to the client as we can already. }
*/
if (WRK_Flush(sp->wrk))
break;
sp->esis++; sp->esis++;
obj = sp->obj; obj = sp->obj;
...@@ -853,11 +850,12 @@ ESI_Deliver(struct sess *sp) ...@@ -853,11 +850,12 @@ ESI_Deliver(struct sess *sp)
assert(sp->step == STP_DONE); assert(sp->step == STP_DONE);
sp->esis--; sp->esis--;
sp->obj = obj; sp->obj = obj;
WRW_Reserve(sp->wrk, &sp->fd);
} }
assert(sp->wrk->wfd == &sp->fd);
if (sp->esis == 0 && sp->http->protover >= 1.1) if (sp->esis == 0 && sp->http->protover >= 1.1)
(void)WRK_Write(sp->wrk, "0\r\n\r\n", -1); (void)WRW_Write(sp->wrk, "0\r\n\r\n", -1);
if (WRW_FlushRelease(sp->wrk))
vca_close_session(sp, "remote closed");
} }
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment