shard director: add optional weight parameter to .add_backend()

We implement weights by scaling the number of replicas of each backend.
The replicas parameter of .reconfigure() remains a minimum.

For existing vtcs, the Debug hashcircle output has been compared
before/after this change to ensure that behaviour is exactly equivalent.

For for wighted backends, it has been checked that the number of
instances per host on the hashcircle matches the expectation.

Also refactor and clean up some of the code:

- consistently make the number of ring points a uint32_t
- some constification

Ref #3276
parent d624ddf6
This diff is collapsed.
......@@ -55,6 +55,7 @@ struct shard_change_task {
#define SHARD_CHANGE_TASK_MAGIC 0x1e1168af
enum shard_change_task_e task;
void *priv;
VCL_REAL weight;
VSTAILQ_ENTRY(shard_change_task) list;
};
......@@ -127,7 +128,7 @@ shard_change_finish(struct shard_change *change)
VSTAILQ_INIT(&change->tasks);
}
static void
static struct shard_change_task *
shard_change_task_add(VRT_CTX, struct shard_change *change,
enum shard_change_task_e task_e, void *priv)
{
......@@ -139,15 +140,17 @@ shard_change_task_add(VRT_CTX, struct shard_change *change,
if (task == NULL) {
shard_err0(ctx, change->shardd,
"could not get workspace for task");
return;
return (NULL);
}
INIT_OBJ(task, SHARD_CHANGE_TASK_MAGIC);
task->task = task_e;
task->priv = priv;
VSTAILQ_INSERT_TAIL(&change->tasks, task, list);
return (task);
}
static inline VCL_BOOL
static inline struct shard_change_task *
shard_change_task_backend(VRT_CTX,
struct vmod_priv *priv, const struct sharddir *shardd,
enum shard_change_task_e task_e, VCL_BACKEND be, VCL_STRING ident,
......@@ -161,22 +164,20 @@ shard_change_task_backend(VRT_CTX,
change = shard_change_get(ctx, priv, shardd);
if (change == NULL)
return (0);
return (NULL);
b = WS_Alloc(ctx->ws, sizeof(*b));
if (b == NULL) {
shard_err(ctx, shardd, ".%s_backend() WS_Alloc() failed",
task_e == ADD_BE ? "add" : "remove");
return (0);
return (NULL);
}
b->backend = be;
b->ident = ident != NULL && *ident != '\0' ? ident : NULL;
b->rampup = rampup;
shard_change_task_add(ctx, change, task_e, b);
return (1);
return (shard_change_task_add(ctx, change, task_e, b));
}
/*
......@@ -186,11 +187,21 @@ shard_change_task_backend(VRT_CTX,
VCL_BOOL
shardcfg_add_backend(VRT_CTX, struct vmod_priv *priv,
const struct sharddir *shardd, VCL_BACKEND be, VCL_STRING ident,
VCL_DURATION rampup)
VCL_DURATION rampup, VCL_REAL weight)
{
struct shard_change_task *task;
assert (weight >= 1);
AN(be);
return (shard_change_task_backend(ctx, priv, shardd, ADD_BE,
be, ident, rampup));
task = shard_change_task_backend(ctx, priv, shardd, ADD_BE,
be, ident, rampup);
if (task == NULL)
return (0);
task->weight = weight;
return (1);
}
VCL_BOOL
......@@ -198,7 +209,7 @@ shardcfg_remove_backend(VRT_CTX, struct vmod_priv *priv,
const struct sharddir *shardd, VCL_BACKEND be, VCL_STRING ident)
{
return (shard_change_task_backend(ctx, priv, shardd, REMOVE_BE,
be, ident, 0));
be, ident, 0) != NULL);
}
VCL_BOOL
......@@ -212,9 +223,7 @@ shardcfg_clear(VRT_CTX, struct vmod_priv *priv, const struct sharddir *shardd)
if (change == NULL)
return (0);
shard_change_task_add(ctx, change, CLEAR, NULL);
return (1);
return (shard_change_task_add(ctx, change, CLEAR, NULL) != NULL);
}
/*
......@@ -232,9 +241,11 @@ circlepoint_compare(const struct shard_circlepoint *a,
}
static void
shardcfg_hashcircle(struct sharddir *shardd, VCL_INT replicas)
shardcfg_hashcircle(struct sharddir *shardd)
{
int i, j;
const struct shard_backend *backends, *b;
int j, h;
uint32_t i, n_points, r, rmax;
const char *ident;
const int len = 12; // log10(UINT32_MAX) + 2;
char s[len];
......@@ -245,49 +256,60 @@ shardcfg_hashcircle(struct sharddir *shardd, VCL_INT replicas)
AZ(shardd->hashcircle);
assert(shardd->n_backend > 0);
AN(shardd->backend);
shardd->hashcircle = calloc(shardd->n_backend * replicas,
sizeof(struct shard_circlepoint));
AN(shardd->hashcircle);
backends=shardd->backend;
AN(backends);
n_points = 0;
rmax = (UINT32_MAX - 1) / shardd->n_backend;
for (b = backends; b < backends + shardd->n_backend; b++) {
CHECK_OBJ_NOTNULL(b->backend, DIRECTOR_MAGIC);
r = b->replicas;
if (r > rmax)
r = rmax;
n_points += r;
}
shardd->replicas = replicas;
assert(n_points < UINT32_MAX);
for (i = 0; i < shardd->n_backend; i++) {
CHECK_OBJ_NOTNULL(shardd->backend[i].backend, DIRECTOR_MAGIC);
shardd->n_points = n_points;
shardd->hashcircle = calloc(n_points, sizeof(struct shard_circlepoint));
AN(shardd->hashcircle);
ident = shardd->backend[i].ident
? shardd->backend[i].ident
: VRT_BACKEND_string(shardd->backend[i].backend);
i = 0;
for (h = 0, b = backends; h < shardd->n_backend; h++, b++) {
ident = b->ident ? b->ident : VRT_BACKEND_string(b->backend);
AN(ident);
assert(ident[0] != '\0');
for (j = 0; j < replicas; j++) {
r = b->replicas;
if (r > rmax)
r = rmax;
for (j = 0; j < r; j++) {
assert(snprintf(s, len, "%d", j) < len);
ss->n = 2;
ssp[0] = ident;
ssp[1] = s;
ss->p = ssp;
shardd->hashcircle[i * replicas + j].point =
VRT_HashStrands32(ss);
shardd->hashcircle[i * replicas + j].host = i;
assert (i < n_points);
shardd->hashcircle[i].point = VRT_HashStrands32(ss);
shardd->hashcircle[i].host = h;
i++;
}
}
qsort( (void *) shardd->hashcircle, shardd->n_backend * replicas,
assert (i == n_points);
qsort( (void *) shardd->hashcircle, n_points,
sizeof (struct shard_circlepoint), (compar) circlepoint_compare);
if ((shardd->debug_flags & SHDBG_CIRCLE) == 0)
return;
for (i = 0; i < shardd->n_backend; i++)
for (j = 0; j < replicas; j++)
SHDBG(SHDBG_CIRCLE, shardd,
"hashcircle[%5jd] = "
"{point = %8x, host = %2u}\n",
(intmax_t)(i * replicas + j),
shardd->hashcircle[i * replicas + j].point,
shardd->hashcircle[i * replicas + j].host);
for (i = 0; i < n_points; i++)
SHDBG(SHDBG_CIRCLE, shardd,
"hashcircle[%5jd] = {point = %8x, host = %2u}\n",
(intmax_t)i, shardd->hashcircle[i].point,
shardd->hashcircle[i].host);
}
/*
......@@ -394,7 +416,7 @@ shardcfg_backend_expand(const struct backend_reconfig *re)
static void
shardcfg_backend_add(struct backend_reconfig *re,
const struct shard_backend *b)
const struct shard_backend *b, uint32_t replicas)
{
unsigned i;
struct shard_backend *bb = re->shardd->backend;
......@@ -419,6 +441,7 @@ shardcfg_backend_add(struct backend_reconfig *re,
re->shardd->n_backend++;
shardcfg_backend_copyin(&bb[i], b);
bb[i].replicas = replicas;
}
static void
......@@ -499,10 +522,11 @@ shardcfg_backend_finalize(struct backend_reconfig *re)
static void
shardcfg_apply_change(VRT_CTX, struct sharddir *shardd,
const struct shard_change *change)
const struct shard_change *change, VCL_INT replicas)
{
struct shard_change_task *task, *clear;
const struct shard_backend *b;
uint32_t b_replicas;
struct backend_reconfig re = {
.shardd = shardd,
......@@ -550,7 +574,14 @@ shardcfg_apply_change(VRT_CTX, struct sharddir *shardd,
b = shardcfg_backend_lookup(&re, task->priv);
if (b == NULL) {
shardcfg_backend_add(&re, task->priv);
assert (task->weight >= 1);
if (replicas * task->weight > UINT32_MAX)
b_replicas = UINT32_MAX;
else
b_replicas = replicas * task->weight;
shardcfg_backend_add(&re, task->priv,
b_replicas);
break;
}
......@@ -599,7 +630,7 @@ shardcfg_reconfigure(VRT_CTX, struct vmod_priv *priv,
sharddir_wrlock(shardd);
shardcfg_apply_change(ctx, shardd, change);
shardcfg_apply_change(ctx, shardd, change, replicas);
shard_change_finish(change);
if (shardd->hashcircle)
......@@ -612,7 +643,7 @@ shardcfg_reconfigure(VRT_CTX, struct vmod_priv *priv,
return (0);
}
shardcfg_hashcircle(shardd, replicas);
shardcfg_hashcircle(shardd);
sharddir_unlock(shardd);
return (1);
}
......
......@@ -30,7 +30,7 @@
VCL_BOOL shardcfg_add_backend(VRT_CTX, struct vmod_priv *priv,
const struct sharddir *shardd, VCL_BACKEND be, VCL_STRING ident,
VCL_DURATION rampup);
VCL_DURATION rampup, VCL_REAL weight);
VCL_BOOL shardcfg_remove_backend(VRT_CTX, struct vmod_priv *priv,
const struct sharddir *shardd, VCL_BACKEND be, VCL_STRING ident);
VCL_BOOL shardcfg_clear(VRT_CTX, struct vmod_priv *priv,
......
......@@ -60,7 +60,7 @@ struct shard_be_info {
struct shard_state {
const struct vrt_ctx *ctx;
struct sharddir *shardd;
int idx;
uint32_t idx;
struct vbitmap *picklist;
int pickcount;
......@@ -94,8 +94,10 @@ shard_lookup(const struct sharddir *shardd, const uint32_t key)
{
CHECK_OBJ_NOTNULL(shardd, SHARDDIR_MAGIC);
const int n = shardd->n_backend * shardd->replicas;
int idx = -1, high = n, low = 0, i;
const uint32_t n = shardd->n_points;
uint32_t i, idx = UINT32_MAX, high = n, low = 0;
assert (n < idx);
do {
i = (high + low) / 2 ;
......@@ -113,7 +115,7 @@ shard_lookup(const struct sharddir *shardd, const uint32_t key)
high = i;
else
low = i;
} while (idx == -1);
} while (idx == UINT32_MAX);
return (idx);
}
......@@ -122,7 +124,6 @@ static int
shard_next(struct shard_state *state, VCL_INT skip, VCL_BOOL healthy)
{
int c, chosen = -1;
uint32_t ringsz;
VCL_BACKEND be;
vtim_real changed;
struct shard_be_info *sbe;
......@@ -134,8 +135,6 @@ shard_next(struct shard_state *state, VCL_INT skip, VCL_BOOL healthy)
if (state->pickcount >= state->shardd->n_backend)
return (-1);
ringsz = state->shardd->n_backend * state->shardd->replicas;
while (state->pickcount < state->shardd->n_backend && skip >= 0) {
c = state->shardd->hashcircle[state->idx].host;
......@@ -174,7 +173,7 @@ shard_next(struct shard_state *state, VCL_INT skip, VCL_BOOL healthy)
break;
}
if (++(state->idx) == ringsz)
if (++(state->idx) == state->shardd->n_points)
state->idx = 0;
}
return (chosen);
......
......@@ -43,6 +43,7 @@ struct shard_backend {
void *freeptr;
};
VCL_DURATION rampup;
uint32_t replicas;
};
struct vmod_directors_shard_param;
......@@ -68,7 +69,8 @@ struct sharddir {
VCL_DURATION rampup_duration;
VCL_REAL warmup;
VCL_INT replicas;
uint32_t n_points;
};
static inline VCL_BACKEND
......
......@@ -373,7 +373,7 @@ The association can be changed per backend request using the *param*
argument of `xshard.backend()`_.
$Method BOOL .add_backend(PRIV_TASK, BACKEND backend,
[STRING ident], [DURATION rampup])
[STRING ident], [DURATION rampup], [REAL weight])
Add a backend *backend* to the director.
......@@ -388,6 +388,12 @@ defaults to the backend name.
backend. Otherwise, the per-director rampup time is used (see
`xshard.set_rampup()`_).
*weight*: Optionally specify a weight to scale the
`xshard.reconfigure()`_ *replicas* parameter. *weight* is limited to
at least 1. Values above 10 probably do not make much sense. The
effect of *weight* is also capped such that the total number of
replicas does not exceed `UINT32_MAX`.
NOTE: Backend changes need to be finalized with
`xshard.reconfigure()`_ and are only supported on one
shard director at a time.
......
......@@ -305,6 +305,8 @@ VCL_BOOL v_matchproto_(td_directors_shard_add_backend)
vmod_shard_add_backend(VRT_CTX, struct vmod_directors_shard *vshard,
struct VARGS(shard_add_backend) *args)
{
VCL_REAL weight = 1;
CHECK_OBJ_NOTNULL(vshard, VMOD_SHARD_SHARD_MAGIC);
if (args->backend == NULL) {
......@@ -313,10 +315,14 @@ vmod_shard_add_backend(VRT_CTX, struct vmod_directors_shard *vshard,
return (0);
}
if (args->valid_weight && args->weight > 1)
weight = args->weight;
return shardcfg_add_backend(ctx, args->arg1,
vshard->shardd, args->backend,
args->valid_ident ? args->ident : NULL,
args->valid_rampup ? args->rampup : nan(""));
args->valid_rampup ? args->rampup : nan(""),
weight);
}
VCL_BOOL v_matchproto_(td_directors_shard_remove_backend)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment