Commit db395f8d authored by Dridi Boukelmoune's avatar Dridi Boukelmoune

Implement socket barriers (shared between processes)

When varnishtest creates a socket barrier, it will bind a socket and
listen to incoming connections. Once the number of expected connections
is open, connections are closed.

Barrier users only need to connect to the socket, read "nothing" and
block until the connection is closed. It allows virtually any process
to sync with varnishtest. The barrier will provide macros with its
socket information.
parent 08998615
varnishtest "Barrier operations"
# same as a00008.vtc, with socket barriers instead
# bs -> server, bc -> client, bb -> both
barrier bs sock 4
barrier bc sock 4
barrier bb sock 4 -cyclic
server s1 {
rxreq
barrier bs sync
barrier bb sync
delay .9
txresp
} -start
server s2 {
rxreq
barrier bs sync
barrier bb sync
delay .6
txresp
} -start
server s3 {
rxreq
barrier bs sync
barrier bb sync
delay .2
txresp
} -start
client c1 -connect ${s1_sock} {
delay .2
txreq
rxresp
barrier bc sync
barrier bb sync
} -start
client c2 -connect ${s2_sock} {
delay .6
txreq
rxresp
barrier bc sync
barrier bb sync
} -start
client c3 -connect ${s3_sock} {
delay .9
txreq
rxresp
barrier bc sync
barrier bb sync
} -start
# Wait for all servers to have received requests
barrier bs sync
barrier bb sync
# Wait for all clients to have received responses
barrier bc sync
barrier bb sync
......@@ -28,13 +28,19 @@
#include "config.h"
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "vtc.h"
#include "vtcp.h"
enum barrier_e {
BARRIER_NONE = 0,
......@@ -55,6 +61,9 @@ struct barrier {
unsigned cyclic;
enum barrier_e type;
/* fields below are only for BARRIER_SOCK */
pthread_t thread;
volatile unsigned active;
};
static pthread_mutex_t barrier_mtx;
......@@ -116,6 +125,108 @@ barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
b->type = BARRIER_COND;
}
static void *
barrier_sock_thread(void *priv)
{
struct barrier *b;
struct vtclog *vl;
struct timeval tmo;
const char *err;
char abuf[16], pbuf[6];
int i, sock, *conns;
fd_set rfds;
CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
assert(b->type == BARRIER_SOCK);
AZ(pthread_mutex_lock(&b->mtx));
vl = vtc_logopen(b->name);
AN(vl);
sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
if (sock < 0) {
pthread_cond_signal(&b->cond);
AZ(pthread_mutex_unlock(&b->mtx));
vtc_log(vl, 0, "Barrier(%s) %s fails: %s (errno=%d)",
b->name, err, strerror(errno), errno);
}
assert(sock > 0);
(void)VTCP_nonblocking(sock);
VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);
macro_def(vl, b->name, "addr", "%s", abuf);
macro_def(vl, b->name, "port", "%s", pbuf);
macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
pthread_cond_signal(&b->cond);
AZ(pthread_mutex_unlock(&b->mtx));
conns = calloc(b->expected, sizeof *conns);
AN(conns);
while (b->active) {
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
tmo.tv_sec = 1;
tmo.tv_usec = 0;
i = select(sock + 1, &rfds, NULL, NULL, &tmo);
if (i == 0)
continue;
if (i < 0) {
if (errno == EINTR)
continue;
AZ(close(sock));
vtc_log(vl, 0,
"Barrier(%s) select fails: %s (errno=%d)",
b->name, strerror(errno), errno);
}
assert(i == 1);
assert(b->waiters <= b->expected);
if (b->waiters == b->expected)
vtc_log(vl, 0,
"Barrier(%s) use error: "
"more waiters than the %u expected",
b->name, b->expected);
i = accept(sock, NULL, NULL);
if (i < 0) {
AZ(close(sock));
vtc_log(vl, 0,
"Barrier(%s) accept fails: %s (errno=%d)",
b->name, strerror(errno), errno);
}
/* NB. We don't keep track of the established connections, only
* that connections were made to the barrier's socket.
*/
conns[b->waiters] = i;
if (++b->waiters < b->expected) {
vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
b->name, b->waiters, b->expected);
continue;
}
vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
for (i = 0; i < b->expected; i++)
AZ(close(conns[i]));
if (b->cyclic)
b->waiters = 0;
else
b->active = 0;
}
macro_undef(vl, b->name, "addr");
macro_undef(vl, b->name, "port");
macro_undef(vl, b->name, "sock");
AZ(close(sock));
return (NULL);
}
static void
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
{
......@@ -123,7 +234,13 @@ barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
barrier_expect(b, av, vl);
b->type = BARRIER_SOCK;
INCOMPL();
b->active = 1;
/* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
* socket is ready for convenience.
*/
AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
AZ(pthread_cond_wait(&b->cond, &b->mtx));
}
static void
......@@ -174,6 +291,46 @@ barrier_cond_sync(struct barrier *b, struct vtclog *vl)
b->waiters = 0;
}
static void
barrier_sock_sync(struct barrier *b, struct vtclog *vl)
{
struct vsb *vsb;
const char *err;
char buf[32];
int i, sock;
ssize_t sz;
CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
assert(b->type == BARRIER_SOCK);
i = snprintf(buf, sizeof buf, "${%s_sock}", b->name);
assert(i > 0 && i < sizeof buf);
vsb = macro_expand(vl, buf);
vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
if (sock < 0)
vtc_log(vl, 0, "Barrier(%s) connection failed: %s",
b->name, err);
VSB_delete(vsb);
/* emulate pthread_cond_wait's behavior */
AZ(pthread_mutex_unlock(&b->mtx));
sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
AZ(pthread_mutex_lock(&b->mtx));
i = errno;
AZ(close(sock));
if (sz < 0)
vtc_log(vl, 0, "Barrier(%s) connection failed: %s (errno=%d)",
b->name, strerror(i), i);
if (sz > 0)
vtc_log(vl, 0, "Barrier(%s) unexpected data (%ldB)",
b->name, sz);
}
static void
barrier_sync(struct barrier *b, struct vtclog *vl)
{
......@@ -188,7 +345,7 @@ barrier_sync(struct barrier *b, struct vtclog *vl)
barrier_cond_sync(b, vl);
break;
case BARRIER_SOCK:
INCOMPL();
barrier_sock_sync(b, vl);
break;
default:
WRONG("Wrong barrier type");
......@@ -212,11 +369,20 @@ cmd_barrier(CMD_ARGS)
/* Reset and free */
VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
AZ(pthread_mutex_lock(&b->mtx));
assert(b->type != BARRIER_NONE);
if (b->cyclic)
AZ(b->waiters);
else
assert(b->waiters == b->expected);
switch (b->type) {
case BARRIER_COND:
if (b->cyclic)
AZ(b->waiters);
else
assert(b->waiters == b->expected);
break;
case BARRIER_SOCK:
b->active = 0;
AZ(pthread_join(b->thread, NULL));
break;
default:
WRONG("Wrong barrier type");
}
AZ(pthread_mutex_unlock(&b->mtx));
}
AZ(pthread_mutex_unlock(&barrier_mtx));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment