Commit d5c1528b authored by Geoff Simmons's avatar Geoff Simmons

Refactor the return status for sync operations in the main loop.

Previously all sync operations (add/update/delete for the resource
types that the controller watches, and the cluster changes brought
about for them if necessary) were only characterized by a Go error
variable. The only conditions that mattered were nil or not-nil.

On a nil return, success was logged, and a SyncSuccess event was
generated for the resource that was synced. But this was done even
if no change was required in the cluster, and the resource had
nothing to do with Ingress or the viking application. This led
to many superfluous Events.

On a non-nil return, a warning Event was generated the sync
operation was re-queued, using the workqueue's rate-limiting delay.
This was done regardless of the type of error. Since the initial delay
is quite rapid (subsequent re-queues begin to back off the delay), it
led to many re-queues, and many Events. But it is very common that a
brief delay is predictable, when not all necessary information is
available to the controller, so that the rapid retries just
generated a lot of noise. In other cases, retries will not improve
the situation -- an invalid config, for example, will still be
invalid on the next attempt.

This commit introduces pkg/update and the type Status, which classifies
the result of a sync operation. All of the sync methods now return
an object of this type, which in turn determines how the controller
handles errors, logs results, and generates Events. The Status types
are:

Success: a cluster change was necessary and was executed successfully.
The result is logged, and an Event with Reason SyncSuccess is generated
(as before).

Noop: no cluster change was necessary. The result is logged, but no
Event is generated. This reduces Event generation considerably.

Fatal: an unrecoverable error (retries won't help). The result is
logged, a SyncFatalError warning Event is generated, but no retries
are attempted.

Recoverable: an error that might do better on retry. The result is
logged, a SyncRecoverableError is generated, and the operation is
re-queued with the rate-limiting delay (as before).

Incomplete: a cluster change is necessary, but some information is
missing. The result is logged, a SyncIncomplete warning Event is
generated, and the operation is re-queued with a delay. The delay
is currently hard-wired to 5s, but will be made configurable.

We'll probably tweak some of the decisions about which status types
are chosen for which results. But this has already improved the
controller's error handling, and has considerably reduced its
verbosity, with respect to both logging and event generation.
parent 0f1ea9c5
...@@ -69,6 +69,7 @@ check: build ...@@ -69,6 +69,7 @@ check: build
golint ./pkg/interfaces/... golint ./pkg/interfaces/...
golint ./pkg/varnish/... golint ./pkg/varnish/...
golint ./pkg/haproxy/... golint ./pkg/haproxy/...
golint ./pkg/update/...
golint ./pkg/apis/varnishingress/v1alpha1/... golint ./pkg/apis/varnishingress/v1alpha1/...
golint ./cmd/... golint ./cmd/...
go test -v ./pkg/controller/... ./pkg/interfaces/... ./pkg/varnish/... go test -v ./pkg/controller/... ./pkg/interfaces/... ./pkg/varnish/...
......
...@@ -29,27 +29,27 @@ ...@@ -29,27 +29,27 @@
package controller package controller
import ( import (
"fmt"
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1" vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
extensions "k8s.io/api/extensions/v1beta1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
) )
func (worker *NamespaceWorker) enqueueIngsForBackendSvcs(svcs []string, func (worker *NamespaceWorker) enqueueIngsForBackendSvcs(svcs []string,
namespace, name string) error { namespace, name string) update.Status {
svc2ing := make(map[string]*extensions.Ingress) svc2ing := make(map[string]*extensions.Ingress)
ings, err := worker.ing.List(labels.Everything()) ings, err := worker.ing.List(labels.Everything())
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
worker.log.Infof("BackendConfig %s/%s: no Ingresses found in "+ return update.MakeNoop(
"workspace %s", namespace, name, worker.namespace) "BackendConfig %s/%s: no Ingresses found in workspace %s",
return nil namespace, name, worker.namespace)
} }
if err != nil { if err != nil {
return err return update.MakeRecoverable("%v", err)
} }
for _, ing := range ings { for _, ing := range ings {
if ing.Spec.Backend != nil { if ing.Spec.Backend != nil {
...@@ -79,29 +79,30 @@ func (worker *NamespaceWorker) enqueueIngsForBackendSvcs(svcs []string, ...@@ -79,29 +79,30 @@ func (worker *NamespaceWorker) enqueueIngsForBackendSvcs(svcs []string,
worker.queue.Add(&SyncObj{Type: Update, Obj: ing}) worker.queue.Add(&SyncObj{Type: Update, Obj: ing})
} }
} }
return nil return update.MakeSuccess(
"BackendConfig %s/%s: re-queued Ingress(es)", namespace, name)
} }
func (worker *NamespaceWorker) syncBcfg(key string) error { func (worker *NamespaceWorker) syncBcfg(key string) update.Status {
worker.log.Infof("Syncing BackendConfig: %s/%s", worker.namespace, key) worker.log.Infof("Syncing BackendConfig: %s/%s", worker.namespace, key)
bcfg, err := worker.bcfg.Get(key) bcfg, err := worker.bcfg.Get(key)
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
worker.log.Tracef("BackendConfig %s/%s: %+v", bcfg.Namespace, worker.log.Tracef("BackendConfig %s/%s: %+v", bcfg.Namespace,
bcfg.Name, bcfg) bcfg.Name, bcfg)
if len(bcfg.Spec.Services) == 0 { if len(bcfg.Spec.Services) == 0 {
// CRD validation should prevent this. // CRD validation should prevent this.
worker.log.Warnf("BackendConfig %s/%s: no services defined, "+
"ignoring", bcfg.Namespace, bcfg.Name)
syncCounters.WithLabelValues(worker.namespace, "BackendConfig", syncCounters.WithLabelValues(worker.namespace, "BackendConfig",
"Ignore").Inc() "Ignore").Inc()
return nil return update.MakeNoop(
"BackendConfig %s/%s: no services defined, ignoring",
bcfg.Namespace, bcfg.Name)
} }
if err = validateProbe(bcfg.Spec.Probe); err != nil { if err = validateProbe(bcfg.Spec.Probe); err != nil {
return fmt.Errorf("BackendConfig %s/%s invalid probe "+ return update.MakeFatal("BackendConfig %s/%s invalid probe "+
"spec: %v", bcfg.Namespace, bcfg.Name, err) "spec: %v", bcfg.Namespace, bcfg.Name, err)
} }
...@@ -111,19 +112,19 @@ func (worker *NamespaceWorker) syncBcfg(key string) error { ...@@ -111,19 +112,19 @@ func (worker *NamespaceWorker) syncBcfg(key string) error {
bcfg.Namespace, bcfg.Name) bcfg.Namespace, bcfg.Name)
} }
func (worker *NamespaceWorker) addBcfg(key string) error { func (worker *NamespaceWorker) addBcfg(key string) update.Status {
return worker.syncBcfg(key) return worker.syncBcfg(key)
} }
func (worker *NamespaceWorker) updateBcfg(key string) error { func (worker *NamespaceWorker) updateBcfg(key string) update.Status {
return worker.syncBcfg(key) return worker.syncBcfg(key)
} }
func (worker *NamespaceWorker) deleteBcfg(obj interface{}) error { func (worker *NamespaceWorker) deleteBcfg(obj interface{}) update.Status {
bcfg, ok := obj.(*vcr_v1alpha1.BackendConfig) bcfg, ok := obj.(*vcr_v1alpha1.BackendConfig)
if !ok || bcfg == nil { if !ok || bcfg == nil {
worker.log.Warnf("Delete BackendConfig: not found: %v", obj) return update.MakeNoop("Delete BackendConfig: not found: %v",
return nil obj)
} }
worker.log.Infof("Deleting BackendConfig: %s/%s", bcfg.Namespace, worker.log.Infof("Deleting BackendConfig: %s/%s", bcfg.Namespace,
bcfg.Name) bcfg.Name)
......
...@@ -33,39 +33,39 @@ import ( ...@@ -33,39 +33,39 @@ import (
api_v1 "k8s.io/api/core/v1" api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
) )
func (worker *NamespaceWorker) syncEndp(key string) error { func (worker *NamespaceWorker) syncEndp(key string) update.Status {
worker.log.Infof("Syncing Endpoints: %s/%s", worker.namespace, key) worker.log.Infof("Syncing Endpoints: %s/%s", worker.namespace, key)
svc, err := worker.svc.Get(key) svc, err := worker.svc.Get(key)
if err != nil { if err != nil {
worker.log.Warnf("Cannot get service for endpoints %s/%s, "+
"ignoring", worker.namespace, key)
syncCounters.WithLabelValues(worker.namespace, "Endpoints", syncCounters.WithLabelValues(worker.namespace, "Endpoints",
"Ignore").Inc() "Ignore").Inc()
return nil return update.MakeNoop(
"Cannot get service for endpoints %s/%s, ignoring",
worker.namespace, key)
} }
if worker.isVarnishIngSvc(svc) { if worker.isVarnishIngSvc(svc) {
worker.log.Infof("Endpoints changed for Varnish Ingress "+ worker.queue.Add(&SyncObj{Type: Update, Obj: svc})
return update.MakeNoop("Endpoints changed for Varnish Ingress "+
"service %s/%s, enqueuing service sync", svc.Namespace, "service %s/%s, enqueuing service sync", svc.Namespace,
svc.Name) svc.Name)
worker.queue.Add(&SyncObj{Type: Update, Obj: svc})
return nil
} }
worker.log.Tracef("Checking ingresses for endpoints: %s/%s", worker.log.Tracef("Checking ingresses for endpoints: %s/%s",
worker.namespace, key) worker.namespace, key)
ings, err := worker.getIngsForSvc(svc) ings, status := worker.getIngsForSvc(svc)
if err != nil { if status.Type != update.Success {
return err return status
} }
if len(ings) == 0 { if len(ings) == 0 {
worker.log.Tracef("No ingresses for endpoints: %s/%s",
worker.namespace, key)
syncCounters.WithLabelValues(worker.namespace, "Endpoints", syncCounters.WithLabelValues(worker.namespace, "Endpoints",
"Ignore").Inc() "Ignore").Inc()
return nil return update.MakeNoop("No ingresses for endpoints: %s/%s",
worker.namespace, key)
} }
worker.log.Tracef("Update ingresses for endpoints %s", key) worker.log.Tracef("Update ingresses for endpoints %s", key)
...@@ -75,27 +75,25 @@ func (worker *NamespaceWorker) syncEndp(key string) error { ...@@ -75,27 +75,25 @@ func (worker *NamespaceWorker) syncEndp(key string) error {
ing.Namespace, ing.Name) ing.Namespace, ing.Name)
continue continue
} }
err = worker.addOrUpdateIng(ing) if status = worker.addOrUpdateIng(ing); status.IsError() {
if err != nil { return status
return err
} }
} }
return nil return update.MakeSuccess("")
} }
func (worker *NamespaceWorker) addEndp(key string) error { func (worker *NamespaceWorker) addEndp(key string) update.Status {
return worker.syncEndp(key) return worker.syncEndp(key)
} }
func (worker *NamespaceWorker) updateEndp(key string) error { func (worker *NamespaceWorker) updateEndp(key string) update.Status {
return worker.syncEndp(key) return worker.syncEndp(key)
} }
func (worker *NamespaceWorker) deleteEndp(obj interface{}) error { func (worker *NamespaceWorker) deleteEndp(obj interface{}) update.Status {
endp, ok := obj.(*api_v1.Endpoints) endp, ok := obj.(*api_v1.Endpoints)
if !ok || endp == nil { if !ok || endp == nil {
worker.log.Warnf("Delete Endpoints: not found: %v", obj) return update.MakeNoop("Delete Endpoints: not found: %v", obj)
return nil
} }
return worker.syncEndp(endp.Name) return worker.syncEndp(endp.Name)
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -43,9 +43,9 @@ import ( ...@@ -43,9 +43,9 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
) )
func setupIngLister( func setupIngLister(
...@@ -142,9 +142,9 @@ func TestIngsForTLSSecret(t *testing.T) { ...@@ -142,9 +142,9 @@ func TestIngsForTLSSecret(t *testing.T) {
}, },
Type: api_v1.SecretTypeTLS, Type: api_v1.SecretTypeTLS,
} }
ings, err := worker.getIngsForTLSSecret(secret) ings, status := worker.getIngsForTLSSecret(secret)
if err != nil { if status.IsError() {
t.Fatal("getIngsForTLSSecret(): ", err) t.Fatal("getIngsForTLSSecret(): ", status)
} }
if len(ings) != 1 { if len(ings) != 1 {
t.Errorf("getIngsForTLSSecret(): wanted 1 Ingress, got %d", t.Errorf("getIngsForTLSSecret(): wanted 1 Ingress, got %d",
...@@ -154,8 +154,9 @@ func TestIngsForTLSSecret(t *testing.T) { ...@@ -154,8 +154,9 @@ func TestIngsForTLSSecret(t *testing.T) {
t.Errorf("getIngsForTLSSecret(): Ingress name wanted %s, "+ t.Errorf("getIngsForTLSSecret(): Ingress name wanted %s, "+
"got %s", ingName, ings[0].Name) "got %s", ingName, ings[0].Name)
} }
if is, err := worker.isVikingIngressTLSSecret(secret); err != nil { if is, status := worker.
t.Fatal("isVikingIngressTLSSecret(): ", err) isVikingIngressTLSSecret(secret); status.IsError() {
t.Fatal("isVikingIngressTLSSecret(): ", status)
} else if !is { } else if !is {
t.Error("isVikingIngressTLSSecret(): wanted true, got false") t.Error("isVikingIngressTLSSecret(): wanted true, got false")
} }
...@@ -167,15 +168,16 @@ func TestIngsForTLSSecret(t *testing.T) { ...@@ -167,15 +168,16 @@ func TestIngsForTLSSecret(t *testing.T) {
}, },
Type: api_v1.SecretTypeTLS, Type: api_v1.SecretTypeTLS,
} }
ings, err = worker.getIngsForTLSSecret(secret) ings, status = worker.getIngsForTLSSecret(secret)
if err != nil { if status.IsError() {
t.Fatal("getIngsForTLSSecret(): ", err) t.Fatal("getIngsForTLSSecret(): ", status)
} }
if ings != nil || len(ings) != 0 { if ings != nil || len(ings) != 0 {
t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0") t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0")
} }
if is, err := worker.isVikingIngressTLSSecret(secret); err != nil { if is, status := worker.
t.Fatal("isVikingIngressTLSSecret(): ", err) isVikingIngressTLSSecret(secret); status.IsError() {
t.Fatal("isVikingIngressTLSSecret(): ", status)
} else if is { } else if is {
t.Error("isVikingIngressTLSSecret(): wanted false, got true") t.Error("isVikingIngressTLSSecret(): wanted false, got true")
} }
...@@ -187,15 +189,16 @@ func TestIngsForTLSSecret(t *testing.T) { ...@@ -187,15 +189,16 @@ func TestIngsForTLSSecret(t *testing.T) {
}, },
Type: api_v1.SecretTypeTLS, Type: api_v1.SecretTypeTLS,
} }
ings, err = worker.getIngsForTLSSecret(secret) ings, status = worker.getIngsForTLSSecret(secret)
if err != nil { if status.IsError() {
t.Fatal("getIngsForTLSSecret(): ", err) t.Fatal("getIngsForTLSSecret(): ", status)
} }
if ings != nil || len(ings) != 0 { if ings != nil || len(ings) != 0 {
t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0") t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0")
} }
if is, err := worker.isVikingIngressTLSSecret(secret); err != nil { if is, status := worker.
t.Fatal("isVikingIngressTLSSecret(): ", err) isVikingIngressTLSSecret(secret); status.IsError() {
t.Fatal("isVikingIngressTLSSecret(): ", status)
} else if is { } else if is {
t.Error("isVikingIngressTLSSecret(): wanted false, got true") t.Error("isVikingIngressTLSSecret(): wanted false, got true")
} }
...@@ -207,15 +210,16 @@ func TestIngsForTLSSecret(t *testing.T) { ...@@ -207,15 +210,16 @@ func TestIngsForTLSSecret(t *testing.T) {
}, },
Type: api_v1.SecretTypeTLS, Type: api_v1.SecretTypeTLS,
} }
ings, err = worker.getIngsForTLSSecret(secret) ings, status = worker.getIngsForTLSSecret(secret)
if err != nil { if status.IsError() {
t.Fatal("getIngsForTLSSecret(): ", err) t.Fatal("getIngsForTLSSecret(): ", status)
} }
if ings != nil || len(ings) != 0 { if ings != nil || len(ings) != 0 {
t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0") t.Error("getIngsForTLSSecret(): wanted no Ingresses, got >0")
} }
if is, err := worker.isVikingIngressTLSSecret(secret); err != nil { if is, status := worker.
t.Fatal("isVikingIngressTLSSecret(): ", err) isVikingIngressTLSSecret(secret); status.IsError() {
t.Fatal("isVikingIngressTLSSecret(): ", status)
} else if is { } else if is {
t.Error("isVikingIngressTLSSecret(): wanted false, got true") t.Error("isVikingIngressTLSSecret(): wanted false, got true")
} }
...@@ -239,16 +243,17 @@ func TestNoIngsForTLSSecret(t *testing.T) { ...@@ -239,16 +243,17 @@ func TestNoIngsForTLSSecret(t *testing.T) {
}, },
Type: api_v1.SecretTypeTLS, Type: api_v1.SecretTypeTLS,
} }
ings, err := worker.getIngsForTLSSecret(secret) ings, status := worker.getIngsForTLSSecret(secret)
if err != nil { if status.IsError() {
t.Fatal("getIngsForTLSSecret(): ", err) t.Fatal("getIngsForTLSSecret(): ", status)
} }
if len(ings) != 0 { if len(ings) != 0 {
t.Errorf("getIngsForTLSSecret(): wanted 0 Ingresses, got %d", t.Errorf("getIngsForTLSSecret(): wanted 0 Ingresses, got %d",
len(ings)) len(ings))
} }
if is, err := worker.isVikingIngressTLSSecret(secret); err != nil { if is, status := worker.
t.Fatal("isVikingIngressTLSSecret(): ", err) isVikingIngressTLSSecret(secret); status.IsError() {
t.Fatal("isVikingIngressTLSSecret(): ", status)
} else if is { } else if is {
t.Error("isVikingIngressTLSSecret(): wanted false, got true") t.Error("isVikingIngressTLSSecret(): wanted false, got true")
} }
...@@ -302,9 +307,9 @@ func TestDeletePEMSecret(t *testing.T) { ...@@ -302,9 +307,9 @@ func TestDeletePEMSecret(t *testing.T) {
log: &logrus.Logger{Out: ioutil.Discard}, log: &logrus.Logger{Out: ioutil.Discard},
vsecr: secrNsLister, vsecr: secrNsLister,
} }
err := worker.deleteTLSSecret(ingTLSSecret) status := worker.deleteTLSSecret(ingTLSSecret)
if err != nil { if status.IsError() {
t.Fatal("deleteTLSSecret(): ", err) t.Fatal("deleteTLSSecret(): ", status)
} }
updSecret, err := worker.vsecr.Get(certSecretName) updSecret, err := worker.vsecr.Get(certSecretName)
if err != nil { if err != nil {
...@@ -328,30 +333,20 @@ func TestDeleteNoPEMSecret(t *testing.T) { ...@@ -328,30 +333,20 @@ func TestDeleteNoPEMSecret(t *testing.T) {
defer cancel() defer cancel()
secrNsLister := setupSecrLister(ctx, client, ns) secrNsLister := setupSecrLister(ctx, client, ns)
logger, hook := logtest.NewNullLogger()
worker := &NamespaceWorker{ worker := &NamespaceWorker{
client: client, client: client,
namespace: ns, namespace: ns,
log: logger, log: &logrus.Logger{Out: ioutil.Discard},
vsecr: secrNsLister, vsecr: secrNsLister,
} }
worker.log.Level = logrus.TraceLevel status := worker.deleteTLSSecret(ingTLSSecret)
err := worker.deleteTLSSecret(ingTLSSecret) if status.Type != update.Fatal {
if err != nil { t.Fatal("deleteTLSSecret(): ", status)
t.Fatal("deleteTLSSecret(): ", err)
}
logEntry := hook.LastEntry()
if logEntry == nil {
t.Fatal("deleteTLSSecret(): no log entry")
}
if logEntry.Level != logrus.ErrorLevel {
t.Errorf("deleteTLSSecret() log level wanted Error got %s",
logEntry.Level)
} }
msg := "PEM Secret " + ns + "/" + certSecretName + msg := "PEM Secret " + ns + "/" + certSecretName +
" not found, not requeuing" " not found, not requeuing"
if logEntry.Message != msg { if status.Msg != msg {
t.Errorf("deleteTLSSecret() log entry wanted [%s] got [%s]", t.Errorf("deleteTLSSecret() error message wanted [%s] got [%s]",
msg, logEntry.Message) msg, status.Msg)
} }
} }
...@@ -29,13 +29,13 @@ ...@@ -29,13 +29,13 @@
package controller package controller
import ( import (
"fmt"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
api_v1 "k8s.io/api/core/v1" api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1" extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
) )
...@@ -67,10 +67,19 @@ func (worker *NamespaceWorker) isVarnishIngSvc(svc *api_v1.Service) bool { ...@@ -67,10 +67,19 @@ func (worker *NamespaceWorker) isVarnishIngSvc(svc *api_v1.Service) bool {
} }
func (worker *NamespaceWorker) getIngsForSvc( func (worker *NamespaceWorker) getIngsForSvc(
svc *api_v1.Service) (ings []*extensions.Ingress, err error) { svc *api_v1.Service,
) (ings []*extensions.Ingress, status update.Status) {
allIngs, err := worker.ing.List(labels.Everything()) allIngs, err := worker.ing.List(labels.Everything())
if err != nil { if err != nil {
if errors.IsNotFound(err) {
status = update.MakeNoop(
"No Ingress defined in namespace %s",
svc.Namespace)
syncCounters.WithLabelValues(worker.namespace,
"Service", "Ignore").Inc()
} else {
status = update.MakeRecoverable("%v", err)
}
return return
} }
...@@ -98,20 +107,23 @@ func (worker *NamespaceWorker) getIngsForSvc( ...@@ -98,20 +107,23 @@ func (worker *NamespaceWorker) getIngsForSvc(
} }
if len(ings) == 0 { if len(ings) == 0 {
worker.log.Infof("No Varnish Ingresses defined for service %s/%s", status = update.MakeNoop(
"No Varnish Ingresses defined for service %s/%s",
svc.Namespace, svc.Name) svc.Namespace, svc.Name)
syncCounters.WithLabelValues(worker.namespace, "Service", syncCounters.WithLabelValues(worker.namespace, "Service",
"Ignore").Inc() "Ignore").Inc()
return
} }
return ings, nil status = update.MakeSuccess("")
return
} }
func (worker *NamespaceWorker) enqueueIngressForService( func (worker *NamespaceWorker) enqueueIngressForService(
svc *api_v1.Service) error { svc *api_v1.Service) update.Status {
ings, err := worker.getIngsForSvc(svc) ings, status := worker.getIngsForSvc(svc)
if err != nil { if status.Type != update.Success {
return err return status
} }
for _, ing := range ings { for _, ing := range ings {
if !worker.isVarnishIngress(ing) { if !worker.isVarnishIngress(ing) {
...@@ -119,7 +131,9 @@ func (worker *NamespaceWorker) enqueueIngressForService( ...@@ -119,7 +131,9 @@ func (worker *NamespaceWorker) enqueueIngressForService(
} }
worker.queue.Add(&SyncObj{Type: Update, Obj: ing}) worker.queue.Add(&SyncObj{Type: Update, Obj: ing})
} }
return nil return update.MakeSuccess(
"Service %s/%s: re-syncing Ingresses for IngressBackend",
svc.Namespace, svc.Name)
} }
// Return true if changes in Varnish services may lead to changes in // Return true if changes in Varnish services may lead to changes in
...@@ -176,16 +190,19 @@ func epAddrs2OffldAddrs( ...@@ -176,16 +190,19 @@ func epAddrs2OffldAddrs(
func (worker *NamespaceWorker) svc2Addrs( func (worker *NamespaceWorker) svc2Addrs(
svc *api_v1.Service, svc *api_v1.Service,
) (vaddrs []vcl.Address, offldAddrs []haproxy.OffldAddr, err error) { ) (vaddrs []vcl.Address, offldAddrs []haproxy.OffldAddr, status update.Status) {
endps, err := worker.getServiceEndpoints(svc) endps, err := worker.getServiceEndpoints(svc)
if err != nil { if err != nil {
status = IncompleteIfNotFound(err, "%v", err)
return return
} }
worker.log.Tracef("Varnish service %s/%s endpoints: %+v", svc.Namespace, worker.log.Tracef("Varnish service %s/%s endpoints: %+v", svc.Namespace,
svc.Name, endps) svc.Name, endps)
if endps == nil { if endps == nil {
return vaddrs, offldAddrs, fmt.Errorf("could not find "+ status = update.MakeIncomplete(
"endpoints for service: %s/%s", svc.Namespace, svc.Name) "could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
return
} }
// XXX hard-wired Port names // XXX hard-wired Port names
...@@ -205,10 +222,11 @@ func (worker *NamespaceWorker) svc2Addrs( ...@@ -205,10 +222,11 @@ func (worker *NamespaceWorker) svc2Addrs(
} }
} }
if admPort == 0 { if admPort == 0 {
return vaddrs, offldAddrs, status = update.MakeFatal(
fmt.Errorf("No Varnish admin port %s found "+ "No Varnish admin port %s found for Service "+
"for Service %s/%s endpoint", "%s/%s endpoint",
admPortName, svc.Namespace, svc.Name) admPortName, svc.Namespace, svc.Name)
return
} }
vaddrs = epAddrs2VCLAddrs(subset.Addresses, vaddrs, admPort) vaddrs = epAddrs2VCLAddrs(subset.Addresses, vaddrs, admPort)
vaddrs = epAddrs2VCLAddrs(subset.NotReadyAddresses, vaddrs, vaddrs = epAddrs2VCLAddrs(subset.NotReadyAddresses, vaddrs,
...@@ -221,14 +239,15 @@ func (worker *NamespaceWorker) svc2Addrs( ...@@ -221,14 +239,15 @@ func (worker *NamespaceWorker) svc2Addrs(
dplanePort, faccessPort) dplanePort, faccessPort)
} }
} }
status = update.MakeSuccess("")
return return
} }
func (worker *NamespaceWorker) syncSvc(key string) error { func (worker *NamespaceWorker) syncSvc(key string) update.Status {
worker.log.Infof("Syncing Service: %s/%s", worker.namespace, key) worker.log.Infof("Syncing Service: %s/%s", worker.namespace, key)
svc, err := worker.svc.Get(key) svc, err := worker.svc.Get(key)
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
if !worker.isVarnishIngSvc(svc) { if !worker.isVarnishIngSvc(svc) {
...@@ -238,8 +257,9 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -238,8 +257,9 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
worker.log.Infof("Syncing Varnish Ingress Service %s/%s:", worker.log.Infof("Syncing Varnish Ingress Service %s/%s:",
svc.Namespace, svc.Name) svc.Namespace, svc.Name)
if svc.Spec.Type == api_v1.ServiceTypeExternalName { if svc.Spec.Type == api_v1.ServiceTypeExternalName {
return fmt.Errorf("Viking Service %s/%s: type ExternalName "+ return update.MakeFatal(
"is illegal", svc.Namespace, svc.Name) "Viking Service %s/%s: type ExternalName is illegal",
svc.Namespace, svc.Name)
} }
// Check if there are Ingresses for which the VCL spec may // Check if there are Ingresses for which the VCL spec may
...@@ -247,7 +267,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -247,7 +267,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
updateVCL := false updateVCL := false
ings, err := worker.ing.List(labels.Everything()) ings, err := worker.ing.List(labels.Everything())
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
for _, ing := range ings { for _, ing := range ings {
if ing.Namespace != svc.Namespace { if ing.Namespace != svc.Namespace {
...@@ -255,7 +275,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -255,7 +275,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
} }
ingSvc, err := worker.getVarnishSvcForIng(ing) ingSvc, err := worker.getVarnishSvcForIng(ing)
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
if ingSvc == nil { if ingSvc == nil {
continue continue
...@@ -284,9 +304,9 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -284,9 +304,9 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
"service %s/%s", svc.Namespace, svc.Name) "service %s/%s", svc.Namespace, svc.Name)
} }
addrs, offldAddrs, err := worker.svc2Addrs(svc) addrs, offldAddrs, status := worker.svc2Addrs(svc)
if err != nil { if status.IsError() {
return err return status
} }
secrName := "" secrName := ""
...@@ -294,12 +314,13 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -294,12 +314,13 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
svc.Namespace, svc.Name) svc.Namespace, svc.Name)
pods, err := worker.getPods(svc) pods, err := worker.getPods(svc)
if err != nil { if err != nil {
return fmt.Errorf("Cannot get a Pod for service %s/%s: %v", return IncompleteIfNotFound(err,
"Cannot get a Pod for service %s/%s: %v",
svc.Namespace, svc.Name, err) svc.Namespace, svc.Name, err)
} }
if len(pods.Items) == 0 { if len(pods.Items) == 0 {
return fmt.Errorf("No Pods for Service: %s/%s", svc.Namespace, return update.MakeIncomplete(
svc.Name) "No Pods for Service: %s/%s", svc.Namespace, svc.Name)
} }
pod := &pods.Items[0] pod := &pods.Items[0]
for _, vol := range pod.Spec.Volumes { for _, vol := range pod.Spec.Volumes {
...@@ -315,7 +336,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -315,7 +336,7 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
if secret, err := worker.vsecr.Get(secrName); err == nil { if secret, err := worker.vsecr.Get(secrName); err == nil {
err = worker.setSecret(secret) err = worker.setSecret(secret)
if err != nil { if err != nil {
return err return update.MakeIncomplete("%v", err)
} }
} else { } else {
worker.log.Warnf("Cannot get Secret %s: %v", secrName, worker.log.Warnf("Cannot get Secret %s: %v", secrName,
...@@ -329,11 +350,11 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -329,11 +350,11 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
if len(offldAddrs) > 0 { if len(offldAddrs) > 0 {
worker.log.Tracef("Varnish service %s/%s offloader addresses: "+ worker.log.Tracef("Varnish service %s/%s offloader addresses: "+
"%+v", svc.Namespace, svc.Name, offldAddrs) "%+v", svc.Namespace, svc.Name, offldAddrs)
err = worker.hController.AddOrUpdateOffloader( status := worker.hController.AddOrUpdateOffloader(
svc.Namespace+"/"+svc.Name, offldAddrs, svc.Namespace+"/"+svc.Name, offldAddrs,
worker.namespace+"/"+secrName) worker.namespace+"/"+secrName)
if err != nil { if status.IsError() {
return err return status
} }
} }
worker.log.Tracef("Varnish service %s/%s addresses: %+v", svc.Namespace, worker.log.Tracef("Varnish service %s/%s addresses: %+v", svc.Namespace,
...@@ -343,37 +364,46 @@ func (worker *NamespaceWorker) syncSvc(key string) error { ...@@ -343,37 +364,46 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
worker.namespace+"/"+secrName, !updateVCL) worker.namespace+"/"+secrName, !updateVCL)
} }
func (worker *NamespaceWorker) addSvc(key string) error { func (worker *NamespaceWorker) addSvc(key string) update.Status {
return worker.syncSvc(key) return worker.syncSvc(key)
} }
func (worker *NamespaceWorker) updateSvc(key string) error { func (worker *NamespaceWorker) updateSvc(key string) update.Status {
return worker.syncSvc(key) return worker.syncSvc(key)
} }
func (worker *NamespaceWorker) deleteSvc(obj interface{}) error { func (worker *NamespaceWorker) deleteSvc(obj interface{}) update.Status {
svc, ok := obj.(*api_v1.Service) svc, ok := obj.(*api_v1.Service)
if !ok || svc == nil { if !ok || svc == nil {
worker.log.Warnf("Delete Service: not found: %v", obj) return update.MakeNoop("Delete Service: not found: %v", obj)
return nil
} }
nsKey := svc.Namespace + "/" + svc.Name nsKey := svc.Namespace + "/" + svc.Name
worker.log.Info("Deleting Service: ", nsKey) worker.log.Info("Deleting Service: ", nsKey)
deletedVarnish := false
deletedOffldr := false
if worker.vController.HasVarnishSvc(nsKey) { if worker.vController.HasVarnishSvc(nsKey) {
if err := worker.vController.DeleteVarnishSvc(nsKey); err != nil { if status := worker.vController.
return err DeleteVarnishSvc(nsKey); status.IsError() {
return status
} }
deletedVarnish = true
} }
if worker.hController.HasOffloader(nsKey) { if worker.hController.HasOffloader(nsKey) {
if err := worker.hController.DeleteOffldSvc(nsKey); err != nil { if status := worker.hController.
return err DeleteOffldSvc(nsKey); status.IsError() {
return status
} }
deletedOffldr = true
} }
if !worker.isVarnishIngSvc(svc) { if !worker.isVarnishIngSvc(svc) {
return worker.enqueueIngressForService(svc) return worker.enqueueIngressForService(svc)
} }
return nil if deletedVarnish || deletedOffldr {
return update.MakeSuccess("")
}
return update.MakeNoop("Service %s/%s: neither a viking admin Service"+
" nor an Ingress Backend", svc.Namespace, svc.Name)
} }
...@@ -32,11 +32,13 @@ import ( ...@@ -32,11 +32,13 @@ import (
"fmt" "fmt"
api_v1 "k8s.io/api/core/v1" api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1" vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
) )
...@@ -180,3 +182,16 @@ func validateProbe(probe *vcr_v1alpha1.ProbeSpec) error { ...@@ -180,3 +182,16 @@ func validateProbe(probe *vcr_v1alpha1.ProbeSpec) error {
} }
return nil return nil
} }
// IncompleteIfNotFound returns an update.Status of type Incomplete if
// err is a k8s NotFound error, as determined by errors.IsNotFound()
// in k8s.io/apimachinery/pkg/api/errors. Otherwise, a Recoverable
// Status is returned.
func IncompleteIfNotFound(
err error, format string, args ...interface{},
) update.Status {
if errors.IsNotFound(err) {
return update.MakeIncomplete(format, args...)
}
return update.MakeRecoverable(format, args...)
}
...@@ -37,6 +37,8 @@ import ( ...@@ -37,6 +37,8 @@ import (
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1" vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
api_v1 "k8s.io/api/core/v1" api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1" extensions "k8s.io/api/extensions/v1beta1"
) )
...@@ -45,18 +47,17 @@ import ( ...@@ -45,18 +47,17 @@ import (
// Services are not found -- they will sync as needed when and if they // Services are not found -- they will sync as needed when and if they
// are discovered. // are discovered.
func (worker *NamespaceWorker) enqueueIngsForVcfg( func (worker *NamespaceWorker) enqueueIngsForVcfg(
vcfg *vcr_v1alpha1.VarnishConfig) error { vcfg *vcr_v1alpha1.VarnishConfig) update.Status {
svc2ing := make(map[*api_v1.Service]*extensions.Ingress) svc2ing := make(map[*api_v1.Service]*extensions.Ingress)
ings, err := worker.ing.List(labels.Everything()) ings, err := worker.ing.List(labels.Everything())
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
worker.log.Infof("VarnishConfig %s/%s: no Ingresses found in "+ return update.MakeNoop("VarnishConfig %s/%s: no Ingresses "+
"workspace %s", vcfg.Namespace, vcfg.Name, "found in workspace %s", vcfg.Namespace, vcfg.Name,
worker.namespace) worker.namespace)
return nil
} }
if err != nil { if err != nil {
return err return update.MakeRecoverable("%v", err)
} }
for _, ing := range ings { for _, ing := range ings {
if !worker.isVarnishIngress(ing) { if !worker.isVarnishIngress(ing) {
...@@ -64,13 +65,12 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg( ...@@ -64,13 +65,12 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg(
} }
vSvc, err := worker.getVarnishSvcForIng(ing) vSvc, err := worker.getVarnishSvcForIng(ing)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
worker.log.Infof("VarnishConfig %s/%s: no Varnish "+ return update.MakeNoop("VarnishConfig %s/%s: no "+
"Services found in workspace %s", "Varnish Services found in workspace %s",
vcfg.Namespace, vcfg.Name, worker.namespace) vcfg.Namespace, vcfg.Name, worker.namespace)
return nil
} }
if err != nil { if err != nil {
return err return update.MakeRecoverable("%v", err)
} }
if vSvc != nil { if vSvc != nil {
svc2ing[vSvc] = ing svc2ing[vSvc] = ing
...@@ -86,7 +86,7 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg( ...@@ -86,7 +86,7 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg(
svcObj, err := worker.svc.Get(svc) svcObj, err := worker.svc.Get(svc)
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
if ing, exists := svc2ing[svcObj]; exists { if ing, exists := svc2ing[svcObj]; exists {
worker.log.Infof("VarnishConfig %s/%s: enqueuing "+ worker.log.Infof("VarnishConfig %s/%s: enqueuing "+
...@@ -95,7 +95,9 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg( ...@@ -95,7 +95,9 @@ func (worker *NamespaceWorker) enqueueIngsForVcfg(
worker.queue.Add(&SyncObj{Type: Update, Obj: ing}) worker.queue.Add(&SyncObj{Type: Update, Obj: ing})
} }
} }
return nil return update.MakeSuccess(
"VarnishConfig %s/%s: re-queued Ingress(es)", vcfg.Namespace,
vcfg.Name)
} }
// XXX a validating webhook should do this // XXX a validating webhook should do this
...@@ -232,54 +234,54 @@ func validateReqDisps(reqDisps []vcr_v1alpha1.RequestDispSpec) error { ...@@ -232,54 +234,54 @@ func validateReqDisps(reqDisps []vcr_v1alpha1.RequestDispSpec) error {
return nil return nil
} }
func (worker *NamespaceWorker) syncVcfg(key string) error { func (worker *NamespaceWorker) syncVcfg(key string) update.Status {
worker.log.Infof("Syncing VarnishConfig: %s/%s", worker.namespace, key) worker.log.Infof("Syncing VarnishConfig: %s/%s", worker.namespace, key)
vcfg, err := worker.vcfg.Get(key) vcfg, err := worker.vcfg.Get(key)
if err != nil { if err != nil {
return err return IncompleteIfNotFound(err, "%v", err)
} }
worker.log.Tracef("VarnishConfig %s/%s: %+v", vcfg.Namespace, worker.log.Tracef("VarnishConfig %s/%s: %+v", vcfg.Namespace,
vcfg.Name, vcfg) vcfg.Name, vcfg)
if len(vcfg.Spec.Services) == 0 { if len(vcfg.Spec.Services) == 0 {
// CRD validation should prevent this. // CRD validation should prevent this.
worker.log.Infof("VarnishConfig %s/%s: no services defined, "+
"ignoring", vcfg.Namespace, vcfg.Name)
syncCounters.WithLabelValues(worker.namespace, "VarnishConfig", syncCounters.WithLabelValues(worker.namespace, "VarnishConfig",
"Ignore").Inc() "Ignore").Inc()
return nil return update.MakeFatal(
"VarnishConfig %s/%s: no services defined",
vcfg.Namespace, vcfg.Name)
} }
if vcfg.Spec.SelfSharding != nil { if vcfg.Spec.SelfSharding != nil {
if err = validateProbe(&vcfg.Spec.SelfSharding.Probe); err != nil { if err = validateProbe(&vcfg.Spec.SelfSharding.Probe); err != nil {
return fmt.Errorf("VarnishConfig %s/%s invalid "+ return update.MakeFatal(
"sharding spec: %v", vcfg.Namespace, vcfg.Name, "VarnishConfig %s/%s invalid sharding spec: %v",
err) vcfg.Namespace, vcfg.Name, err)
} }
} }
if err = validateRewrites(vcfg.Spec.Rewrites); err != nil { if err = validateRewrites(vcfg.Spec.Rewrites); err != nil {
return err return update.MakeFatal("%v", err)
} }
if err = validateReqDisps(vcfg.Spec.ReqDispositions); err != nil { if err = validateReqDisps(vcfg.Spec.ReqDispositions); err != nil {
return err return update.MakeFatal("%v", err)
} }
return worker.enqueueIngsForVcfg(vcfg) return worker.enqueueIngsForVcfg(vcfg)
} }
func (worker *NamespaceWorker) addVcfg(key string) error { func (worker *NamespaceWorker) addVcfg(key string) update.Status {
return worker.syncVcfg(key) return worker.syncVcfg(key)
} }
func (worker *NamespaceWorker) updateVcfg(key string) error { func (worker *NamespaceWorker) updateVcfg(key string) update.Status {
return worker.syncVcfg(key) return worker.syncVcfg(key)
} }
func (worker *NamespaceWorker) deleteVcfg(obj interface{}) error { func (worker *NamespaceWorker) deleteVcfg(obj interface{}) update.Status {
vcfg, ok := obj.(*vcr_v1alpha1.VarnishConfig) vcfg, ok := obj.(*vcr_v1alpha1.VarnishConfig)
if !ok || vcfg == nil { if !ok || vcfg == nil {
worker.log.Warnf("Delete VarnishConfig: not found: %v", obj) return update.MakeNoop(
return nil "Delete VarnishConfig: not found: %v", obj)
} }
worker.log.Infof("Deleting VarnishConfig: %s/%s", vcfg.Namespace, worker.log.Infof("Deleting VarnishConfig: %s/%s", vcfg.Namespace,
vcfg.Name) vcfg.Name)
......
...@@ -31,6 +31,7 @@ package controller ...@@ -31,6 +31,7 @@ package controller
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
api_v1 "k8s.io/api/core/v1" api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1" extensions "k8s.io/api/extensions/v1beta1"
...@@ -47,6 +48,7 @@ import ( ...@@ -47,6 +48,7 @@ import (
ving_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1" ving_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
vcr_listers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/listers/varnishingress/v1alpha1" vcr_listers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/listers/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
) )
...@@ -56,6 +58,8 @@ const ( ...@@ -56,6 +58,8 @@ const (
syncFailure = "SyncFailure" syncFailure = "SyncFailure"
) )
const incompleteRetryDelay = 5 * time.Second
// NamespaceWorker serves fanout of work items to workers for each // NamespaceWorker serves fanout of work items to workers for each
// namespace for which the controller is notified about a resource // namespace for which the controller is notified about a resource
// update. The NamespaceQueues object creates a new instance when it // update. The NamespaceQueues object creates a new instance when it
...@@ -135,31 +139,29 @@ func (worker *NamespaceWorker) warnEvent(obj interface{}, reason, msgFmt string, ...@@ -135,31 +139,29 @@ func (worker *NamespaceWorker) warnEvent(obj interface{}, reason, msgFmt string,
worker.event(obj, api_v1.EventTypeWarning, reason, msgFmt, args...) worker.event(obj, api_v1.EventTypeWarning, reason, msgFmt, args...)
} }
func (worker *NamespaceWorker) syncSuccess(obj interface{}, msgFmt string, func (worker *NamespaceWorker) syncSuccess(obj interface{}, msgPfx string,
args ...interface{}) { status update.Status) {
worker.log.Infof(msgFmt, args...) worker.log.Infof("%s%s", msgPfx, status)
worker.infoEvent(obj, syncSuccess, msgFmt, args...) worker.infoEvent(obj, status.Reason(), "%s%s", msgPfx, status)
} }
func (worker *NamespaceWorker) syncFailure(obj interface{}, msgFmt string, func (worker *NamespaceWorker) syncFailure(obj interface{}, msgPfx string,
args ...interface{}) { status update.Status) {
worker.log.Errorf(msgFmt, args...) worker.log.Errorf("%s%s", msgPfx, status)
worker.warnEvent(obj, syncFailure, msgFmt, args...) worker.warnEvent(obj, status.Reason(), "%s%s", msgPfx, status)
} }
func (worker *NamespaceWorker) dispatch(obj interface{}) error { func (worker *NamespaceWorker) dispatch(obj interface{}) update.Status {
syncObj, ok := obj.(*SyncObj) syncObj, ok := obj.(*SyncObj)
if !ok { if !ok {
worker.syncFailure(obj, "Unhandled type %T", obj) return update.MakeFatal("Unhandled type %T", obj)
return nil
} }
_, key, err := getNameSpace(syncObj.Obj) _, key, err := getNameSpace(syncObj.Obj)
if err != nil { if err != nil {
worker.syncFailure(syncObj.Obj, return update.MakeFatal("Cannot get key for object %v: %v",
"Cannot get key for object %v: %v", syncObj.Obj, err) syncObj.Obj, err)
return nil
} }
switch syncObj.Type { switch syncObj.Type {
case Add: case Add:
...@@ -177,9 +179,8 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error { ...@@ -177,9 +179,8 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error {
case *ving_v1alpha1.BackendConfig: case *ving_v1alpha1.BackendConfig:
return worker.addBcfg(key) return worker.addBcfg(key)
default: default:
worker.syncFailure(syncObj.Obj, return update.MakeFatal("Unhandled object type: %T",
"Unhandled object type: %T", syncObj.Obj) syncObj.Obj)
return nil
} }
case Update: case Update:
switch syncObj.Obj.(type) { switch syncObj.Obj.(type) {
...@@ -196,9 +197,8 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error { ...@@ -196,9 +197,8 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error {
case *ving_v1alpha1.BackendConfig: case *ving_v1alpha1.BackendConfig:
return worker.updateBcfg(key) return worker.updateBcfg(key)
default: default:
worker.syncFailure(syncObj.Obj, return update.MakeFatal("Unhandled object type: %T",
"Unhandled object type: %T", syncObj.Obj) syncObj.Obj)
return nil
} }
case Delete: case Delete:
deletedObj := syncObj.Obj deletedObj := syncObj.Obj
...@@ -220,14 +220,11 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error { ...@@ -220,14 +220,11 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error {
case *ving_v1alpha1.BackendConfig: case *ving_v1alpha1.BackendConfig:
return worker.deleteBcfg(deletedObj) return worker.deleteBcfg(deletedObj)
default: default:
worker.syncFailure(deletedObj, return update.MakeFatal("Unhandled object type: %T",
"Unhandled object type: %T", deletedObj) syncObj.Obj)
return nil
} }
default: default:
worker.syncFailure(syncObj.Obj, "Unhandled sync type: %v", return update.MakeFatal("Unhandled sync type: %v", syncObj.Type)
syncObj.Type)
return nil
} }
} }
...@@ -238,22 +235,33 @@ func (worker *NamespaceWorker) next() { ...@@ -238,22 +235,33 @@ func (worker *NamespaceWorker) next() {
} }
defer worker.queue.Done(obj) defer worker.queue.Done(obj)
if err := worker.dispatch(obj); err == nil { status := worker.dispatch(obj)
syncObj, ok := obj.(*SyncObj) msgPfx := ""
if ok { if syncObj, ok := obj.(*SyncObj); ok {
if ns, name, err := getNameSpace(syncObj.Obj); err == nil { if ns, name, err := getNameSpace(syncObj.Obj); err == nil {
worker.syncSuccess(obj, msgPfx = syncObj.Type.String() + " " + ns + "/" +
"Successfully synced: %s %s/%s", name + ": "
syncObj.Type, ns, name)
}
}
if !ok || err != nil {
worker.syncSuccess(obj, "Successfully synced")
} }
}
switch status.Type {
case update.Noop:
// XXX configure a verbose mode that also generates
// Events for the Noop case
worker.log.Info(msgPfx, status.String())
worker.queue.Forget(obj)
case update.Success:
worker.syncSuccess(obj, msgPfx, status)
worker.queue.Forget(obj)
case update.Fatal:
worker.syncFailure(obj, msgPfx, status)
worker.queue.Forget(obj) worker.queue.Forget(obj)
} else { case update.Recoverable:
worker.syncFailure(obj, "Error, requeueing: %v", err) worker.syncFailure(obj, msgPfx, status)
worker.queue.AddRateLimited(obj) worker.queue.AddRateLimited(obj)
case update.Incomplete:
worker.syncFailure(obj, msgPfx, status)
worker.queue.AddAfter(obj, incompleteRetryDelay)
} }
} }
......
...@@ -43,6 +43,7 @@ import ( ...@@ -43,6 +43,7 @@ import (
"time" "time"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/interfaces" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/interfaces"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
...@@ -152,6 +153,52 @@ func (offldrErrs OffldrErrors) Error() string { ...@@ -152,6 +153,52 @@ func (offldrErrs OffldrErrors) Error() string {
return sb.String() return sb.String()
} }
func (offldrErrs OffldrErrors) status() update.Status {
if offldrErrs == nil {
return update.MakeSuccess("")
}
hasstatus, success, recoverable := false, false, false
for _, e := range offldrErrs {
status, ok := e.err.(update.Status)
if !ok {
continue
}
hasstatus = true
switch status.Type {
case update.Fatal:
return update.MakeFatal("%v", offldrErrs)
case update.Incomplete:
return update.MakeIncomplete("%v", offldrErrs)
case update.Recoverable:
recoverable = true
case update.Success:
success = true
}
}
if !hasstatus || recoverable {
return update.MakeRecoverable("%v", offldrErrs)
}
if success {
return update.MakeSuccess("%v", offldrErrs)
}
return update.MakeNoop("%v", offldrErrs)
}
func errStatus(err error) update.Status {
status, ok := err.(update.Status)
if ok {
return status
}
errs, ok := err.(OffldrErrors)
if !ok {
if err != nil {
return update.MakeRecoverable("%v", err)
}
return update.MakeSuccess("")
}
return errs.status()
}
type configStatus struct { type configStatus struct {
dplaneState ReloadState dplaneState ReloadState
version int64 version int64
...@@ -249,9 +296,10 @@ func (hc *Controller) updateLoadStatus(inst *haproxyInst) error { ...@@ -249,9 +296,10 @@ func (hc *Controller) updateLoadStatus(inst *haproxyInst) error {
} }
inst.status.pemExists[s] = pemExists inst.status.pemExists[s] = pemExists
if !pemExists { if !pemExists {
// XXX SyncIncomplete return update.MakeIncomplete(
return fmt.Errorf("Offloader instance %s: certificate "+ "Offloader instance %s: certificate "+
"PEM %s not found", inst.name, s.CertName()) "PEM %s not found",
inst.name, s.CertName())
} }
hc.log.Infof("Offloader instance %s: PEM file %s exists", hc.log.Infof("Offloader instance %s: PEM file %s exists",
inst.name, s.CertName()) inst.name, s.CertName())
...@@ -272,9 +320,9 @@ func (hc *Controller) updateLoadStatus(inst *haproxyInst) error { ...@@ -272,9 +320,9 @@ func (hc *Controller) updateLoadStatus(inst *haproxyInst) error {
} }
inst.status.dplaneState = state inst.status.dplaneState = state
if !reloaded { if !reloaded {
// XXX SyncIncomplete return update.MakeIncomplete(
return fmt.Errorf("Offloader instance %s: TLS config %s not "+ "Offloader instance %s: TLS config %s not "+
"loaded, status: %s", inst.name, inst.spec, "loaded, status: %s", inst.name, inst.spec,
state.Status) state.Status)
} }
hc.log.Infof("Offloader instance %s: TLS config %s successfully "+ hc.log.Infof("Offloader instance %s: TLS config %s successfully "+
...@@ -303,9 +351,9 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error { ...@@ -303,9 +351,9 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error {
defer hc.wg.Done() defer hc.wg.Done()
if len(spec.Secrets) == 0 { if len(spec.Secrets) == 0 {
hc.log.Warnf("Offloader instance %s: no certificates specified", return update.MakeIncomplete(
"Offloader instance %s: no certificates specified",
inst.name) inst.name)
return nil
} }
for _, s := range spec.Secrets { for _, s := range spec.Secrets {
hc.log.Debugf("Offloader instance %s, checking for PEM file %s", hc.log.Debugf("Offloader instance %s, checking for PEM file %s",
...@@ -317,9 +365,10 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error { ...@@ -317,9 +365,10 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error {
} }
inst.status.pemExists[s] = pemExists inst.status.pemExists[s] = pemExists
if !pemExists { if !pemExists {
// XXX SyncIncomplete return update.MakeIncomplete(
return fmt.Errorf("Offloader instance %s: certificate "+ "Offloader instance %s: certificate "+
"PEM %s not found", inst.name, s.CertName()) "PEM %s not found",
inst.name, s.CertName())
} }
hc.log.Infof("Offloader instance %s: PEM file %s exists", hc.log.Infof("Offloader instance %s: PEM file %s exists",
inst.name, s.CertName()) inst.name, s.CertName())
...@@ -404,9 +453,10 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error { ...@@ -404,9 +453,10 @@ func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error {
inst.status.loaded = true inst.status.loaded = true
return nil return nil
} }
// XXX SyncIncomplete return update.MakeIncomplete(
return fmt.Errorf("Offloader instance %s: TLS config %s not "+ "Offloader instance %s: TLS config %s not "+
"loaded, status: %s", inst.name, spec, state.Status) "loaded, status: %s",
inst.name, spec, state.Status)
default: default:
panic("Illegal reload status") panic("Illegal reload status")
} }
...@@ -424,15 +474,17 @@ func (hc *Controller) updateOffldSvc(svcKey string) error { ...@@ -424,15 +474,17 @@ func (hc *Controller) updateOffldSvc(svcKey string) error {
var errs OffldrErrors var errs OffldrErrors
svc, exists := hc.svcs[svcKey] svc, exists := hc.svcs[svcKey]
if !exists || svc == nil { if !exists || svc == nil {
return fmt.Errorf("No known offloader service for %s", svcKey) return update.MakeIncomplete(
"No known offloader service for %s", svcKey)
} }
if svc.secrName == "" { if svc.secrName == "" {
return fmt.Errorf("No known admin secret for offloader service"+ return update.MakeIncomplete(
" %s", svcKey) "No known admin secret for offloader service %s",
svcKey)
} }
if svc.spec == nil { if svc.spec == nil {
hc.log.Infof("Offloader service %s: no current spec", svcKey) return update.MakeNoop(
return nil "Offloader service %s: no current spec", svcKey)
} }
hc.log.Info("Updating offloader instances for ", svcKey) hc.log.Info("Updating offloader instances for ", svcKey)
...@@ -526,9 +578,9 @@ func (hc *Controller) removeOffldrInstances( ...@@ -526,9 +578,9 @@ func (hc *Controller) removeOffldrInstances(
// instsGauge.Dec() // instsGauge.Dec()
continue continue
} }
// XXX SyncIncomplete err = update.MakeIncomplete(
err = fmt.Errorf("Offloader instance %s: TLS config "+ "Offloader instance %s: TLS config "+
"not deleted, status: %s", inst.name, "not deleted, status: %s", inst.name,
state.Status) state.Status)
errs = append(errs, inst.mkError(err)) errs = append(errs, inst.mkError(err))
continue continue
...@@ -661,8 +713,11 @@ func (hc *Controller) getOffldStatus(inst *haproxyInst) error { ...@@ -661,8 +713,11 @@ func (hc *Controller) getOffldStatus(inst *haproxyInst) error {
// designated by key, using the given addresses for remote admin, and // designated by key, using the given addresses for remote admin, and
// the Secret designated by secrName as the password for Basic Auth in // the Secret designated by secrName as the password for Basic Auth in
// requests to the dataplane API. // requests to the dataplane API.
func (hc *Controller) AddOrUpdateOffloader(key string, addrs []OffldAddr, func (hc *Controller) AddOrUpdateOffloader(
secrName string) error { key string,
addrs []OffldAddr,
secrName string,
) update.Status {
var passwdPtr *string var passwdPtr *string
svc, exists := hc.svcs[key] svc, exists := hc.svcs[key]
...@@ -694,28 +749,32 @@ func (hc *Controller) AddOrUpdateOffloader(key string, addrs []OffldAddr, ...@@ -694,28 +749,32 @@ func (hc *Controller) AddOrUpdateOffloader(key string, addrs []OffldAddr,
hc.log.Debugf("Update offloader svc %s: addrs=%+v secret=%s", key, hc.log.Debugf("Update offloader svc %s: addrs=%+v secret=%s", key,
addrs, secrName) addrs, secrName)
return hc.updateOffldrAddrs(key, addrs, passwdPtr) return errStatus(hc.updateOffldrAddrs(key, addrs, passwdPtr))
} }
// DeleteOffldSvc removes the TLS offloader service designated by // DeleteOffldSvc removes the TLS offloader service designated by
// svcKey -- the haproxy configuration is deleted, and the // svcKey -- the haproxy configuration is deleted, and the
// specification is removed from the controller's configuration. // specification is removed from the controller's configuration.
func (hc *Controller) DeleteOffldSvc(svcKey string) error { func (hc *Controller) DeleteOffldSvc(svcKey string) update.Status {
svc, exists := hc.svcs[svcKey] svc, exists := hc.svcs[svcKey]
if !exists { if !exists {
return nil return update.MakeNoop("No offloader service %s", svcKey)
} }
err := hc.removeOffldrInstances(svc.instances) err := hc.removeOffldrInstances(svc.instances)
if err != nil { if err != nil {
delete(hc.svcs, svcKey) delete(hc.svcs, svcKey)
// svcsGauge.Dec() // svcsGauge.Dec()
} }
return err return errStatus(err)
} }
// Update the TLS offloader designated by svcKey to the configuration // Update the TLS offloader designated by svcKey to the configuration
// given by spec. // given by spec.
func (hc *Controller) Update(svcKey string, addrs []OffldAddr, spec Spec) error { func (hc *Controller) Update(
svcKey string,
addrs []OffldAddr,
spec Spec,
) update.Status {
svc, exists := hc.svcs[svcKey] svc, exists := hc.svcs[svcKey]
if !exists { if !exists {
svc = &offldrSvc{instances: make([]*haproxyInst, len(addrs))} svc = &offldrSvc{instances: make([]*haproxyInst, len(addrs))}
...@@ -725,14 +784,15 @@ func (hc *Controller) Update(svcKey string, addrs []OffldAddr, spec Spec) error ...@@ -725,14 +784,15 @@ func (hc *Controller) Update(svcKey string, addrs []OffldAddr, spec Spec) error
} }
svc.spec = &spec svc.spec = &spec
if len(svc.instances) == 0 { if len(svc.instances) == 0 {
return fmt.Errorf("Currently no known offloader endpoints for "+ return update.MakeIncomplete(
"Service %s", svcKey) "Currently no known offloader endpoints for Service %s",
svcKey)
} }
passwdPtr := hc.secrets[svc.secrName] passwdPtr := hc.secrets[svc.secrName]
for _, inst := range svc.instances { for _, inst := range svc.instances {
inst.dplanePasswd = passwdPtr inst.dplanePasswd = passwdPtr
} }
return hc.updateOffldrAddrs(svcKey, addrs, passwdPtr) return errStatus(hc.updateOffldrAddrs(svcKey, addrs, passwdPtr))
} }
// SetDataplaneSecret stores the secret to be used as the Basic Auth // SetDataplaneSecret stores the secret to be used as the Basic Auth
...@@ -753,12 +813,12 @@ func (hc *Controller) SetDataplaneSecret(key string, secret []byte) { ...@@ -753,12 +813,12 @@ func (hc *Controller) SetDataplaneSecret(key string, secret []byte) {
// used to authorize use of the dataplane API for the TLS offloader // used to authorize use of the dataplane API for the TLS offloader
// designated by SetOffldSecret. SetDataplaneSecret(), in turns, sets // designated by SetOffldSecret. SetDataplaneSecret(), in turns, sets
// the secret contents for secretKey. // the secret contents for secretKey.
func (hc *Controller) SetOffldSecret(svcKey, secretKey string) error { func (hc *Controller) SetOffldSecret(svcKey, secretKey string) {
svc, ok := hc.svcs[svcKey] svc, ok := hc.svcs[svcKey]
if !ok { if !ok {
hc.log.Warnf("Cannot set secret %s for offloader %s: "+ hc.log.Warnf("Cannot set secret %s for offloader %s: "+
"offloader not found", secretKey, svcKey) "offloader not found", secretKey, svcKey)
return nil return
} }
svc.secrName = secretKey svc.secrName = secretKey
if secret, ok := hc.secrets[secretKey]; ok { if secret, ok := hc.secrets[secretKey]; ok {
...@@ -766,7 +826,7 @@ func (hc *Controller) SetOffldSecret(svcKey, secretKey string) error { ...@@ -766,7 +826,7 @@ func (hc *Controller) SetOffldSecret(svcKey, secretKey string) error {
inst.dplane.password = *secret inst.dplane.password = *secret
} }
} }
return nil return
} }
// DeleteDataplaneSecret removes the Secret designated by name. // DeleteDataplaneSecret removes the Secret designated by name.
......
/*-
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package update defines the Status type, which classifies the result
// of a synchronization operation by the controller. This
// distinguishes forms of success or failure after an
// Add/Update/Delete notification for the resource types that the
// controller watches, and determeins furher actions such as error
// handling, logging and event generation.
package update
import "fmt"
// StatusType is the classifier for the result of a synchronization.
type StatusType uint8
const (
// Noop indicates that no change in the cluster was
// necessary. The result is logged, but no further action is
// taken.
Noop StatusType = iota
// Success indicates that a change in the cluster was
// successfully executed. The result is logged, and an event
// is generated for the resource that was synchronized.
Success
// Fatal indicates an unrecoverable error, which cannot be
// compensated by retrying the same synchronization. For
// example, an invalid configuration is unrecoverable, if it
// can never become valid until it is changed. This result is
// logged, a warning event is generated, and the operation is
// never retried.
Fatal
// Recoverable indicates an error that might be compensated
// with a retry. For example, network errors in the cluster
// such as a timeouts may be temporary, and hence may succeed
// on retry. This result is logged, a warning event is
// generated, and the sync operation is re-queued with a rate
// limiting delay (likely an exponential backoff, as scheduled
// by the client-go workqueue).
Recoverable
// Incomplete indicates that a cluster change is necessary,
// but the controller does not currently have all of the
// information that it requires. For example, a Service
// configuration may have been added, but the Endpoints are
// not yet known (the Service's Pods might still be starting
// up). This result is logged, a warning event is generated,
// and the sync operation is scheduled for retry after a brief
// delay (default 5s), since the missing info commonly becomes
// available within a brief time.
Incomplete
)
func (t StatusType) String() string {
switch t {
case Noop:
return "no cluster change necessary"
case Success:
return "successfully synced"
case Fatal:
return "unrecoverable error"
case Recoverable:
return "recoverable error"
case Incomplete:
return "insufficient info for necessary cluster change"
default:
return "UNKNOWN SYNC STATUS"
}
}
// Reason returns a string suitable as the reason string for an event
// after a sync returns the given status type.
//
// Success: SyncSuccess
// Fatal: SyncFatalError
// Recoverable: SyncRecoverableError
// Incomplete: SyncIncomplete
//
// Not valid for the Noop type, since no event is generated for Noop.
func (t StatusType) Reason() string {
switch t {
case Success:
return "SyncSuccess"
case Fatal:
return "SyncFatalError"
case Recoverable:
return "SyncRecoverableError"
case Incomplete:
return "SyncIncomplete"
default:
panic("illegal StatusType for Reason()")
}
}
// Status encapsulates the result of a synchronization operation,
// including its type and an optional detail message (an error message
// for the error types).
type Status struct {
Msg string
Type StatusType
}
func (status Status) String() string {
if status.Msg == "" {
return status.Type.String()
}
return status.Type.String() + ": " + status.Msg
}
// Reason returns a string suitable as the reason string for an event
// after a sync returns. Identical to Reason() for the given type.
func (status Status) Reason() string {
return status.Type.Reason()
}
// IsError returns true iff the Status has one of the error types:
// Fatal, Recoverable or Incomplete.
func (status Status) IsError() bool {
switch status.Type {
case Noop:
return false
case Success:
return false
case Fatal:
return true
case Recoverable:
return true
case Incomplete:
return true
default:
return true
}
}
func (status Status) Error() string {
return status.String()
}
// Make is a convenience function to create a Status of the given
// type, setting the Msg to a formatted string.
func Make(t StatusType, format string, args ...interface{}) Status {
return Status{
Msg: fmt.Sprintf(format, args...),
Type: t,
}
}
// MakeNoop creates a Noop Status with a formatted string for the Msg.
func MakeNoop(format string, args ...interface{}) Status {
return Make(Noop, format, args...)
}
// MakeSuccess creates a Success Status with a formatted string for
// the Msg.
func MakeSuccess(format string, args ...interface{}) Status {
return Make(Success, format, args...)
}
// MakeFatal creates a Fatal Status with a formatted string for the
// Msg.
func MakeFatal(format string, args ...interface{}) Status {
return Make(Fatal, format, args...)
}
// MakeRecoverable creates a Recoverable Status with a formatted
// string for the Msg.
func MakeRecoverable(format string, args ...interface{}) Status {
return Make(Recoverable, format, args...)
}
// MakeIncomplete creates an Incomplete Status with a formatted string
// for the Msg.
func MakeIncomplete(format string, args ...interface{}) Status {
return Make(Incomplete, format, args...)
}
...@@ -185,10 +185,16 @@ func (vc *Controller) monitor(monitorIntvl time.Duration) { ...@@ -185,10 +185,16 @@ func (vc *Controller) monitor(monitorIntvl time.Duration) {
} }
if err := vc.updateVarnishSvc(svcName); err != nil { if err := vc.updateVarnishSvc(svcName); err != nil {
vc.errorEvt(svcName, updateErr, if status := errStatus(err); status.IsError() {
"Errors updating Varnish "+ vc.errorEvt(svcName, updateErr,
"Service %s: %+v", svcName, err) "Errors updating Varnish "+
good = false "Service %s: %+v",
svcName, err)
good = false
} else {
vc.log.Infof("Varnish Service %s: %s",
svcName, status)
}
} }
if good { if good {
vc.infoEvt(svcName, monitorGood, vc.infoEvt(svcName, monitorGood,
......
...@@ -45,6 +45,7 @@ import ( ...@@ -45,6 +45,7 @@ import (
"time" "time"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/interfaces" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/interfaces"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/update"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
"code.uplex.de/uplex-varnish/varnishapi/pkg/admin" "code.uplex.de/uplex-varnish/varnishapi/pkg/admin"
...@@ -105,6 +106,52 @@ func (vadmErrs AdmErrors) Error() string { ...@@ -105,6 +106,52 @@ func (vadmErrs AdmErrors) Error() string {
return sb.String() return sb.String()
} }
func (vadmErrs AdmErrors) status() update.Status {
if vadmErrs == nil {
return update.MakeSuccess("")
}
hasstatus, success, recoverable := false, false, false
for _, e := range vadmErrs {
status, ok := e.err.(update.Status)
if !ok {
continue
}
hasstatus = true
switch status.Type {
case update.Fatal:
return update.MakeFatal("%v", vadmErrs)
case update.Incomplete:
return update.MakeIncomplete("%v", vadmErrs)
case update.Recoverable:
recoverable = true
case update.Success:
success = true
}
}
if !hasstatus || recoverable {
return update.MakeRecoverable("%v", vadmErrs)
}
if success {
return update.MakeSuccess("%v", vadmErrs)
}
return update.MakeNoop("%v", vadmErrs)
}
func errStatus(err error) update.Status {
status, ok := err.(update.Status)
if ok {
return status
}
errs, ok := err.(AdmErrors)
if !ok {
if err != nil {
return update.MakeRecoverable("%v", err)
}
return update.MakeSuccess("")
}
return errs.status()
}
// Meta encapsulates meta-data for the resource types that enter into // Meta encapsulates meta-data for the resource types that enter into
// a Varnish configuration: Ingress, VarnishConfig and BackendConfig. // a Varnish configuration: Ingress, VarnishConfig and BackendConfig.
// //
...@@ -264,6 +311,11 @@ func (vc *Controller) updateVarnishInstance(inst *varnishInst, cfgName string, ...@@ -264,6 +311,11 @@ func (vc *Controller) updateVarnishInstance(inst *varnishInst, cfgName string,
vc.log.Tracef("Error loading config %s at %s: %v", vc.log.Tracef("Error loading config %s at %s: %v",
cfgName, inst.addr, err) cfgName, inst.addr, err)
metrics.vclLoadErrs.Inc() metrics.vclLoadErrs.Inc()
if resp, ok := err.(admin.UnexpectedResponse); ok {
if resp.Response.Code < admin.OK {
return update.MakeFatal("%v", resp)
}
}
return err return err
} }
metrics.vclLoads.Inc() metrics.vclLoads.Inc()
...@@ -309,18 +361,18 @@ func (vc *Controller) updateVarnishSvc(name string) error { ...@@ -309,18 +361,18 @@ func (vc *Controller) updateVarnishSvc(name string) error {
vc.log.Tracef("Update Varnish svc %s: config=%+v", name, *svc) vc.log.Tracef("Update Varnish svc %s: config=%+v", name, *svc)
svc.cfgLoaded = false svc.cfgLoaded = false
if svc.secrName == "" { if svc.secrName == "" {
return fmt.Errorf("No known admin secret for Varnish Service "+ return update.MakeIncomplete(
"%s", name) "No known admin secret for Varnish Service %s", name)
} }
if svc.spec == nil { if svc.spec == nil {
vc.log.Infof("Update Varnish Service %s: Currently no Ingress"+ return update.MakeNoop(
" defined", name) "Update Varnish Service %s: Currently no Ingress "+
return nil "defined", name)
} }
vclSrc, err := svc.spec.spec.GetSrc() vclSrc, err := svc.spec.spec.GetSrc()
if err != nil { if err != nil {
return err return update.MakeFatal("%v", err)
} }
cfgName := svc.spec.configName() cfgName := svc.spec.configName()
...@@ -522,9 +574,12 @@ func (vc *Controller) updateVarnishSvcAddrs(key string, addrs []vcl.Address, ...@@ -522,9 +574,12 @@ func (vc *Controller) updateVarnishSvcAddrs(key string, addrs []vcl.Address,
// Service // Service
// loadVCL: true if the VCL config for the Service should be // loadVCL: true if the VCL config for the Service should be
// reloaded // reloaded
func (vc *Controller) AddOrUpdateVarnishSvc(key string, addrs []vcl.Address, func (vc *Controller) AddOrUpdateVarnishSvc(
secrName string, loadVCL bool) error { key string,
addrs []vcl.Address,
secrName string,
loadVCL bool,
) update.Status {
var secrPtr *[]byte var secrPtr *[]byte
svc, svcExists := vc.svcs[key] svc, svcExists := vc.svcs[key]
if !svcExists { if !svcExists {
...@@ -567,18 +622,18 @@ func (vc *Controller) AddOrUpdateVarnishSvc(key string, addrs []vcl.Address, ...@@ -567,18 +622,18 @@ func (vc *Controller) AddOrUpdateVarnishSvc(key string, addrs []vcl.Address,
} else { } else {
vc.log.Trace("secret is nil") vc.log.Trace("secret is nil")
} }
return vc.updateVarnishSvcAddrs(key, addrs, secrPtr, loadVCL) return errStatus(vc.updateVarnishSvcAddrs(key, addrs, secrPtr, loadVCL))
} }
// DeleteVarnishSvc is called on the Delete event for the Varnish // DeleteVarnishSvc is called on the Delete event for the Varnish
// Service identified by the namespace/name key. The Varnish instance // Service identified by the namespace/name key. The Varnish instance
// is set to the unready state, and no further action is taken (other // is set to the unready state, and no further action is taken (other
// resources in the cluster may shut down the Varnish instances). // resources in the cluster may shut down the Varnish instances).
func (vc *Controller) DeleteVarnishSvc(key string) error { func (vc *Controller) DeleteVarnishSvc(key string) update.Status {
vc.log.Trace("DeleteVarnishSvc: key=", key) vc.log.Trace("DeleteVarnishSvc: key=", key)
svc, ok := vc.svcs[key] svc, ok := vc.svcs[key]
if !ok { if !ok {
return nil return update.MakeNoop("No Varnish Service %s", key)
} }
vc.log.Tracef("DeleteVarnishSvc: svc=%+v", svc) vc.log.Tracef("DeleteVarnishSvc: svc=%+v", svc)
err := vc.removeVarnishInstances(svc.instances) err := vc.removeVarnishInstances(svc.instances)
...@@ -588,7 +643,7 @@ func (vc *Controller) DeleteVarnishSvc(key string) error { ...@@ -588,7 +643,7 @@ func (vc *Controller) DeleteVarnishSvc(key string) error {
svcsGauge.Dec() svcsGauge.Dec()
} }
vc.log.Tracef("DeleteVarnishSvc map after delete = %+v", vc.svcs) vc.log.Tracef("DeleteVarnishSvc map after delete = %+v", vc.svcs)
return err return errStatus(err)
} }
func (vc *Controller) updateBeGauges() { func (vc *Controller) updateBeGauges() {
...@@ -616,10 +671,14 @@ func (vc *Controller) updateBeGauges() { ...@@ -616,10 +671,14 @@ func (vc *Controller) updateBeGauges() {
// ingsMeta: Ingress meta-data // ingsMeta: Ingress meta-data
// vcfgMeta: VarnishConfig meta-data // vcfgMeta: VarnishConfig meta-data
// bcfgMeta: BackendConfig meta-data // bcfgMeta: BackendConfig meta-data
func (vc *Controller) Update(svcKey string, spec vcl.Spec, addrs []vcl.Address, func (vc *Controller) Update(
ingsMeta map[string]Meta, vcfgMeta Meta, svcKey string,
bcfgMeta map[string]Meta) error { spec vcl.Spec,
addrs []vcl.Address,
ingsMeta map[string]Meta,
vcfgMeta Meta,
bcfgMeta map[string]Meta,
) update.Status {
var secrPtr *[]byte var secrPtr *[]byte
svc, exists := vc.svcs[svcKey] svc, exists := vc.svcs[svcKey]
if !exists { if !exists {
...@@ -647,8 +706,9 @@ func (vc *Controller) Update(svcKey string, spec vcl.Spec, addrs []vcl.Address, ...@@ -647,8 +706,9 @@ func (vc *Controller) Update(svcKey string, spec vcl.Spec, addrs []vcl.Address,
vc.updateBeGauges() vc.updateBeGauges()
if len(svc.instances) == 0 { if len(svc.instances) == 0 {
return fmt.Errorf("Currently no known endpoints for Varnish "+ return update.MakeIncomplete(
"service %s", svcKey) "Currently no known endpoints for Varnish service %s",
svcKey)
} }
if _, exists := vc.secrets[svc.secrName]; exists { if _, exists := vc.secrets[svc.secrName]; exists {
...@@ -659,7 +719,7 @@ func (vc *Controller) Update(svcKey string, spec vcl.Spec, addrs []vcl.Address, ...@@ -659,7 +719,7 @@ func (vc *Controller) Update(svcKey string, spec vcl.Spec, addrs []vcl.Address,
} else { } else {
secrPtr = nil secrPtr = nil
} }
return vc.updateVarnishSvcAddrs(svcKey, addrs, secrPtr, true) return errStatus(vc.updateVarnishSvcAddrs(svcKey, addrs, secrPtr, true))
} }
// SetNotReady may be called on the Delete event on an Ingress, if no // SetNotReady may be called on the Delete event on an Ingress, if no
...@@ -794,7 +854,9 @@ func (vc *Controller) SetAdmSecret(key string, secret []byte) { ...@@ -794,7 +854,9 @@ func (vc *Controller) SetAdmSecret(key string, secret []byte) {
// UpdateSvcForSecret associates the Secret identified by the // UpdateSvcForSecret associates the Secret identified by the
// namespace/name secretKey with the Varnish Service identified by the // namespace/name secretKey with the Varnish Service identified by the
// namespace/name svcKey. The Service is newly synced if necessary. // namespace/name svcKey. The Service is newly synced if necessary.
func (vc *Controller) UpdateSvcForSecret(svcKey, secretKey string) error { func (vc *Controller) UpdateSvcForSecret(
svcKey, secretKey string,
) update.Status {
secret, exists := vc.secrets[secretKey] secret, exists := vc.secrets[secretKey]
if !exists { if !exists {
secretKey = "" secretKey = ""
...@@ -803,9 +865,9 @@ func (vc *Controller) UpdateSvcForSecret(svcKey, secretKey string) error { ...@@ -803,9 +865,9 @@ func (vc *Controller) UpdateSvcForSecret(svcKey, secretKey string) error {
svc, exists := vc.svcs[svcKey] svc, exists := vc.svcs[svcKey]
if !exists { if !exists {
if secret == nil { if secret == nil {
vc.log.Infof("Neither Varnish Service %s nor secret "+ return update.MakeNoop(
"%s found", svcKey, secretKey) "Neither Varnish Service %s nor secret %s found",
return nil svcKey, secretKey)
} }
vc.log.Infof("Creating Varnish Service %s to set secret %s", vc.log.Infof("Creating Varnish Service %s to set secret %s",
svcKey, secretKey) svcKey, secretKey)
...@@ -822,7 +884,7 @@ func (vc *Controller) UpdateSvcForSecret(svcKey, secretKey string) error { ...@@ -822,7 +884,7 @@ func (vc *Controller) UpdateSvcForSecret(svcKey, secretKey string) error {
vc.log.Infof("Updating Service %s after setting secret %s", svcKey, vc.log.Infof("Updating Service %s after setting secret %s", svcKey,
secretKey) secretKey)
return vc.updateVarnishSvc(svcKey) return errStatus(vc.updateVarnishSvc(svcKey))
} }
// DeleteAdmSecret removes the secret identified by the namespace/name // DeleteAdmSecret removes the secret identified by the namespace/name
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment