Commit 856d5627 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

The beginnings of a TX-scheduling facility.

parent c14a8185
......@@ -122,6 +122,9 @@ struct h2_req {
VTAILQ_ENTRY(h2_req) list;
int64_t window;
struct h2h_decode *decode;
struct worker *tx_wrk;
VTAILQ_ENTRY(h2_req) tx_list;
};
VTAILQ_HEAD(h2_req_s, h2_req);
......@@ -158,6 +161,10 @@ struct h2_sess {
struct req *new_req;
int go_away;
uint32_t go_away_last_stream;
VTAILQ_HEAD(,h2_req) txqueue;
struct h2_req req0[1];
};
/* http2/cache_http2_panic.c */
......@@ -191,6 +198,10 @@ h2_error h2h_decode_fini(const struct h2_sess *h2, struct h2h_decode *d);
h2_error h2h_decode_bytes(struct h2_sess *h2, struct h2h_decode *d,
const uint8_t *ptr, size_t len);
/* cache_http2_send.c */
void H2_Send_Get(struct worker *, struct h2_sess *, struct h2_req *);
void H2_Send_Rel(struct worker *, struct h2_sess *, struct h2_req *);
h2_error H2_Send_Frame(struct worker *, const struct h2_sess *,
h2_frame type, uint8_t flags, uint32_t len, uint32_t stream,
const void *);
......
......@@ -85,8 +85,10 @@ h2_bytes(struct req *req, enum vdp_action act, void **priv,
return (0);
AZ(req->vdp_nxt); /* always at the bottom of the pile */
H2_Send_Get(req->wrk, r2->h2sess, r2);
H2_Send(req->wrk, r2,
act == VDP_FLUSH ? 1 : 0, H2_F_DATA, H2FF_NONE, len, ptr);
H2_Send_Rel(req->wrk, r2->h2sess, r2);
return (0);
}
......@@ -140,11 +142,13 @@ h2_minimal_response(struct req *req, uint16_t status)
req->err_code = status;
/* XXX return code checking once H2_Send returns anything but 0 */
H2_Send_Get(req->wrk, r2->h2sess, r2);
H2_Send(req->wrk, r2, 1,
H2_F_HEADERS,
H2FF_HEADERS_END_HEADERS |
(status < 200 ? 0 : H2FF_HEADERS_END_STREAM),
l, buf);
H2_Send_Rel(req->wrk, r2->h2sess, r2);
return (0);
}
......@@ -240,8 +244,10 @@ h2_deliver(struct req *req, struct boc *boc, int sendbody)
sz = (char*)p - req->ws->f;
/* XXX: Optimize !sendbody case */
H2_Send_Get(req->wrk, r2->h2sess, r2);
H2_Send(req->wrk, r2, 1, H2_F_HEADERS, H2FF_HEADERS_END_HEADERS,
sz, req->ws->f);
H2_Send_Rel(req->wrk, r2->h2sess, r2);
WS_Release(req->ws, 0);
......@@ -254,7 +260,9 @@ h2_deliver(struct req *req, struct boc *boc, int sendbody)
err = VDP_DeliverObj(req);
/*XXX*/(void)err;
H2_Send_Get(req->wrk, r2->h2sess, r2);
H2_Send(req->wrk, r2, 1, H2_F_DATA, H2FF_DATA_END_STREAM, 0, NULL);
H2_Send_Rel(req->wrk, r2->h2sess, r2);
AZ(req->wrk->v1l);
VDP_close(req);
......
......@@ -186,10 +186,10 @@ h2_rx_ping(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
return (H2CE_PROTOCOL_ERROR);
if (h2->rxf_flags != 0) // We never send pings
return (H2SE_PROTOCOL_ERROR);
Lck_Lock(&h2->sess->mtx);
H2_Send_Get(wrk, h2, r2);
H2_Send_Frame(wrk, h2,
H2_F_PING, H2FF_PING_ACK, 8, 0, h2->rxf_data);
Lck_Unlock(&h2->sess->mtx);
H2_Send_Rel(wrk, h2, r2);
return (0);
}
......@@ -357,10 +357,10 @@ h2_rx_settings(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
if (retval)
return (retval);
}
Lck_Lock(&h2->sess->mtx);
H2_Send_Get(wrk, h2, r2);
H2_Send_Frame(wrk, h2,
H2_F_SETTINGS, H2FF_SETTINGS_ACK, 0, 0, NULL);
Lck_Unlock(&h2->sess->mtx);
H2_Send_Rel(wrk, h2, r2);
}
return (0);
}
......@@ -636,10 +636,10 @@ h2_procframe(struct worker *wrk, struct h2_sess *h2,
VSLb(h2->vsl, SLT_Debug, "H2: stream %u: %s", h2->rxf_stream, h2e->txt);
vbe32enc(b, h2e->val);
Lck_Lock(&h2->sess->mtx);
H2_Send_Get(wrk, h2, r2);
(void)H2_Send_Frame(wrk, h2, H2_F_RST_STREAM,
0, sizeof b, h2->rxf_stream, b);
Lck_Unlock(&h2->sess->mtx);
H2_Send_Rel(wrk, h2, r2);
h2_del_req(wrk, r2);
return (0);
......@@ -732,9 +732,9 @@ h2_rxframe(struct worker *wrk, struct h2_sess *h2)
if (h2e) {
vbe32enc(b, h2->highest_stream);
vbe32enc(b + 4, h2e->val);
Lck_Lock(&h2->sess->mtx);
H2_Send_Get(wrk, h2, h2->req0);
(void)H2_Send_Frame(wrk, h2, H2_F_GOAWAY, 0, 8, 0, b);
Lck_Unlock(&h2->sess->mtx);
H2_Send_Rel(wrk, h2, h2->req0);
}
return (h2e ? 0 : 1);
}
......@@ -36,6 +36,36 @@
#include "vend.h"
void
H2_Send_Get(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
{
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
r2->tx_wrk = wrk;
Lck_Lock(&h2->sess->mtx);
VTAILQ_INSERT_TAIL(&h2->txqueue, r2, tx_list);
while (VTAILQ_FIRST(&h2->txqueue) != r2)
Lck_CondWait(&wrk->cond, &h2->sess->mtx, 0);
Lck_Unlock(&h2->sess->mtx);
}
void
H2_Send_Rel(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
{
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
Lck_Lock(&h2->sess->mtx);
assert(VTAILQ_FIRST(&h2->txqueue) == r2);
VTAILQ_REMOVE(&h2->txqueue, r2, tx_list);
r2 = VTAILQ_FIRST(&h2->txqueue);
if (r2 != NULL)
AZ(pthread_cond_signal(&r2->tx_wrk->cond));
Lck_Unlock(&h2->sess->mtx);
}
static void
h2_mk_hdr(uint8_t *hdr, h2_frame ftyp, uint8_t flags,
uint32_t len, uint32_t stream)
......@@ -64,7 +94,6 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
ssize_t s;
(void)wrk;
Lck_AssertHeld(&h2->sess->mtx);
AN(ftyp);
AZ(flags & ~(ftyp->flags));
......@@ -74,7 +103,9 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
AZ(ftyp->act_snonzero);
h2_mk_hdr(hdr, ftyp, flags, len, stream);
Lck_Lock(&h2->sess->mtx);
VSLb_bin(h2->vsl, SLT_H2TxHdr, 9, hdr);
Lck_Unlock(&h2->sess->mtx);
s = write(h2->sess->fd, hdr, sizeof hdr);
if (s != sizeof hdr)
......@@ -83,7 +114,9 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
s = write(h2->sess->fd, ptr, len);
if (s != len)
return (H2CE_PROTOCOL_ERROR); // XXX Need private ?
Lck_Lock(&h2->sess->mtx);
VSLb_bin(h2->vsl, SLT_H2TxBody, len, ptr);
Lck_Unlock(&h2->sess->mtx);
}
return (0);
}
......@@ -106,11 +139,14 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
(void)flush;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
h2 = r2->h2sess;
CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
assert(len == 0 || ptr != NULL);
assert(VTAILQ_FIRST(&h2->txqueue) == r2);
AN(ftyp);
AZ(flags & ~(ftyp->flags));
if (r2->stream == 0)
......@@ -120,6 +156,7 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
Lck_Lock(&h2->sess->mtx);
mfs = h2->remote_settings.max_frame_size;
Lck_Unlock(&h2->sess->mtx);
if (len < mfs) {
retval = H2_Send_Frame(wrk, h2,
ftyp, flags, len, r2->stream, ptr);
......@@ -145,6 +182,5 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
ftyp = ftyp->continuation;
} while (len > 0 && retval == 0);
}
Lck_Unlock(&h2->sess->mtx);
return (retval);
}
......@@ -99,6 +99,7 @@ h2_new_sess(const struct worker *wrk, struct sess *sp, struct req *srq)
h2->htc->rfd = &sp->fd;
h2->sess = sp;
VTAILQ_INIT(&h2->streams);
VTAILQ_INIT(&h2->txqueue);
h2->local_settings = H2_proto_settings;
h2->remote_settings = H2_proto_settings;
......@@ -108,6 +109,8 @@ h2_new_sess(const struct worker *wrk, struct sess *sp, struct req *srq)
SES_Reserve_xport_priv(sp, &up);
*up = (uintptr_t)h2;
INIT_OBJ(h2->req0, H2_REQ_MAGIC);
h2->req0->h2sess = h2;
}
AN(up);
CAST_OBJ_NOTNULL(h2, (void*)(*up), H2_SESS_MAGIC);
......@@ -333,13 +336,13 @@ h2_new_session(struct worker *wrk, void *arg)
THR_SetRequest(h2->srq);
Lck_Lock(&h2->sess->mtx);
H2_Send_Get(wrk, h2, h2->req0);
H2_Send_Frame(wrk, h2,
H2_F_SETTINGS, H2FF_NONE, sizeof H2_settings, 0, H2_settings);
H2_Send_Rel(wrk, h2, h2->req0);
/* and off we go... */
h2->cond = &wrk->cond;
Lck_Unlock(&h2->sess->mtx);
while (h2_rxframe(wrk, h2)) {
WS_Reset(h2->ws, wsp);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment