Commit 433e86f0 authored by Tollef Fog Heen's avatar Tollef Fog Heen

Set the waiter pipe as non-blocking and record overflows

Fixes #1285
parent 2d53dfe4
......@@ -1039,6 +1039,7 @@ void VMOD_Init(void);
void WAIT_Enter(struct sess *sp);
void WAIT_Init(void);
const char *WAIT_GetName(void);
void WAIT_Write_Session(struct sess *sp, int fd);
/* cache_wrk.c */
......
......@@ -32,6 +32,7 @@
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <math.h>
#include "cache/cache.h"
......@@ -77,3 +78,16 @@ WAIT_Enter(struct sess *sp)
SES_Close(sp, SC_REM_CLOSE);
waiter->pass(waiter_priv, sp);
}
void
WAIT_Write_Session(struct sess *sp, int fd)
{
ssize_t written;
written = write(fd, &sp, sizeof sp);
if (written != sizeof sp && (errno == EAGAIN || errno == EWOULDBLOCK)) {
VSC_C_main->sess_pipe_overflow++;
SES_Delete(sp, SC_SESS_PIPE_OVERFLOW, NAN);
return;
}
assert (written == sizeof sp);
}
......@@ -37,19 +37,19 @@
#include <sys/epoll.h>
#include <fcntl.h>
#include <stdlib.h>
#include "cache/cache.h"
#include "waiter/waiter.h"
#include "vtim.h"
#include "vfil.h"
#ifndef EPOLLRDHUP
# define EPOLLRDHUP 0
#endif
#define NEEV 100
#define NEEV 8192
struct vwe {
unsigned magic;
......@@ -220,12 +220,13 @@ vwe_timeout_idle_ticker(void *priv)
/*--------------------------------------------------------------------*/
static void
vwe_pass(void *priv, const struct sess *sp)
vwe_pass(void *priv, struct sess *sp)
{
struct vwe *vwe;
CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
assert(sizeof sp == write(vwe->pipes[1], &sp, sizeof sp));
WAIT_Write_Session(sp, vwe->pipes[1]);
}
/*--------------------------------------------------------------------*/
......@@ -233,7 +234,6 @@ vwe_pass(void *priv, const struct sess *sp)
static void *
vwe_init(void)
{
int i;
struct vwe *vwe;
ALLOC_OBJ(vwe, VWE_MAGIC);
......@@ -242,17 +242,9 @@ vwe_init(void)
AZ(pipe(vwe->pipes));
AZ(pipe(vwe->timer_pipes));
i = fcntl(vwe->pipes[0], F_GETFL);
assert(i != -1);
i |= O_NONBLOCK;
i = fcntl(vwe->pipes[0], F_SETFL, i);
assert(i != -1);
i = fcntl(vwe->timer_pipes[0], F_GETFL);
assert(i != -1);
i |= O_NONBLOCK;
i = fcntl(vwe->timer_pipes[0], F_SETFL, i);
assert(i != -1);
AZ(VFIL_nonblocking(vwe->pipes[0]));
AZ(VFIL_nonblocking(vwe->pipes[1]));
AZ(VFIL_nonblocking(vwe->timer_pipes[0]));
AZ(pthread_create(&vwe->timer_thread,
NULL, vwe_timeout_idle_ticker, vwe));
......
......@@ -38,7 +38,6 @@
#include <sys/types.h>
#include <sys/event.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
......@@ -46,6 +45,7 @@
#include "waiter/waiter.h"
#include "vtim.h"
#include "vfil.h"
#define NKEV 100
......@@ -210,12 +210,13 @@ vwk_thread(void *priv)
/*--------------------------------------------------------------------*/
static void
vwk_pass(void *priv, const struct sess *sp)
vwk_pass(void *priv, struct sess *sp)
{
struct vwk *vwk;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
assert(sizeof sp == write(vwk->pipes[1], &sp, sizeof sp));
WAIT_Write_Session(sp, vwk->pipes[1]);
}
/*--------------------------------------------------------------------*/
......@@ -223,7 +224,6 @@ vwk_pass(void *priv, const struct sess *sp)
static void *
vwk_init(void)
{
int i;
struct vwk *vwk;
ALLOC_OBJ(vwk, VWK_MAGIC);
......@@ -232,11 +232,8 @@ vwk_init(void)
VTAILQ_INIT(&vwk->sesshead);
AZ(pipe(vwk->pipes));
i = fcntl(vwk->pipes[0], F_GETFL);
assert(i != -1);
i |= O_NONBLOCK;
i = fcntl(vwk->pipes[0], F_SETFL, i);
assert(i != -1);
AZ(VFIL_nonblocking(vwk->pipes[0]));
AZ(VFIL_nonblocking(vwk->pipes[1]));
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
return (vwk);
......
......@@ -30,6 +30,7 @@
#include "config.h"
#include <fcntl.h>
#include <poll.h>
#include <stdlib.h>
......@@ -37,6 +38,7 @@
#include "waiter/waiter.h"
#include "vtim.h"
#include "vfil.h"
#define NEEV 128
......@@ -191,13 +193,13 @@ vwp_main(void *priv)
/*--------------------------------------------------------------------*/
static void
vwp_poll_pass(void *priv, const struct sess *sp)
vwp_poll_pass(void *priv, struct sess *sp)
{
struct vwp *vwp;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
assert(sizeof sp == write(vwp->pipes[1], &sp, sizeof sp));
WAIT_Write_Session(sp, vwp->pipes[1]);
}
/*--------------------------------------------------------------------*/
......@@ -211,6 +213,9 @@ vwp_poll_init(void)
AN(vwp);
VTAILQ_INIT(&vwp->sesshead);
AZ(pipe(vwp->pipes));
AZ(VFIL_nonblocking(vwp->pipes[1]));
vwp_pollspace(vwp, 256);
AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
return (vwp);
......
......@@ -240,14 +240,18 @@ vws_thread(void *priv)
/*--------------------------------------------------------------------*/
static void
vws_pass(void *priv, const struct sess *sp)
vws_pass(void *priv, struct sess *sp)
{
int r;
struct vws *vws;
CAST_OBJ_NOTNULL(vws, priv, VWS_MAGIC);
while((r = port_send(vws->dport, 0, TRUST_ME(sp))) == -1 &&
errno == EAGAIN);
r = port_send(vws->dport, 0, TRUST_ME(sp));
if (r == -1 && errno == EAGAIN) {
VSC_C_main->sess_pipe_overflow++;
SES_Delete(sp, SC_SESS_PIPE_OVERFLOW, NAN);
return;
}
AZ(r);
}
......
......@@ -31,7 +31,7 @@
struct sess;
typedef void* waiter_init_f(void);
typedef void waiter_pass_f(void *priv, const struct sess *);
typedef void waiter_pass_f(void *priv, struct sess *);
#define WAITER_DEFAULT "platform dependent"
......
......@@ -39,5 +39,6 @@ SESS_CLOSE(TX_PIPE, "Piped transaction")
SESS_CLOSE(TX_ERROR, "Error transaction")
SESS_CLOSE(TX_EOF, "EOF transmission")
SESS_CLOSE(OVERLOAD, "Out of some resource")
SESS_CLOSE(SESS_PIPE_OVERFLOW, "Session pipe overflow")
/*lint -restore */
......@@ -88,6 +88,11 @@ VSC_F(sess_fail, uint64_t, 1, 'c',
" some resource like filedescriptors."
)
VSC_F(sess_pipe_overflow, uint64_t, 1, 'c',
"Session pipe overflow",
"Count of sessions dropped due to the session pipe overflowing."
)
/*---------------------------------------------------------------------*/
VSC_F(client_req_400, uint64_t, 1, 'a',
......
......@@ -33,3 +33,4 @@ int seed_random(void);
int VFIL_tmpfile(char *);
char *VFIL_readfile(const char *pfx, const char *fn, ssize_t *sz);
char *VFIL_readfd(int fd, ssize_t *sz);
int VFIL_nonblocking(int fd);
......@@ -123,3 +123,16 @@ VFIL_readfile(const char *pfx, const char *fn, ssize_t *sz)
errno = err;
return (r);
}
int
VFIL_nonblocking(int fd)
{
int i;
i = fcntl(fd, F_GETFL);
assert(i != -1);
i |= O_NONBLOCK;
i = fcntl(fd, F_SETFL, i);
assert(i != -1);
return (i);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment