Commit 609c7767 authored by Geoff Simmons's avatar Geoff Simmons

Bugfix: update Varnish services when the endpoints change.

parent e401ed85
......@@ -241,6 +241,20 @@ func NewIngressController(log *logrus.Logger, kubeClient kubernetes.Interface,
addEndp := obj.(*api_v1.Endpoints)
ingc.log.Debug("endpHandler.AddFunc:", addEndp)
ingc.log.Info("Adding endpoints:", addEndp.Name)
// If this is an Endpoint for a Varnish admin
// service, then handle the service instead.
svc, ok, _, err := ingc.getSvcForEndp(addEndp)
if err != nil {
ingc.log.Errorf("Error getting service for "+
"endpoint %s: %v", addEndp.Name)
return
}
if ok && ingc.isVarnishAdmSvc(svc, namespace) {
ingc.syncQueue.enqueue(svc)
return
}
ingc.syncQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
......@@ -266,6 +280,18 @@ func NewIngressController(log *logrus.Logger, kubeClient kubernetes.Interface,
return
}
}
svc, ok, _, err := ingc.getSvcForEndp(remEndp)
if err != nil {
ingc.log.Errorf("Error getting service for "+
"endpoint %s: %v", remEndp.Name)
return
}
if ok && ingc.isVarnishAdmSvc(svc, namespace) {
ingc.syncQueue.enqueue(svc)
return
}
ingc.log.Info("Removing endpoints:", remEndp.Name)
ingc.syncQueue.enqueue(obj)
},
......@@ -276,10 +302,22 @@ func NewIngressController(log *logrus.Logger, kubeClient kubernetes.Interface,
if !reflect.DeepEqual(oldEps.Subsets, curEps.Subsets) {
ingc.log.Infof("Endpoints %v changed, syncing",
cur.(*api_v1.Endpoints).Name)
svc, ok, _, err := ingc.getSvcForEndp(curEps)
if err != nil {
ingc.log.Errorf("Error getting "+
"service for endpoint %s: %v",
curEps.Name)
return
}
if ok && ingc.isVarnishAdmSvc(svc, namespace) {
ingc.syncQueue.enqueue(svc)
return
}
ingc.syncQueue.enqueue(cur)
} else {
ingc.log.Info("Update Endpoints: No change")
return
}
ingc.log.Info("Update Endpoints: No change")
},
}
ingc.endpLister.Store, ingc.endpController = cache.NewInformer(
......@@ -439,6 +477,7 @@ func (ingc *IngressController) sync(task Task) {
switch task.Kind {
case Ingress:
ingc.syncIng(task)
return
case Endpoints:
ingc.syncEndp(task)
return
......@@ -494,18 +533,29 @@ func (ingc *IngressController) getIngForSvc(svc *api_v1.Service) []extensions.In
return ings
}
func (ingc *IngressController) getSvcForEndp(endp *api_v1.Endpoints) (*api_v1.Service, bool, string, error) {
svcKey := endp.GetNamespace() + "/" + endp.GetName()
svcObj, svcExists, err := ingc.svcLister.GetByKey(svcKey)
if err != nil || !svcExists {
return nil, svcExists, svcKey, err
}
svc, ok := svcObj.(*api_v1.Service)
if !ok {
return nil, svcExists, svcKey, err
}
return svc, svcExists, svcKey, nil
}
func (ingc *IngressController) getIngForEndp(obj interface{}) []extensions.Ingress {
var ings []extensions.Ingress
endp := obj.(*api_v1.Endpoints)
svcKey := endp.GetNamespace() + "/" + endp.GetName()
svcObj, svcExists, err := ingc.svcLister.GetByKey(svcKey)
svc, svcExists, svcKey, err := ingc.getSvcForEndp(endp)
if err != nil {
ingc.log.Errorf("Getting service %v from the cache: %v", svcKey,
err)
} else {
if svcExists {
ings = append(ings,
ingc.getIngForSvc(svcObj.(*api_v1.Service))...)
ings = append(ings, ingc.getIngForSvc(svc)...)
}
}
return ings
......
......@@ -303,8 +303,8 @@ func (vc *VarnishController) updateVarnishSvc(key string,
var errs VarnishAdmErrors
var newSvcs, remSvcs, keepSvcs []*varnishSvc
var updateAddrs map[string]struct{}
var prevAddrs map[string]*varnishSvc
updateAddrs := make(map[string]struct{})
prevAddrs := make(map[string]*varnishSvc)
for _, addr := range addrs {
key := addr.IP + ":" + strconv.Itoa(int(addr.Port))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment