Commit 075c5b26 authored by Geoff Simmons's avatar Geoff Simmons

Add the conditions field to VarnishConfig.self-sharding.

Currently only implemented to kubectl-style deployments.
Helm support to be added in the next commits.

This is a breaking change for the self-sharding configuration.
Configs previously expressed in the self-sharding object are
now moved to self-sharding.shard. In addition, the optional
array self-sharding.conditions may express conditions under
which self-sharding logic is executed.

The conditions array has the same syntax and semantics as
for req-disposition.

XXX: there is some DRY that needs to be refactored away.

- We currently have two internal means of expressing
  conditions under which something happens (translated
  to if-clauses with matching operations and the like in
  VCL): Condition and the legacy MatchTerm, in both the
  k8s VarnishConfig class and the spec type for VCL
  templating. These should be unified to Condition.

- The code for interpreting Conditions for the k8s object
  in pkg/controller is in part repeated for self-sharding
  and req-disposition. These should be encapsulated in a
  common function.

- Template generation for the if-clauses implementing
  conditions is repeated in req-dispostiion and currently
  in two places for self-sharding. This should be a
  common text/template object.

While here, move some of the code about self-sharding and
conditions from varnishconfig.go to ingress.go. Because
it's actually about the VarnishConfig Custom Resource, and
in a continuing effort to reduce the oversized ingress.go
source.
parent 5c7ef404
......@@ -34,49 +34,107 @@ spec:
self-sharding:
type: object
properties:
max-secondary-ttl:
type: string
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
primaryOnly:
type: boolean
key:
type: string
pattern: "^client\\.identity$|^req\\.(url|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$"
digest:
type: string
enum:
- CRC32
- ICRC32
- MD5
- RS
- SHA1
- SHA224
- SHA256
- SHA384
- SHA512
- SHA3_224
- SHA3_256
- SHA3_512
probe:
shard:
type: object
properties:
timeout:
max-secondary-ttl:
type: string
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
interval:
primaryOnly:
type: boolean
key:
type: string
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
initial:
type: integer
minimum: 0
window:
type: integer
minimum: 0
maximum: 64
threshold:
type: integer
minimum: 0
maximum: 64
pattern: "^client\\.identity$|^req\\.(url|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$"
digest:
type: string
enum:
- CRC32
- ICRC32
- MD5
- RS
- SHA1
- SHA224
- SHA256
- SHA384
- SHA512
- SHA3_224
- SHA3_256
- SHA3_512
probe:
type: object
properties:
timeout:
type: string
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
interval:
type: string
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
initial:
type: integer
minimum: 0
window:
type: integer
minimum: 0
maximum: 64
threshold:
type: integer
minimum: 0
maximum: 64
conditions:
type: array
minItems: 1
items:
type: object
required:
- comparand
properties:
comparand:
type: string
pattern: "^req\\.(url|method|proto|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$"
compare:
type: string
enum:
- equal
- not-equal
- match
- not-match
- prefix
- not-prefix
- exists
- not-exists
values:
type: array
minItems: 1
items:
type: string
match-flags:
type: object
properties:
max-mem:
type: integer
minimum: 0
anchor:
type: string
enum:
- none
- start
- both
utf8:
type: boolean
posix-syntax:
type: boolean
longest-match:
type: boolean
literal:
type: boolean
never-capture:
type: boolean
case-sensitive:
type: boolean
perl-classes:
type: boolean
word-boundary:
type: boolean
auth:
type: array
minItems: 1
......
......@@ -80,6 +80,9 @@ deploy-shard-by-url-kubectl: deploy-cafe-kubectl
deploy-shard-by-key-kubectl: deploy-cafe-kubectl
@kubectl apply -f shard-by-key.yaml
deploy-shard-conditions-kubectl: deploy-cafe-kubectl
@kubectl apply -f shard-conditions.yaml
# TESTOPTS are passed to varnishtest, e.g.: make TESTOPTS=-v verify
verify:
$(mkdir)/verify.sh
......@@ -146,6 +149,11 @@ undeploy-shard-by-key-kubectl:
$(MAKE) undeploy-cafe-kubectl
$(MAKE) wait
undeploy-shard-conditions-kubectl:
@kubectl delete -f shard-conditions.yaml
$(MAKE) undeploy-cafe-kubectl
$(MAKE) wait
ifeq ($(DEPLOY),kubectl)
deploy-self-sharding: deploy-self-sharding-kubectl
undeploy-self-sharding: undeploy-self-sharding-kubectl
......@@ -159,6 +167,8 @@ deploy-shard-by-key: deploy-shard-by-key-kubectl
undeploy-shard-by-key: undeploy-shard-by-key-kubectl
deploy-primary-only-by-clientid: deploy-primary-only-by-clientid-kubectl
undeploy-primary-only-by-clientid: undeploy-primary-only-by-clientid-kubectl
deploy-shard-conditions: deploy-shard-conditions-kubectl
undeploy-shard-conditions: undeploy-shard-conditions-kubectl
else
deploy-self-sharding: deploy-self-sharding-helm
undeploy-self-sharding: undeploy-self-sharding-helm
......@@ -192,9 +202,12 @@ undeploy: undeploy-shard-by-key
else ifeq ($(EXAMPLE),primary-only-by-clientid)
deploy: deploy-primary-only-by-clientid
undeploy: undeploy-primary-only-by-clientid
else ifeq ($(EXAMPLE),shard-conditions)
deploy: deploy-shard-conditions
undeploy: undeploy-shard-conditions
else
deploy undeploy:
$(error EXAMPLE must be set to self-sharding, primary-only[-by-clientid], or shard-by-[digest|url|key])
$(error EXAMPLE must be set to self-sharding, shard-conditions, primary-only[-by-clientid], or shard-by-[digest|url|key])
endif
.PHONY: all $(MAKECMDGOALS)
......@@ -19,11 +19,12 @@ spec:
# self-sharding: {}
#
self-sharding:
primaryOnly: true
key: client.identity
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
shard:
primaryOnly: true
key: client.identity
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
......@@ -19,10 +19,11 @@ spec:
# self-sharding: {}
#
self-sharding:
primaryOnly: true
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
shard:
primaryOnly: true
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
......@@ -19,10 +19,11 @@ spec:
# self-sharding: {}
#
self-sharding:
max-secondary-ttl: 2m
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
shard:
max-secondary-ttl: 2m
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
......@@ -19,6 +19,7 @@ spec:
# self-sharding: {}
#
self-sharding:
primaryOnly: true
key: req.http.Host
digest: SHA3_512
shard:
primaryOnly: true
key: req.http.Host
digest: SHA3_512
......@@ -19,5 +19,6 @@ spec:
# self-sharding: {}
#
self-sharding:
primaryOnly: true
key: req.http.Host
shard:
primaryOnly: true
key: req.http.Host
......@@ -19,5 +19,6 @@ spec:
# self-sharding: {}
#
self-sharding:
primaryOnly: true
key: req.url
shard:
primaryOnly: true
key: req.url
# Sample configuration for a self-sharding Varnish cluster
apiVersion: "ingress.varnish-cache.org/v1alpha1"
kind: VarnishConfig
metadata:
name: sharding-conditions-cfg
spec:
# The services array is required and must have at least one element.
# Lists the Service names of Varnish services in the same namespace
# to which this config is to be applied.
services:
- varnish-ingress-admin
# If the self-sharding object is present, then self-sharding will be
# implemented for the named service. All of its properties are
# optional, and defaults hold if they are left out. To just apply
# self-sharding with all default values, specify an empty object:
#
# self-sharding: {}
#
self-sharding:
conditions:
- comparand: req.url
compare: prefix
values:
- /foo/
shard:
primaryOnly: true
key: client.identity
probe:
timeout: 6s
interval: 6s
initial: 2
window: 4
threshold: 3
......@@ -61,11 +61,17 @@ type VarnishConfigSpec struct {
// SelfShardSpec specifies self-sharding in a Varnish cluster.
// see: https://code.uplex.de/uplex-varnish/k8s-ingress/blob/master/docs/self-sharding.md
type SelfShardSpec struct {
Key string `json:"key,omitempty"`
Digest string `json:"digest,omitempty"`
Max2ndTTL string `json:"max-secondary-ttl,omitempty"`
Probe ProbeSpec `json:"probe,omitempty"`
PrimaryOnly bool `json:"primaryOnly,omitempty"`
Conditions []ReqCondition `json:"conditions,omitempty"`
Sharding ShardSpec `json:"shard"`
}
// ShardSpec specifies the configuration details for sharding.
type ShardSpec struct {
Probe *ProbeSpec `json:"probe,omitempty"`
Key string `json:"key,omitempty"`
Digest string `json:"digest,omitempty"`
Max2ndTTL string `json:"max-secondary-ttl,omitempty"`
PrimaryOnly bool `json:"primaryOnly,omitempty"`
}
// ProbeSpec specifies health probes for self-sharding and BackendConfig.
......
......@@ -466,7 +466,14 @@ func (in *RewriteSpec) DeepCopy() *RewriteSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SelfShardSpec) DeepCopyInto(out *SelfShardSpec) {
*out = *in
in.Probe.DeepCopyInto(&out.Probe)
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]ReqCondition, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
in.Sharding.DeepCopyInto(&out.Sharding)
return
}
......@@ -480,6 +487,27 @@ func (in *SelfShardSpec) DeepCopy() *SelfShardSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ShardSpec) DeepCopyInto(out *ShardSpec) {
*out = *in
if in.Probe != nil {
in, out := &in.Probe, &out.Probe
*out = new(ProbeSpec)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShardSpec.
func (in *ShardSpec) DeepCopy() *ShardSpec {
if in == nil {
return nil
}
out := new(ShardSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TemplateConfig) DeepCopyInto(out *TemplateConfig) {
*out = *in
......
......@@ -506,37 +506,6 @@ func (worker *NamespaceWorker) ings2VCLSpec(
return vclSpec, bcfgs, update.MakeSuccess("")
}
func configComparison(cmp vcr_v1alpha1.CompareType) (vcl.CompareType, bool) {
switch cmp {
case vcr_v1alpha1.Equal:
return vcl.Equal, false
case vcr_v1alpha1.NotEqual:
return vcl.Equal, true
case vcr_v1alpha1.Match:
return vcl.Match, false
case vcr_v1alpha1.NotMatch:
return vcl.Match, true
case vcr_v1alpha1.Prefix:
return vcl.Prefix, false
case vcr_v1alpha1.NotPrefix:
return vcl.Prefix, true
case vcr_v1alpha1.Exists:
return vcl.Exists, false
case vcr_v1alpha1.NotExists:
return vcl.Exists, true
case vcr_v1alpha1.Greater:
return vcl.Greater, false
case vcr_v1alpha1.GreaterEqual:
return vcl.GreaterEqual, false
case vcr_v1alpha1.Less:
return vcl.Less, false
case vcr_v1alpha1.LessEqual:
return vcl.LessEqual, false
default:
return vcl.Equal, false
}
}
func configConditions(vclConds []vcl.MatchTerm,
vcfgConds []vcr_v1alpha1.Condition) {
......@@ -666,37 +635,6 @@ func (worker *NamespaceWorker) configACL(spec *vcl.Spec,
}
}
func configMatchFlags(flags vcr_v1alpha1.MatchFlagsType) vcl.MatchFlagsType {
vclFlags := vcl.MatchFlagsType{
UTF8: flags.UTF8,
PosixSyntax: flags.PosixSyntax,
LongestMatch: flags.LongestMatch,
Literal: flags.Literal,
NeverCapture: flags.NeverCapture,
PerlClasses: flags.PerlClasses,
WordBoundary: flags.WordBoundary,
}
if flags.MaxMem != nil && *flags.MaxMem != 0 {
vclFlags.MaxMem = *flags.MaxMem
}
if flags.CaseSensitive == nil {
vclFlags.CaseSensitive = true
} else {
vclFlags.CaseSensitive = *flags.CaseSensitive
}
switch flags.Anchor {
case vcr_v1alpha1.None:
vclFlags.Anchor = vcl.None
case vcr_v1alpha1.Start:
vclFlags.Anchor = vcl.Start
case vcr_v1alpha1.Both:
vclFlags.Anchor = vcl.Both
default:
vclFlags.Anchor = vcl.None
}
return vclFlags
}
func (worker *NamespaceWorker) configRewrites(spec *vcl.Spec,
vcfg *vcr_v1alpha1.VarnishConfig) error {
......@@ -835,7 +773,7 @@ func (worker *NamespaceWorker) configReqDisps(spec *vcl.Spec,
for i, disp := range reqDisps {
worker.log.Tracef("ReqDisposition: %+v", disp)
vclDisp := vcl.DispositionSpec{
Conditions: make([]vcl.Condition, len(disp.Conditions)),
Conditions: reqConds2vclConds(disp.Conditions),
}
for j, cond := range disp.Conditions {
vclCond := vcl.Condition{
......
......@@ -235,6 +235,95 @@ func validateReqDisps(reqDisps []vcr_v1alpha1.RequestDispSpec) error {
return nil
}
func configComparison(cmp vcr_v1alpha1.CompareType) (vcl.CompareType, bool) {
switch cmp {
case vcr_v1alpha1.Equal:
return vcl.Equal, false
case vcr_v1alpha1.NotEqual:
return vcl.Equal, true
case vcr_v1alpha1.Match:
return vcl.Match, false
case vcr_v1alpha1.NotMatch:
return vcl.Match, true
case vcr_v1alpha1.Prefix:
return vcl.Prefix, false
case vcr_v1alpha1.NotPrefix:
return vcl.Prefix, true
case vcr_v1alpha1.Exists:
return vcl.Exists, false
case vcr_v1alpha1.NotExists:
return vcl.Exists, true
case vcr_v1alpha1.Greater:
return vcl.Greater, false
case vcr_v1alpha1.GreaterEqual:
return vcl.GreaterEqual, false
case vcr_v1alpha1.Less:
return vcl.Less, false
case vcr_v1alpha1.LessEqual:
return vcl.LessEqual, false
default:
return vcl.Equal, false
}
}
func configMatchFlags(flags vcr_v1alpha1.MatchFlagsType) vcl.MatchFlagsType {
vclFlags := vcl.MatchFlagsType{
UTF8: flags.UTF8,
PosixSyntax: flags.PosixSyntax,
LongestMatch: flags.LongestMatch,
Literal: flags.Literal,
NeverCapture: flags.NeverCapture,
PerlClasses: flags.PerlClasses,
WordBoundary: flags.WordBoundary,
}
if flags.MaxMem != nil && *flags.MaxMem != 0 {
vclFlags.MaxMem = *flags.MaxMem
}
if flags.CaseSensitive == nil {
vclFlags.CaseSensitive = true
} else {
vclFlags.CaseSensitive = *flags.CaseSensitive
}
switch flags.Anchor {
case vcr_v1alpha1.None:
vclFlags.Anchor = vcl.None
case vcr_v1alpha1.Start:
vclFlags.Anchor = vcl.Start
case vcr_v1alpha1.Both:
vclFlags.Anchor = vcl.Both
default:
vclFlags.Anchor = vcl.None
}
return vclFlags
}
func reqConds2vclConds(reqConds []vcr_v1alpha1.ReqCondition) []vcl.Condition {
vclConds := make([]vcl.Condition, len(reqConds))
for i, reqCond := range reqConds {
vclCond := vcl.Condition{
Comparand: reqCond.Comparand,
}
if len(reqCond.Values) > 0 {
vclCond.Values = make([]string, len(reqCond.Values))
copy(vclCond.Values, reqCond.Values)
}
if reqCond.Count != nil {
count := uint(*reqCond.Count)
vclCond.Count = &count
}
vclCond.Compare, vclCond.Negate =
configComparison(reqCond.Compare)
if reqCond.MatchFlags != nil {
vclCond.MatchFlags = configMatchFlags(
*reqCond.MatchFlags)
} else {
vclCond.MatchFlags.CaseSensitive = true
}
vclConds[i] = vclCond
}
return vclConds
}
func getHashAlgo(digest string) vcl.HashAlgo {
switch digest {
case "CRC32":
......@@ -337,27 +426,27 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster.Nodes)
cfgSpec := vcfg.Spec.SelfSharding
probe := getVCLProbe(&cfgSpec.Probe)
spec.ShardCluster.Probe = *probe
spec.ShardCluster.PrimaryOnly = cfgSpec.PrimaryOnly
if cfgSpec.Max2ndTTL != "" {
spec.ShardCluster.MaxSecondaryTTL = cfgSpec.Max2ndTTL
} else {
spec.ShardCluster.MaxSecondaryTTL = defMax2ndTTL
}
spec.ShardCluster.Conditions = reqConds2vclConds(cfgSpec.Conditions)
spec.ShardCluster.Probe = getVCLProbe(cfgSpec.Sharding.Probe)
spec.ShardCluster.PrimaryOnly = cfgSpec.Sharding.PrimaryOnly
spec.ShardCluster.MaxSecondaryTTL = defMax2ndTTL
spec.ShardCluster.By = vcl.ByHash
if cfgSpec.Digest != "" && cfgSpec.Digest != "SHA256" {
if cfgSpec.Sharding.Max2ndTTL != "" {
spec.ShardCluster.MaxSecondaryTTL = cfgSpec.Sharding.Max2ndTTL
}
if cfgSpec.Sharding.Digest != "" &&
cfgSpec.Sharding.Digest != "SHA256" {
spec.ShardCluster.By = vcl.Blob
spec.ShardCluster.Algo = getHashAlgo(cfgSpec.Digest)
if cfgSpec.Key != "" {
spec.ShardCluster.Key = cfgSpec.Key
spec.ShardCluster.Algo = getHashAlgo(cfgSpec.Sharding.Digest)
if cfgSpec.Sharding.Key != "" {
spec.ShardCluster.Key = cfgSpec.Sharding.Key
} else {
spec.ShardCluster.Key = "req.url"
}
} else if cfgSpec.Key != "" {
spec.ShardCluster.Key = cfgSpec.Key
if cfgSpec.Key == "req.url" {
} else if cfgSpec.Sharding.Key != "" {
spec.ShardCluster.Key = cfgSpec.Sharding.Key
if cfgSpec.Sharding.Key == "req.url" {
spec.ShardCluster.By = vcl.URL
} else {
spec.ShardCluster.By = vcl.Key
......@@ -388,7 +477,7 @@ func (worker *NamespaceWorker) syncVcfg(key string) update.Status {
}
if vcfg.Spec.SelfSharding != nil {
if err = validateProbe(&vcfg.Spec.SelfSharding.Probe); err != nil {
if err = validateProbe(vcfg.Spec.SelfSharding.Sharding.Probe); err != nil {
return update.MakeFatal(
"VarnishConfig %s/%s invalid sharding spec: %v",
vcfg.Namespace, vcfg.Name, err)
......
......@@ -29,6 +29,7 @@
package vcl
import (
"fmt"
"regexp"
"text/template"
)
......@@ -39,11 +40,14 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"
"Connection: close";
{{- if .Probe }}
{{- if .Probe.Timeout}}
.timeout = {{.Probe.Timeout}};
{{- end}}
......@@ -59,6 +63,7 @@ probe vk8s_probe_varnish {
{{- if .Probe.Threshold}}
.threshold = {{.Probe.Threshold}};
{{- end}}
{{- end}}
}
{{range $node := .Nodes -}}
......@@ -88,19 +93,46 @@ sub vcl_init {
{{ if .PrimaryOnly -}}
new vk8s_cluster_primary = taskvar.backend();
{{- end }}
{{- range $cidx, $c := .Conditions -}}
{{- if condNeedsMatcher $c }}
new {{condMatcher $cidx}} = {{vmod $c.Compare}}.set({{flags $c}});
{{- range $val := $c.Values}}
{{condMatcher $cidx}}.add("{{$val}}");
{{- end -}}
{{end -}}
{{- end }}
}
sub vcl_recv {
{{ if .PrimaryOnly -}}
{{- digest_update 'c' . }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' .}}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else {{ end }}if (remote.ip ~ vk8s_cluster_acl) {
{{ if .Conditions -}}
if (
{{- range $cidx, $cond := .Conditions}}
{{- if ne $cidx 0}} &&
{{end}}
{{- if .Negate}}! {{end}}
{{- if condNeedsMatcher $cond}}
{{- condMatcher $cidx}}.{{match .Compare}}({{.Comparand}})
{{- else if exists .Compare}}
{{- .Comparand}}
{{- else}}
{{- .Comparand}} {{cmpRelation .Compare .Negate}} {{value $cond}}
{{- end}}
{{- end -}}
) {
{{ end -}}
{{- digest_update 'c' . }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' .}}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
{{ if .Conditions }}}
{{ end -}}
{{ end -}}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......@@ -123,7 +155,19 @@ sub vcl_recv {
sub vcl_backend_fetch {
{{- digest_update 'b' . }}
vk8s_cluster_param.set({{ key 'b' .}});
if (bereq.retries == 0
if (
{{- range $cidx, $cond := .Conditions}}
{{- if .Negate}}! {{end}}
{{- if condNeedsMatcher $cond}}
{{- condMatcher $cidx}}.{{match .Compare}}({{ctx 'b' .Comparand}})
{{- else if exists .Compare}}
{{- ctx 'b' .Comparand}}
{{- else}}
{{- ctx 'b' .Comparand}} {{cmpRelation .Compare .Negate}} {{value $cond}}
{{- end}}
&&
{{ end -}}
bereq.retries == 0
&& !bereq.uncacheable
&& remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster.backend(resolve=NOW) != server.identity) {
......@@ -223,6 +267,21 @@ var shardFuncMap = template.FuncMap{
addr := svc.Addresses[0]
return "vk8s_" + addr.PodNamespace + "_" + addr.PodName
},
"exists": func(cmp CompareType) bool { return cmp == Exists },
"match": func(cmp CompareType) string { return match(cmp) },
"value": func(cond Condition) string { return reqValue(cond) },
"vmod": func(cmp CompareType) string { return vmod(cmp) },
"flags": func(cond Condition) string { return reqFlags(cond) },
"cmpRelation": func(cmp CompareType, negate bool) string {
return cmpRelation(cmp, negate)
},
"condNeedsMatcher": func(cond Condition) bool {
return reqNeedsMatcher(cond)
},
"condMatcher": func(cidx int) string {
return fmt.Sprintf("vk8s_selfshard_cond_%d", cidx)
},
"ctx": context,
}
var shardTmpl = template.Must(template.New(selfShardName).Funcs(shardFuncMap).
......
......@@ -61,7 +61,7 @@ var varnishCluster = ShardCluster{
}},
},
},
Probe: Probe{
Probe: &Probe{
Timeout: "2s",
Interval: "5s",
Initial: "2",
......@@ -126,3 +126,14 @@ func TestShardByDigest(t *testing.T) {
varnishCluster.Algo = Sha3_512
templateTest(t, shardTmpl, varnishCluster, "shard_by_digest.golden")
}
func TestShardConditions(t *testing.T) {
varnishCluster.Conditions = []Condition{
{
Comparand: "req.url",
Compare: Prefix,
Values: []string{"/foo/"},
},
}
templateTest(t, shardTmpl, varnishCluster, "shard_conditions.golden")
}
......@@ -340,7 +340,8 @@ const (
// from the VarnishConfig Custom Resource.
type ShardCluster struct {
Nodes []Service
Probe Probe
Conditions []Condition
Probe *Probe
MaxSecondaryTTL string
Key string
By KeyBy
......@@ -352,14 +353,19 @@ func (shard ShardCluster) hash(hash hash.Hash) {
for _, node := range shard.Nodes {
node.hash(hash)
}
shard.Probe.hash(hash)
hash.Write([]byte(shard.MaxSecondaryTTL))
if shard.PrimaryOnly {
hash.Write([]byte{1})
for _, cond := range shard.Conditions {
cond.hash(hash)
}
if shard.Probe != nil {
shard.Probe.hash(hash)
}
hash.Write([]byte(shard.MaxSecondaryTTL))
hash.Write([]byte(shard.Key))
hash.Write([]byte{byte(shard.By)})
hash.Write([]byte{byte(shard.Algo)})
if shard.PrimaryOnly {
hash.Write([]byte{1})
}
}
// AuthStatus is the response code to be sent for authentication
......@@ -795,6 +801,26 @@ type Condition struct {
Negate bool
}
func (cond Condition) hash(hash hash.Hash) {
for _, val := range cond.Values {
hash.Write([]byte(val))
}
cond.MatchFlags.hash(hash)
if cond.Count != nil {
countBytes := make([]byte, 8)
binary.BigEndian.PutUint64(countBytes,
uint64(*cond.Count))
hash.Write(countBytes)
}
hash.Write([]byte(cond.Comparand))
hash.Write([]byte{byte(cond.Compare)})
if cond.Negate {
hash.Write([]byte{1})
} else {
hash.Write([]byte{0})
}
}
// RecvReturn is a name for the disposition of a client request.
// See: https://varnish-cache.org/docs/6.3/reference/states.html
type RecvReturn string
......@@ -839,23 +865,7 @@ type DispositionSpec struct {
func (reqDisp DispositionSpec) hash(hash hash.Hash) {
for _, cond := range reqDisp.Conditions {
for _, val := range cond.Values {
hash.Write([]byte(val))
}
cond.MatchFlags.hash(hash)
if cond.Count != nil {
countBytes := make([]byte, 8)
binary.BigEndian.PutUint64(countBytes,
uint64(*cond.Count))
hash.Write(countBytes)
}
hash.Write([]byte(cond.Comparand))
hash.Write([]byte{byte(cond.Compare)})
if cond.Negate {
hash.Write([]byte{1})
} else {
hash.Write([]byte{0})
}
cond.hash(hash)
}
hash.Write([]byte(reqDisp.Disposition.Action))
if reqDisp.Disposition.Action == RecvSynth {
......
......@@ -76,6 +76,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......
......@@ -4,6 +4,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......@@ -53,13 +55,13 @@ sub vcl_init {
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else if (remote.ip ~ vk8s_cluster_acl) {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......
......@@ -4,6 +4,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......
......@@ -4,6 +4,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......@@ -55,13 +57,13 @@ sub vcl_init {
sub vcl_recv {
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else if (remote.ip ~ vk8s_cluster_acl) {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......
......@@ -4,6 +4,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......@@ -53,13 +55,13 @@ sub vcl_init {
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=KEY, key=vk8s_cluster.key(req.http.Host)));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else if (remote.ip ~ vk8s_cluster_acl) {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=KEY, key=vk8s_cluster.key(req.http.Host)));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......
......@@ -4,6 +4,8 @@ import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
......@@ -53,13 +55,13 @@ sub vcl_init {
sub vcl_recv {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=URL));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else if (remote.ip ~ vk8s_cluster_acl) {
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=URL));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......
import std;
import directors;
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"
"Connection: close";
.timeout = 2s;
.interval = 5s;
.initial = 2;
.window = 8;
.threshold = 3;
}
backend vk8s_default_varnish-8445d4f7f-z2b9p {
.host = "172.17.0.12";
.port = "80";
.probe = vk8s_probe_varnish;
}
backend vk8s__ {
.host = "172.17.0.13";
.port = "80";
.probe = vk8s_probe_varnish;
}
backend vk8s_default_varnish-8445d4f7f-ldljf {
.host = "172.17.0.14";
.port = "80";
.probe = vk8s_probe_varnish;
}
acl vk8s_cluster_acl {
"172.17.0.12";
"172.17.0.13";
"172.17.0.14";
}
sub vcl_init {
new vk8s_cluster_param = directors.shard_param();
new vk8s_cluster = directors.shard();
vk8s_cluster.associate(vk8s_cluster_param.use());
vk8s_cluster.add_backend(vk8s_default_varnish-8445d4f7f-z2b9p);
vk8s_cluster.add_backend(vk8s__);
vk8s_cluster.add_backend(vk8s_default_varnish-8445d4f7f-ldljf);
vk8s_cluster.reconfigure();
new vk8s_shard_digest = blobdigest.digest(SHA3_512);
new vk8s_cluster_primary = taskvar.backend();
new vk8s_selfshard_cond_0 = selector.set(case_sensitive=false);
vk8s_selfshard_cond_0.add("/foo/");
}
sub vcl_recv {
if (vk8s_selfshard_cond_0.hasprefix(req.url)) {
vk8s_shard_digest.update(blob.decode(encoded=req.http.Host));
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW, by=BLOB, key_blob=vk8s_shard_digest.final()));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
}
return (synth(404));
}
# prevent deadlock for accidental cyclic requests
set req.hash_ignore_busy = true;
# if we're async, don't deliver stale
if (req.http.VK8S-Is-Bgfetch == "true") {
set req.grace = 0s;
}
return (hash);
}
}
sub vcl_backend_fetch {
vk8s_shard_digest.update(blob.decode(encoded=bereq.http.Host));
vk8s_cluster_param.set(by=BLOB, key_blob=vk8s_shard_digest.final());
if (vk8s_selfshard_cond_0.hasprefix(bereq.url)
&&
bereq.retries == 0
&& !bereq.uncacheable
&& remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster.backend(resolve=NOW) != server.identity) {
set bereq.backend = vk8s_cluster.backend(resolve=LAZY);
set bereq.http.VK8S-Is-Bgfetch = bereq.is_bgfetch;
return (fetch);
}
}
sub vcl_backend_response {
if (bereq.backend == vk8s_cluster.backend(resolve=LAZY)) {
if (beresp.http.VK8S-Cluster-TTL) {
set beresp.ttl = std.duration(
beresp.http.VK8S-Cluster-TTL + "s", 1s);
if (beresp.ttl > 5m) {
set beresp.ttl = 5m;
}
unset beresp.http.VK8S-Cluster-TTL;
}
else {
set beresp.uncacheable = true;
}
return (deliver);
}
}
sub vcl_backend_error {
if (bereq.backend == vk8s_cluster.backend(resolve=LAZY)) {
return (deliver);
}
}
sub vcl_deliver {
unset resp.http.VK8S-Cluster-TTL;
if (remote.ip ~ vk8s_cluster_acl && ! vk8s_cluster_primary.defined()) {
if (! obj.uncacheable) {
set resp.http.VK8S-Cluster-TTL = obj.ttl;
}
return (deliver);
}
}
......@@ -83,6 +83,9 @@ make EXAMPLE=shard-by-key deploy verify undeploy
echo Primary-only self-sharding by client.identity as key
make EXAMPLE=primary-only-by-clientid deploy verify undeploy
echo Self-sharding under conditions example
make EXAMPLE=shard-conditions deploy verify undeploy
echo Basic Authentication example
cd ${MYPATH}/../examples/authentication/
make EXAMPLE=basic-auth deploy verify undeploy
......
......@@ -427,11 +427,14 @@ spec:
import blob;
import blobdigest;
import taskvar;
import re2;
import selector;
probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"
"Connection: close";
{{- if .Probe }}
{{- if .Probe.Timeout}}
.timeout = {{.Probe.Timeout}};
{{- end}}
......@@ -447,6 +450,7 @@ spec:
{{- if .Probe.Threshold}}
.threshold = {{.Probe.Threshold}};
{{- end}}
{{- end}}
}
{{range $node := .Nodes -}}
......@@ -476,19 +480,45 @@ spec:
{{ if .PrimaryOnly -}}
new vk8s_cluster_primary = taskvar.backend();
{{- end }}
{{range $cidx, $c := .Conditions -}}
{{if condNeedsMatcher $c -}}
new {{condMatcher $cidx}} = {{vmod $c.Compare}}.set({{flags $c}});
{{- range $val := $c.Values}}
{{condMatcher $cidx}}.add("{{$val}}");
{{- end -}}
{{end -}}
{{end -}}
}
sub vcl_recv {
{{ if .PrimaryOnly -}}
{{- digest_update 'c' . }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' .}}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
else {{ end }}if (remote.ip ~ vk8s_cluster_acl) {
{{ if .Conditions -}}
if (
{{- range $cidx, $cond := .Conditions}}
{{- if ne $cidx 0}} &&
{{end}}
{{- if .Negate}}! {{end}}
{{- if condNeedsMatcher $cond}}
{{- condMatcher $cidx}}.{{match .Compare}}({{.Comparand}})
{{- else if exists .Compare}}
{{- .Comparand}}
{{- else}}
{{- .Comparand}} {{cmpRelation .Compare .Negate}} {{value $cond}}
{{- end}}
{{- end -}}
) {
{{ end -}}
{{- digest_update 'c' . }}
vk8s_cluster_primary.set(vk8s_cluster.backend(resolve=NOW
{{- key 'c' .}}));
if (remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster_primary.get() != server.identity) {
set req.backend_hint = vk8s_cluster_primary.get();
return (pipe);
}
{{ if .Conditions }}}{{ end -}}
{{ end -}}
if (remote.ip ~ vk8s_cluster_acl) {
if (req.http.Host == "vk8s_cluster") {
if (req.url == "/vk8s_cluster_health") {
return (synth(200));
......@@ -511,7 +541,19 @@ spec:
sub vcl_backend_fetch {
{{- digest_update 'b' . }}
vk8s_cluster_param.set({{ key 'b' .}});
if (bereq.retries == 0
if (
{{- range $cidx, $cond := .Conditions}}
{{- if .Negate}}! {{end}}
{{- if condNeedsMatcher $cond}}
{{- condMatcher $cidx}}.{{match .Compare}}({{.Comparand}})
{{- else if exists .Compare}}
{{- .Comparand}}
{{- else}}
{{- .Comparand}} {{cmpRelation .Compare .Negate}} {{value $cond}}
{{- end}}
&&
{{ end -}}
bereq.retries == 0
&& !bereq.uncacheable
&& remote.ip !~ vk8s_cluster_acl
&& "" + vk8s_cluster.backend(resolve=NOW) != server.identity) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment