Add a wait table to serialize initial object read per object

parent 0a0229fd
......@@ -120,6 +120,154 @@ enum stve_strangelove {
STVELOVE_SIGNAL_DRAIN
};
/* Wait table --------------------------------------------------------
*
* We got a challenge in stvfe_dskoc_fco: while we can solve concurrent inserts
* via the fdb index, we want already the memory allocation to be limited to
* only one thread, otherwise a lot of threads waiting for an _obj_get to
* complete will allocate fcos just to throw them away a moment later.
*
* We would like a "waitinglist" mechanism where only one thread goes to
* allocate the object and the others hang on to the waitinglist, but before we
* have allocated memory, we do not have the memory for this mechanism. Catch22.
* Our way out is a per-stvfe wait table with mtx/cond pairs, indexed via a
* hash. There will be spurious wakeups, but those are deemed better than
* useless allocations.
*/
struct stvfe_wait_entry {
uint64_t priv;
pthread_mutex_t mtx;
pthread_cond_t cond;
};
struct stvfe_wait {
unsigned magic;
#define STVFE_WAIT_MAGIC 0x664ec959
uint8_t bits;
// (1 << bits) entries
struct stvfe_wait_entry e[];
};
static void
stvfe_wait_fini(struct stvfe_wait **swp) {
struct stvfe_wait *sw;
size_t i, l;
TAKE_OBJ_NOTNULL(sw, swp, STVFE_WAIT_MAGIC);
AN(sw->bits);
l = 1;
l <<= sw->bits;
for (i = 0; i < l; i++) {
AZ(pthread_mutex_destroy(&sw->e[i].mtx));
AZ(pthread_cond_destroy(&sw->e[i].cond));
}
FREE_OBJ(sw);
}
static struct stvfe_wait *
stvfe_wait_new(uint8_t pow2) {
struct stvfe_wait *sw;
size_t sz, i, l;
AN(pow2);
sz = sizeof(*sw) + (sizeof(struct stvfe_wait_entry) << pow2);
sw = malloc(sz);
AN(sw);
memset(sw, 0, sz);
sw->magic = STVFE_WAIT_MAGIC;
sw->bits = pow2;
l = 1;
l <<= sw->bits;
for (i = 0; i < l; i++) {
AZ(pthread_mutex_init(&sw->e[i].mtx, NULL));
AZ(pthread_cond_init(&sw->e[i].cond, NULL));
}
return (sw);
}
static void
stvfe_wait_tune(struct stvfe_wait **swp, uint8_t pow2)
{
struct stvfe_wait_entry *e;
struct stvfe_wait *sw;
size_t i, l;
AN(swp);
sw = *swp;
CHECK_OBJ_NOTNULL(sw, STVFE_WAIT_MAGIC);
if (pow2 == sw->bits)
return;
*swp = stvfe_wait_new(pow2);
// cool old pointer
(void) usleep(10 * 1000);
l = 1;
l <<= sw->bits;
for (i = 0; i < l; i++) {
e = &sw->e[i];
AZ(pthread_mutex_lock(&e->mtx));
while (e->priv != 0)
AZ(pthread_cond_wait(&e->cond, &e->mtx));
AZ(pthread_mutex_unlock(&e->mtx));
}
stvfe_wait_fini(&sw);
AZ(sw);
}
// https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
static inline size_t
stvfe_wait_fib(const struct stvfe_wait *sw, uint64_t priv)
{
const size_t gr = 11400714819323198485llu; //lint !e620
size_t r;
r = priv * gr;
r >>= (sizeof(gr) * 8) - sw->bits;
return (r);
}
static struct stvfe_wait_entry *
stvfe_wait(struct stvfe_wait *sw, uint64_t priv)
{
struct stvfe_wait_entry *e, *r = NULL;
size_t i;
CHECK_OBJ_NOTNULL(sw, STVFE_WAIT_MAGIC);
AN(priv);
i = stvfe_wait_fib(sw, priv);
assert(i < ((size_t)1 << sw->bits));
e = &sw->e[i];
AZ(pthread_mutex_lock(&e->mtx));
while (e->priv != priv && e->priv != 0)
AZ(pthread_cond_wait(&e->cond, &e->mtx));
if (e->priv != priv) {
AZ(e->priv);
e->priv = priv;
r = e;
}
else
AZ(pthread_cond_wait(&e->cond, &e->mtx));
AZ(pthread_mutex_unlock(&e->mtx));
return (r);
}
static void
stvfe_wait_signal(struct stvfe_wait_entry *e, uint64_t priv)
{
AN(e);
AZ(pthread_mutex_lock(&e->mtx));
assert(e->priv == priv);
e->priv = 0;
AZ(pthread_cond_broadcast(&e->cond));
AZ(pthread_mutex_unlock(&e->mtx));
}
/* Stevedore ---------------------------------------------------------*/
enum stvfe_scope {
......@@ -143,6 +291,7 @@ struct stvfe {
struct fellow_fd *ffd;
struct VSC_fellow *stats;
struct vsc_seg *vsc_seg;
struct stvfe_wait *wait;
buddy_t my_membuddy;
buddy_t *membuddy;
......@@ -279,25 +428,40 @@ stvfe_dskoc_fco(struct worker *wrk,
const struct stevedore *stv, const struct stvfe *stvfe,
struct objcore *oc)
{
struct stvfe_wait_entry *we;
struct fellow_cache_res fcr;
struct objcore *refoc;
struct fcoc fcoc = {0};
uint64_t *counter;
void *priv, *old;
CHECK_OBJ_NOTNULL(stvfe, STVFE_MAGIC);
AN(oc->stobj->priv2);
counter = &stvfe->stats->c_dsk_obj_get_present;
again:
// CST implies a memory barrier
priv = __atomic_load_n(&oc->stobj->priv, __ATOMIC_SEQ_CST);
if (priv != NULL) {
fcoc.fco = priv;
stvfe->stats->c_dsk_obj_get_present++;
(*counter)++;
return (fcoc);
}
we = stvfe_wait(stvfe->wait, oc->stobj->priv2);
if (we == NULL ) {
counter = &stvfe->stats->c_dsk_obj_get_coalesce;
goto again;
}
/* we race to get the fco and, if we win the race,
* point the oc to the fco
*
* XXX/TODO: since introduction of stvfe_wait, there
* should be no more race in the normal case, but
* only if the object gets removed immediately
*/
refoc = oc;
......@@ -313,7 +477,7 @@ stvfe_dskoc_fco(struct worker *wrk,
AZ(fcoc.fco);
stvfe->stats->c_dsk_obj_get_fail++;
return (fcoc);
goto out;
}
fcoc.fco = fcr.r.ptr;
......@@ -323,7 +487,7 @@ stvfe_dskoc_fco(struct worker *wrk,
assert(refoc == oc);
fcoc.fcoref = fcoc.fco;
stvfe->stats->c_dsk_obj_get_coalesce++;
return (fcoc);
goto out;
}
/* fellow_cache_obj_get() decides the race already */
......@@ -337,6 +501,8 @@ stvfe_dskoc_fco(struct worker *wrk,
stvfe->stats->c_dsk_obj_get++;
out:
stvfe_wait_signal(we, oc->stobj->priv2);
return (fcoc);
}
......@@ -527,6 +693,9 @@ stvfe_fcr_handle_iter(struct worker *wrk, struct objcore *oc,
* the basic assumption at the cache layer is that there always exists a segment
* if there is any data to be iterated over, so we always call ObjWaitExtend()
* via fellow_stream_wait() before returning.
*
*
* NOTE: for a tune event, this might still race (by design)
*/
struct fellow_stream {
......@@ -1193,6 +1362,9 @@ sfe_init(struct stevedore *memstv, enum stvfe_scope scope,
ALLOC_OBJ(dskstv, STEVEDORE_MAGIC);
XXXAN(dskstv);
stvfe->wait = stvfe_wait_new(tune.wait_table_exponent);
XXXAN(stvfe->wait);
stvfe->scope = scope;
stvfe->tune = tune;
stvfe->path = filename;
......@@ -1249,6 +1421,7 @@ sfe_tune_apply(const struct stevedore *stv, const struct stvfe_tune *tuna)
return (err);
stvfe->tune = tune;
fellow_log_discardctl(stvfe->ffd, tune.discard_immediate);
stvfe_wait_tune(&stvfe->wait, tune.wait_table_exponent);
return (NULL);
}
......@@ -2939,6 +3112,8 @@ sfe_fini(struct stevedore **stvp)
sfe_close_real(stvfe);
stv->priv = NULL;
stvfe_wait_fini(&stvfe->wait);
AZ(stvfe->wait);
FREE_OBJ(stvfe->dskstv);
FREE_OBJ(stvfe);
}
......
......@@ -41,6 +41,7 @@ TUNE(float, log_rewrite_ratio, 0.5, 0.001, FLT_MAX);
// reserve chunk is the larger of chunk_exponent and result from logbuffer size
TUNE(unsigned, chunk_exponent, 20 /* 1MB*/, 12 /* 4KB */, 28 /* 256MB */);
TUNE(uint8_t, wait_table_exponent, 10, 6, 32);
TUNE(unsigned, dsk_reserve_chunks, 4, 2, UINT_MAX);
TUNE(unsigned, mem_reserve_chunks, 1, 0, UINT_MAX);
TUNE(size_t, objsize_hint, 256 * 1024, 4096, SIZE_MAX);
......
......@@ -391,6 +391,7 @@ fellow_tune(VCL_STEVEDORE stv, struct VARGS(fellow_tune) *args)
#define CHK_size_t 0
#define CHK_int8_t 0
#define CHK_unsigned 1
#define CHK_uint8_t 1
#define TUNE(type, name, default, min, max) \
if (args->valid_ ## name) { \
if (/*lint --e(506,774,685,568)*/ \
......@@ -411,6 +412,7 @@ fellow_tune(VCL_STEVEDORE stv, struct VARGS(fellow_tune) *args)
#undef CHK_size_t
#undef CHK_int8_t
#undef CHK_unsigned
#undef CHK_uint8_t
//lint +e734 loss of precision
r = sfe_tune_apply(stv, &tune);
......
......@@ -479,8 +479,8 @@ will be used (which might fail of insufficient memory is available).
.. _xfellow.tune():
STRING xfellow.tune([INT logbuffer_size], [DURATION logbuffer_flush_interval], [REAL log_rewrite_ratio], [INT chunk_exponent], [BYTES chunk_bytes], [INT dsk_reserve_chunks], [INT mem_reserve_chunks], [BYTES objsize_hint], [INT cram], [INT readahead], [BYTES discard_immediate], [INT io_batch_min], [INT io_batch_max], [ENUM hash_obj], [ENUM hash_log], [ENUM ioerr_obj], [ENUM ioerr_log], [ENUM allocerr_obj], [ENUM allocerr_log])
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
STRING xfellow.tune([INT logbuffer_size], [DURATION logbuffer_flush_interval], [REAL log_rewrite_ratio], [INT chunk_exponent], [BYTES chunk_bytes], [INT wait_table_exponent], [INT dsk_reserve_chunks], [INT mem_reserve_chunks], [BYTES objsize_hint], [INT cram], [INT readahead], [BYTES discard_immediate], [INT io_batch_min], [INT io_batch_max], [ENUM hash_obj], [ENUM hash_log], [ENUM ioerr_obj], [ENUM ioerr_log], [ENUM allocerr_obj], [ENUM allocerr_log])
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
::
......@@ -490,6 +490,7 @@ STRING xfellow.tune([INT logbuffer_size], [DURATION logbuffer_flush_interval], [
[REAL log_rewrite_ratio],
[INT chunk_exponent],
[BYTES chunk_bytes],
[INT wait_table_exponent],
[INT dsk_reserve_chunks],
[INT mem_reserve_chunks],
[BYTES objsize_hint],
......@@ -555,6 +556,33 @@ fellow storage can be fine tuned:
See `xbuddy.tune()` for additional details.
* *wait_table_exponent*
TL;DR: 2-logarithm of concurrency for initial reads of objects from
disk.
- unit: wait table entries as a power of two
- default: 10
- minimum: 6
- maximum: 32
When objects are initially read from disk after a cold start or
eviction from memory, condition variables are used to serialize
parallel requests to the same object, similar in effect to the
waitinglist mechanism in Varnish-Cache.
These condition variables are organized in a hash table. This
parameter specifies the 2-logarithm of that table's size.
Two to the power of this value represents an upper limit to the
number of objects read from disk in parallel. The actual limit can
be lower when hash collisions occur. The amount of memory used is
roughly 128 bytes times two to the power of this value.
Note: The wait table only concerns objects initially read from
disk. Once an object is read, its body data is read in parallel
independent of this limit.
* *dsk_reserve_chunks*
- unit: scalar
......@@ -772,8 +800,8 @@ Can only be called from ``vcl_init {}``.
.. _slash.tune_fellow():
STRING tune_fellow(STEVEDORE storage, [INT logbuffer_size], [DURATION logbuffer_flush_interval], [REAL log_rewrite_ratio], [INT chunk_exponent], [BYTES chunk_bytes], [INT dsk_reserve_chunks], [INT mem_reserve_chunks], [BYTES objsize_hint], [INT cram], [INT readahead], [BYTES discard_immediate], [INT io_batch_min], [INT io_batch_max], [ENUM hash_obj], [ENUM hash_log], [ENUM ioerr_obj], [ENUM ioerr_log], [ENUM allocerr_obj], [ENUM allocerr_log])
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
STRING tune_fellow(STEVEDORE storage, [INT logbuffer_size], [DURATION logbuffer_flush_interval], [REAL log_rewrite_ratio], [INT chunk_exponent], [BYTES chunk_bytes], [INT wait_table_exponent], [INT dsk_reserve_chunks], [INT mem_reserve_chunks], [BYTES objsize_hint], [INT cram], [INT readahead], [BYTES discard_immediate], [INT io_batch_min], [INT io_batch_max], [ENUM hash_obj], [ENUM hash_log], [ENUM ioerr_obj], [ENUM ioerr_log], [ENUM allocerr_obj], [ENUM allocerr_log])
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
::
......@@ -784,6 +812,7 @@ STRING tune_fellow(STEVEDORE storage, [INT logbuffer_size], [DURATION logbuffer_
[REAL log_rewrite_ratio],
[INT chunk_exponent],
[BYTES chunk_bytes],
[INT wait_table_exponent],
[INT dsk_reserve_chunks],
[INT mem_reserve_chunks],
[BYTES objsize_hint],
......
......@@ -429,6 +429,7 @@ $Method STRING .tune(
[ REAL log_rewrite_ratio ],
[ INT chunk_exponent ],
[ BYTES chunk_bytes ],
[ INT wait_table_exponent ],
[ INT dsk_reserve_chunks ],
[ INT mem_reserve_chunks ],
[ BYTES objsize_hint ],
......@@ -493,6 +494,33 @@ fellow storage can be fine tuned:
See `xbuddy.tune()` for additional details.
* *wait_table_exponent*
TL;DR: 2-logarithm of concurrency for initial reads of objects from
disk.
- unit: wait table entries as a power of two
- default: 10
- minimum: 6
- maximum: 32
When objects are initially read from disk after a cold start or
eviction from memory, condition variables are used to serialize
parallel requests to the same object, similar in effect to the
waitinglist mechanism in Varnish-Cache.
These condition variables are organized in a hash table. This
parameter specifies the 2-logarithm of that table's size.
Two to the power of this value represents an upper limit to the
number of objects read from disk in parallel. The actual limit can
be lower when hash collisions occur. The amount of memory used is
roughly 128 bytes times two to the power of this value.
Note: The wait table only concerns objects initially read from
disk. Once an object is read, its body data is read in parallel
independent of this limit.
* *dsk_reserve_chunks*
- unit: scalar
......@@ -706,6 +734,7 @@ $Function STRING tune_fellow(
[ REAL log_rewrite_ratio ],
[ INT chunk_exponent ],
[ BYTES chunk_bytes ],
[ INT wait_table_exponent ],
[ INT dsk_reserve_chunks ],
[ INT mem_reserve_chunks ],
[ BYTES objsize_hint ],
......
......@@ -272,6 +272,11 @@ client c7 -wait
varnish v1 -vcl {
backend none none;
import slash;
sub vcl_init {
slash.tune_fellow(storage.fellow, wait_table_exponent = 6);
}
}
varnish v1 -cliok "vcl.discard vcl1"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment