Commit dd288af6 authored by Poul-Henning Kamp's avatar Poul-Henning Kamp

Rework the binary heap, we use for expiry processing, to deal more

gracefully with large number of objects.

Previously we kept all objects in a single array which resultined
in increasingly infrequent but increasingly demanding calls to
calloc(3) with the consequent massive memory copies.  We also did
not release memory again if unused.

Now we stripe the array into rows of 64k objects each.

This number is a compromise between space wastage, max 1MB on a
64bit machine, and the desire to not add and delete rows all the
time.  With 64k objects in a row, even on a very busy server would
only add a new row every 5...10 seconds during ramp up.

Delete unused rows, but keep a hysteresis of an entire empty row
to avoid silly add-delete-add-delete-add-delete behaviour at row
boundaries.

Streamline some of the functions a bit.

Fixes #210




git-svn-id: http://www.varnish-cache.org/svn/trunk/varnish-cache@3390 d4fa192b-c00b-0410-8231-f00ffab90ce4
parent 5a070b13
......@@ -32,8 +32,6 @@
*
* We use a malloc(3)/realloc(3) array to store the pointers using the
* classical FORTRAN strategy.
*
* XXX: the array is not scaled back when items are deleted.
*/
#include "config.h"
......@@ -44,38 +42,71 @@
#include "binary_heap.h"
#include "libvarnish.h"
/* Private definitions -----------------------------------------------*/
/* Paramters ---------------------------------------------------------*/
/*
* The number of elements in a row has to be a compromise between
* wasted space and number of memory allocations.
* With 64k objects per row, there will be at least 5...10 seconds
* between row additions on a very busy server.
* At the same time, the worst case amount of wasted memory is kept
* at a reasonable 1 MB -- two rows on 64bit system.
* Finally, but without practical significance: 16 bits should be
* easier for the compiler to optimize.
*/
#define ROW_SHIFT 16
#define MIN_LENGTH 16
/* Private definitions -----------------------------------------------*/
#define ROOT_IDX 1
#define ROW_WIDTH (1 << ROW_SHIFT)
/*lint -emacro(572, ROW) shift 0 >> by 16 */
/*lint -emacro(835, ROW) 0 left of >> */
/*lint -emacro(778, ROW) const >> evaluates to zero */
#define ROW(b, n) ((b)->array[(n) >> ROW_SHIFT])
/*lint -emacro(835, A) 0 left of & */
#define A(b, n) ROW(b, n)[(n) & (ROW_WIDTH - 1)]
struct binheap {
unsigned magic;
#define BINHEAP_MAGIC 0xf581581aU /* from /dev/random */
void *priv;
binheap_cmp_t *cmp;
binheap_update_t *update;
void **array;
void ***array;
unsigned rows;
unsigned length;
unsigned next;
unsigned granularity;
};
#define PARENT(u) ((u) / 2)
/*lint -emacro(835, CHILD) 0 right of + */
#define CHILD(u,n) ((u) * 2 + (n))
/* Implementation ----------------------------------------------------*/
static void
binheap_update(const struct binheap *bh, unsigned u)
binheap_addrow(struct binheap *bh)
{
assert(bh->magic == BINHEAP_MAGIC);
assert(u < bh->next);
assert(bh->array[u] != NULL);
if (bh->update == NULL)
return;
bh->update(bh->priv, bh->array[u], u);
unsigned u;
/* First make sure we have space for another row */
if (&ROW(bh, bh->length) >= bh->array + bh->rows) {
u = bh->rows * 2;
bh->array = realloc(bh->array, sizeof(*bh->array) * u);
assert(bh->array != NULL);
/* NULL out new pointers */
while (bh->rows < u)
bh->array[bh->rows++] = NULL;
}
assert(ROW(bh, bh->length) == NULL);
ROW(bh, bh->length) = malloc(sizeof(**bh->array) * ROW_WIDTH);
assert(ROW(bh, bh->length));
bh->length += ROW_WIDTH;
}
struct binheap *
......@@ -90,14 +121,26 @@ binheap_new(void *priv, binheap_cmp_t *cmp_f, binheap_update_t *update_f)
bh->cmp = cmp_f;
bh->update = update_f;
bh->next = ROOT_IDX;
bh->length = MIN_LENGTH;
bh->array = calloc(sizeof *bh->array, bh->length);
bh->rows = 16; /* A tiny-ish number */
bh->array = calloc(sizeof *bh->array, bh->rows);
assert(bh->array != NULL);
bh->granularity = getpagesize() / sizeof *bh->array;
binheap_addrow(bh);
A(bh, ROOT_IDX) = NULL;
bh->magic = BINHEAP_MAGIC;
return (bh);
}
static void
binheap_update(const struct binheap *bh, unsigned u)
{
assert(bh->magic == BINHEAP_MAGIC);
assert(u < bh->next);
assert(A(bh, u) != NULL);
if (bh->update == NULL)
return;
bh->update(bh->priv, A(bh, u), u);
}
static void
binhead_swap(const struct binheap *bh, unsigned u, unsigned v)
{
......@@ -106,9 +149,9 @@ binhead_swap(const struct binheap *bh, unsigned u, unsigned v)
assert(bh->magic == BINHEAP_MAGIC);
assert(u < bh->next);
assert(v < bh->next);
p = bh->array[u];
bh->array[u] = bh->array[v];
bh->array[v] = p;
p = A(bh, u);
A(bh, u) = A(bh, v);
A(bh, v) = p;
binheap_update(bh, u);
binheap_update(bh, v);
}
......@@ -121,11 +164,10 @@ binheap_trickleup(const struct binheap *bh, unsigned u)
assert(bh->magic == BINHEAP_MAGIC);
while (u > ROOT_IDX) {
v = PARENT(u);
if (bh->cmp(bh->priv, bh->array[u], bh->array[v])) {
binhead_swap(bh, u, v);
u = v;
} else
if (!bh->cmp(bh->priv, A(bh, u), A(bh, v)))
break;
binhead_swap(bh, u, v);
u = v;
}
return (u);
}
......@@ -138,28 +180,15 @@ binheap_trickledown(const struct binheap *bh, unsigned u)
assert(bh->magic == BINHEAP_MAGIC);
while (1) {
v1 = CHILD(u, 0);
v2 = CHILD(u, 1);
if (v1 >= bh->next)
return;
if (v2 >= bh->next) {
if (!bh->cmp(bh->priv, bh->array[u], bh->array[v1]))
binhead_swap(bh, u, v1);
v2 = CHILD(u, 1);
if (v2 < bh->next && bh->cmp(bh->priv, A(bh, v2), A(bh, v1)))
v1 = v2;
if (bh->cmp(bh->priv, A(bh, u), A(bh, v1)))
return;
}
if (bh->cmp(bh->priv, bh->array[v1], bh->array[v2])) {
if (!bh->cmp(bh->priv, bh->array[u], bh->array[v1])) {
binhead_swap(bh, u, v1);
u = v1;
continue;
}
} else {
if (!bh->cmp(bh->priv, bh->array[u], bh->array[v2])) {
binhead_swap(bh, u, v2);
u = v2;
continue;
}
}
return;
binhead_swap(bh, u, v1);
u = v1;
}
}
......@@ -171,18 +200,10 @@ binheap_insert(struct binheap *bh, void *p)
assert(bh != NULL);
assert(bh->magic == BINHEAP_MAGIC);
assert(bh->length >= bh->next);
if (bh->length == bh->next) {
if (bh->length >= bh->granularity * 32)
bh->length += bh->granularity * 32;
else if (bh->length > bh->granularity)
bh->length += bh->granularity;
else
bh->length += bh->length;
bh->array = realloc(bh->array, bh->length * sizeof *bh->array);
assert(bh->array != NULL);
}
if (bh->length == bh->next)
binheap_addrow(bh);
u = bh->next++;
bh->array[u] = p;
A(bh, u) = p;
binheap_update(bh, u);
(void)binheap_trickleup(bh, u);
}
......@@ -193,11 +214,33 @@ binheap_root(const struct binheap *bh)
assert(bh != NULL);
assert(bh->magic == BINHEAP_MAGIC);
if (bh->next == ROOT_IDX)
return (NULL);
return (bh->array[ROOT_IDX]);
return (A(bh, ROOT_IDX));
}
/*
* It may seem counter-intuitive that we delete by replacement with
* the tail object. "That's almost certain to not belong there, in
* particular when we delete the root ?" is the typical reaction.
*
* If we tried to trickle up into the empty position, we would,
* eventually, end up with a hole in the bottom row, at which point
* we would move the tail object there.
* But there is no guarantee that the tail object would not need to
* trickle up from that position, in fact, it might be the new root
* of this half of the subtree.
* The total number of operations is guaranteed to be at least
* N{height} downward selections, because we have to get the hole
* all the way down, but in addition to that, we may get up to
* N{height}-1 upward trickles.
*
* When we fill the hole with the tail object, the worst case is
* that it trickles all the way down to become the tail object
* again.
* In other words worst case is N{height} downward trickles.
* But there is a pretty decent chance that it does not make
* it all the way down.
*/
void
binheap_delete(struct binheap *bh, unsigned idx)
{
......@@ -207,18 +250,28 @@ binheap_delete(struct binheap *bh, unsigned idx)
assert(bh->next > ROOT_IDX);
assert(idx < bh->next);
assert(idx > 0);
assert(bh->array[idx] != NULL);
bh->update(bh->priv, bh->array[idx], 0);
assert(A(bh, idx) != NULL);
bh->update(bh->priv, A(bh, idx), 0);
if (idx == --bh->next) {
bh->array[bh->next] = NULL;
A(bh, bh->next) = NULL;
return;
}
bh->array[idx] = bh->array[bh->next];
bh->array[bh->next] = NULL;
A(bh, idx) = A(bh, bh->next);
A(bh, bh->next) = NULL;
binheap_update(bh, idx);
idx = binheap_trickleup(bh, idx);
binheap_trickledown(bh, idx);
/* XXX: free part of array ? */
/*
* We keep a hysteresis of one full row before we start to
* return space to the OS to avoid silly behaviour around
* row boundaries.
*/
if (bh->next + 2 * ROW_WIDTH <= bh->length) {
free(ROW(bh, bh->length - 1));
ROW(bh, bh->length - 1) = NULL;
bh->length -= ROW_WIDTH;
}
}
......@@ -226,7 +279,10 @@ binheap_delete(struct binheap *bh, unsigned idx)
/* Test driver -------------------------------------------------------*/
#include <stdio.h>
#if 0
#if 1
#define N 23
static int
cmp(void *priv, void *a, void *b)
{
......@@ -247,14 +303,14 @@ dump(struct binheap *bh, const char *what, unsigned ptr)
unsigned u, *up;
printf("dump\n");
f = popen("dot -Tps >> /tmp/_.ps", "w");
f = popen("dot -Tps >> /tmp/_.ps 2>/dev/null", "w");
assert(f != NULL);
fprintf(f, "digraph binheap {\n");
fprintf(f, "size=\"7,10\"\n");
fprintf(f, "ptr [label=\"%s\"]\n", what);
fprintf(f, "ptr -> node_%u\n", ptr);
for (u = 1; u < bh->next; u++) {
up = bh->array[u];
up = A(bh, u);
fprintf(f, "node_%u [label=\"%u\"];\n", u, *up);
if (u > 0)
fprintf(f, "node_%u -> node_%u\n", PARENT(u), u);
......@@ -263,7 +319,6 @@ dump(struct binheap *bh, const char *what, unsigned ptr)
pclose(f);
}
#define N 31
int
main(int argc, char **argv)
{
......@@ -286,7 +341,7 @@ main(int argc, char **argv)
break;
assert(*up >= lu);
lu = *up;
u = random() % bh->next;
u = (random() % (bh->next - 1)) + 1;
binheap_delete(bh, u);
if (1)
dump(bh, "Delete", u);
......@@ -301,7 +356,7 @@ struct foo {
};
#define M 1311191
#define N 131
#define N 1311
struct foo ff[N];
......@@ -332,8 +387,8 @@ chk(struct binheap *bh)
for (u = 2; u < bh->next; u++) {
v = PARENT(u);
fa = bh->array[u];
fb = bh->array[v];
fa = A(bh, u);
fb = A(bh, v);
assert(fa->key > fb->key);
continue;
printf("[%2u/%2u] %10u > [%2u/%2u] %10u %s\n",
......@@ -363,11 +418,17 @@ main(int argc, char **argv)
for (u = 0; u < M; u++) {
v = random() % N;
if (ff[v].idx > 0) {
printf("Delete [%u] %'u\n", v, ff[v].key);
if (0)
printf("Delete [%u] %'u\n", v, ff[v].key);
else
printf("-%u", v);
binheap_delete(bh, ff[v].idx);
} else {
ff[v].key = random();
printf("Insert [%u] %'u\n", v, ff[v].key);
if (0)
printf("Insert [%u] %'u\n", v, ff[v].key);
else
printf("+%u", v);
binheap_insert(bh, &ff[v]);
}
chk(bh);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment