Commit 71ffe607 authored by Geoff Simmons's avatar Geoff Simmons

trackrdrd: added SPMC queue, tested for running with worker threads

parent a9ffe469
......@@ -20,6 +20,13 @@ fi
AC_PROG_CPP
AC_PROG_CXXCPP
AX_PTHREAD(,[AC_MSG_ERROR([Could not configure pthreads support])])
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
CXXFLAGS="$CXXFLAGS $PTHREAD_CFLAGS"
CC="$PTHREAD_CC"
AC_PROG_INSTALL
AC_PROG_LIBTOOL
AC_PROG_MAKE_SET
......
......@@ -14,7 +14,8 @@ trackrdrd_SOURCES = \
monitor.c \
mq.c \
activemq/amq.h \
activemq/amq.cpp
activemq/amq.cpp \
spmcq.c
trackrdrd_LDADD = \
$(VARNISHSRC)/lib/libvarnishcompat/libvarnishcompat.la \
......
......@@ -40,7 +40,6 @@
#include "trackrdrd.h"
#define MIN_TABLE_SCALE 10
#define MIN_DATA_SCALE 10
#define INDEX(u) ((u) & (tbl.len - 1))
......
/*-
* Copyright (c) 2012 UPLEX Nils Goroll Systemoptimierung
* Copyright (c) 2012 Otto Gmbh & Co KG
* All rights reserved
* Use only with permission
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*/
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include <errno.h>
#include <string.h>
#include "trackrdrd.h"
#include "vas.h"
static pthread_mutex_t spmcq_deq_lock;
static inline unsigned
spmcq_len(void)
{
if (spmcq.tail < spmcq.head)
return UINT_MAX - spmcq.head - 1 - spmcq.tail;
else
return spmcq.tail - spmcq.head;
}
static void
spmcq_cleanup(void)
{
free(spmcq.data);
pthread_mutex_destroy(&spmcq_deq_lock);
}
int
SPMCQ_Init(void)
{
void *buf;
size_t n = 1 << (MIN_TABLE_SCALE + config.maxopen_scale);
buf = calloc(n, sizeof(void *));
if (buf == NULL)
return(errno);
if (pthread_mutex_init(&spmcq_deq_lock, NULL) != 0)
return(errno);
spmcq_t q =
{ .magic = SPMCQ_MAGIC, .mask = n - 1, .data = buf, .head = 0,
.tail = 0 };
memcpy(&spmcq, &q, sizeof(spmcq_t));
atexit(spmcq_cleanup);
return(0);
}
bool
SPMCQ_Enq(void *ptr)
{
if (spmcq_len() > spmcq.mask)
return false;
spmcq.data[spmcq.tail++ & spmcq.mask] = ptr;
return true;
}
void
*SPMCQ_Deq(void)
{
void *ptr;
AZ(pthread_mutex_lock(&spmcq_deq_lock));
if (spmcq_len() == 0)
ptr = NULL;
else
ptr = spmcq.data[spmcq.head++ & spmcq.mask];
AZ(pthread_mutex_unlock(&spmcq_deq_lock));
return ptr;
}
INCLUDES = -I$(VARNISHSRC)/include -I$(VARNISHSRC) @AMQ_CFLAGS@
TESTS = test_parse test_data test_mq regress.sh
TESTS = test_parse test_data test_mq test_spmcq test_spmcq_loop.sh regress.sh
check_PROGRAMS = test_parse test_data test_mq
check_PROGRAMS = test_parse test_data test_mq test_spmcq
test_parse_SOURCES = \
minunit.h \
......@@ -31,3 +31,13 @@ test_mq_LDADD = \
../mq.$(OBJEXT) \
../amq.$(OBJEXT) \
@AMQ_LIBS@
test_spmcq_SOURCES = \
minunit.h \
test_spmcq.c \
../trackrdrd.h
test_spmcq_LDADD = \
$(VARNISHSRC)/lib/libvarnish/libvarnish.la \
../spmcq.$(OBJEXT)
......@@ -44,6 +44,20 @@
#define mu_assert(msg, test) do { if (!(test)) return msg; } while (0)
#define mu_run_test(test) do { const char *msg = test(); tests_run++; \
if (msg) return msg; } while (0)
/* phk-ish mu_assert */
#define mu_assert_errno(c) \
do { \
if (!(c)) { \
sprintf(errmsg, "%s failed in %s at %s:%d: errno %d (%s)", \
#c, __func__, __FILE__, __LINE__, errno, strerror(errno)); \
mu_assert(errmsg, 0); \
} \
} while(0)
/* short for MU Assert Zero / Non-Zero */
#define MAZ(c) do { mu_assert_errno((c) == 0); } while(0)
#define MAN(c) do { mu_assert_errno((c) != 0); } while(0)
extern int tests_run;
#define TEST_RUNNER \
......
This diff is collapsed.
#! /bin/bash
N=1000
echo
echo "TEST: $0"
echo "... running test_spmcq $N times"
I=1
while [[ $I -le $N ]]
do
# echo -en "Test $N\r"
./test_spmcq > /dev/null
if [ $? -ne 0 ]; then
echo "Test $I FAILED"
exit 1
fi
((I++))
done
exit 0
\ No newline at end of file
......@@ -30,6 +30,38 @@
*/
#include <stdio.h>
#include <stdbool.h>
#include <pthread.h>
#define MIN_TABLE_SCALE 10
/* spmcq.c */
/* Single producer multiple consumer bounded FIFO queue */
typedef struct {
unsigned magic;
#define SPMCQ_MAGIC 0xe9a5d0a8
const unsigned mask;
void **data;
volatile unsigned head;
volatile unsigned tail;
} spmcq_t;
spmcq_t spmcq;
int SPMCQ_Init(void);
bool SPMCQ_Enq(void *ptr);
void *SPMCQ_Deq(void);
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t spmcq_nonfull_cond;
pthread_mutex_t spmcq_nonfull_lock;
/* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */
pthread_cond_t spmcq_nonempty_cond;
pthread_mutex_t spmcq_nonempty_lock;
/* mq.c */
const char *MQ_GlobalInit(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment