Commit ec36b39c authored by Geoff Simmons's avatar Geoff Simmons

Fix Service deletion.

- The varnish controller deleted a service from its
map under the wrong conditions.

- Not an error if there are no Endpoints for the
Service when the Delete event is synced -- they
may already be gone.

- Varnish and haproxy controllers delete the service
from their maps before any other sync actions are
taken -- otherwise error returns may prevent the
deletion from ever happening.

- Permanent network errors on attempts to communicate
with the admin interfaces are ignored (do not cause
re-queue), since the instance may be gone.
parent 816cf8dc
......@@ -28,7 +28,10 @@
package controller
import api_v1 "k8s.io/api/core/v1"
import (
"fmt"
api_v1 "k8s.io/api/core/v1"
)
func (worker *NamespaceWorker) syncEndp(key string) error {
worker.log.Infof("Syncing Endpoints: %s/%s", worker.namespace, key)
......@@ -41,7 +44,14 @@ func (worker *NamespaceWorker) syncEndp(key string) error {
return nil
}
if worker.isVarnishIngSvc(svc) {
if eps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if eps == nil {
return fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
} else if isIngress, err := worker.isVarnishIngSvc(svc, eps); err != nil {
return err
} else if isIngress {
worker.log.Infof("Endpoints changed for Varnish Ingress "+
"service %s/%s, enqueuing service sync", svc.Namespace,
svc.Name)
......
......@@ -187,7 +187,11 @@ func (worker *NamespaceWorker) ingBackend2Addrs(namespace string,
}
endps, err := worker.getServiceEndpoints(svc)
if err != nil {
if endps == nil || err != nil {
if endps == nil {
err = fmt.Errorf("could not find endpoints for "+
"service: %s/%s", svc.Namespace, svc.Name)
}
return addrs, fmt.Errorf("Error getting endpoints for service "+
"%v: %v", svc, err)
}
......@@ -389,7 +393,11 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
svc.Namespace, svc.Name)
endps, err := worker.getServiceEndpoints(svc)
if err != nil {
if endps == nil || err != nil {
if endps == nil {
err = fmt.Errorf("could not find endpoints for "+
"service: %s/%s", svc.Namespace, svc.Name)
}
return fmt.Errorf("Error getting endpoints for service %s/%s: "+
"%v", svc.Namespace, svc.Name, err)
}
......
......@@ -44,9 +44,16 @@ const admPortName = "varnishadm"
// isVarnishIngSvc determines if a Service represents a Varnish that
// can implement Ingress, for which this controller is responsible.
// Currently the app label must point to a hardwired name.
func (worker *NamespaceWorker) isVarnishIngSvc(svc *api_v1.Service) bool {
func (worker *NamespaceWorker) isVarnishIngSvc(
svc *api_v1.Service,
endps *api_v1.Endpoints,
) (bool, error) {
if endps == nil {
panic("isVarnishIngSvc(svc, endps == nil")
}
app, exists := svc.Labels[labelKey]
return exists && app == labelVal
isVingSvc := exists && app == labelVal
return isVingSvc, nil
}
func (worker *NamespaceWorker) getIngsForSvc(
......@@ -129,7 +136,14 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
if err != nil {
return err
}
if !worker.isVarnishIngSvc(svc) {
if eps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if eps == nil {
return fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
} else if isIngress, err := worker.isVarnishIngSvc(svc, eps); err != nil {
return err
} else if !isIngress {
return worker.enqueueIngressForService(svc)
}
......@@ -178,6 +192,10 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
}
worker.log.Tracef("Varnish service %s/%s endpoints: %+v", svc.Namespace,
svc.Name, endps)
if endps == nil {
return fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
}
// Get the secret name and admin port for the service. We have
// to retrieve a Pod spec for the service, then look for the
......@@ -263,10 +281,24 @@ func (worker *NamespaceWorker) deleteSvc(obj interface{}) error {
return nil
}
nsKey := svc.Namespace + "/" + svc.Name
worker.log.Info("Deleting Service:", nsKey)
if !worker.isVarnishIngSvc(svc) {
worker.log.Info("Deleting Service: ", nsKey)
if worker.vController.HasVarnishSvc(nsKey) {
if err := worker.vController.DeleteVarnishSvc(nsKey); err != nil {
return err
}
}
if endps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if endps == nil {
worker.log.Warnf("Service %s: Endpoints already deleted", nsKey)
return nil
} else if isIngress, err := worker.isVarnishIngSvc(svc, endps); err != nil {
return err
} else if !isIngress {
return worker.enqueueIngressForService(svc)
}
return worker.vController.DeleteVarnishSvc(nsKey)
return nil
}
......@@ -69,8 +69,6 @@ func (worker *NamespaceWorker) getServiceEndpoints(
return ep, nil
}
}
err = fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
return
}
......
......@@ -172,6 +172,7 @@ func (vc *Controller) monitor(monitorIntvl time.Duration) {
for {
time.Sleep(monitorIntvl)
vc.log.Tracef("monitor: vc.svcs = %+v", vc.svcs)
for svcName, svc := range vc.svcs {
vc.log.Infof("Monitoring Varnish instances in %s",
svcName)
......
......@@ -36,6 +36,7 @@ package varnish
import (
"fmt"
"io"
"net"
"os"
"reflect"
"regexp"
......@@ -192,6 +193,11 @@ func (vc *Controller) EvtGenerator(svcEvt interfaces.SvcEventGenerator) {
vc.svcEvt = svcEvt
}
func (vc *Controller) HasVarnishSvc(svcKey string) bool {
_, ok := vc.svcs[svcKey]
return ok
}
// Start initiates the Varnish controller and starts the monitor
// goroutine.
func (vc *Controller) Start() {
......@@ -392,6 +398,18 @@ func (vc *Controller) setCfgLabel(inst *varnishInst, cfg, lbl string,
return nil
}
// Ignore permanent network errors on syncs for deletion -- the
// Varnish instance may already be gone.
func (vc *Controller) ignorePermNetErr(err error) bool {
vc.log.Debugf("checking error type %T: %+v", err, err)
neterr, ok := err.(net.Error)
if !ok || !neterr.Temporary() {
return false
}
vc.log.Warnf("Ignoring permanent network error: %+v", err)
return true
}
// On Delete for a Varnish instance, we set it to the unready state.
func (vc *Controller) removeVarnishInstances(insts []*varnishInst) error {
var errs AdmErrors
......@@ -401,6 +419,9 @@ func (vc *Controller) removeVarnishInstances(insts []*varnishInst) error {
if err := vc.setCfgLabel(inst, notAvailCfg, readinessLabel,
true); err != nil {
if vc.ignorePermNetErr(err) {
continue
}
admErr := AdmError{addr: inst.addr, err: err}
errs = append(errs, admErr)
continue
......@@ -553,15 +574,19 @@ func (vc *Controller) AddOrUpdateVarnishSvc(key string, addrs []vcl.Address,
// is set to the unready state, and no further action is taken (other
// resources in the cluster may shut down the Varnish instances).
func (vc *Controller) DeleteVarnishSvc(key string) error {
vc.log.Trace("DeleteVarnishSvc: key=", key)
svc, ok := vc.svcs[key]
if !ok {
return nil
}
vc.log.Tracef("DeleteVarnishSvc: svc=%+v", svc)
err := vc.removeVarnishInstances(svc.instances)
if err != nil {
vc.log.Tracef("DeleteVarnishSvc map before delete = %+v", vc.svcs)
if err == nil {
delete(vc.svcs, key)
svcsGauge.Dec()
}
vc.log.Tracef("DeleteVarnishSvc map after delete = %+v", vc.svcs)
return err
}
......@@ -633,6 +658,9 @@ func (vc *Controller) SetNotReady(svcKey string) error {
if err := vc.setCfgLabel(inst, notAvailCfg, readinessLabel,
false); err != nil {
if vc.ignorePermNetErr(err) {
continue
}
admErr := AdmError{
addr: inst.addr,
err: err,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment