Commit 295cadee authored by Geoff Simmons's avatar Geoff Simmons

varnishevent - added monitoring and stats

	     - fd table is static in varnishevent.c
	     - mutexing SPSCQ enqueue and dequeue
	     - writer flushes only when waiting
parent ab39048c
......@@ -16,6 +16,7 @@ varnishevent_SOURCES = \
writer.c \
config.c \
log.c \
monitor.c \
$(top_builddir)/lib/libvarnish/assert.c \
$(top_builddir)/lib/libvarnish/flopen.c \
$(top_builddir)/lib/libvarnish/version.c \
......
......@@ -94,10 +94,6 @@ DATA_Init(void)
if (logline == NULL)
return errno;
fd_tbl = (fd_t *) calloc(config.max_fd, sizeof(fd_t));
if (fd_tbl == NULL)
return errno;
VSTAILQ_INIT(&freehead);
for (int i = 0; i < config.max_data; i++) {
INIT_LOG_RECORDS(logline[i].rx_headers);
......@@ -114,11 +110,6 @@ DATA_Init(void)
VSTAILQ_INSERT_TAIL(&freehead, &logline[i], freelist);
}
for (int k = 0; k < config.max_fd; k++) {
fd_tbl[k].ll = NULL;
fd_tbl[k].state = FD_EMPTY;
}
AZ(pthread_mutex_init(&freelist_lock, &attr_lock));
global_nfree = 0;
......@@ -128,13 +119,17 @@ DATA_Init(void)
/*
* take all free entries from the datatable for lockless allocation
*/
void
unsigned
DATA_Take_Freelist(struct freehead_s *dst)
{
unsigned nfree;
AZ(pthread_mutex_lock(&freelist_lock));
VSTAILQ_CONCAT(dst, &freehead);
nfree = global_nfree;
global_nfree = 0;
AZ(pthread_mutex_unlock(&freelist_lock));
return nfree;
}
/*
......
/*-
* Copyright (c) 2013 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2013 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <syslog.h>
#include <pthread.h>
#include "varnishevent.h"
#include "libvarnish.h"
#include "vas.h"
static int run;
static pthread_t monitor;
static pthread_mutex_t stats_lock;
static void
log_output(void)
{
LOG_Log(LOG_INFO, "Data table: len=%u open=%u done=%u load=%.2f occ_hi=%u "
"global_free=%u", config.max_data, logline->open, logline->done,
100.0 * (logline->open + logline->done) / config.max_data,
logline->occ_hi, global_nfree);
RDR_Stats();
WRT_Stats();
SPSCQ_Stats();
}
static void
monitor_cleanup(void *arg)
{
(void) arg;
log_output();
LOG_Log0(LOG_INFO, "Monitoring thread exiting");
}
static void *
monitor_main(void *arg)
{
LOG_Log(LOG_INFO, "Monitor thread running every %u secs",
config.monitor_interval);
run = 1;
pthread_cleanup_push(monitor_cleanup, arg);
while (run) {
TIM_sleep(config.monitor_interval);
log_output();
}
pthread_cleanup_pop(0);
LOG_Log0(LOG_INFO, "Monitoring thread exiting");
pthread_exit((void *) NULL);
}
void
MON_Shutdown(void)
{
run = 0;
AZ(pthread_cancel(monitor));
AZ(pthread_join(monitor, NULL));
AZ(pthread_mutex_destroy(&stats_lock));
}
void
MON_Start(void)
{
AZ(pthread_mutex_init(&stats_lock, &attr_lock));
AZ(pthread_create(&monitor, NULL, monitor_main, NULL));
}
void
MON_StatsUpdate(stats_update_t update)
{
AZ(pthread_mutex_lock(&stats_lock));
switch(update) {
case STATS_WRITTEN:
logline->done--;
break;
case STATS_DONE:
logline->done++;
break;
default:
/* Unreachable */
AN(NULL);
}
AZ(pthread_mutex_unlock(&stats_lock));
}
......@@ -29,26 +29,29 @@
*
*/
#include <syslog.h>
#include <pthread.h>
#include "varnishevent.h"
#include "vqueue.h"
#include "vas.h"
static volatile unsigned long enqs = 0, deqs = 0;
static volatile unsigned long enqs = 0, deqs = 0, occ_hi = 0;
VSTAILQ_HEAD(spscq_s, logline_t);
struct spscq_s spscq_head = VSTAILQ_HEAD_INITIALIZER(spscq_head);
/* XXX: enqs & deqs are not synced */
unsigned SPSCQ_Len(void) {
return enqs - deqs;
}
static pthread_mutex_t head_lock = PTHREAD_MUTEX_INITIALIZER;
void
SPSCQ_Enq(logline_t *ptr)
{
/* XXX: is the assertion accurate if enqs & deqs are not synced? */
AZ(pthread_mutex_lock(&head_lock));
assert(enqs - deqs < config.max_data);
enqs++;
if (enqs - deqs > occ_hi)
occ_hi = enqs - deqs;
VSTAILQ_INSERT_TAIL(&spscq_head, ptr, spscq);
AZ(pthread_mutex_unlock(&head_lock));
}
logline_t
......@@ -56,10 +59,23 @@ logline_t
{
void *ptr;
if (VSTAILQ_EMPTY(&spscq_head))
AZ(pthread_mutex_lock(&head_lock));
if (VSTAILQ_EMPTY(&spscq_head)) {
AZ(pthread_mutex_unlock(&head_lock));
return NULL;
}
ptr = VSTAILQ_FIRST(&spscq_head);
VSTAILQ_REMOVE_HEAD(&spscq_head, spscq);
deqs++;
AZ(pthread_mutex_unlock(&head_lock));
return ptr;
}
void
SPSCQ_Stats(void)
{
unsigned len = enqs - deqs;
LOG_Log(LOG_INFO, "Queue: max=%u len=%u load=%.2f occ_hi=%u",
config.max_data, len, 100.0 * len / config.max_data, occ_hi);
}
......@@ -88,6 +88,24 @@
#include "varnishevent.h"
typedef enum {
FD_EMPTY = 0,
FD_OPEN
} fd_state_e;
typedef struct fd_t {
logline_t *ll;
fd_state_e state;
} fd_t;
static fd_t *fd_tbl;
static unsigned seen = 0;
static unsigned open = 0;
static unsigned submitted = 0;
static unsigned occ_hi = 0;
static unsigned waits = 0;
static volatile sig_atomic_t reopen;
struct VSM_data *vd;
......@@ -97,6 +115,7 @@ static int m_flag = 0;
/* Local freelist */
static struct freehead_s reader_freelist =
VSTAILQ_HEAD_INITIALIZER(reader_freelist);
static unsigned rdr_free = 0;
static int waiting = 0;
......@@ -106,26 +125,38 @@ RDR_Waiting(void)
return waiting;
}
void
RDR_Stats(void)
{
LOG_Log(LOG_INFO, "Reader (%s): fd_max=%u seen=%u open=%u load=%.2f "
"submitted=%u occ_hi=%u waits=%u free=%u",
waiting ? "waiting" : "running", config.max_fd, seen, open,
100.0 * open / config.max_fd, submitted, occ_hi, waits, rdr_free);
}
static inline logline_t
*take(void)
{
struct logline_t *data;
while (VSTAILQ_EMPTY(&reader_freelist)) {
DATA_Take_Freelist(&reader_freelist);
rdr_free = DATA_Take_Freelist(&reader_freelist);
if (VSTAILQ_EMPTY(&reader_freelist)) {
/* XXX: bump stats */
waits++;
LOG_Log0(LOG_DEBUG, "Reader: waiting for free space");
waiting = 1;
AZ(pthread_mutex_lock(&data_ready_lock));
AZ(pthread_cond_wait(&data_ready_cond, &data_ready_lock));
DATA_Take_Freelist(&reader_freelist);
rdr_free = DATA_Take_Freelist(&reader_freelist);
waiting = 0;
AZ(pthread_mutex_unlock(&data_ready_lock));
LOG_Log0(LOG_DEBUG, "Reader: done waiting for free space");
}
}
data = VSTAILQ_FIRST(&reader_freelist);
VSTAILQ_REMOVE_HEAD(&reader_freelist, freelist);
rdr_free--;
return (data);
}
......@@ -135,11 +166,10 @@ submit(int fd)
assert(fd_tbl[fd].state == FD_OPEN);
assert(fd_tbl[fd].ll->state == DATA_DONE);
SPSCQ_Enq((void *) fd_tbl[fd].ll);
/* XXX: bump stats */
/* XXX: wait for a goal queue length, as in trackrdrd? */
if (WRT_Waiting()) {
if (WRT_Waiting())
AZ(pthread_cond_signal(&spscq_ready_cond));
}
submitted++;
}
static inline void
......@@ -229,6 +259,12 @@ h_ncsa(void *priv, enum VSL_tag_e tag, unsigned fd,
(void) priv;
if (fd > config.max_fd) {
LOG_Log(LOG_ALERT, "fd %u exceeds max %u: %s [%.*s]", fd, config.max_fd,
VSL_tags[tag], len, ptr);
return 1;
}
/* XXX: handle these, esp. for Backend_health */
if (fd == 0)
return 1;
......@@ -246,7 +282,14 @@ h_ncsa(void *priv, enum VSL_tag_e tag, unsigned fd,
/* XXX: assert that all lp->tag[i].len == 0 ? */
fd_tbl[fd].ll = lp;
lp->state = DATA_OPEN;
logline->open++;
if (logline->open + logline->done > logline->occ_hi)
logline->occ_hi = logline->open + logline->done;
fd_tbl[fd].state = FD_OPEN;
seen++;
open++;
if (open > occ_hi)
occ_hi = open;
}
else {
lp = fd_tbl[fd].ll;
......@@ -259,10 +302,13 @@ h_ncsa(void *priv, enum VSL_tag_e tag, unsigned fd,
lp->bitmap |= bitmap;
if (lp->state == DATA_DONE) {
logline->open--;
MON_StatsUpdate(STATS_DONE);
if (!m_flag || VSL_Matched(vd, lp->bitmap))
submit(fd);
fd_tbl[fd].state = FD_EMPTY;
fd_tbl[fd].ll = NULL;
open--;
}
return 1;
......@@ -402,7 +448,7 @@ main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
LOG_Log(LOG_INFO, "intializing (%s)", VCS_version);
LOG_Log(LOG_INFO, "initializing (%s)", VCS_version);
AZ(pthread_mutexattr_init(&attr_lock));
AZ(pthread_condattr_init(&attr_cond));
......@@ -410,20 +456,42 @@ main(int argc, char *argv[])
AZ(pthread_mutexattr_setpshared(&attr_lock, PTHREAD_PROCESS_PRIVATE));
AZ(pthread_condattr_setpshared(&attr_cond, PTHREAD_PROCESS_PRIVATE));
/* XXX: log errors */
if ((errnum = DATA_Init()) != 0) {
fprintf(stderr, "Cannot init data structures: %s\n", strerror(errnum));
exit(1);
LOG_Log(LOG_ALERT, "Cannot init data table: %s\n",
strerror(errnum));
exit(EXIT_FAILURE);
}
fd_tbl = (fd_t *) calloc(config.max_fd, sizeof(fd_t));
if (fd_tbl == NULL) {
LOG_Log(LOG_ALERT, "Cannot init fd table: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
for (int k = 0; k < config.max_fd; k++) {
fd_tbl[k].ll = NULL;
fd_tbl[k].state = FD_EMPTY;
}
if (config.monitor_interval > 0)
MON_Start();
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
if ((errnum = WRT_Init(of)) != 0) {
fprintf(stderr, "Cannot init writer thread: %s\n", strerror(errnum));
exit(1);
LOG_Log(LOG_ALERT, "Cannot init writer thread: %s\n", strerror(errnum));
exit(EXIT_FAILURE);
}
WRT_Start();
while (!WRT_Running())
;
/* XXX: configure wrt_waits and sleep interval? */
int wrt_waits = 0;
while (!WRT_Running()) {
if (wrt_waits++ > 10) {
LOG_Log0(LOG_ALERT, "Writer thread not running, giving up");
exit(EXIT_FAILURE);
}
TIM_sleep(1);
}
while (VSL_Dispatch(vd, h_ncsa, of) >= 0) {
if (fflush(of) != 0) {
......@@ -438,6 +506,7 @@ main(int argc, char *argv[])
}
WRT_Halt();
MON_Shutdown();
exit(0);
}
......@@ -41,7 +41,7 @@
#define DEFAULT_MAX_HEADERS 64 /* http_max_hdr */
#define DEFAULT_MAX_FD 1024
#define DEFAULT_MAX_DATA 3072
#define DEFAULT_MAX_DATA 4096
#define DEFAULT_PID_FILE "/var/run/varnishevent.pid"
#if 0
......@@ -88,22 +88,13 @@ typedef struct logline_t {
hdr_t vcl_log; /* VCL_Log entries */
VSTAILQ_ENTRY(logline_t) freelist;
VSTAILQ_ENTRY(logline_t) spscq;
unsigned open;
unsigned done;
unsigned occ_hi;
} logline_t;
logline_t *logline;
typedef enum {
FD_EMPTY = 0,
FD_OPEN
} fd_state_e;
typedef struct fd_t {
logline_t *ll;
fd_state_e state;
} fd_t;
fd_t *fd_tbl;
VSTAILQ_HEAD(freehead_s, logline_t);
struct freehead_s freehead;
......@@ -164,6 +155,7 @@ struct config {
/* varnishevent.c */
int RDR_Waiting(void);
void RDR_Stats(void);
/* config.c */
void CONF_Init(void);
......@@ -198,7 +190,7 @@ int LOG_Open(const char *progname);
/* data.c */
int DATA_Init(void);
void DATA_Clear_Logline(logline_t *ll);
void DATA_Take_Freelist(struct freehead_s *dst);
unsigned DATA_Take_Freelist(struct freehead_s *dst);
void DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned);
/* writer.c */
......@@ -211,6 +203,19 @@ void WRT_Halt(void);
void WRT_Shutdown(void);
/* spscq.c */
unsigned SPSCQ_Len(void);
void SPSCQ_Enq(logline_t *ptr);
logline_t *SPSCQ_Deq(void);
unsigned SPSCQ_Len(void);
void SPSCQ_Stats(void);
/* monitor.c */
typedef enum {
/* "Ending" VSL tag seen */
STATS_DONE,
/* Log line written */
STATS_WRITTEN,
} stats_update_t;
void MON_Start(void);
void MON_Shutdown(void);
void MON_StatsUpdate(stats_update_t update);
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* Copyright (c) 2013 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2013 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
......@@ -63,7 +63,6 @@ typedef enum {
WRT_STATE_E_LIMIT
} wrt_state_e;
#if 0
static const char* statename[WRT_STATE_E_LIMIT] = {
[WRT_NOTSTARTED] = "not started",
[WRT_INITIALIZING] = "initializing",
......@@ -72,7 +71,6 @@ static const char* statename[WRT_STATE_E_LIMIT] = {
[WRT_SHUTTINGDOWN] = "shutting down",
[WRT_EXITED] = "exited"
};
#endif
/* Single writer thread, consumer for the SPSC queue. */
pthread_t writer;
......@@ -81,7 +79,6 @@ pthread_t writer;
static struct freehead_s wrt_freelist;
static unsigned wrt_nfree;
static struct vsb *os;
static FILE *fo;
......@@ -704,9 +701,9 @@ wrt_write(logline_t *ll)
/* flush the stream */
VSB_finish(os);
fprintf(fo, "%s", VSB_data(os));
/* XXX: if autoflush */
fflush(fo);
/* XXX: configurable autoflush */
writes++;
MON_StatsUpdate(STATS_WRITTEN);
/* clean up */
DATA_Clear_Logline(ll);
......@@ -715,7 +712,7 @@ wrt_write(logline_t *ll)
/* XXX: shouldn't we return free space sooner?
e.g. nfree < (max >> 1), i.e. less than half */
if (global_nfree == 0)
if (global_nfree == 0 || RDR_Waiting())
wrt_return_freelist();
}
......@@ -735,7 +732,7 @@ static void
wrt->state = WRT_RUNNING;
while (run) {
ll = (logline_t *) SPSCQ_Deq();
ll = SPSCQ_Deq();
if (ll != NULL) {
deqs++;
CHECK_OBJ(ll, LOGLINE_MAGIC);
......@@ -746,27 +743,30 @@ static void
/*
* wait until data are available, or quit is signaled.
* XXX: no backoff condition
* return space before sleeping */
* flush ouput and return space before sleeping
*/
wrt->state = WRT_WAITING;
if (wrt_nfree > 0)
wrt_return_freelist();
fflush(fo);
AZ(pthread_mutex_lock(&spscq_ready_lock));
/*
* run is guaranteed to be fresh after the lock
*/
if (run) {
wrt->state = WRT_WAITING;
waits++;
AZ(pthread_cond_wait(&spscq_ready_cond, &spscq_ready_lock));
wrt->state = WRT_RUNNING;
}
wrt->state = WRT_RUNNING;
AZ(pthread_mutex_unlock(&spscq_ready_lock));
}
wrt->state = WRT_SHUTTINGDOWN;
/* Prepare to exit, drain the queue */
while ((ll = (logline_t *) SPSCQ_Deq()) != NULL) {
while ((ll = SPSCQ_Deq()) != NULL) {
deqs++;
CHECK_OBJ(ll, LOGLINE_MAGIC);
wrt_write(ll);
......@@ -781,12 +781,8 @@ static void
static void wrt_cleanup(void)
{
if (cleaned) return;
#if 0
AZ(pthread_mutex_destroy(&spmcq_datawaiter_lock));
AZ(pthread_cond_destroy(&spmcq_datawaiter_cond));
AZ(pthread_mutex_destroy(&spmcq_roomwaiter_lock));
AZ(pthread_cond_destroy(&spmcq_roomwaiter_cond));
#endif
free(scratch);
cleaned = 1;
}
......@@ -823,10 +819,9 @@ void
WRT_Stats(void)
{
if (!run) return;
#if 0
LOG_Log(LOG_INFO, "Writer thread (%s): seen=%d waits=%d writes=%d",
statename[wrt_data.state], deqs, waits, writes);
#endif
LOG_Log(LOG_INFO, "Writer (%s): seen=%d writes=%d waits=%d free=%d",
statename[wrt_data.state], deqs, writes, waits, wrt_nfree);
}
int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment