Commit d4d73dfe authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: unified indentation style, some code cleanup

parent e2163244
......@@ -95,7 +95,7 @@ struct hashentry_s {
float insert_time;
VTAILQ_ENTRY(hashentry_s) insert_list;
dataentry *de;
dataentry *de;
};
typedef struct hashentry_s hashentry;
......@@ -144,7 +144,7 @@ typedef struct hashtable_s hashtable;
static hashtable htbl;
#ifdef WITHOUT_ASSERTS
#define entry_assert(e, cond) do { (void)(e);(void)(cond);} while(0)
#define entry_assert(e, cond) do { (void)(e);(void)(cond);} while(0)
#else /* WITH_ASSERTS */
#define entry_assert(e, cond) \
do { \
......@@ -317,50 +317,50 @@ jenkmulvey2(uint32_t n)
static uint32_t
wang(uint32_t n)
{
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
n = ~n + (n << 15); // n = (n << 15) - n - 1;
n ^= rotr(n,12);
n += (n << 2);
n ^= rotr(n,4);
n = (n + (n << 3)) + (n << 11);
n ^= rotr(n,16);
return n;
}
void
HASH_Stats(void)
{
LOG_Log(LOG_INFO,
"Hash table: "
"Hash table: "
"len=%u "
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
"seen=%u "
"drop_reqstart=%u "
"drop_vcl_log=%u "
"drop_reqend=%u "
"expired=%u "
"evacuated=%u "
"open=%u "
"load=%.2f "
"collisions=%u "
"insert_probes=%u "
"find_probes=%u "
"fail=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
htbl.len,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
htbl.seen,
htbl.drop_reqstart,
htbl.drop_vcl_log,
htbl.drop_reqend,
htbl.expired,
htbl.evacuated,
htbl.open,
100.0 * htbl.open / htbl.len,
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.collisions,
htbl.insert_probes,
htbl.find_probes,
htbl.fail,
htbl.occ_hi,
htbl.occ_hi_this);
htbl.occ_hi_this = 0;
}
......@@ -400,7 +400,7 @@ hash_init(void)
/* entries init */
for (int i = 0; i < entries; i++) {
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].magic = HASH_MAGIC;
htbl.entry[i].state = HASH_EMPTY;
}
atexit(hash_cleanup);
......@@ -500,7 +500,7 @@ static hashentry
he = &htbl.entry[INDEX(h)];
if (he->state == HASH_EMPTY)
goto ok;
goto ok;
htbl.collisions++;
oldest = he;
......@@ -618,7 +618,7 @@ static inline dataentry
sprintf(de->data, "XID=%d", xid);
de->end = strlen(de->data);
if (de->end > dtbl.w_stats.data_hi)
dtbl.w_stats.data_hi = de->end;
dtbl.w_stats.data_hi = de->end;
MON_StatsUpdate(STATS_OCCUPANCY);
return (de);
......@@ -714,21 +714,21 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
AZ(err);
LOG_Log(LOG_DEBUG, "%s: XID=%u", VSL_tags[tag], xid);
if (xid > last_start_xid)
if (xid > last_start_xid)
last_start_xid = xid;
tim = TIM_mono();
tim = TIM_mono();
if (! insert(xid, fd, tim)) {
htbl.drop_reqstart++;
break;
}
htbl.drop_reqstart++;
break;
}
/* configurable ? */
if ((tim - tim_exp_check) > 10) {
hash_exp(tim - htbl.ttl);
tim_exp_check = tim;
}
break;
/* configurable ? */
if ((tim - tim_exp_check) > 10) {
hash_exp(tim - htbl.ttl);
tim_exp_check = tim;
}
break;
case SLT_VCL_Log:
/* Skip VCL_Log entries without the "track " prefix. */
......@@ -741,15 +741,15 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
LOG_Log(LOG_DEBUG, "%s: XID=%u, data=[%.*s]", VSL_tags[tag],
xid, datalen, data);
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_vcl_log++;
break;
}
check_entry(he, xid, fd);
de = he->de;
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_vcl_log++;
break;
}
check_entry(he, xid, fd);
de = he->de;
append(de, tag, xid, data, datalen);
de->hasdata = true;
break;
......@@ -761,21 +761,21 @@ OSL_Track(void *priv, enum VSL_tag_e tag, unsigned fd, unsigned len,
LOG_Log(LOG_DEBUG, "%s: XID=%u req_endt=%u.%09lu", VSL_tags[tag], xid,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
if (xid > last_end_xid)
if (xid > last_end_xid)
last_end_xid = xid;
xid_spread_sum += (last_end_xid - last_start_xid);
xid_spread_count++;
xid_spread_sum += (last_end_xid - last_start_xid);
xid_spread_count++;
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_reqend++;
break;
}
check_entry(he, xid, fd);
de = he->de;
he = hash_find(xid);
if (! he) {
LOG_Log(LOG_WARNING, "%s: XID %d not found",
VSL_tags[tag], xid);
htbl.drop_reqend++;
break;
}
check_entry(he, xid, fd);
de = he->de;
sprintf(reqend_str, "%s=%u.%09lu", REQEND_T_VAR,
(unsigned) reqend_t.tv_sec, reqend_t.tv_nsec);
......@@ -965,7 +965,6 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
/* Main loop */
term = 0;
/* XXX: Varnish restart? */
/* XXX: TERM not noticed until request received */
while (VSL_Dispatch(vd, OSL_Track, NULL) > 0)
if (term || !endless)
......@@ -984,7 +983,7 @@ CHILD_Main(struct VSM_data *vd, int endless, int readconfig)
WRK_Shutdown();
AZ(MQ_GlobalShutdown());
if (config.monitor_interval > 0.0)
MON_StatusShutdown(monitor);
MON_StatusShutdown(monitor);
LOG_Log0(LOG_INFO, "Worker process exiting");
LOG_Close();
exit(EXIT_SUCCESS);
......
......@@ -83,12 +83,12 @@ DATA_Init(void)
dtbl.nfree = 0;
for (int i = 0; i < entries; i++) {
dtbl.entry[i].magic = DATA_MAGIC;
dtbl.entry[i].magic = DATA_MAGIC;
dtbl.entry[i].state = DATA_EMPTY;
dtbl.entry[i].hasdata = false;
dtbl.entry[i].data = &dtbl.buf[i * bufsize];
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist);
dtbl.nfree++;
VSTAILQ_INSERT_TAIL(&dtbl.freehead, &dtbl.entry[i], freelist);
dtbl.nfree++;
}
assert(dtbl.nfree == entries);
assert(VSTAILQ_FIRST(&dtbl.freehead));
......@@ -105,10 +105,10 @@ DATA_Init(void)
void
DATA_Take_Freelist(struct freehead_s *dst)
{
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(dst, &dtbl.freehead);
dtbl.nfree = 0;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(dst, &dtbl.freehead);
dtbl.nfree = 0;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
/*
......@@ -119,25 +119,25 @@ DATA_Take_Freelist(struct freehead_s *dst)
void
DATA_Return_Freelist(struct freehead_s *returned, unsigned nreturned)
{
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(&dtbl.freehead, returned);
dtbl.nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
AZ(pthread_mutex_lock(&dtbl.freelist_lock));
VSTAILQ_CONCAT(&dtbl.freehead, returned);
dtbl.nfree += nreturned;
AZ(pthread_mutex_unlock(&dtbl.freelist_lock));
}
void
DATA_Dump1(dataentry *entry, int i)
{
if (entry->state == DATA_EMPTY)
return;
LOG_Log(LOG_INFO, "Data entry %d: XID=%d tid=%d state=%s data=[%.*s]",
i, entry->xid, entry->tid, statename[entry->state], entry->end,
entry->data);
if (entry->state == DATA_EMPTY)
return;
LOG_Log(LOG_INFO, "Data entry %d: XID=%d tid=%d state=%s data=[%.*s]",
i, entry->xid, entry->tid, statename[entry->state], entry->end,
entry->data);
}
void
DATA_Dump(void)
{
for (int i = 0; i < dtbl.len; i++)
DATA_Dump1(&dtbl.entry[i], i);
for (int i = 0; i < dtbl.len; i++)
DATA_Dump1(&dtbl.entry[i], i);
}
......@@ -73,17 +73,17 @@ stacktrace(void)
depth = backtrace (buf, MAX_STACK_DEPTH);
if (depth == 0) {
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
LOG_Log0(LOG_ERR, "Stacktrace empty");
return;
}
strings = backtrace_symbols(buf, depth);
if (strings == NULL) {
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
LOG_Log0(LOG_ERR, "Cannot retrieve symbols for stacktrace");
return;
}
/* XXX: get symbol names from nm? cf. cache_panic.c/pan_backtrace */
for (i = 0; i < depth; i++)
LOG_Log(LOG_ERR, "%s", strings[i]);
LOG_Log(LOG_ERR, "%s", strings[i]);
free(strings);
}
......@@ -92,7 +92,7 @@ void
HNDL_Abort(int sig)
{
LOG_Log(LOG_ALERT, "Received signal %d (%s), stacktrace follows", sig,
strsignal(sig));
strsignal(sig));
stacktrace();
AZ(sigaction(SIGABRT, &default_action, NULL));
LOG_Log0(LOG_ALERT, "Aborting");
......
......@@ -51,38 +51,38 @@ log_output(void)
LOG_Log(LOG_INFO,
"Data table writer: "
"len=%u "
"nodata=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"data_hi=%u "
"data_overflows=%u ",
dtbl.len,
dtbl.w_stats.nodata,
dtbl.w_stats.submitted,
dtbl.w_stats.wait_qfull,
dtbl.w_stats.wait_room,
dtbl.w_stats.data_hi,
dtbl.w_stats.data_overflows);
"len=%u "
"nodata=%u "
"submitted=%u "
"wait_qfull=%u "
"wait_room=%u "
"data_hi=%u "
"data_overflows=%u ",
dtbl.len,
dtbl.w_stats.nodata,
dtbl.w_stats.submitted,
dtbl.w_stats.wait_qfull,
dtbl.w_stats.wait_room,
dtbl.w_stats.data_hi,
dtbl.w_stats.data_overflows);
LOG_Log(LOG_INFO,
"Data table reader: "
"done=%u "
"open=%u "
"load=%.2f "
"sent=%u "
"failed=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
dtbl.r_stats.done,
dtbl.r_stats.open,
(100.0 * (1.0 * dtbl.r_stats.done + 1.0 * dtbl.r_stats.open) / dtbl.len),
dtbl.r_stats.sent,
dtbl.r_stats.failed,
dtbl.r_stats.occ_hi,
dtbl.r_stats.occ_hi_this
);
"done=%u "
"open=%u "
"load=%.2f "
"sent=%u "
"failed=%u "
"occ_hi=%u "
"occ_hi_this=%u ",
dtbl.r_stats.done,
dtbl.r_stats.open,
(100.0 * (1.0 * dtbl.r_stats.done + 1.0 * dtbl.r_stats.open) / dtbl.len),
dtbl.r_stats.sent,
dtbl.r_stats.failed,
dtbl.r_stats.occ_hi,
dtbl.r_stats.occ_hi_this
);
/* locking would be overkill */
dtbl.r_stats.occ_hi_this = 0;
......@@ -118,7 +118,7 @@ void
if (nanosleep(&t, NULL) != 0) {
if (errno == EINTR) {
if (run == 0)
break;
break;
LOG_Log0(LOG_INFO, "Monitoring thread interrupted");
continue;
}
......@@ -182,7 +182,7 @@ MON_StatsUpdate(stats_update_t update)
break;
case STATS_NODATA:
dtbl.w_stats.nodata++;
dtbl.w_stats.nodata++;
dtbl.r_stats.done--;
break;
......
......@@ -50,26 +50,6 @@ spmcq_len(void)
return UINT_MAX - spmcq.head + 1 + spmcq.tail;
}
#if 0
/*
* this is only approximately correct and could even become negative when values
* get updated while we read them!
*
*/
int SPMCQ_Len(void) {
unsigned l;
do {
l = spmcq_len();
if (l <= spmcq.mask + 1)
break;
VRMB();
} while (1);
return (l);
}
#endif
static void
spmcq_cleanup(void)
{
......
......@@ -90,7 +90,7 @@ static const char
{
const char *err;
printf("... test worker init (including connect to ActiveMQ)\n");
printf("... test worker init\n");
err = MQ_WorkerInit(&worker);
sprintf(errmsg, "MQ_WorkerInit: %s", err);
......
......@@ -117,32 +117,32 @@ static void
unsigned *xid;
while (run) {
/* run may be stale at this point */
/* run may be stale at this point */
debug_print("Consumer %d: attempt dequeue\n", id);
xid = (unsigned *) SPMCQ_Deq();
if (xid == NULL) {
/* grab the CV lock, which also constitutes an implicit memory
xid = (unsigned *) SPMCQ_Deq();
if (xid == NULL) {
/* grab the CV lock, which also constitutes an implicit memory
barrier */
debug_print("Consumer %d: mutex\n", id);
if (pthread_mutex_lock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
/* run is guaranteed to be fresh here */
if (run) {
debug_print("Consumer %d: wait, run = %d\n", id, run);
if (pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_WAIT);
}
debug_print("Consumer %d: unlock\n", id);
if (pthread_mutex_unlock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
if (! run) {
debug_print("Consumer %d: quit signaled, run = %d\n", id, run);
break;
}
} else {
/* xid != NULL */
debug_print("Consumer %d: dequeue %d (xid = %u)\n", id, ++deqs,
debug_print("Consumer %d: mutex\n", id);
if (pthread_mutex_lock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
/* run is guaranteed to be fresh here */
if (run) {
debug_print("Consumer %d: wait, run = %d\n", id, run);
if (pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_WAIT);
}
debug_print("Consumer %d: unlock\n", id);
if (pthread_mutex_unlock(&spmcq_datawaiter_lock) != 0)
consumer_exit(pcdata, CONSUMER_MUTEX);
if (! run) {
debug_print("Consumer %d: quit signaled, run = %d\n", id, run);
break;
}
} else {
/* xid != NULL */
debug_print("Consumer %d: dequeue %d (xid = %u)\n", id, ++deqs,
*xid);
pcdata->sum += *xid;
}
......
......@@ -184,7 +184,7 @@ parent_main(pid_t child_pid, struct VSM_data *vd, int endless)
wpid, WEXITSTATUS(status));
if (WIFSIGNALED(status))
LOG_Log(LOG_WARNING,
"Worker process %d exited due to signal %d (%s)",
"Worker process %d exited due to signal %d (%s)",
wpid, WTERMSIG(status), strsignal(WTERMSIG(status)));
if (wpid != child_pid)
......@@ -210,169 +210,169 @@ usage(int status)
int
main(int argc, char * const *argv)
{
int c, d_flag = 0, D_flag = 0, endless = 1, err;
const char *P_arg = NULL, *l_arg = NULL, *n_arg = NULL, *f_arg = NULL,
*y_arg = NULL, *c_arg = NULL, *u_arg = NULL;
struct VSM_data *vd;
pid_t child_pid;
vd = VSM_New();
VSL_Setup(vd);
CONF_Init();
if ((err = CONF_ReadDefault()) != 0) {
if (err != -1)
LOG_Log(LOG_ALERT, "Cannot read %s: %s", DEFAULT_CONFIG,
strerror(err));
exit(EXIT_FAILURE);
}
cli_config_filename[0] = '\0';
while ((c = getopt(argc, argv, "u:P:Vn:hl:df:y:c:D")) != -1) {
switch (c) {
case 'P':
P_arg = optarg;
break;
case 'V':
printf(PACKAGE_STRING " revision " REVISION "\n");
exit(EXIT_SUCCESS);
case 'n':
n_arg = optarg;
break;
case 'l':
l_arg = optarg;
break;
case 'd':
d_flag = 1;
break;
case 'f':
f_arg = optarg;
break;
case 'y':
y_arg = optarg;
break;
case 'c':
c_arg = optarg;
break;
case 'D':
D_flag = 1;
break;
case 'u':
u_arg = optarg;
break;
case 'h':
usage(EXIT_SUCCESS);
default:
usage(EXIT_FAILURE);
}
}
if ((argc - optind) > 0)
int c, d_flag = 0, D_flag = 0, endless = 1, err;
const char *P_arg = NULL, *l_arg = NULL, *n_arg = NULL, *f_arg = NULL,
*y_arg = NULL, *c_arg = NULL, *u_arg = NULL;
struct VSM_data *vd;
pid_t child_pid;
vd = VSM_New();
VSL_Setup(vd);
CONF_Init();
if ((err = CONF_ReadDefault()) != 0) {
if (err != -1)
LOG_Log(LOG_ALERT, "Cannot read %s: %s", DEFAULT_CONFIG,
strerror(err));
exit(EXIT_FAILURE);
}
cli_config_filename[0] = '\0';
while ((c = getopt(argc, argv, "u:P:Vn:hl:df:y:c:D")) != -1) {
switch (c) {
case 'P':
P_arg = optarg;
break;
case 'V':
printf(PACKAGE_STRING " revision " REVISION "\n");
exit(EXIT_SUCCESS);
case 'n':
n_arg = optarg;
break;
case 'l':
l_arg = optarg;
break;
case 'd':
d_flag = 1;
break;
case 'f':
f_arg = optarg;
break;
case 'y':
y_arg = optarg;
break;
case 'c':
c_arg = optarg;
break;
case 'D':
D_flag = 1;
break;
case 'u':
u_arg = optarg;
break;
case 'h':
usage(EXIT_SUCCESS);
default:
usage(EXIT_FAILURE);
if (c_arg) {
strcpy(cli_config_filename, c_arg);
printf("Reading config from %s\n", c_arg);
if (CONF_ReadFile(c_arg) != 0)
exit(EXIT_FAILURE);
}
}
if ((argc - optind) > 0)
usage(EXIT_FAILURE);
if (c_arg) {
strcpy(cli_config_filename, c_arg);
printf("Reading config from %s\n", c_arg);
if (CONF_ReadFile(c_arg) != 0)
exit(EXIT_FAILURE);
}
if (f_arg && n_arg)
usage(EXIT_FAILURE);
if (l_arg && y_arg)
usage(EXIT_FAILURE);
if (f_arg && n_arg)
usage(EXIT_FAILURE);
if (l_arg && y_arg)
usage(EXIT_FAILURE);
if (u_arg) {
err = CONF_Add("user", u_arg);
if (err) {
fprintf(stderr, "Unknown user: %s\n", u_arg);
exit(EXIT_FAILURE);
}
if (u_arg) {
err = CONF_Add("user", u_arg);
if (err) {
fprintf(stderr, "Unknown user: %s\n", u_arg);
exit(EXIT_FAILURE);
}
}
if (y_arg) {
err = CONF_Add("syslog.facility", y_arg);
if (err) {
fprintf(stderr, "Unknown syslog facility: %s\n", y_arg);
exit(EXIT_FAILURE);
}
if (y_arg) {
err = CONF_Add("syslog.facility", y_arg);
if (err) {
fprintf(stderr, "Unknown syslog facility: %s\n", y_arg);
exit(EXIT_FAILURE);
}
}
if (P_arg)
strcpy(config.pid_file, P_arg);
if (n_arg)
strcpy(config.varnish_name, n_arg);
if (l_arg)
strcpy(config.log_file, l_arg);
if (f_arg) {
strcpy(config.varnish_bindump, f_arg);
endless = 0;
}
if (P_arg)
strcpy(config.pid_file, P_arg);
if (n_arg)
strcpy(config.varnish_name, n_arg);
if (l_arg)
strcpy(config.log_file, l_arg);
if (f_arg) {
strcpy(config.varnish_bindump, f_arg);
endless = 0;
}
if (f_arg && VSL_Arg(vd, 'r', f_arg) <= 0)
exit(EXIT_FAILURE);
else if (!EMPTY(config.varnish_name)
&& VSL_Arg(vd, 'n', config.varnish_name) <= 0)
exit(EXIT_FAILURE);
if (f_arg && VSL_Arg(vd, 'r', f_arg) <= 0)
exit(EXIT_FAILURE);
else if (!EMPTY(config.varnish_name)
&& VSL_Arg(vd, 'n', config.varnish_name) <= 0)
exit(EXIT_FAILURE);
if (LOG_Open(PACKAGE_NAME) != 0) {
exit(EXIT_FAILURE);
}
if (LOG_Open(PACKAGE_NAME) != 0) {
exit(EXIT_FAILURE);
}
VAS_Fail = ASRT_Fail;
VAS_Fail = ASRT_Fail;
if (d_flag)
LOG_SetLevel(LOG_DEBUG);
LOG_Log0(LOG_INFO,
"initializing (v" PACKAGE_VERSION " revision " REVISION ")");
if (d_flag)
LOG_SetLevel(LOG_DEBUG);
LOG_Log0(LOG_INFO,
"initializing (v" PACKAGE_VERSION " revision " REVISION ")");
CONF_Dump();
CONF_Dump();
if (!EMPTY(config.pid_file)
&& (pfh = VPF_Open(config.pid_file, 0644, NULL)) == NULL) {
LOG_Log(LOG_ERR, "Cannot write pid file %s: %s\n",
config.pid_file, strerror(errno));
exit(EXIT_FAILURE);
}
if (!D_flag && varnish_daemon(0, 0) == -1) {
perror("daemon()");
if (pfh != NULL)
VPF_Remove(pfh);
exit(EXIT_FAILURE);
}
if (pfh != NULL)
VPF_Write(pfh);
terminate_action.sa_handler = HNDL_Terminate;
AZ(sigemptyset(&terminate_action.sa_mask));
terminate_action.sa_flags &= ~SA_RESTART;
stacktrace_action.sa_handler = HNDL_Abort;
ignore_action.sa_handler = SIG_IGN;
default_action.sa_handler = SIG_DFL;
if (!D_flag) {
child_pid = fork();
switch(child_pid) {
case -1:
LOG_Log(LOG_ALERT,
"Cannot fork (%s), running as single process",
strerror(errno));
CHILD_Main(vd, endless, 0);
break;
case 0:
CHILD_Main(vd, endless, 0);
break;
default:
parent_main(child_pid, vd, endless);
break;
}
}
else {
LOG_Log0(LOG_INFO, "Running as non-demon single process");
if (!EMPTY(config.pid_file)
&& (pfh = VPF_Open(config.pid_file, 0644, NULL)) == NULL) {
LOG_Log(LOG_ERR, "Cannot write pid file %s: %s\n",
config.pid_file, strerror(errno));
exit(EXIT_FAILURE);
}
if (!D_flag && varnish_daemon(0, 0) == -1) {
perror("daemon()");
if (pfh != NULL)
VPF_Remove(pfh);
exit(EXIT_FAILURE);
}
if (pfh != NULL)
VPF_Write(pfh);
terminate_action.sa_handler = HNDL_Terminate;
AZ(sigemptyset(&terminate_action.sa_mask));
terminate_action.sa_flags &= ~SA_RESTART;
stacktrace_action.sa_handler = HNDL_Abort;
ignore_action.sa_handler = SIG_IGN;
default_action.sa_handler = SIG_DFL;
if (!D_flag) {
child_pid = fork();
switch(child_pid) {
case -1:
LOG_Log(LOG_ALERT,
"Cannot fork (%s), running as single process",
strerror(errno));
CHILD_Main(vd, endless, 0);
break;
case 0:
CHILD_Main(vd, endless, 0);
break;
default:
parent_main(child_pid, vd, endless);
break;
}
}
else {
LOG_Log0(LOG_INFO, "Running as non-demon single process");
CHILD_Main(vd, endless, 0);
}
}
......@@ -53,10 +53,10 @@ void ASRT_Fail(const char *func, const char *file, int line, const char *cond,
#define SIGDISP(SIG, action) \
do { if (UNDEFINED(SIG)) break; \
if (sigaction((SIG), (&action), NULL) != 0) \
LOG_Log(LOG_ALERT, \
"Cannot install handler for " #SIG ": %s", \
strerror(errno)); \
if (sigaction((SIG), (&action), NULL) != 0) \
LOG_Log(LOG_ALERT, \
"Cannot install handler for " #SIG ": %s", \
strerror(errno)); \
} while(0)
volatile sig_atomic_t term;
......@@ -108,7 +108,7 @@ bool SPMCQ_StopWorker(int running);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
......
......@@ -54,12 +54,13 @@ typedef enum {
} wrk_state_e;
static const char* statename[WRK_STATE_E_LIMIT] = {
[WRK_NOTSTARTED] = "not started",
[WRK_INITIALIZING] = "initializing",
[WRK_RUNNING] = "running",
[WRK_WAITING] = "waiting",
[WRK_SHUTTINGDOWN] = "shutting down",
[WRK_EXITED] = "exited"};
[WRK_NOTSTARTED] = "not started",
[WRK_INITIALIZING] = "initializing",
[WRK_RUNNING] = "running",
[WRK_WAITING] = "waiting",
[WRK_SHUTTINGDOWN] = "shutting down",
[WRK_EXITED] = "exited"
};
struct worker_data_s {
unsigned magic;
......@@ -121,9 +122,9 @@ wrk_send(void *amq_worker, dataentry *entry, worker_data_t *wrk)
wrk->wrk_nfree++;
if (dtbl.nfree == 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
}
spmcq_signal(room);
......@@ -168,48 +169,43 @@ static void
LOG_Log(LOG_INFO, "Worker %d: running (%s)", wrk->id, version);
while (run) {
entry = (dataentry *) SPMCQ_Deq();
if (entry != NULL) {
wrk->deqs++;
entry = (dataentry *) SPMCQ_Deq();
if (entry != NULL) {
wrk->deqs++;
wrk_send(amq_worker, entry, wrk);
/* should we go to sleep ? */
if (SPMCQ_StopWorker(running))
goto sleep;
continue;
if (!SPMCQ_StopWorker(running))
continue;
}
sleep:
/* return space before sleeping */
if (wrk->wrk_nfree > 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
}
/* return space before sleeping */
if (wrk->wrk_nfree > 0) {
DATA_Return_Freelist(&wrk->wrk_freelist, wrk->wrk_nfree);
wrk->wrk_nfree = 0;
}
/*
* Queue is empty or we should backoff
*
* wait until data are available, or quit is signaled.
*
* Grab the CV lock, which also constitutes an implicit memory
* Queue is empty or we should backoff
*
* wait until data are available, or quit is signaled.
*
* Grab the CV lock, which also constitutes an implicit memory
* barrier
*/
*/
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
/*
* run is guaranteed to be fresh here
*
* also re-check the stop condition under the lock
*/
* run is guaranteed to be fresh here
*
* also re-check the stop condition under the lock
*/
if (run && ((! entry) || SPMCQ_StopWorker(running))) {
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
AZ(pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock));
spmcq_datawaiter--;
wrk->state = WRK_RUNNING;
wrk->waits++;
spmcq_datawaiter++;
wrk->state = WRK_WAITING;
AZ(pthread_cond_wait(&spmcq_datawaiter_cond,
&spmcq_datawaiter_lock));
spmcq_datawaiter--;
wrk->state = WRK_RUNNING;
}
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment