Commit e22718ba authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: first demonized version

parent 8acf4395
......@@ -43,7 +43,20 @@ PKG_PROG_PKG_CONFIG
# Checks for header files.
AC_HEADER_STDC
#AC_CHECK_HEADERS([sys/stdlib.h])
## From Varnish configure.ac
# white lie - we don't actually test it
AC_MSG_CHECKING([whether daemon() works])
case $target in
*-*-darwin*)
# present but not functional
AC_MSG_RESULT([no])
ac_cv_func_daemon=no
;;
*)
AC_CHECK_FUNCS([daemon])
;;
esac
# Varnish source tree
AC_ARG_VAR([VARNISHSRC], [path to Varnish source tree (mandatory)])
......@@ -107,6 +120,7 @@ AC_ARG_ENABLE(werror,
CXXFLAGS="${CXXFLAGS} -Werror"
)
## From activemq-cpp configure.ac
## Execute Doxygen macros
DX_HTML_FEATURE(ON)
DX_CHM_FEATURE(OFF)
......
......@@ -112,6 +112,7 @@ CONF_Add(const char *lval, const char *rval)
confUnsigned("maxopen.scale", maxopen_scale);
confUnsigned("maxdata.scale", maxdata_scale);
confUnsigned("nworkers", nworkers);
confUnsigned("restarts", restarts);
if (strcmp(lval, "syslog.facility") == 0) {
if ((ret = conf_getFacility(rval)) < 0)
......@@ -177,6 +178,7 @@ CONF_Init(void)
config.mq_uri[0] = '\0';
config.mq_qname[0] = '\0';
config.nworkers = 1;
config.restarts = 1;
}
int
......@@ -248,4 +250,5 @@ CONF_Dump(void)
confdump("mq.uri = %s", config.mq_uri);
confdump("mq.qname = %s", config.mq_qname);
confdump("nworkers = %d", config.nworkers);
confdump("restarts = %d", config.restarts);
}
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
PARENT(SIGTERM, terminate);
PARENT(SIGINT, terminate);
PARENT(SIGUSR1, SIG_IGN);
PARENT(SIGUSR2, SIG_IGN);
CHILD(SIGTERM, terminate);
CHILD(SIGINT, terminate);
CHILD(SIGUSR1, dump);
CHILD(SIGUSR2, SIG_IGN);
......@@ -14,8 +14,8 @@
echo
echo "TEST: $0"
echo "... testing log output at debug level against a known checksum"
CKSUM=$(../trackrdrd -f varnish.binlog -l - -d -c test.conf | cksum)
if [ "$CKSUM" != '1094437405 229102' ]; then
CKSUM=$(../trackrdrd -D -f varnish.binlog -l - -d -c test.conf | cksum)
if [ "$CKSUM" != '772720543 229221' ]; then
echo "ERROR: Regression test incorrect cksum: $CKSUM"
exit 1
fi
......
......@@ -46,6 +46,7 @@
#include <syslog.h>
#include <sys/fcntl.h>
#include <pthread.h>
#include <stdarg.h>
#include "compat/daemon.h"
......@@ -67,6 +68,22 @@
#define DEFAULT_CONFIG "/etc/trackrdrd.conf"
/* Hack, because we cannot have #ifdef in the macro definition SIGDISP */
#define _UNDEFINED(SIG) ((#SIG)[0] == 0)
#define UNDEFINED(SIG) _UNDEFINED(SIG)
#define SIGDISP(SIG, disp) \
do { if (UNDEFINED(SIG)) break; \
if (signal((SIG), (disp)) == SIG_ERR) \
LOG_Log(LOG_ALERT, \
"Cannot install handler for " #SIG ": %s", \
strerror(errno)); \
} while(0)
static void child_main(struct VSM_data *vd, int endless);
static volatile sig_atomic_t term;
/*--------------------------------------------------------------------*/
static void
......@@ -88,14 +105,7 @@ submit(unsigned xid)
tbl.submitted++;
}
static void
sigusr1(int sig)
{
DATA_Dump();
signal(sig, sigusr1);
}
static dataentry
static inline dataentry
*insert(unsigned xid, enum VSL_tag_e tag, unsigned fd)
{
dataentry *entry;
......@@ -122,6 +132,31 @@ static dataentry
return entry;
}
static inline dataentry
*find_or_insert(unsigned xid, enum VSL_tag_e tag, unsigned fd)
{
dataentry *entry;
entry = DATA_Find(xid);
if (entry == NULL) {
if (term)
return NULL;
LOG_Log(LOG_WARNING, "%s: XID %d not found, attempting insert",
VSL_tags[tag], xid);
entry = insert(xid, tag, fd);
if (entry == NULL)
return NULL;
}
else {
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
}
return entry;
}
static int
OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
unsigned spec, const char *ptr, uint64_t bitmap)
......@@ -134,6 +169,9 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
(void) priv;
(void) bitmap;
if (term && tbl.open == 0)
return 1;
/* spec != 'c' */
if ((spec & VSL_S_CLIENT) == 0)
LOG_Log(LOG_WARNING, "%s: Client bit ('c') not set", VSL_tags[tag]);
......@@ -141,6 +179,8 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
switch (tag) {
case SLT_ReqStart:
if (term) return(0);
err = Parse_ReqStart(ptr, len, &xid);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d", VSL_tags[tag], xid);
......@@ -154,29 +194,15 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
if (strncmp(ptr, TRACKLOG_PREFIX, TRACKLOG_PREFIX_LEN) != 0)
break;
/* assert(regex captures XID and data); */
err = Parse_VCL_Log(&ptr[TRACKLOG_PREFIX_LEN], len-TRACKLOG_PREFIX_LEN,
&xid, &data, &datalen);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d, data=[%.*s]", VSL_tags[tag],
xid, datalen, data);
/* assert((hash(XID) exists) && hash(XID).tid == fd
&& !hash(XID).done); */
entry = DATA_Find(xid);
if (entry == NULL) {
LOG_Log(LOG_WARNING, "%s: XID %d not found, attempting insert",
VSL_tags[tag], xid);
entry = insert(xid, tag, fd);
entry = find_or_insert(xid, tag, fd);
if (entry == NULL)
break;
}
else {
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
}
/* Data overflow */
/* XXX: Encapsulate (1 << (config.maxdata_scale+10)) */
......@@ -197,19 +223,14 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
break;
case SLT_ReqEnd:
/* assert(regex.match() && (hash(XID) exists) && hash(XID).tid == fd
&& !hash(XID).done); */
err = Parse_ReqEnd(ptr, len, &xid);
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%d", VSL_tags[tag], xid);
entry = DATA_Find(xid);
CHECK_OBJ_NOTNULL(entry, DATA_MAGIC);
assert(entry->xid == xid);
assert(entry->tid == fd);
assert(entry->state == DATA_OPEN);
/*hash(XID).done = TRUE;*/
entry = find_or_insert(xid, tag, fd);
if (entry == NULL)
break;
entry->state = DATA_DONE;
MON_StatsUpdate(STATS_DONE);
submit(xid);
......@@ -225,6 +246,202 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
/*--------------------------------------------------------------------*/
/* Handle for the PID file */
struct vpf_fh *pfh = NULL;
static void
dump(int sig)
{
DATA_Dump();
signal(sig, dump);
}
static void
terminate(int sig)
{
LOG_Log0(LOG_DEBUG, "Signal handler terminate called");
(void) sig;
term = 1;
}
static void
parent_shutdown(int status, pid_t child_pid)
{
if (child_pid && kill(child_pid, SIGTERM) != 0) {
LOG_Log(LOG_ERR, "Cannot kill child process %d: %s", child_pid,
strerror(errno));
exit(EXIT_FAILURE);
}
/* Remove PID file if necessary */
if (pfh != NULL)
VPF_Remove(pfh);
LOG_Log0(LOG_INFO, "Management process exiting");
LOG_Close();
exit(status);
}
static void
parent_main(pid_t child_pid, struct VSM_data *vd, int endless)
{
int restarts = 0, status;
pid_t wpid;
LOG_Log0(LOG_INFO, "Management process starting");
term = 0;
/* install signal handlers */
#define PARENT(SIG,disp) SIGDISP(SIG,disp)
#define CHILD(SIG,disp) ((void) 0)
#include "signals.h"
#undef PARENT
#undef CHILD
while (!term) {
wpid = waitpid(child_pid, &status, 0);
if (wpid == -1) {
if (errno == EINTR) {
if (term)
parent_shutdown(EXIT_SUCCESS, child_pid);
else {
LOG_Log0(LOG_WARNING,
"Interrupted while waiting for worker process, "
"continuing");
continue;
}
}
LOG_Log(LOG_ERR, "Cannot wait for worker process %d: %s", child_pid,
strerror(errno));
parent_shutdown(EXIT_FAILURE, child_pid);
}
assert(wpid == child_pid);
AZ(WIFSTOPPED(status));
AZ(WIFCONTINUED(status));
if (WIFEXITED(status))
LOG_Log(LOG_INFO, "Worker process %d exited normally", child_pid);
else
LOG_Log(LOG_WARNING, "Worker process %d exited with status %d",
child_pid, WEXITSTATUS(status));
if (WIFSIGNALED(status))
LOG_Log(LOG_WARNING, "Worker process %d exited due to signal %d",
child_pid, WTERMSIG(status));
if (config.restarts && restarts > config.restarts) {
LOG_Log(LOG_ERR, "Too many restarts: %d", restarts);
parent_shutdown(EXIT_FAILURE, 0);
}
LOG_Log0(LOG_INFO, "Restarting child process");
child_pid = fork();
switch(child_pid) {
case -1:
LOG_Log(LOG_ALERT, "Cannot fork: %s", strerror(errno));
parent_shutdown(EXIT_FAILURE, child_pid);
break;
case 0:
child_main(vd, endless);
break;
default:
restarts++;
}
}
}
/* Matches typedef VSM_diag_f in include/varnishapi.h
Log error messages from VSL_Open and VSL_Arg */
static void
vsl_diag(void *priv, const char *fmt, ...)
{
(void) priv;
va_list ap;
va_start(ap, fmt);
logconf.log(LOG_ERR, fmt, ap);
va_end(ap);
}
static void
child_main(struct VSM_data *vd, int endless)
{
/* XXX: privilege separation */
int errnum;
const char *errmsg;
pthread_t monitor;
LOG_Log0(LOG_INFO, "Worker process starting");
/* install signal handlers */
#define CHILD(SIG,disp) SIGDISP(SIG,disp)
#define PARENT(SIG,disp) ((void) 0)
#include "signals.h"
#undef PARENT
#undef CHILD
if (DATA_Init() != 0) {
LOG_Log(LOG_ERR, "Cannot init data table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
VSM_Diag(vd, vsl_diag, NULL);
if (VSL_Open(vd, 1))
exit(EXIT_FAILURE);
/* Only read the VSL tags relevant to tracking */
assert(VSL_Arg(vd, 'i', TRACK_TAGS) > 0);
/* Start the monitor thread */
if (config.monitor_interval > 0.0) {
if (pthread_create(&monitor, NULL, MON_StatusThread,
(void *) &config.monitor_interval) != 0) {
LOG_Log(LOG_ERR, "Cannot start monitoring thread: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
}
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = MQ_GlobalInit();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize message broker access: %s", errmsg);
exit(EXIT_FAILURE);
}
errnum = WRK_Init();
if (errnum != NULL) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
strerror(errnum));
exit(EXIT_FAILURE);
}
if ((errnum = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(errnum));
exit(EXIT_FAILURE);
}
MON_StatsInit();
/* Start worker threads */
WRK_Start();
/* Main loop */
term = 0;
/* XXX: Varnish restart? */
while (VSL_Dispatch(vd, OSL_Track, NULL))
if (term || !endless)
break;
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
LOG_Log0(LOG_INFO, "Worker process exiting");
LOG_Close();
exit(EXIT_SUCCESS);
}
static void
usage(int status)
{
......@@ -235,12 +452,11 @@ usage(int status)
int
main(int argc, char * const *argv)
{
int c, d_flag = 0, err;
int c, d_flag = 0, D_flag = 0, endless = 1;
const char *P_arg = NULL, *l_arg = NULL, *n_arg = NULL, *f_arg = NULL,
*y_arg = NULL, *c_arg = NULL, *errmsg;
struct vpf_fh *pfh = NULL;
*y_arg = NULL, *c_arg = NULL;
struct VSM_data *vd;
pthread_t monitor;
pid_t child_pid;
vd = VSM_New();
VSL_Setup(vd);
......@@ -255,8 +471,7 @@ main(int argc, char * const *argv)
exit(EXIT_FAILURE);
}
/* XXX: When we can demonize, add an option to run as non-demon */
while ((c = getopt(argc, argv, "P:Vn:hl:df:y:c:")) != -1) {
while ((c = getopt(argc, argv, "P:Vn:hl:df:y:c:D")) != -1) {
switch (c) {
case 'P':
P_arg = optarg;
......@@ -282,6 +497,9 @@ main(int argc, char * const *argv)
case 'c':
c_arg = optarg;
break;
case 'D':
D_flag = 1;
break;
case 'h':
usage(EXIT_SUCCESS);
default:
......@@ -311,8 +529,10 @@ main(int argc, char * const *argv)
strcpy(config.log_file, l_arg);
if (y_arg)
CONF_Add("syslog.facility", y_arg);
if (f_arg)
if (f_arg) {
strcpy(config.varnish_bindump, f_arg);
endless = 0;
}
if (f_arg && VSL_Arg(vd, 'r', f_arg) <= 0)
exit(EXIT_FAILURE);
......@@ -326,99 +546,46 @@ main(int argc, char * const *argv)
if (d_flag)
LOG_SetLevel(LOG_DEBUG);
LOG_Log0(LOG_INFO, "starting");
LOG_Log0(LOG_INFO, "initializing");
CONF_Dump();
/* XXX: Parent/child setup
Write the pid in the parent, open VSL in the child
*/
if (!EMPTY(config.pid_file)
&& (pfh = VPF_Open(config.pid_file, 0644, NULL)) == NULL) {
LOG_Log(LOG_ERR, "Cannot write pid file %s: %s\n",
config.pid_file, strerror(errno));
exit(EXIT_FAILURE);
}
if (pfh != NULL)
VPF_Write(pfh);
/*
if (!D_flag && varnish_daemon(0, 0) == -1) {
perror("daemon()");
if (pfh != NULL)
VPF_Remove(pfh);
exit(1);
}
*/
/* XXX: child inits data table, opens and reads VSL */
if (DATA_Init() != 0) {
LOG_Log(LOG_ALERT, "Cannot init data table: %s", strerror(errno));
exit(EXIT_FAILURE);
}
/* XXX: Install this signal handler in the child */
if (signal(SIGUSR1, sigusr1) == SIG_ERR) {
LOG_Log(LOG_ERR, "Cannot install signal handler for USR1: %s\n",
strerror(errno));
exit(EXIT_FAILURE);
}
if (VSL_Open(vd, 1))
exit(EXIT_FAILURE);
/* Only read the VSL tags relevant to tracking */
assert(VSL_Arg(vd, 'i', TRACK_TAGS) > 0);
if (pfh != NULL)
VPF_Write(pfh);
/* Start the monitor thread */
if (config.monitor_interval > 0.0) {
if (pthread_create(&monitor, NULL, MON_StatusThread,
(void *) &config.monitor_interval) != 0) {
LOG_Log(LOG_ERR, "Cannot start monitoring thread: %s\n",
if (!D_flag) {
child_pid = fork();
switch(child_pid) {
case -1:
LOG_Log(LOG_ALERT,
"Cannot fork (%s), running as single process",
strerror(errno));
exit(EXIT_FAILURE);
}
}
else
LOG_Log0(LOG_INFO, "Monitoring thread not running");
errmsg = MQ_GlobalInit();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot initialize queueing: %s", errmsg);
exit(EXIT_FAILURE);
child_main(vd, endless);
break;
case 0:
child_main(vd, endless);
break;
default:
parent_main(child_pid, vd, endless);
break;
}
err = WRK_Init();
if (errmsg != NULL) {
LOG_Log(LOG_ERR, "Cannot prepare worker threads: %s",
strerror(err));
exit(EXIT_FAILURE);
}
if ((err = SPMCQ_Init()) != 0) {
LOG_Log(LOG_ERR, "Cannot initialize internal worker queue: %s",
strerror(err));
exit(EXIT_FAILURE);
else {
LOG_Log0(LOG_INFO, "Running as non-demon single process");
child_main(vd, endless);
}
MON_StatsInit();
/* Start worker threads */
WRK_Start();
/* Main loop */
while (VSL_Dispatch(vd, OSL_Track, NULL) >= 0)
;
WRK_Halt();
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
/* XXX: Parent removes PID */
if (pfh != NULL)
VPF_Remove(pfh);
LOG_Log0(LOG_INFO, "exiting");
LOG_Close();
exit(EXIT_SUCCESS);
}
......@@ -147,6 +147,7 @@ struct config {
char mq_uri[BUFSIZ];
char mq_qname[BUFSIZ];
unsigned nworkers;
unsigned restarts;
} config;
void CONF_Init(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment