Commit 6cdb8cdd authored by Geoff Simmons's avatar Geoff Simmons

Add self-sharding by cookie value.

To ensure that health checks from other Varnish instances succeed
in the self-sharding configuration, move the clause in vcl_recv
that is executed when remote.ip matches the ACL for cluster members
to the start of the subroutine.

XXX: currently returns a synthetic 400 response if the configured
cookie name is not found in the Cookie header (or if there is no
Cookie header). In a further iteration, we will add an optional
field defaultKey, which if present sets a string value to be used
as the sharding key if the key is not found.

XXX: testing this feature has revealed bugs in sharding by key
(using by=KEY) unless primaryOnly is also set to true. These will
be fixed in a further iteration.
parent a16103c6
......@@ -75,7 +75,7 @@ spec:
type: boolean
key:
type: string
pattern: "^client\\.identity$|^req\\.(url|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$"
pattern: "^client\\.identity$|^req\\.(url|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$|^cookie=[a-zA-Z0-9!#$%&'*+.^_`|~-]+$"
digest:
type: string
enum:
......
......@@ -54,6 +54,10 @@ deploy-shard-by-key-helm:
@helm install viking-ingress-shard-by-key $(CHARTDIR)/viking-test-app \
--values values-shard-by-key.yaml
deploy-shard-by-cookie-helm:
@helm install viking-ingress-shard-by-cookie \
$(CHARTDIR)/viking-test-app --values values-shard-by-cookie.yaml
deploy-primary-only-by-clientid-helm:
@helm install viking-ingress-primary-only-by-clientid $(CHARTDIR)/viking-test-app \
--values values-primary-only-by-clientid.yaml
......@@ -91,6 +95,9 @@ deploy-shard-conditions-kubectl: deploy-cafe-kubectl
verify:
$(mkdir)/verify.sh
verify-cookie:
$(mkdir)/verify-cookie.sh
wait:
@echo Waiting until varnish-ingress Pods are not configured for Ingress
$(TESTDIR)/wait.sh app=varnish-ingress
......@@ -115,6 +122,10 @@ undeploy-shard-by-key-helm:
@helm uninstall viking-ingress-shard-by-key
$(MAKE) wait
undeploy-shard-by-cookie-helm:
@helm uninstall viking-ingress-shard-by-cookie
$(MAKE) wait
undeploy-primary-only-by-clientid-helm:
@helm uninstall viking-ingress-primary-only-by-clientid
$(MAKE) wait
......@@ -188,6 +199,8 @@ deploy-shard-by-url: deploy-shard-by-url-helm
undeploy-shard-by-url: undeploy-shard-by-url-helm
deploy-shard-by-key: deploy-shard-by-key-helm
undeploy-shard-by-key: undeploy-shard-by-key-helm
deploy-shard-by-cookie: deploy-shard-by-cookie-helm
undeploy-shard-by-cookie: undeploy-shard-by-cookie-helm
deploy-primary-only-by-clientid: deploy-primary-only-by-clientid-helm
undeploy-primary-only-by-clientid: undeploy-primary-only-by-clientid-helm
deploy-shard-conditions: deploy-shard-conditions-helm
......@@ -209,6 +222,10 @@ undeploy: undeploy-shard-by-url
else ifeq ($(EXAMPLE),shard-by-key)
deploy: deploy-shard-by-key
undeploy: undeploy-shard-by-key
else ifeq ($(EXAMPLE),shard-by-cookie)
deploy: deploy-shard-by-cookie
undeploy: undeploy-shard-by-cookie
verify: verify-cookie
else ifeq ($(EXAMPLE),primary-only-by-clientid)
deploy: deploy-primary-only-by-clientid
undeploy: undeploy-primary-only-by-clientid
......@@ -217,7 +234,7 @@ deploy: deploy-shard-conditions
undeploy: undeploy-shard-conditions
else
deploy undeploy:
$(error EXAMPLE must be set to self-sharding, shard-conditions, primary-only[-by-clientid], or shard-by-[digest|url|key])
$(error EXAMPLE must be set to self-sharding, shard-conditions, primary-only[-by-clientid], or shard-by-[digest|url|key|cookie])
endif
.PHONY: all $(MAKECMDGOALS)
# looks like -*- vcl -*-
varnishtest "cafe example with self-sharding by Cookie"
# The beresp may send Connection:close, if Varnish went to pipe due to
# primary-only. So we run each test in a separate connection.
client c1 -connect "${localhost} ${localport}" {
txreq -url /coffee/foo -hdr "Host: cafe.example.com" \
-hdr "Cookie: baz=quux; foo=abcdefghijklmnopqrstuvwxyz; 47=11"
rxresp
expect resp.status == 200
expect resp.body ~ "(?m)^URI: /coffee/foo$"
expect resp.body ~ "(?m)^Server name: coffee-[a-z0-9]+-[a-z0-9]+$"
} -run
client c1 -connect "${localhost} ${localport}" {
txreq -url /tea/bar -hdr "Host: cafe.example.com" \
-hdr "Cookie: baz=quux; foo=foobar"
rxresp
expect resp.status == 200
expect resp.body ~ "(?m)^URI: /tea/bar"
expect resp.body ~ "(?m)^Server name: tea-[a-z0-9]+-[a-z0-9]+$"
} -run
client c1 -connect "${localhost} ${localport}" {
txreq -url /coffee/baz -hdr "Host: cafe.example.com" \
-hdr "Cookie: foo=47; baz=quux"
rxresp
expect resp.status == 200
expect resp.body ~ "(?m)^URI: /coffee/baz"
expect resp.body ~ "(?m)^Server name: coffee-[a-z0-9]+-[a-z0-9]+$"
} -run
client c1 -connect "${localhost} ${localport}" {
txreq -url /tea/quux -hdr "Host: cafe.example.com" \
-hdr "Cookie: foo=fighter"
rxresp
expect resp.status == 200
expect resp.body ~ "(?m)^URI: /tea/quux"
expect resp.body ~ "(?m)^Server name: tea-[a-z0-9]+-[a-z0-9]+$"
} -run
client c1 -connect "${localhost} ${localport}" {
txreq -url /coffee/foo -hdr "Host: cafe.example.com" \
-hdr "Cookie: baz=quux; 47=11"
rxresp
expect resp.status == 400
} -run
client c1 -connect "${localhost} ${localport}" {
txreq -url /coffee/foo -hdr "Host: cafe.example.com"
rxresp
expect resp.status == 400
} -run
apps:
coffee:
image: nginxdemos/hello:plain-text
replicas: 2
tea:
image: nginxdemos/hello:plain-text
replicas: 3
ingress:
name: cafe-ingress
rules:
- host: cafe.example.com
paths:
- path: /tea
type: Prefix
app: tea
- path: /coffee
type: Prefix
app: coffee
vikingAdmSvc: varnish-ingress-admin
selfSharding:
rules:
- shard:
key: cookie=foo
primaryOnly: true
# Use reqDisposition to bypass builtin vcl_recv so that responses to
# requests with the Cookie head may be cacheable.
# cf. "cookie pass" in examples/req-disposition
reqDisposition:
- conditions:
- comparand: req.http.Host
compare: not-exists
- comparand: req.esi_level
count: 0
- comparand: req.proto
compare: prefix
values:
- HTTP/1.1
match-flags:
case-sensitive: false
disposition:
action: synth
status: 400
- conditions:
- comparand: req.method
compare: not-equal
values:
- GET
- HEAD
- PUT
- POST
- TRACE
- OPTIONS
- DELETE
- PATCH
- CONNECT
disposition:
action: synth
status: 405
- conditions:
- comparand: req.method
compare: not-equal
values:
- GET
- HEAD
disposition:
action: pass
#! /bin/bash -ex
MYDIR=$(dirname ${BASH_SOURCE[0]})
source ${MYDIR}/../../test/utils.sh
LOCALPORT=${LOCALPORT:-8888}
wait_until_ready app=varnish-ingress
wait_until_configured app=varnish-ingress
kubectl port-forward svc/varnish-ingress ${LOCALPORT}:80 >/dev/null &
trap 'kill $(jobs -p)' EXIT
wait_for_port ${LOCALPORT}
# XXX hackish, see the comments in verify.sh
sleep 10
varnishtest ${TESTOPTS} -Dlocalport=${LOCALPORT} cafe-cookie.vtc
......@@ -45,6 +45,8 @@ import (
net_v1 "k8s.io/api/networking/v1"
)
const cookieKeyPfx = "cookie="
// Don't return error (requeuing the vcfg) if either of Ingresses or
// Services are not found -- they will sync as needed when and if they
// are discovered.
......@@ -459,6 +461,10 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
vclRule.Key = rule.Sharding.Key
if rule.Sharding.Key == "req.url" {
vclRule.By = vcl.URL
} else if strings.HasPrefix(vclRule.Key, cookieKeyPfx) {
vclRule.By = vcl.Cookie
vclRule.Key = strings.TrimPrefix(
vclRule.Key, cookieKeyPfx)
} else {
vclRule.By = vcl.Key
}
......
......@@ -31,6 +31,7 @@ package vcl
import (
"fmt"
"regexp"
"strconv"
"text/template"
)
......@@ -95,6 +96,10 @@ sub vcl_init {
{{- end }}
{{- range $ridx, $rule := .Rules -}}
{{- digest_init $rule }}
{{- if isCookieKey $rule }}
new vk8s_shard_cookie_{{$ridx}} =
re2.regex("\b{{$rule.Key}}\s*=\s*([^,;[:space:]]+)");
{{- end }}
{{- range $cidx, $c := $rule.Conditions -}}
{{- if condNeedsMatcher $c }}
new {{condMatcher $ridx $cidx}} = {{vmod $c.Compare}}.set({{flags $c}});
......@@ -107,6 +112,24 @@ sub vcl_init {
}
sub vcl_recv {
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
{{ range $ridx, $rule := .Rules -}}
{{ if $rule.PrimaryOnly -}}
{{ if $rule.Conditions -}}
......@@ -126,35 +149,21 @@ sub vcl_recv {
) {
{{ end -}}
{{- digest_update 'c' $rule }}
{{- if isCookieKey $rule }}
if (!vk8s_shard_cookie_{{$ridx}}.match(req.http.Cookie)) {
return (synth(400));
}
{{- end }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' $rule }}));
{{- key 'c' $rule $ridx }}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
{{ if $rule.Conditions }}} else
{{ end -}}
{{ if $rule.Conditions -}}}{{ end -}}
{{ end -}}
{{ end -}}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
}
sub vk8s_cluster_fetch {
......@@ -179,7 +188,12 @@ sub vk8s_cluster_fetch {
{{- end}}
{{ closeIf $rule }}
{{- digest_update 'b' $rule }}
vk8s_cluster_param.set({{ key 'b' $rule }});
{{- if isCookieKey $rule }}
if (!vk8s_shard_cookie_{{$ridx}}.match(bereq.http.Cookie)) {
return (error(400));
}
{{- end }}
vk8s_cluster_param.set({{ key 'b' $rule $ridx }});
vk8s_cluster_forward.set(true);
{{ closeRule $rule }}
{{- end }}{{/* if not PrimaryOnly */}}
......@@ -241,7 +255,7 @@ func context(ctx rune, key string) string {
return reqMatch.ReplaceAllLiteralString(key, "bereq")
}
func keyParams(ctx rune, rule ShardRule) string {
func keyParams(ctx rune, rule ShardRule, idx int) string {
pfx := ""
if ctx == 'c' {
pfx = ", "
......@@ -254,10 +268,17 @@ func keyParams(ctx rune, rule ShardRule) string {
case Key:
return pfx + "by=KEY, key=vk8s_cluster.key(" +
context(ctx, rule.Key) + ")"
case Cookie:
return pfx + "by=KEY, key=vk8s_cluster.key(vk8s_shard_cookie_" +
strconv.Itoa(idx) + ".backref(1))"
}
return pfx + "by=BLOB, key_blob=vk8s_shard_digest.final()"
}
func isCookieKey(rule ShardRule) bool {
return rule.By == Cookie
}
func digestInit(rule ShardRule) string {
if rule.By != Blob {
return ""
......@@ -314,6 +335,7 @@ const selfShardName = "self-sharding"
var shardFuncMap = template.FuncMap{
"key": keyParams,
"isCookieKey": isCookieKey,
"digest_init": digestInit,
"digest_update": digestUpdate,
"hasPrimary": hasPrimary,
......
......@@ -129,6 +129,10 @@ func TestShardByDigest(t *testing.T) {
}
func TestShardConditions(t *testing.T) {
varnishCluster.Rules[0].PrimaryOnly = true
varnishCluster.Rules[0].By = Blob
varnishCluster.Rules[0].Key = "req.http.Host"
varnishCluster.Rules[0].Algo = Sha3_512
varnishCluster.Rules[0].Conditions = []Condition{
{
Comparand: "req.url",
......@@ -139,3 +143,10 @@ func TestShardConditions(t *testing.T) {
varnishCluster.Rules = append(varnishCluster.Rules, ShardRule{})
templateTest(t, shardTmpl, varnishCluster, "shard_conditions.golden")
}
func TestShardByCookie(t *testing.T) {
varnishCluster.Rules[0].PrimaryOnly = true
varnishCluster.Rules[0].By = Cookie
varnishCluster.Rules[0].Key = "foobar"
templateTest(t, shardTmpl, varnishCluster, "shard_by_cookie.golden")
}
......@@ -412,7 +412,7 @@ func (algo HashAlgo) String() string {
}
}
// KeyBy classifies the shard key methid
// KeyBy classifies the shard key method
type KeyBy uint8
const (
......@@ -424,6 +424,8 @@ const (
Key
// Blob by=BLOB
Blob
// Cookie by=KEY with a cookie value
Cookie
)
// ShardRule represents a sharding configuration, and optional
......
......@@ -152,7 +152,7 @@ sub vcl_recv {
return (hash);
}
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -55,13 +55,6 @@ sub vcl_init {
}
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
......@@ -80,7 +73,14 @@ sub vcl_recv {
return (hash);
}
}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -73,7 +73,7 @@ sub vcl_recv {
return (hash);
}
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
import std;
import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"
"Connection: close";
.timeout = 2s;
.interval = 5s;
.initial = 2;
.window = 8;
.threshold = 3;
}
backend vk8s_default_varnish-8445d4f7f-z2b9p {
.host = "172.17.0.12";
.port = "80";
.probe = vk8s_probe_varnish;
}
backend vk8s__ {
.host = "172.17.0.13";
.port = "80";
.probe = vk8s_probe_varnish;
}
backend vk8s_default_varnish-8445d4f7f-ldljf {
.host = "172.17.0.14";
.port = "80";
.probe = vk8s_probe_varnish;
}
acl vk8s_cluster_acl {
"172.17.0.12";
"172.17.0.13";
"172.17.0.14";
}
sub vcl_init {
new vk8s_cluster_param = directors.shard_param();
new vk8s_cluster = directors.shard();
vk8s_cluster.associate(vk8s_cluster_param.use());
vk8s_cluster.add_backend(vk8s_default_varnish-8445d4f7f-z2b9p);
vk8s_cluster.add_backend(vk8s__);
vk8s_cluster.add_backend(vk8s_default_varnish-8445d4f7f-ldljf);
vk8s_cluster.reconfigure();
new vk8s_cluster_forward = taskvar.bool();
new vk8s_cluster_primary = taskvar.backend();
new vk8s_shard_cookie_0 =
re2.regex("\bfoobar\s*=\s*([^,;[:space:]]+)");
new vk8s_selfshard_cond_0_0 = selector.set(case_sensitive=false);
vk8s_selfshard_cond_0_0.add("/foo/");
}
sub vcl_recv {
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
if (vk8s_selfshard_cond_0_0.hasprefix(req.url)) {
if (!vk8s_shard_cookie_0.match(req.http.Cookie)) {
return (synth(400));
}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=KEY, key=vk8s_cluster.key(vk8s_shard_cookie_0.backref(1))));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
|| bereq.uncacheable
|| remote.ip ~ vk8s_cluster_acl
|| "" + vk8s_cluster.backend(resolve=NOW) == server.identity) {
return;
}
vk8s_cluster_param.set();
vk8s_cluster_forward.set(true);
if (vk8s_cluster_forward.get(fallback=false)) {
set bereq.backend = vk8s_cluster.backend(resolve=LAZY);
set bereq.http.VK8S-Is-Bgfetch = bereq.is_bgfetch;
return (fetch);
}
}
sub vcl_backend_fetch {
call vk8s_cluster_fetch;
}
sub vcl_backend_response {
if (bereq.backend == vk8s_cluster.backend(resolve=LAZY)) {
if (beresp.http.VK8S-Cluster-TTL) {
set beresp.ttl = std.duration(
beresp.http.VK8S-Cluster-TTL + "s", 1s);
if (beresp.ttl > 5m) {
set beresp.ttl = 5m;
}
unset beresp.http.VK8S-Cluster-TTL;
}
else {
set beresp.uncacheable = true;
}
return (deliver);
}
}
sub vcl_backend_error {
if (bereq.backend == vk8s_cluster.backend(resolve=LAZY)) {
return (deliver);
}
}
sub vcl_deliver {
unset resp.http.VK8S-Cluster-TTL;
if (remote.ip ~ vk8s_cluster_acl && ! vk8s_cluster_primary.defined()) {
if (! obj.uncacheable) {
set resp.http.VK8S-Cluster-TTL = obj.ttl;
}
return (deliver);
}
}
......@@ -56,14 +56,6 @@ sub vcl_init {
}
sub vcl_recv {
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
......@@ -82,7 +74,15 @@ sub vcl_recv {
return (hash);
}
}
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -55,13 +55,6 @@ sub vcl_init {
}
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=KEY, key=vk8s_cluster.key(req.http.Host)));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
......@@ -80,7 +73,14 @@ sub vcl_recv {
return (hash);
}
}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=KEY, key=vk8s_cluster.key(req.http.Host)));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -55,13 +55,6 @@ sub vcl_init {
}
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=URL));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
......@@ -80,7 +73,14 @@ sub vcl_recv {
return (hash);
}
}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=URL));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -58,16 +58,6 @@ sub vcl_init {
}
sub vcl_recv {
if (vk8s_selfshard_cond_0_0.hasprefix(req.url)) {
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
} else
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
......@@ -86,7 +76,16 @@ sub vcl_recv {
return (hash);
}
}
if (vk8s_selfshard_cond_0_0.hasprefix(req.url)) {
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}}
sub vk8s_cluster_fetch {
if (bereq.retries > 0
......
......@@ -84,6 +84,9 @@ make EXAMPLE=shard-by-url deploy verify undeploy
echo Self-sharding by key example
make EXAMPLE=shard-by-key deploy verify undeploy
echo Self-sharding by cookie example
make EXAMPLE=shard-by-cookie deploy verify-cookie undeploy
echo Primary-only self-sharding by client.identity as key
make EXAMPLE=primary-only-by-clientid deploy verify undeploy
......
......@@ -648,6 +648,10 @@ templates:
{{- end }}
{{- range $ridx, $rule := .Rules -}}
{{- digest_init $rule }}
{{- if isCookieKey $rule }}
new vk8s_shard_cookie_{{$ridx}} =
re2.regex("\b{{$rule.Key}}\s*=\s*([^,;[:space:]]+)");
{{- end }}
{{- range $cidx, $c := $rule.Conditions -}}
{{- if condNeedsMatcher $c }}
new {{condMatcher $ridx $cidx}} = {{vmod $c.Compare}}.set({{flags $c}});
......@@ -660,6 +664,24 @@ templates:
}
sub vcl_recv {
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
{{ range $ridx, $rule := .Rules -}}
{{ if $rule.PrimaryOnly -}}
{{ if $rule.Conditions -}}
......@@ -679,35 +701,21 @@ templates:
) {
{{ end -}}
{{- digest_update 'c' $rule }}
{{- if isCookieKey $rule }}
if (!vk8s_shard_cookie_{{$ridx}}.match(req.http.Cookie)) {
return (synth(400));
}
{{- end }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' $rule }}));
{{- key 'c' $rule $ridx }}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
{{ if $rule.Conditions }}}
{{ if $rule.Conditions -}}}{{ end -}}
{{ end -}}
{{ end -}}
{{ end -}}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
}
sub vk8s_cluster_fetch {
......@@ -732,7 +740,12 @@ templates:
{{- end}}
{{ closeIf $rule }}
{{- digest_update 'b' $rule }}
vk8s_cluster_param.set({{ key 'b' $rule }});
{{- if isCookieKey $rule }}
if (!vk8s_shard_cookie_{{$ridx}}.match(bereq.http.Cookie)) {
return (error(400));
}
{{- end }}
vk8s_cluster_param.set({{ key 'b' $rule $ridx }});
vk8s_cluster_forward.set(true);
{{ closeRule $rule }}
{{- end }}{{/* if not PrimaryOnly */}}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment