Commit 1a78ad39 authored by Geoff Simmons's avatar Geoff Simmons

Add the key and digest fields for self-sharding to VarnishConfig-

parent c32047c2
......@@ -39,6 +39,24 @@ spec:
pattern: '^\d+(\.\d+)?(ms|[smhdwy])$'
primaryOnly:
type: boolean
key:
type: string
pattern: "^req\\.(url|http\\.[a-zA-Z0-9!#$%&'*+.^_`|~-]+)$"
digest:
type: string
enum:
- CRC32
- ICRC32
- MD5
- RS
- SHA1
- SHA224
- SHA256
- SHA384
- SHA512
- SHA3_224
- SHA3_256
- SHA3_512
probe:
type: object
properties:
......
......@@ -61,6 +61,8 @@ type VarnishConfigSpec struct {
// SelfShardSpec specifies self-sharding in a Varnish cluster.
// see: https://code.uplex.de/uplex-varnish/k8s-ingress/blob/master/docs/self-sharding.md
type SelfShardSpec struct {
Key string `json:"key,omitempty"`
Digest string `json:"digest,omitempty"`
Max2ndTTL string `json:"max-secondary-ttl,omitempty"`
Probe ProbeSpec `json:"probe,omitempty"`
PrimaryOnly bool `json:"primaryOnly,omitempty"`
......
......@@ -501,99 +501,6 @@ func (worker *NamespaceWorker) ings2VCLSpec(
return vclSpec, bcfgs, update.MakeSuccess("")
}
func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
vcfg *vcr_v1alpha1.VarnishConfig, svc *api_v1.Service) update.Status {
if vcfg.Spec.SelfSharding == nil {
worker.log.Tracef("No cluster shard configuration for Service "+
"%s/%s", svc.Namespace, svc.Name)
return update.MakeNoop("")
}
worker.log.Tracef("Set cluster shard configuration for Service %s/%s",
svc.Namespace, svc.Name)
endps, err := worker.getAllServiceEndpoints(svc)
if endps == nil || len(endps) == 0 || err != nil {
if endps == nil || len(endps) == 0 {
return update.MakeIncomplete(
"could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
}
return IncompleteIfNotFound(
err, "Error getting endpoints for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
worker.log.Tracef("Endpoints for shard configuration: %+v", endps)
var nAddrs int
var httpPort int32
for _, eps := range endps {
for _, endpSubset := range eps.Subsets {
nAddrs += len(endpSubset.Addresses)
nAddrs += len(endpSubset.NotReadyAddresses)
for _, port := range endpSubset.Ports {
if httpPort == 0 && port.Name == "http" {
httpPort = port.Port
}
}
}
}
if httpPort == 0 {
return update.MakeFatal(
"No http port found in the endpoints for service %s/%s",
svc.Namespace, svc.Name)
}
if nAddrs <= 1 {
return update.MakeFatal(
"Sharding requested, but %d endpoint addresses found "+
"for service %s/%s", nAddrs, svc.Namespace,
svc.Name)
}
pods, err := worker.getPods(svc)
if err != nil {
return IncompleteIfNotFound(err,
"Error getting pod information for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
if len(pods.Items) <= 1 {
return update.MakeFatal(
"Sharding requested, but %d pods found for service "+
"%s/%s", len(pods.Items), svc.Namespace,
svc.Name)
}
worker.log.Tracef("Pods for shard configuration: %+v", pods.Items)
// Populate spec.ShardCluster.Nodes with Pod names and the http endpoint
for _, pod := range pods.Items {
node := vcl.Service{Addresses: make([]vcl.Address, 1)}
if pod.Spec.Hostname != "" {
node.Name = pod.Spec.Hostname
} else {
node.Name = pod.Name
}
node.Addresses[0].IP = pod.Status.PodIP
node.Addresses[0].Port = httpPort
spec.ShardCluster.Nodes = append(spec.ShardCluster.Nodes, node)
}
worker.log.Tracef("Node configuration for self-sharding in Service "+
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster.Nodes)
cfgSpec := vcfg.Spec.SelfSharding
probe := getVCLProbe(&cfgSpec.Probe)
spec.ShardCluster.Probe = *probe
spec.ShardCluster.PrimaryOnly = cfgSpec.PrimaryOnly
if cfgSpec.Max2ndTTL != "" {
spec.ShardCluster.MaxSecondaryTTL = cfgSpec.Max2ndTTL
} else {
spec.ShardCluster.MaxSecondaryTTL = defMax2ndTTL
}
worker.log.Tracef("Spec configuration for self-sharding in Service "+
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster)
return update.MakeSuccess("")
}
func configComparison(cmp vcr_v1alpha1.CompareType) (vcl.CompareType, bool) {
switch cmp {
case vcr_v1alpha1.Equal:
......
......@@ -38,6 +38,7 @@ import (
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
......@@ -234,6 +235,147 @@ func validateReqDisps(reqDisps []vcr_v1alpha1.RequestDispSpec) error {
return nil
}
func getHashAlgo(digest string) vcl.HashAlgo {
switch digest {
case "CRC32":
return vcl.Crc32
case "ICRC32":
return vcl.ICrc32
case "MD5":
return vcl.MD5
case "RS":
return vcl.RS
case "SHA1":
return vcl.Sha1
case "SHA224":
return vcl.Sha224
case "SHA384":
return vcl.Sha384
case "SHA512":
return vcl.Sha512
case "SHA3_224":
return vcl.Sha3_224
case "SHA3_256":
return vcl.Sha3_256
case "SHA3_512":
return vcl.Sha3_512
default:
panic("Illegal digest enum")
}
}
func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
vcfg *vcr_v1alpha1.VarnishConfig, svc *api_v1.Service) update.Status {
if vcfg.Spec.SelfSharding == nil {
worker.log.Tracef("No cluster shard configuration for Service "+
"%s/%s", svc.Namespace, svc.Name)
return update.MakeNoop("")
}
worker.log.Tracef("Set cluster shard configuration for Service %s/%s",
svc.Namespace, svc.Name)
endps, err := worker.getAllServiceEndpoints(svc)
if endps == nil || len(endps) == 0 || err != nil {
if endps == nil || len(endps) == 0 {
return update.MakeIncomplete(
"could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
}
return IncompleteIfNotFound(
err, "Error getting endpoints for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
worker.log.Tracef("Endpoints for shard configuration: %+v", endps)
var nAddrs int
var httpPort int32
for _, eps := range endps {
for _, endpSubset := range eps.Subsets {
nAddrs += len(endpSubset.Addresses)
nAddrs += len(endpSubset.NotReadyAddresses)
for _, port := range endpSubset.Ports {
if httpPort == 0 && port.Name == "http" {
httpPort = port.Port
}
}
}
}
if httpPort == 0 {
return update.MakeFatal(
"No http port found in the endpoints for service %s/%s",
svc.Namespace, svc.Name)
}
if nAddrs <= 1 {
return update.MakeFatal(
"Sharding requested, but %d endpoint addresses found "+
"for service %s/%s", nAddrs, svc.Namespace,
svc.Name)
}
pods, err := worker.getPods(svc)
if err != nil {
return IncompleteIfNotFound(err,
"Error getting pod information for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
if len(pods.Items) <= 1 {
return update.MakeFatal(
"Sharding requested, but %d pods found for service "+
"%s/%s", len(pods.Items), svc.Namespace,
svc.Name)
}
worker.log.Tracef("Pods for shard configuration: %+v", pods.Items)
// Populate spec.ShardCluster.Nodes with Pod names and the http endpoint
for _, pod := range pods.Items {
node := vcl.Service{Addresses: make([]vcl.Address, 1)}
if pod.Spec.Hostname != "" {
node.Name = pod.Spec.Hostname
} else {
node.Name = pod.Name
}
node.Addresses[0].IP = pod.Status.PodIP
node.Addresses[0].Port = httpPort
spec.ShardCluster.Nodes = append(spec.ShardCluster.Nodes, node)
}
worker.log.Tracef("Node configuration for self-sharding in Service "+
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster.Nodes)
cfgSpec := vcfg.Spec.SelfSharding
probe := getVCLProbe(&cfgSpec.Probe)
spec.ShardCluster.Probe = *probe
spec.ShardCluster.PrimaryOnly = cfgSpec.PrimaryOnly
if cfgSpec.Max2ndTTL != "" {
spec.ShardCluster.MaxSecondaryTTL = cfgSpec.Max2ndTTL
} else {
spec.ShardCluster.MaxSecondaryTTL = defMax2ndTTL
}
spec.ShardCluster.By = vcl.ByHash
if cfgSpec.Digest != "" && cfgSpec.Digest != "SHA256" {
spec.ShardCluster.By = vcl.Blob
spec.ShardCluster.Algo = getHashAlgo(cfgSpec.Digest)
if cfgSpec.Key != "" {
spec.ShardCluster.Key = cfgSpec.Key
} else {
spec.ShardCluster.Key = "req.url"
}
} else if cfgSpec.Key != "" {
spec.ShardCluster.Key = cfgSpec.Key
if cfgSpec.Key == "req.url" {
spec.ShardCluster.By = vcl.URL
} else {
spec.ShardCluster.By = vcl.Key
}
}
worker.log.Tracef("Spec configuration for self-sharding in Service "+
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster)
return update.MakeSuccess("")
}
func (worker *NamespaceWorker) syncVcfg(key string) update.Status {
worker.log.Infof("Syncing VarnishConfig: %s/%s", worker.namespace, key)
vcfg, err := worker.vcfg.Get(key)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment