Commit 9d6e4837 authored by Tim Leers's avatar Tim Leers Committed by Geoff Simmons

Fix `close a closed channel` panic on shutdown

parent 05b6cabc
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
package controller package controller
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"time" "time"
...@@ -109,7 +110,8 @@ type IngressController struct { ...@@ -109,7 +110,8 @@ type IngressController struct {
informers *infrmrs informers *infrmrs
listers *Listers listers *Listers
nsQs *NamespaceQueues nsQs *NamespaceQueues
stopCh chan struct{} ctx context.Context
cancel context.CancelFunc
recorder record.EventRecorder recorder record.EventRecorder
} }
...@@ -135,7 +137,6 @@ func NewIngressController( ...@@ -135,7 +137,6 @@ func NewIngressController(
ingc := IngressController{ ingc := IngressController{
log: log, log: log,
client: kubeClient, client: kubeClient,
stopCh: make(chan struct{}),
} }
InitMetrics() InitMetrics()
...@@ -323,15 +324,16 @@ func (ingc *IngressController) updateObj(old, new interface{}) { ...@@ -323,15 +324,16 @@ func (ingc *IngressController) updateObj(old, new interface{}) {
// the controller is ready (after informers have launched). // the controller is ready (after informers have launched).
func (ingc *IngressController) Run(readyFile string, metricsPort uint16) { func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer ingc.nsQs.Stop()
ingc.ctx, ingc.cancel = context.WithCancel(context.Background())
ingc.log.Info("Launching informers") ingc.log.Info("Launching informers")
go ingc.informers.ing.Run(ingc.stopCh) go ingc.informers.ing.Run(ingc.ctx.Done())
go ingc.informers.svc.Run(ingc.stopCh) go ingc.informers.svc.Run(ingc.ctx.Done())
go ingc.informers.endp.Run(ingc.stopCh) go ingc.informers.endp.Run(ingc.ctx.Done())
go ingc.informers.secr.Run(ingc.stopCh) go ingc.informers.secr.Run(ingc.ctx.Done())
go ingc.informers.vcfg.Run(ingc.stopCh) go ingc.informers.vcfg.Run(ingc.ctx.Done())
go ingc.informers.bcfg.Run(ingc.stopCh) go ingc.informers.bcfg.Run(ingc.ctx.Done())
ingc.log.Infof("Starting metrics listener at port %d", metricsPort) ingc.log.Infof("Starting metrics listener at port %d", metricsPort)
go ServeMetrics(ingc.log, metricsPort) go ServeMetrics(ingc.log, metricsPort)
...@@ -355,7 +357,7 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) { ...@@ -355,7 +357,7 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
} }
ingc.log.Info("Waiting for caches to sync") ingc.log.Info("Waiting for caches to sync")
if ok := cache.WaitForCacheSync(ingc.stopCh, if ok := cache.WaitForCacheSync(ingc.ctx.Done(),
ingc.informers.ing.HasSynced, ingc.informers.ing.HasSynced,
ingc.informers.svc.HasSynced, ingc.informers.svc.HasSynced,
ingc.informers.endp.HasSynced, ingc.informers.endp.HasSynced,
...@@ -369,16 +371,16 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) { ...@@ -369,16 +371,16 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
} }
ingc.log.Info("Caches synced, running workers") ingc.log.Info("Caches synced, running workers")
go wait.Until(ingc.nsQs.Run, time.Second, ingc.stopCh) go wait.Until(ingc.nsQs.Run, time.Second, ingc.ctx.Done())
<-ingc.stopCh <-ingc.ctx.Done()
ingc.log.Info("Stop received for Controller")
} }
// Stop the Ingress controller -- signal the workers to stop. // Stop the Ingress controller -- signal the workers to stop.
func (ingc *IngressController) Stop() { func (ingc *IngressController) Stop() {
ingc.stopCh <- struct{}{}
ingc.log.Info("Shutting down workers") ingc.log.Info("Shutting down workers")
close(ingc.stopCh) ingc.cancel()
<-ingc.nsQs.DoneChan ingc.nsQs.Stop()
ingc.log.Info("Controller exiting") ingc.log.Info("Controller exiting")
} }
...@@ -269,7 +269,6 @@ func (worker *NamespaceWorker) work() { ...@@ -269,7 +269,6 @@ func (worker *NamespaceWorker) work() {
// responsible for each namespace. // responsible for each namespace.
type NamespaceQueues struct { type NamespaceQueues struct {
Queue workqueue.RateLimitingInterface Queue workqueue.RateLimitingInterface
DoneChan chan struct{}
ingClass string ingClass string
log *logrus.Logger log *logrus.Logger
vController *varnish.Controller vController *varnish.Controller
...@@ -302,7 +301,6 @@ func NewNamespaceQueues( ...@@ -302,7 +301,6 @@ func NewNamespaceQueues(
workqueue.DefaultControllerRateLimiter(), "_ALL_") workqueue.DefaultControllerRateLimiter(), "_ALL_")
return &NamespaceQueues{ return &NamespaceQueues{
Queue: q, Queue: q,
DoneChan: make(chan struct{}),
log: log, log: log,
ingClass: ingClass, ingClass: ingClass,
vController: vController, vController: vController,
...@@ -393,5 +391,4 @@ func (qs *NamespaceQueues) Stop() { ...@@ -393,5 +391,4 @@ func (qs *NamespaceQueues) Stop() {
} }
qs.log.Info("Waiting for workers to shut down") qs.log.Info("Waiting for workers to shut down")
qs.wg.Wait() qs.wg.Wait()
close(qs.DoneChan)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment