Commit 45f2e9aa authored by Geoff Simmons's avatar Geoff Simmons

get rid of obsolete mutexen and mutex/condvar attributes

parent 31a66a33
...@@ -150,6 +150,24 @@ sigflush(int sig) ...@@ -150,6 +150,24 @@ sigflush(int sig)
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
static inline void
spmcq_signal(void)
{
if (spmcq_datawaiter) {
AZ(pthread_mutex_lock(&spmcq_datawaiter_lock));
if (spmcq_datawaiter)
AZ(pthread_cond_signal(&spmcq_datawaiter_cond));
AZ(pthread_mutex_unlock(&spmcq_datawaiter_lock));
}
}
/* efficiently retrieve a single data entry */ /* efficiently retrieve a single data entry */
static inline dataentry static inline dataentry
...@@ -158,7 +176,7 @@ static inline dataentry ...@@ -158,7 +176,7 @@ static inline dataentry
dataentry *data; dataentry *data;
while (VSTAILQ_EMPTY(&reader_freelist)) { while (VSTAILQ_EMPTY(&reader_freelist)) {
spmcq_signal(data); spmcq_signal();
rdr_data_free = DATA_Take_Freelist(&reader_freelist); rdr_data_free = DATA_Take_Freelist(&reader_freelist);
if (VSTAILQ_EMPTY(&reader_freelist)) { if (VSTAILQ_EMPTY(&reader_freelist)) {
data_exhausted = 1; data_exhausted = 1;
...@@ -200,19 +218,15 @@ data_submit(dataentry *de) ...@@ -200,19 +218,15 @@ data_submit(dataentry *de)
SPMCQ_Enq(de); SPMCQ_Enq(de);
submitted++; submitted++;
/* should we wake up another worker? */ /* should we wake up another worker?
wrk_running = WRK_Running();
if (SPMCQ_NeedWorker(wrk_running))
spmcq_signal(data);
/*
* base case: wake up a worker if all are sleeping * base case: wake up a worker if all are sleeping
* *
* this is an un-synced access to spmcq_data_waiter, but * this is an un-synced access to spmcq_data_waiter, but
* if we don't wake them up now, we will next time around * if we don't wake them up now, we will next time around
*/ */
if (wrk_running == spmcq_datawaiter) wrk_running = WRK_Running();
spmcq_signal(data); if (wrk_running == spmcq_datawaiter || SPMCQ_NeedWorker(wrk_running))
spmcq_signal();
} }
static inline void static inline void
......
...@@ -165,40 +165,6 @@ dataentry *SPMCQ_Deq(void); ...@@ -165,40 +165,6 @@ dataentry *SPMCQ_Deq(void);
void SPMCQ_Drain(void); void SPMCQ_Drain(void);
unsigned SPMCQ_NeedWorker(int running); unsigned SPMCQ_NeedWorker(int running);
#define spmcq_wait(what) \
do { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter++; \
AZ(pthread_cond_wait(&spmcq_##what##waiter_cond, \
&spmcq_##what##waiter_lock)); \
spmcq_##what##waiter--; \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} while (0)
/*
* the first test is not synced, so we might enter the if body too late or
* unnecessarily
*
* * too late: doesn't matter, will come back next time
* * unnecessarily: we'll find out now
*/
#define spmcq_signal(what) \
do { \
if (spmcq_##what##waiter) { \
AZ(pthread_mutex_lock(&spmcq_##what##waiter_lock)); \
if (spmcq_##what##waiter) \
AZ(pthread_cond_signal(&spmcq_##what##waiter_cond)); \
AZ(pthread_mutex_unlock(&spmcq_##what##waiter_lock)); \
} \
} while (0)
/* Producer waits for this condition when the spmc queue is full.
Consumers signal this condition after dequeue. */
pthread_cond_t spmcq_roomwaiter_cond;
pthread_mutex_t spmcq_roomwaiter_lock;
int spmcq_roomwaiter;
/* Consumers wait for this condition when the spmc queue is empty. /* Consumers wait for this condition when the spmc queue is empty.
Producer signals this condition after enqueue. */ Producer signals this condition after enqueue. */
pthread_cond_t spmcq_datawaiter_cond; pthread_cond_t spmcq_datawaiter_cond;
...@@ -322,7 +288,3 @@ typedef enum { VCL_LOG_DATA, VCL_LOG_KEY } vcl_log_t; ...@@ -322,7 +288,3 @@ typedef enum { VCL_LOG_DATA, VCL_LOG_KEY } vcl_log_t;
int Parse_VCL_Log(const char *ptr, int len, char **data, int *datalen, int Parse_VCL_Log(const char *ptr, int len, char **data, int *datalen,
vcl_log_t *type); vcl_log_t *type);
int Parse_Timestamp(const char *ptr, int len, struct timeval *t); int Parse_Timestamp(const char *ptr, int len, struct timeval *t);
/* generic init attributes */
pthread_mutexattr_t attr_lock;
pthread_condattr_t attr_cond;
...@@ -193,8 +193,6 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk) ...@@ -193,8 +193,6 @@ wrk_send(void **mq_worker, dataentry *entry, worker_data_t *wrk)
wrk->wrk_nfree = 0; wrk->wrk_nfree = 0;
assert(VSTAILQ_EMPTY(&wrk->wrk_freelist)); assert(VSTAILQ_EMPTY(&wrk->wrk_freelist));
} }
spmcq_signal(room);
} }
static void static void
...@@ -307,8 +305,6 @@ static void wrk_cleanup(void) ...@@ -307,8 +305,6 @@ static void wrk_cleanup(void)
free(thread_data); free(thread_data);
AZ(pthread_mutex_destroy(&spmcq_datawaiter_lock)); AZ(pthread_mutex_destroy(&spmcq_datawaiter_lock));
AZ(pthread_cond_destroy(&spmcq_datawaiter_cond)); AZ(pthread_cond_destroy(&spmcq_datawaiter_cond));
AZ(pthread_mutex_destroy(&spmcq_roomwaiter_lock));
AZ(pthread_cond_destroy(&spmcq_roomwaiter_cond));
cleaned = 1; cleaned = 1;
} }
...@@ -345,10 +341,6 @@ WRK_Init(void) ...@@ -345,10 +341,6 @@ WRK_Init(void)
AZ(pthread_mutex_init(&spmcq_datawaiter_lock, NULL)); AZ(pthread_mutex_init(&spmcq_datawaiter_lock, NULL));
AZ(pthread_cond_init(&spmcq_datawaiter_cond, NULL)); AZ(pthread_cond_init(&spmcq_datawaiter_cond, NULL));
spmcq_roomwaiter = 0;
AZ(pthread_mutex_init(&spmcq_roomwaiter_lock, NULL));
AZ(pthread_cond_init(&spmcq_roomwaiter_cond, NULL));
atexit(wrk_cleanup); atexit(wrk_cleanup);
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment