Commit 6566cc8e authored by Geoff Simmons's avatar Geoff Simmons

Controller watches namespaces, and stops the queue/worker on ns delete.

parent f2885268
......@@ -14,14 +14,8 @@ rules:
resources:
- services
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
- namespaces
verbs:
- get
- list
......
......@@ -17,14 +17,8 @@ rules:
resources:
- services
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
- namespaces
verbs:
- get
- list
......
......@@ -8,14 +8,8 @@ rules:
resources:
- services
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
- namespaces
verbs:
- get
- list
......
......@@ -9,14 +9,8 @@ rules:
resources:
- services
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
- namespaces
verbs:
- get
- list
......
......@@ -67,6 +67,7 @@ type infrmrs struct {
tsecr cache.SharedIndexInformer
vcfg cache.SharedIndexInformer
bcfg cache.SharedIndexInformer
ns cache.SharedIndexInformer
}
// SyncType classifies the sync event, passed through to workers.
......@@ -181,6 +182,7 @@ func NewIngressController(
ing: infFactory.Extensions().V1beta1().Ingresses().Informer(),
svc: infFactory.Core().V1().Services().Informer(),
endp: infFactory.Core().V1().Endpoints().Informer(),
ns: infFactory.Core().V1().Namespaces().Informer(),
vsecr: vsecrInfFactory.Core().V1().Secrets().Informer(),
tsecr: tsecrInfFactory.Core().V1().Secrets().Informer(),
vcfg: vcrInfFactory.Ingress().V1alpha1().VarnishConfigs().
......@@ -203,6 +205,11 @@ func NewIngressController(
ingc.informers.vcfg.AddEventHandler(evtFuncs)
ingc.informers.bcfg.AddEventHandler(evtFuncs)
nsDeleteFunc := cache.ResourceEventHandlerFuncs{
DeleteFunc: ingc.deleteNs,
}
ingc.informers.ns.AddEventHandler(nsDeleteFunc)
ingc.listers = &Listers{
ing: infFactory.Extensions().V1beta1().Ingresses().Lister(),
svc: infFactory.Core().V1().Services().Lister(),
......@@ -263,6 +270,8 @@ func incWatchCounter(obj interface{}, sync string) {
watchCounters.WithLabelValues("Endpoints", sync).Inc()
case *api_v1.Secret:
watchCounters.WithLabelValues("Secret", sync).Inc()
case *api_v1.Namespace:
watchCounters.WithLabelValues("Namespace", sync).Inc()
case *vcr_v1alpha1.VarnishConfig:
watchCounters.WithLabelValues("VarnishConfig", sync).Inc()
case *vcr_v1alpha1.BackendConfig:
......@@ -354,6 +363,17 @@ func (ingc *IngressController) updateObj(old, new interface{}) {
ingc.nsQs.Queue.Add(&SyncObj{Type: Update, Obj: new})
}
func (ingc *IngressController) deleteNs(obj interface{}) {
ingc.logObj("Delete", obj)
incWatchCounter(obj, "Delete")
ns, ok := obj.(*api_v1.Namespace)
if !ok {
ingc.log.Errorf("Delete Namespace, got type %T: %v", obj, obj)
return
}
ingc.nsQs.StopNS(ns.Name)
}
// Run the Ingress controller -- start the informers in goroutines,
// wait for the caches to sync, and call Run() for the
// NamespaceQueues. Then block until Stop() is invoked.
......@@ -373,6 +393,7 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
go ingc.informers.vsecr.Run(ingc.ctx.Done())
go ingc.informers.vcfg.Run(ingc.ctx.Done())
go ingc.informers.bcfg.Run(ingc.ctx.Done())
go ingc.informers.ns.Run(ingc.ctx.Done())
ingc.log.Infof("Starting metrics listener at port %d", metricsPort)
go ServeMetrics(ingc.log, metricsPort)
......@@ -403,7 +424,8 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
ingc.informers.tsecr.HasSynced,
ingc.informers.vsecr.HasSynced,
ingc.informers.vcfg.HasSynced,
ingc.informers.bcfg.HasSynced); !ok {
ingc.informers.bcfg.HasSynced,
ingc.informers.ns.HasSynced); !ok {
err := fmt.Errorf("Failed waiting for caches to sync")
utilruntime.HandleError(err)
......
......@@ -382,6 +382,20 @@ func (qs *NamespaceQueues) next() {
qs.Queue.Forget(obj)
}
// StopNS stops the queue for a namespace and its associated
// worker. The NamespaceWorker object is de-referenced, becoming
// available for garbage collection.
func (qs *NamespaceQueues) StopNS(ns string) {
worker, exists := qs.workers[ns]
if !exists {
return
}
qs.log.Infof("Shutting down queue for namespace: %s", worker.namespace)
worker.queue.ShutDown()
// XXX sync the map
delete(qs.workers, ns)
}
// Run comprises the main loop of the controller, reading from the
// main queue of work items and handing them off to workers for each
// namespace.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment