Commit d9f0d00b authored by Geoff Simmons's avatar Geoff Simmons

Add the VDP, passes intial tests with cat, /bin/true and /bin/false.

parent 5950a65d
......@@ -49,7 +49,15 @@ DESCRIPTION
.. _Varnish: http://www.varnish-cache.org/
The pipe library provides `Varnish`_ delivery and fetch processors
that pipe client and backend responses through external commands.
(VDPs and VFPs) that pipe client and backend responses through
external commands.
XXX ...
.. _pipe.vdp():
new xvdp = pipe.vdp(STRING name, STRING path)
---------------------------------------------
XXX ...
......
# looks like -*- vcl -*-
varnishtest "vdp object"
server s1 {
rxreq
txresp -body {foo bar baz quux}
} -start
varnish v1 -vcl+backend {
import ${vmod_pipe};
sub vcl_init {
new cat = pipe.vdp(name="cat", path="/bin/cat");
new t = pipe.vdp(name="true", path="/bin/true");
new f = pipe.vdp(name="false", path="/bin/false");
}
sub vcl_deliver {
set resp.filters = req.http.X-Filters;
}
} -start
client c1 {
txreq -hdr "X-Filters: cat"
rxresp
expect resp.status == 200
expect resp.body == "foo bar baz quux"
txreq -hdr "X-Filters: cat true"
rxresp
expect resp.status == 200
expect resp.body == ""
# Only read response headers, the body may fail.
txreq -hdr "X-Filters: cat false"
rxresp -no_obj
expect resp.status == 200
} -run
logexpect l1 -v v1 -g vxid -d 1 -q {Notice ~ "^vdfp_pipe: vdp cat:"} {
expect 0 * Begin {^req \d+ rxreq$}
expect * = Notice {^vdfp_pipe: vdp cat: exec'd /bin/cat as pid \d+$}
expect * = Notice {^vdfp_pipe: vdp cat: /bin/cat exited with status 0$}
expect * = End
expect 0 * Begin {^req \d+ rxreq$}
expect * = Notice {^vdfp_pipe: vdp cat: exec'd /bin/cat as pid \d+$}
expect * = Notice {^vdfp_pipe: vdp cat: /bin/cat exited with status 0$}
expect * = End
expect 0 * Begin {^req \d+ rxreq$}
expect * = Notice {^vdfp_pipe: vdp cat: exec'd /bin/cat as pid \d+$}
expect * = Notice {^vdfp_pipe: vdp cat: /bin/cat exited with status 0$}
expect * = End
} -run
logexpect l1 -v v1 -g vxid -d 1 -q {Notice ~ "^vdfp_pipe: vdp t:"} {
expect 0 * Begin {^req \d+ rxreq$}
expect * = Notice {^vdfp_pipe: vdp t: exec'd /bin/true as pid \d+$}
expect * = Notice {^vdfp_pipe: vdp t: /bin/true exited with status 0$}
expect * = End
} -run
logexpect l1 -v v1 -g vxid -d 1 -q {Error ~ "^vdfp_pipe: vdp f:" or Notice ~ "^vdfp_pipe: vdp f"} {
expect 0 * Begin {^req \d+ rxreq$}
expect * = Notice {^vdfp_pipe: vdp f: exec'd /bin/false as pid \d+$}
expect * = Error {^vdfp_pipe: vdp f: /bin/false exited with status \d+$}
expect * = End
} -run
......@@ -26,13 +26,544 @@
* SUCH DAMAGE.
*/
/* for strdup() */
#define _POSIX_C_SOURCE 200809L
#include "config.h"
#include "vdef.h"
#include "vrt.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "cache/cache.h"
#include "cache/cache_filter.h"
#include "vtree.h"
#include "vcc_if.h"
#define VFAIL(ctx, type, fmt, ...) \
VRT_fail((ctx), type " pipe failure: " fmt, __VA_ARGS__)
#define VDPFAIL(ctx, fmt, ...) \
VFAIL((ctx), "vdp", fmt, __VA_ARGS__)
/* == default first_byte_ and between_bytes_timeout (in ms) */
#define TIMEOUT_MS 60000
struct VPFX(pipe_vdp) {
unsigned magic;
#define PIPE_VDP_MAGIC 0xa887d6c3
char *name;
char *path;
struct vdp *vdp;
};
struct vdp_map {
unsigned magic;
#define PIPE_VDP_MAP_MAGIC 0x87a66950
VRBT_ENTRY(vdp_map) entry;
const char *vcl_name;
const char *vdp_name;
struct VPFX(pipe_vdp) *obj;
};
static inline int
map_cmp(const struct vdp_map *v1, const struct vdp_map *v2)
{
int vclcmp = strcmp(v1->vcl_name, v2->vcl_name);
if (vclcmp != 0)
return (vclcmp);
return (strcmp(v1->vdp_name, v2->vdp_name));
}
VRBT_HEAD(vdp_tree, vdp_map);
VRBT_PROTOTYPE_STATIC(vdp_tree, vdp_map, entry, map_cmp);
VRBT_GENERATE_STATIC(vdp_tree, vdp_map, entry, map_cmp);
static struct vdp_tree tree_h;
struct vdp_state {
unsigned magic;
#define PIPE_VDP_STATE_MAGIC 0xaeb87f5f
struct VPFX(pipe_vdp) *obj;
pid_t chldpid;
int chldin;
int chldout;
int chlderr;
};
/* VDP */
static inline int
mk_pipe(int fds[2], char *name, struct vsl_log *vsl)
{
errno = 0;
if (pipe(fds) != 0) {
VSLb(vsl, SLT_Error, "vdfp_pipe: vdp %s: pipe(2) failed: %s",
name, vstrerror(errno));
return (-1);
}
return (0);
}
static inline int
mk_dup(int oldfd, int newfd, char *name, struct vsl_log *vsl)
{
errno = 0;
if (dup2(oldfd, newfd) == -1) {
VSLb(vsl, SLT_Error, "vdfp_pipe: vdp %s: dup2(2) failed: %s",
name, vstrerror(errno));
return (-1);
}
return (0);
}
static int v_matchproto_(vdp_init_f)
vdp_init(struct req *req, void **priv)
{
struct vdp_state *state;
struct vdp_entry *vdpe;
struct vdp_map map_entry, *map;
struct VPFX(pipe_vdp) *obj;
int in[2], out[2], err[2];
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(req->vdc, VDP_CTX_MAGIC);
AN(req->vcl);
AN(priv);
AZ(*priv);
vdpe = VTAILQ_LAST(&req->vdc->vdp, vdp_entry_s);
CHECK_OBJ_NOTNULL(vdpe, VDP_ENTRY_MAGIC);
AN(vdpe->vdp);
AN(vdpe->vdp->name);
map_entry.vdp_name = vdpe->vdp->name;
map_entry.vcl_name = VCL_Name(req->vcl);
map = VRBT_FIND(vdp_tree, &tree_h, &map_entry);
CHECK_OBJ_NOTNULL(map, PIPE_VDP_MAP_MAGIC);
CHECK_OBJ_NOTNULL(map->obj, PIPE_VDP_MAGIC);
obj = map->obj;
errno = 0;
ALLOC_OBJ(state, PIPE_VDP_STATE_MAGIC);
if (state == NULL) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: cannot allocate "
"state: %s", obj->name, vstrerror(errno));
return (-1);
}
*priv = state;
state->obj = obj;
state->chldpid = -1;
state->chldin = -1;
state->chldout = -1;
state->chlderr = -1;
if (mk_pipe(in, obj->name, req->vsl) != 0)
return (-1);
if (mk_pipe(out, obj->name, req->vsl) != 0)
return (-1);
if (mk_pipe(err, obj->name, req->vsl) != 0)
return (-1);
state->chldpid = fork();
if (state->chldpid < 0) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: fork failed for "
"%s: %s", obj->name, obj->path, vstrerror(errno));
return (-1);
}
if (state->chldpid == 0) {
char *envp[] = { NULL };
char *argv[2];
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
closefd(&in[1]);
closefd(&out[0]);
closefd(&err[0]);
if (mk_dup(in[0], STDIN_FILENO, obj->name, req->vsl) != 0)
exit(EXIT_FAILURE);
if (mk_dup(out[1], STDOUT_FILENO, obj->name, req->vsl) != 0)
exit(EXIT_FAILURE);
if (mk_dup(err[1], STDERR_FILENO, obj->name, req->vsl) != 0)
exit(EXIT_FAILURE);
argv[0] = obj->path;
argv[1] = NULL;
errno = 0;
if (execve(obj->path, argv, envp) == -1) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: cannot "
"exec %s: %s", obj->name, obj->path,
vstrerror(errno));
exit(EXIT_FAILURE);
}
}
VSLb(req->vsl, SLT_Notice, "vdfp_pipe: vdp %s: exec'd %s as pid %jd",
obj->name, obj->path, (intmax_t)state->chldpid);
state->chldin = in[1];
state->chldout = out[0];
state->chlderr = err[0];
http_Unset(req->resp, H_Content_Length);
return (0);
}
static inline void
close_all(struct vdp_state *state)
{
if (state->chldin != -1)
closefd(&state->chldin);
if (state->chldout != -1)
closefd(&state->chldout);
if (state->chlderr != -1)
closefd(&state->chlderr);
}
static inline const char *
stream_name(int fd, struct vdp_state *state)
{
if (fd == state->chldin)
return "stdin";
if (fd == state->chldout)
return "stdout";
if (fd == state->chlderr)
return "stderr";
WRONG("fd matches no child stream");
return (NULL);
}
/* Assumes CHECK_OBJ* for state called by caller. */
static int
check_pid(struct vdp_state *state, struct vsl_log *vsl, int options)
{
int status;
pid_t pid;
AN(vsl);
assert(state->chldpid > 0);
errno = 0;
while ((pid = waitpid(state->chldpid, &status, options)) < 0) {
assert(errno == EINTR);
continue;
}
if (pid == 0) {
AN(options & WNOHANG);
return (0);
}
assert(pid == state->chldpid);
state->chldpid = -1;
close_all(state);
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) == 0) {
VSLb(vsl, SLT_Notice, "vdfp_pipe: vdp %s: %s exited "
"with status 0", state->obj->name,
state->obj->path);
return (1);
}
VSLb(vsl, SLT_Error, "vdfp_pipe: vdp %s: %s exited with status "
"%d", state->obj->name, state->obj->path,
WEXITSTATUS(status));
return (-1);
}
if (WIFSIGNALED(status)) {
VSLb(vsl, SLT_Error, "vdfp_pipe: vdp %s: %s terminated with "
"signal %d (%s)", state->obj->name, state->obj->path,
WTERMSIG(status), strsignal(WTERMSIG(status)));
#ifdef WCOREDUMP
if (WCOREDUMP(status))
VSLb(vsl, SLT_Error, "vdfp_pipe: vdp %s: %s dumped "
"core", state->obj->name, state->obj->path);
#endif
return (-1);
}
/* XXX assuming no ptrace/WIFSTOPPED/WIFCONTINUED etc */
return (-1);
}
/* Assumes CHECK_OBJ* for req and state called by caller. */
static int
rw_child(struct req *req, struct vdp_state *state, enum vdp_action act,
const void *ptr, ssize_t len)
{
struct pollfd fds[3];
int retval, nfds;
ssize_t nbytes;
// XXX allocate buf in state
char buf[4096];
CHECK_OBJ_NOTNULL(state->obj, PIPE_VDP_MAGIC);
AN(state->obj->name);
AN(state->obj->path);
for (;;) {
if ((retval = check_pid(state, req->vsl, WNOHANG)) != 0)
return (retval);
nfds = 0;
if (ptr != NULL && len > 0 && state->chldin != -1) {
fds[0].fd = state->chldin;
fds[0].events = POLLOUT;
fds[0].revents = 0;
nfds++;
}
if (state->chldout != -1) {
fds[nfds].fd = state->chldout;
fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
nfds++;
}
if (state->chlderr != -1) {
fds[nfds].fd = state->chlderr;
fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
nfds++;
}
if (nfds == 0)
break;
errno = 0;
retval = poll(fds, nfds, TIMEOUT_MS);
if (retval < 0) {
assert(errno == EINTR);
continue;
}
if (retval == 0 && len > 0) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: timeout "
"waiting for %s", state->obj->name,
state->obj->path);
return (-1);
}
else if (retval == 0)
continue;
for (int i = 0; i < retval; i++) {
if (fds[i].revents == 0)
continue;
AZ(fds[i].revents & POLLNVAL);
if (fds[i].revents & POLLERR) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: "
"error polling %s %s", state->obj->name,
state->obj->path,
stream_name(fds[i].fd, state));
close_all(state);
return (-1);
}
if (fds[i].revents & POLLHUP
&& fds[i].fd == state->chldin) {
closefd(&state->chldin);
continue;
}
if (fds[i].revents & POLLOUT) {
assert(fds[i].fd == state->chldin);
AN(ptr);
assert(len > 0);
errno = 0;
nbytes = write(state->chldin, ptr, len);
if (nbytes < 0) {
VSLb(req->vsl, SLT_Error,
"vdfp_pipe: vdp %s: error writing "
"to %s stdin: %s",
state->obj->name, state->obj->path,
vstrerror(errno));
close_all(state);
return (-1);
}
AN(nbytes);
assert(nbytes <= len);
len -= nbytes;
ptr += nbytes;
continue;
}
AN(fds[i].revents & POLLIN);
assert(fds[i].fd == state->chldout
|| fds[i].fd == state->chlderr);
errno = 0;
nbytes = read(fds[i].fd, buf, 4096);
if (nbytes < 0) {
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s:"
" error reading %s from %s: %s",
state->obj->name,
stream_name(fds[i].fd, state),
state->obj->path,
vstrerror(errno));
close_all(state);
return (-1);
}
if (nbytes == 0) {
closefd(&fds[i].fd);
break;
}
if (fds[i].fd == state->chldout) {
retval = VDP_bytes(req, VDP_FLUSH, buf, nbytes);
if (retval < 0) {
close_all(state);
return (-1);
}
continue;
}
// XXX write one line at a time
VSLb(req->vsl, SLT_Error, "vdfp_pipe: vdp %s: %s "
"stderr ...", state->obj->name, state->obj->path);
VSLb_bin(req->vsl, SLT_Error, nbytes, buf);
}
if (len == 0)
break;
}
AZ(len);
if (act == VDP_END && state->chldin != -1)
closefd(&state->chldin);
return (0);
}
static int v_matchproto_(vdp_bytes_f)
vdp_bytes(struct req *req, enum vdp_action act, void **priv, const void *ptr,
ssize_t len)
{
struct vdp_state *state;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
assert(len >= 0);
AN(priv);
CAST_OBJ_NOTNULL(state, *priv, PIPE_VDP_STATE_MAGIC);
return (rw_child(req, state, act, ptr, len));
}
static int v_matchproto_(vdp_fini_f)
vdp_fini(struct req *req, void **priv)
{
struct vdp_state *state;
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
AN(priv);
if (*priv == NULL)
return (0);
CAST_OBJ(state, *priv, PIPE_VDP_STATE_MAGIC);
CHECK_OBJ_NOTNULL(state->obj, PIPE_VDP_MAGIC);
AN(state->obj->name);
AN(state->obj->path);
close_all(state);
if (state->chldpid != -1)
(void)check_pid(state, req->vsl, 0);
FREE_OBJ(state);
*priv = NULL;
return (0);
}
/* vdp object */
VCL_VOID
vmod_vdp__init(VRT_CTX, struct VPFX(pipe_vdp) **vdpp, const char *obj_name,
VCL_STRING vdp_name, VCL_STRING path)
{
struct VPFX(pipe_vdp) *vdp_obj;
struct vdp *vdp;
struct vdp_map *map;
CHECK_OBJ_NOTNULL(ctx, VRT_CTX_MAGIC);
AN(vdpp);
AZ(*vdpp);
AN(obj_name);
if (vdp_name == NULL || *vdp_name == '\0') {
VDPFAIL(ctx, "new %s: filter name is empty", obj_name);
return;
}
#define CHK_NAME(nm) do { \
if (strcmp(vdp_name, (nm)) == 0) { \
VDPFAIL(ctx, "new %s: filter name %s already in use " \
"by another VDP", obj_name, vdp_name); \
return; \
} \
} while(0)
CHK_NAME("esi");
CHK_NAME("gunzip");
CHK_NAME("range");
CHK_NAME("VED");
CHK_NAME("PGZ");
CHK_NAME("VZZ");
CHK_NAME("V1B");
CHK_NAME("H2B");
if (path == NULL || *path == '\0') {
VDPFAIL(ctx, "new %s: path is empty", path);
return;
}
errno = 0;
if (access(path, X_OK) != 0) {
VDPFAIL(ctx, "new %s: cannot execute %s: %s", obj_name, path,
vstrerror(errno));
return;
}
errno = 0;
ALLOC_OBJ(vdp_obj, PIPE_VDP_MAGIC);
if (vdp_obj == NULL) {
VDPFAIL(ctx, "new %s: cannot allocate space for object: %s",
obj_name, vstrerror(errno));
return;
}
*vdpp = vdp_obj;
vdp_obj->name = strdup(obj_name);
vdp_obj->path = strdup(path);
errno = 0;
ALLOC_OBJ(map, PIPE_VDP_MAP_MAGIC);
if (map == NULL) {
VDPFAIL(ctx, "new %s: cannot allocate space for map entry: %s",
obj_name, vstrerror(errno));
return;
}
map->vdp_name = strdup(vdp_name);
map->vcl_name = strdup(VCL_Name(ctx->vcl));
map->obj = vdp_obj;
AZ(VRBT_INSERT(vdp_tree, &tree_h, map));
errno = 0;
vdp = malloc(sizeof(*vdp));
if (vdp == NULL) {
VDPFAIL(ctx, "new %s: cannot allocate space for VDP: %s",
obj_name, vstrerror(errno));
return;
}
vdp->name = strdup(vdp_name);
vdp->init = vdp_init;
vdp->bytes = vdp_bytes;
vdp->fini = vdp_fini;
VRT_AddVDP(ctx, vdp);
vdp_obj->vdp = vdp;
return;
}
/* XXX Event function calls VRT_RemoveVDP(), since it needs a VRT_CTX. */
VCL_VOID
vmod_vdp__fini(struct VPFX(pipe_vdp) **vdpp)
{
struct VPFX(pipe_vdp) *vdp_obj;
/* XXX cleanup RB tree entry */
if (vdpp == NULL || *vdpp == NULL)
return;
TAKE_OBJ_NOTNULL(vdp_obj, vdpp, PIPE_VDP_MAGIC);
if (vdp_obj->name != NULL)
free(vdp_obj->name);
if (vdp_obj->path != NULL)
free(vdp_obj->path);
FREE_OBJ(vdp_obj);
}
VCL_STRING
vmod_version(VRT_CTX)
{
......
......@@ -45,7 +45,12 @@ DESCRIPTION
.. _Varnish: http://www.varnish-cache.org/
The pipe library provides `Varnish`_ delivery and fetch processors
that pipe client and backend responses through external commands.
(VDPs and VFPs) that pipe client and backend responses through
external commands.
XXX ...
$Object vdp(STRING name, STRING path)
XXX ...
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment