Commit be563360 authored by Geoff Simmons's avatar Geoff Simmons

Self-sharding Varnish cluster is configurable.

parent 93910c64
...@@ -62,7 +62,15 @@ const ( ...@@ -62,7 +62,15 @@ const (
admSecretKey = "admin" admSecretKey = "admin"
admSvcName = "varnish-ingress-admin" admSvcName = "varnish-ingress-admin"
admPortName = "varnishadm" admPortName = "varnishadm"
selfShardKey = "custom.varnish-cache.org/self-sharding"
annotationPrefix = "ingress.varnish-cache.org/"
selfShardKey = "self-sharding"
shardProbeTimeoutKey = "self-sharding-probe-timeout"
shardProbeIntervalKey = "self-sharding-probe-interval"
shardProbeInitialKey = "self-sharding-probe-initial"
shardProbeWindowKey = "self-sharding-probe-window"
shardProbeThresholdKey = "self-sharding-probe-threshold"
shardMax2ndTTL = "self-sharding-max-secondary-ttl"
// resyncPeriod = 30 * time.Second // resyncPeriod = 30 * time.Second
) )
...@@ -432,11 +440,12 @@ func (ingc *IngressController) Stop() { ...@@ -432,11 +440,12 @@ func (ingc *IngressController) Stop() {
func (ingc *IngressController) configSharding(spec *vcl.Spec, func (ingc *IngressController) configSharding(spec *vcl.Spec,
ing *extensions.Ingress) error { ing *extensions.Ingress) error {
ann, exists := ing.Annotations[selfShardKey] ann, exists := ing.Annotations[annotationPrefix+selfShardKey]
if !exists { if !exists ||
return nil (!strings.EqualFold(ann, "on") &&
} !strings.EqualFold(ann, "true")) {
if !strings.EqualFold(ann, "on") && !strings.EqualFold(ann, "true") { ingc.log.Debugf("No cluster shard configuration for Ingress "+
"%s/%s", ing.Namespace, ing.Name)
return nil return nil
} }
...@@ -475,7 +484,7 @@ func (ingc *IngressController) configSharding(spec *vcl.Spec, ...@@ -475,7 +484,7 @@ func (ingc *IngressController) configSharding(spec *vcl.Spec,
ingc.log.Debug("Pods for shard configuration:", pods.Items) ingc.log.Debug("Pods for shard configuration:", pods.Items)
// Populate spec.ClusterNodes with Pod names and the http endpoint // Populate spec.ShardCluster.Nodes with Pod names and the http endpoint
for _, pod := range pods.Items { for _, pod := range pods.Items {
var varnishCntnr api_v1.Container var varnishCntnr api_v1.Container
var httpPort int32 var httpPort int32
...@@ -507,10 +516,38 @@ func (ingc *IngressController) configSharding(spec *vcl.Spec, ...@@ -507,10 +516,38 @@ func (ingc *IngressController) configSharding(spec *vcl.Spec,
} }
node.Addresses[0].IP = pod.Status.PodIP node.Addresses[0].IP = pod.Status.PodIP
node.Addresses[0].Port = httpPort node.Addresses[0].Port = httpPort
spec.ClusterNodes = append(spec.ClusterNodes, node) spec.ShardCluster.Nodes = append(spec.ShardCluster.Nodes, node)
}
ingc.log.Debugf("Node configuration for self-sharding in Ingress "+
"%s/%s: %+v", ing.Namespace, ing.Name, spec.ShardCluster.Nodes)
anns := ing.Annotations
ann, exists = anns[annotationPrefix+shardProbeTimeoutKey]
if exists {
spec.ShardCluster.Probe.Timeout = ann
}
ann, exists = anns[annotationPrefix+shardProbeIntervalKey]
if exists {
spec.ShardCluster.Probe.Interval = ann
}
ann, exists = anns[annotationPrefix+shardProbeInitialKey]
if exists {
spec.ShardCluster.Probe.Initial = ann
}
ann, exists = anns[annotationPrefix+shardProbeWindowKey]
if exists {
spec.ShardCluster.Probe.Window = ann
}
ann, exists = anns[annotationPrefix+shardProbeThresholdKey]
if exists {
spec.ShardCluster.Probe.Threshold = ann
}
ann, exists = anns[annotationPrefix+shardMax2ndTTL]
if exists {
spec.ShardCluster.MaxSecondaryTTL = ann
} }
ingc.log.Debugf("Spec configuration for self-sharding in Ingress "+ ingc.log.Debugf("Spec configuration for self-sharding in Ingress "+
"%s/%s: %+v", ing.Namespace, ing.Name, spec.ClusterNodes) "%s/%s: %+v", ing.Namespace, ing.Name, spec.ShardCluster)
return nil return nil
} }
......
...@@ -5,11 +5,24 @@ import directors; ...@@ -5,11 +5,24 @@ import directors;
probe vk8s_probe_varnish { probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1" .request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"; "Host: vk8s_cluster";
.timeout = 5s; {{- if .Probe.Timeout}}
.interval = 5s; .timeout = {{.Probe.Timeout}};
{{- end}}
{{- if .Probe.Interval}}
.interval = {{.Probe.Interval}};
{{- end}}
{{- if .Probe.Initial}}
.initial = {{.Probe.Initial}};
{{- end}}
{{- if .Probe.Window}}
.window = {{.Probe.Window}};
{{- end}}
{{- if .Probe.Threshold}}
.threshold = {{.Probe.Threshold}};
{{- end}}
} }
{{range $node := .ClusterNodes -}} {{range $node := .Nodes -}}
backend {{$node.Name}} { backend {{$node.Name}} {
.host = "{{(index $node.Addresses 0).IP}}"; .host = "{{(index $node.Addresses 0).IP}}";
.port = "{{(index $node.Addresses 0).Port}}"; .port = "{{(index $node.Addresses 0).Port}}";
...@@ -19,14 +32,14 @@ backend {{$node.Name}} { ...@@ -19,14 +32,14 @@ backend {{$node.Name}} {
{{end -}} {{end -}}
acl vk8s_cluster_acl { acl vk8s_cluster_acl {
{{- range $node := .ClusterNodes}} {{- range $node := .Nodes}}
"{{(index $node.Addresses 0).IP}}"; "{{(index $node.Addresses 0).IP}}";
{{- end}} {{- end}}
} }
sub vcl_init { sub vcl_init {
new vk8s_cluster = directors.shard(); new vk8s_cluster = directors.shard();
{{range $node := .ClusterNodes -}} {{range $node := .Nodes -}}
vk8s_cluster.add_backend({{$node.Name}}); vk8s_cluster.add_backend({{$node.Name}});
{{end -}} {{end -}}
vk8s_cluster.reconfigure(); vk8s_cluster.reconfigure();
...@@ -69,10 +82,8 @@ sub vcl_backend_response { ...@@ -69,10 +82,8 @@ sub vcl_backend_response {
if (beresp.http.VK8S-Cluster-TTL) { if (beresp.http.VK8S-Cluster-TTL) {
set beresp.ttl = std.duration( set beresp.ttl = std.duration(
beresp.http.VK8S-Cluster-TTL + "s", 1s); beresp.http.VK8S-Cluster-TTL + "s", 1s);
if (beresp.ttl > {{.MaxSecondaryTTL}}) {
{{- /* XXX make this TTL configurable */}} set beresp.ttl = {{.MaxSecondaryTTL}};
if (beresp.ttl > 5m) {
set beresp.ttl = 5m;
} }
unset beresp.http.VK8S-Cluster-TTL; unset beresp.http.VK8S-Cluster-TTL;
} }
......
...@@ -90,8 +90,11 @@ import directors; ...@@ -90,8 +90,11 @@ import directors;
probe vk8s_probe_varnish { probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1" .request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"; "Host: vk8s_cluster";
.timeout = 5s; .timeout = 2s;
.interval = 5s; .interval = 5s;
.initial = 2;
.window = 8;
.threshold = 3;
} }
backend varnish-8445d4f7f-z2b9p { backend varnish-8445d4f7f-z2b9p {
......
...@@ -5,8 +5,11 @@ import directors; ...@@ -5,8 +5,11 @@ import directors;
probe vk8s_probe_varnish { probe vk8s_probe_varnish {
.request = "HEAD /vk8s_cluster_health HTTP/1.1" .request = "HEAD /vk8s_cluster_health HTTP/1.1"
"Host: vk8s_cluster"; "Host: vk8s_cluster";
.timeout = 5s; .timeout = 2s;
.interval = 5s; .interval = 5s;
.initial = 2;
.window = 8;
.threshold = 3;
} }
backend varnish-8445d4f7f-z2b9p { backend varnish-8445d4f7f-z2b9p {
......
...@@ -109,11 +109,41 @@ func (a ByHost) Len() int { return len(a) } ...@@ -109,11 +109,41 @@ func (a ByHost) Len() int { return len(a) }
func (a ByHost) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByHost) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByHost) Less(i, j int) bool { return a[i].Host < a[j].Host } func (a ByHost) Less(i, j int) bool { return a[i].Host < a[j].Host }
type Probe struct {
Timeout string
Interval string
Initial string
Window string
Threshold string
}
func (probe Probe) hash(hash hash.Hash) {
hash.Write([]byte(probe.Timeout))
hash.Write([]byte(probe.Interval))
hash.Write([]byte(probe.Initial))
hash.Write([]byte(probe.Window))
hash.Write([]byte(probe.Threshold))
}
type ShardCluster struct {
Nodes []Service
Probe Probe
MaxSecondaryTTL string
}
func (shard ShardCluster) hash(hash hash.Hash) {
for _, node := range shard.Nodes {
node.hash(hash)
}
shard.Probe.hash(hash)
hash.Write([]byte(shard.MaxSecondaryTTL))
}
type Spec struct { type Spec struct {
DefaultService Service DefaultService Service
Rules []Rule Rules []Rule
AllServices map[string]Service AllServices map[string]Service
ClusterNodes []Service ShardCluster ShardCluster
} }
func (spec Spec) DeepHash() uint64 { func (spec Spec) DeepHash() uint64 {
...@@ -133,9 +163,7 @@ func (spec Spec) DeepHash() uint64 { ...@@ -133,9 +163,7 @@ func (spec Spec) DeepHash() uint64 {
hash.Write([]byte(svc)) hash.Write([]byte(svc))
spec.AllServices[svc].hash(hash) spec.AllServices[svc].hash(hash)
} }
for _, node := range spec.ClusterNodes { spec.ShardCluster.hash(hash)
node.hash(hash)
}
return hash.Sum64() return hash.Sum64()
} }
...@@ -144,7 +172,7 @@ func (spec Spec) Canonical() Spec { ...@@ -144,7 +172,7 @@ func (spec Spec) Canonical() Spec {
DefaultService: Service{Name: spec.DefaultService.Name}, DefaultService: Service{Name: spec.DefaultService.Name},
Rules: make([]Rule, len(spec.Rules)), Rules: make([]Rule, len(spec.Rules)),
AllServices: make(map[string]Service, len(spec.AllServices)), AllServices: make(map[string]Service, len(spec.AllServices)),
ClusterNodes: make([]Service, len(spec.ClusterNodes)), ShardCluster: spec.ShardCluster,
} }
copy(canon.DefaultService.Addresses, spec.DefaultService.Addresses) copy(canon.DefaultService.Addresses, spec.DefaultService.Addresses)
sort.Stable(ByIPPort(canon.DefaultService.Addresses)) sort.Stable(ByIPPort(canon.DefaultService.Addresses))
...@@ -159,9 +187,8 @@ func (spec Spec) Canonical() Spec { ...@@ -159,9 +187,8 @@ func (spec Spec) Canonical() Spec {
canon.AllServices[name] = svcs canon.AllServices[name] = svcs
sort.Stable(ByIPPort(canon.AllServices[name].Addresses)) sort.Stable(ByIPPort(canon.AllServices[name].Addresses))
} }
copy(canon.ClusterNodes, spec.ClusterNodes) sort.Stable(ByName(canon.ShardCluster.Nodes))
sort.Stable(ByName(canon.ClusterNodes)) for _, node := range canon.ShardCluster.Nodes {
for _, node := range canon.ClusterNodes {
sort.Stable(ByIPPort(node.Addresses)) sort.Stable(ByIPPort(node.Addresses))
} }
return canon return canon
...@@ -208,8 +235,8 @@ func (spec Spec) GetSrc() (string, error) { ...@@ -208,8 +235,8 @@ func (spec Spec) GetSrc() (string, error) {
if err := IngressTmpl.Execute(&buf, spec); err != nil { if err := IngressTmpl.Execute(&buf, spec); err != nil {
return "", err return "", err
} }
if len(spec.ClusterNodes) > 0 { if len(spec.ShardCluster.Nodes) > 0 {
if err := ShardTmpl.Execute(&buf, spec); err != nil { if err := ShardTmpl.Execute(&buf, spec.ShardCluster); err != nil {
return "", err return "", err
} }
} }
......
...@@ -106,6 +106,9 @@ func TestIngressTemplate(t *testing.T) { ...@@ -106,6 +106,9 @@ func TestIngressTemplate(t *testing.T) {
if !ok { if !ok {
t.Errorf("Generated VCL for IngressSpec does not match gold "+ t.Errorf("Generated VCL for IngressSpec does not match gold "+
"file: %s", gold) "file: %s", gold)
if testing.Verbose() {
t.Logf("Generated: %s", buf.String())
}
} }
} }
...@@ -221,25 +224,29 @@ func TestCanoncial(t *testing.T) { ...@@ -221,25 +224,29 @@ func TestCanoncial(t *testing.T) {
} }
} }
var varnishCluster = []Service{ var varnishCluster = ShardCluster{
Service{ Nodes: []Service{
Name: "varnish-8445d4f7f-z2b9p", Service{
Addresses: []Address{ Name: "varnish-8445d4f7f-z2b9p",
{"172.17.0.12", 80}, Addresses: []Address{{"172.17.0.12", 80}},
}, },
}, Service{
Service{ Name: "varnish-8445d4f7f-k22dn",
Name: "varnish-8445d4f7f-k22dn", Addresses: []Address{{"172.17.0.13", 80}},
Addresses: []Address{
{"172.17.0.13", 80},
}, },
}, Service{
Service{ Name: "varnish-8445d4f7f-ldljf",
Name: "varnish-8445d4f7f-ldljf", Addresses: []Address{{"172.17.0.14", 80}},
Addresses: []Address{
{"172.17.0.14", 80},
}, },
}, },
Probe: Probe{
Timeout: "2s",
Interval: "5s",
Initial: "2",
Window: "8",
Threshold: "3",
},
MaxSecondaryTTL: "5m",
} }
func TestShardTemplate(t *testing.T) { func TestShardTemplate(t *testing.T) {
...@@ -252,8 +259,7 @@ func TestShardTemplate(t *testing.T) { ...@@ -252,8 +259,7 @@ func TestShardTemplate(t *testing.T) {
t.Error("Cannot parse shard template:", err) t.Error("Cannot parse shard template:", err)
return return
} }
cafeSpec.ClusterNodes = varnishCluster if err := tmpl.Execute(&buf, varnishCluster); err != nil {
if err := tmpl.Execute(&buf, cafeSpec); err != nil {
t.Error("cluster template Execute():", err) t.Error("cluster template Execute():", err)
return return
} }
...@@ -264,12 +270,15 @@ func TestShardTemplate(t *testing.T) { ...@@ -264,12 +270,15 @@ func TestShardTemplate(t *testing.T) {
if !ok { if !ok {
t.Errorf("Generated VCL for self-sharding does not match gold "+ t.Errorf("Generated VCL for self-sharding does not match gold "+
"file: %s", gold) "file: %s", gold)
if testing.Verbose() {
t.Logf("Generated: %s", buf.String())
}
} }
} }
func TestGetSrc(t *testing.T) { func TestGetSrc(t *testing.T) {
gold := "ingress_shard.golden" gold := "ingress_shard.golden"
cafeSpec.ClusterNodes = varnishCluster cafeSpec.ShardCluster = varnishCluster
src, err := cafeSpec.GetSrc() src, err := cafeSpec.GetSrc()
if err != nil { if err != nil {
t.Error("Spec.GetSrc():", err) t.Error("Spec.GetSrc():", err)
...@@ -282,5 +291,8 @@ func TestGetSrc(t *testing.T) { ...@@ -282,5 +291,8 @@ func TestGetSrc(t *testing.T) {
if !ok { if !ok {
t.Errorf("Generated VCL from GetSrc() does not match gold "+ t.Errorf("Generated VCL from GetSrc() does not match gold "+
"file: %s", gold) "file: %s", gold)
if testing.Verbose() {
t.Logf("Generated: %s", src)
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment