Commit 6dfcd17b authored by Martin Blix Grydeland's avatar Martin Blix Grydeland

Pipe byte accounting

Add VSC counters for pipe traffic. Traffic is split between header
bytes, and input and output piped bytes.

Log SLT_PipeAcct records showing per pipe session byte counts.
parent c654dc51
......@@ -699,6 +699,7 @@ struct req {
#define RES_ESI (1<<4)
#define RES_ESI_CHILD (1<<5)
#define RES_GUNZIP (1<<6)
#define RES_PIPE (1<<7)
/* Deliver pipeline */
#define N_VDPS 5
......@@ -1088,6 +1089,7 @@ const char *reqbody_status_2str(enum req_body_state_e e);
const char *sess_close_2str(enum sess_close sc, int want_desc);
/* cache_pipe.c */
void Pipe_Init(void);
void PipeRequest(struct req *req, struct busyobj *bo);
/* cache_pool.c */
......
......@@ -219,6 +219,7 @@ child_main(void)
VBP_Init();
WRK_Init();
Pool_Init();
Pipe_Init();
EXP_Init();
HSH_Init(heritage.hash);
......
......@@ -40,8 +40,17 @@
#include "vtcp.h"
#include "vtim.h"
static struct lock pipestat_mtx;
struct acct_pipe {
ssize_t req;
ssize_t bereq;
ssize_t in;
ssize_t out;
};
static int
rdf(int fd0, int fd1)
rdf(int fd0, int fd1, ssize_t *pcnt)
{
int i, j;
char buf[BUFSIZ], *p;
......@@ -53,12 +62,30 @@ rdf(int fd0, int fd1)
j = write(fd1, p, i);
if (j <= 0)
return (1);
*pcnt += j;
if (i != j)
(void)usleep(100000); /* XXX hack */
}
return (0);
}
static void
pipecharge(struct req *req, const struct acct_pipe *a)
{
VSLb(req->vsl, SLT_PipeAcct, "%ju %ju %ju %ju",
(uintmax_t)a->req,
(uintmax_t)a->bereq,
(uintmax_t)a->in,
(uintmax_t)a->out);
Lck_Lock(&pipestat_mtx);
VSC_C_main->s_pipe_hdrbytes += a->req;
VSC_C_main->s_pipe_in += a->in;
VSC_C_main->s_pipe_out += a->out;
Lck_Unlock(&pipestat_mtx);
}
void
PipeRequest(struct req *req, struct busyobj *bo)
{
......@@ -66,7 +93,8 @@ PipeRequest(struct req *req, struct busyobj *bo)
struct worker *wrk;
struct pollfd fds[2];
int i;
ssize_t txcnt;
struct acct_pipe acct;
ssize_t hdrbytes;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(req->sp, SESS_MAGIC);
......@@ -74,24 +102,38 @@ PipeRequest(struct req *req, struct busyobj *bo)
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
req->res_mode = RES_PIPE;
memset(&acct, 0, sizeof acct);
acct.req = req->acct.req_hdrbytes;
req->acct.req_hdrbytes = 0;
vc = VDI_GetFd(bo);
if (vc == NULL)
if (vc == NULL) {
pipecharge(req, &acct);
SES_Close(req->sp, SC_OVERLOAD);
return;
}
bo->vbc = vc; /* For panic dumping */
(void)VTCP_blocking(vc->fd);
WRW_Reserve(wrk, &vc->fd, bo->vsl, req->t_req);
(void)HTTP1_Write(wrk, bo->bereq, 0);
hdrbytes = HTTP1_Write(wrk, bo->bereq, 0);
if (req->htc->pipeline.b != NULL)
(void)WRW_Write(wrk, req->htc->pipeline.b,
Tlen(req->htc->pipeline));
i = WRW_FlushRelease(wrk, &txcnt);
i = WRW_FlushRelease(wrk, &acct.bereq);
if (acct.bereq > hdrbytes) {
acct.in = acct.bereq - hdrbytes;
acct.bereq = hdrbytes;
}
VSLb_ts_req(req, "Pipe", W_TIM_real(wrk));
if (i) {
pipecharge(req, &acct);
SES_Close(req->sp, SC_TX_PIPE);
VDI_CloseFd(&vc);
return;
......@@ -113,7 +155,7 @@ PipeRequest(struct req *req, struct busyobj *bo)
i = poll(fds, 2, (int)(cache_param->pipe_timeout * 1e3));
if (i < 1)
break;
if (fds[0].revents && rdf(vc->fd, req->sp->fd)) {
if (fds[0].revents && rdf(vc->fd, req->sp->fd, &acct.out)) {
if (fds[1].fd == -1)
break;
(void)shutdown(vc->fd, SHUT_RD);
......@@ -121,7 +163,7 @@ PipeRequest(struct req *req, struct busyobj *bo)
fds[0].events = 0;
fds[0].fd = -1;
}
if (fds[1].revents && rdf(req->sp->fd, vc->fd)) {
if (fds[1].revents && rdf(req->sp->fd, vc->fd, &acct.in)) {
if (fds[0].fd == -1)
break;
(void)shutdown(req->sp->fd, SHUT_RD);
......@@ -131,7 +173,17 @@ PipeRequest(struct req *req, struct busyobj *bo)
}
}
VSLb_ts_req(req, "PipeSess", W_TIM_real(wrk));
pipecharge(req, &acct);
SES_Close(req->sp, SC_TX_PIPE);
VDI_CloseFd(&vc);
bo->vbc = NULL;
}
/*--------------------------------------------------------------------*/
void
Pipe_Init(void)
{
Lck_New(&pipestat_mtx, lck_pipestat);
}
......@@ -896,7 +896,6 @@ CNT_Request(struct worker *wrk, struct req *req)
CHECK_OBJ_ORNULL(wrk->nobjhead, OBJHEAD_MAGIC);
}
if (nxt == REQ_FSM_DONE) {
/* XXX: Workaround for pipe */
AN(req->vsl->wid);
if (req->res_mode & (RES_ESI|RES_ESI_CHILD))
VSLb(req->vsl, SLT_ESI_BodyBytes, "%ju",
......@@ -923,13 +922,15 @@ CNT_AcctLogCharge(struct dstat *ds, struct req *req)
a = &req->acct;
VSLb(req->vsl, SLT_ReqAcct, "%ju %ju %ju %ju %ju %ju",
(uintmax_t)a->req_hdrbytes,
(uintmax_t)a->req_bodybytes,
(uintmax_t)(a->req_hdrbytes + a->req_bodybytes),
(uintmax_t)a->resp_hdrbytes,
(uintmax_t)a->resp_bodybytes,
(uintmax_t)(a->resp_hdrbytes + a->resp_bodybytes));
if (!(req->res_mode & RES_PIPE)) {
VSLb(req->vsl, SLT_ReqAcct, "%ju %ju %ju %ju %ju %ju",
(uintmax_t)a->req_hdrbytes,
(uintmax_t)a->req_bodybytes,
(uintmax_t)(a->req_hdrbytes + a->req_bodybytes),
(uintmax_t)a->resp_hdrbytes,
(uintmax_t)a->resp_bodybytes,
(uintmax_t)(a->resp_hdrbytes + a->resp_bodybytes));
}
#define ACCT(foo) \
ds->s_##foo += a->foo; \
......
varnishtest "Test request byte counters on pipe"
server s1 {
rxreq
expect req.url == "/"
expect req.http.test == "yes"
txresp -body "fdsa"
} -start
varnish v1 -vcl+backend {
sub vcl_recv {
return (pipe);
}
sub vcl_pipe {
set bereq.http.test = "yes";
unset bereq.http.x-forwarded-for;
unset bereq.http.x-varnish;
unset bereq.http.connection;
}
} -start
# req:
# POST / HTTP/1.1\r\n 17 bytes
# Content-Length: 4\r\n 19 bytes
# \r\n 2 bytes
# Total: 38 bytes
# bereq:
# POST / HTTP/1.1\r\n 17 bytes
# Content-Length: 4\r\n 19 bytes
# test: yes\r\n 11 bytes
# \r\n 2 bytes
# Total: 49 bytes
# reqbody
# asdf 4 bytes
# resp:
# HTTP/1.1 200 Ok\r\n 17 bytes
# Content-Length: 4\r\n 19 bytes
# \r\n 2 bytes
# fdsa 4 bytes
# Total: 42 bytes
logexpect l1 -v v1 -g request {
expect 0 1001 Begin "^req .* rxreq"
expect * = PipeAcct "^38 49 4 42$"
expect 0 = End
} -start
client c1 {
txreq -req "POST" -url "/" -hdr "Content-Length: 4"
send "asdf"
rxresp
expect resp.status == 200
} -run
logexpect l1 -wait
varnish v1 -expect s_pipe_hdrbytes == 38
varnish v1 -expect s_pipe_in == 4
varnish v1 -expect s_pipe_out == 42
......@@ -53,4 +53,5 @@ LOCK(nbusyobj)
LOCK(busyobj)
LOCK(mempool)
LOCK(vxid)
LOCK(pipestat)
/*lint -restore */
......@@ -400,6 +400,20 @@ VSC_F(s_resp_bodybytes, uint64_t, 1, 'a', info,
"Reponse body bytes",
"Total response body bytes transmitted"
)
VSC_F(s_pipe_hdrbytes, uint64_t, 0, 'a', info,
"Pipe request header bytes",
"Total request bytes received for piped sessions"
)
VSC_F(s_pipe_in, uint64_t, 0, 'a', info,
"Piped bytes from client",
"Total number of bytes forwarded from clients in"
" pipe sessions"
)
VSC_F(s_pipe_out, uint64_t, 0, 'a', info,
"Piped bytes to client",
"Total number of bytes forwarded to clients in"
" pipe sessions"
)
VSC_F(sess_closed, uint64_t, 1, 'a', info,
"Session Closed",
......
......@@ -500,4 +500,16 @@ SLTM(ESI_BodyBytes, 0, "ESI body fragment byte counter",
"\n"
)
SLTM(PipeAcct, 0, "Pipe byte counts",
"Contains byte counters for pipe sessions.\n\n"
"The format is::\n\n"
"\t%d %d %d %d\n"
"\t| | | |\n"
"\t| | | +------- Piped bytes to client\n"
"\t| | +---------- Piped bytes from client\n"
"\t| +------------- Backend request headers\n"
"\t+---------------- Client request headers\n"
"\n"
)
#undef NODEF_NOTICE
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment