Commit 30f73f01 authored by Geoff Simmons's avatar Geoff Simmons

Wait for queues and workers to shut down before exiting.

This has exposed k8s issue #59822 when we use client-go v6.0.0 for
compatibility with k8s 1.9, when shared informers are stopped as
part of queue shutdown:

https://github.com/kubernetes/kubernetes/issues/59822

This upgrades to use of client-go v6.0.1, but evidently only has the
effect that the problem is intermittent. Occasionally there are panics
due to double channel close. The guarantee of sequential delivery
by a shared informer may also not be honored.

We go forward with these issues, since 1.9 is already out of support,
and will probably start supporting newer k8s versions in the near
term.

Ref #6
parent a2b2ffc8
......@@ -235,12 +235,12 @@ func handleTermination(
log.Infof("Received signal (%s), shutting down", sig.String())
}
log.Info("Shutting down informers")
informerStop <- struct{}{}
log.Info("Shutting down the ingress controller")
ingc.Stop()
log.Info("Shutting down informers")
informerStop <- struct{}{}
if !exited {
log.Info("Shutting down the Varnish controller")
vc.Quit()
......
......@@ -25,16 +25,12 @@ require (
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.2.0
github.com/spf13/pflag v1.0.3 // indirect
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
golang.org/x/tools v0.0.0-20181221235234-d00ac6d27372 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
k8s.io/api v0.0.0-20181221193117-173ce66c1e39
k8s.io/apimachinery v0.0.0-20180925215425-1926e7bb5c13
k8s.io/client-go v6.0.0+incompatible
k8s.io/code-generator v0.0.0-20180510141822-0ab89e584187 // indirect
k8s.io/gengo v0.0.0-20181113154421-fd15ee9cc2f7 // indirect
k8s.io/klog v0.1.0 // indirect
k8s.io/client-go v6.0.1-0.20180515144434-1692bdde78a6+incompatible
k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be // indirect
)
......@@ -84,25 +84,24 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20181221235234-d00ac6d27372 h1:zWPUEY/PjVHT+zO3L8OfkjrtIjf55joTxn/RQP/AjOI=
golang.org/x/tools v0.0.0-20181221235234-d00ac6d27372/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
......@@ -118,14 +117,7 @@ k8s.io/api v0.0.0-20181221193117-173ce66c1e39 h1:iGq7zEPXFb0IeXAQK5RiYT1SVKX/af9
k8s.io/api v0.0.0-20181221193117-173ce66c1e39/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=
k8s.io/apimachinery v0.0.0-20180925215425-1926e7bb5c13 h1:XhNQCG3WplXZqDfte+QiEYqa6BrV3j81XdFy3jxNI1k=
k8s.io/apimachinery v0.0.0-20180925215425-1926e7bb5c13/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0=
k8s.io/apimachinery v0.0.0-20181222072933-b814ad55d7c5 h1:96AWA6L4KjeldDMhG9639vpZ8VBQll1CgSAjCfEdFGI=
k8s.io/client-go v6.0.0+incompatible h1:kskJ2eeKafKUmOhH9xa02lGTs96aoQi3ZKyzIo62sJg=
k8s.io/client-go v6.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/code-generator v0.0.0-20180510141822-0ab89e584187 h1:9okhnIBEqdPBTimd5LOenYh+TLynwjIZ/znXMKXM3RA=
k8s.io/code-generator v0.0.0-20180510141822-0ab89e584187/go.mod h1:MYiN+ZJZ9HkETbgVZdWw2AsuAi9PZ4V80cwfuf2axe8=
k8s.io/gengo v0.0.0-20181113154421-fd15ee9cc2f7 h1:zjNgw2qqBQmKd0S59lGZBQqFxJqUZroVbDphfnVm5do=
k8s.io/gengo v0.0.0-20181113154421-fd15ee9cc2f7/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.1.0 h1:I5HMfc/DtuVaGR1KPwUrTc476K8NCqNBldC7H4dYEzk=
k8s.io/klog v0.1.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/client-go v6.0.1-0.20180515144434-1692bdde78a6+incompatible h1:QUglh8NubUsokcy6HJuwxOLPL60N77/it79BA+qRJRk=
k8s.io/client-go v6.0.1-0.20180515144434-1692bdde78a6+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be h1:aWEq4nbj7HRJ0mtKYjNSk/7X28Tl6TI6FeG8gKF+r7Q=
k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
......@@ -371,10 +371,13 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
go wait.Until(ingc.nsQs.Run, time.Second, ingc.stopCh)
<-ingc.stopCh
ingc.log.Info("Shutting down workers")
}
// Stop the Ingress controller -- signal the workers to stop.
func (ingc *IngressController) Stop() {
ingc.stopCh <- struct{}{}
ingc.log.Info("Shutting down workers")
close(ingc.stopCh)
<-ingc.nsQs.DoneChan
ingc.log.Info("Controller exiting")
}
......@@ -30,6 +30,7 @@ package controller
import (
"fmt"
"sync"
api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
......@@ -67,7 +68,6 @@ type NamespaceWorker struct {
log *logrus.Logger
vController *varnish.VarnishController
queue workqueue.RateLimitingInterface
stopChan chan struct{}
listers *Listers
ing ext_listers.IngressNamespaceLister
svc core_v1_listers.ServiceNamespaceLister
......@@ -77,6 +77,7 @@ type NamespaceWorker struct {
bcfg vcr_listers.BackendConfigNamespaceLister
client kubernetes.Interface
recorder record.EventRecorder
wg *sync.WaitGroup
}
func (worker *NamespaceWorker) event(obj interface{}, evtType, reason,
......@@ -228,14 +229,6 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) error {
}
func (worker *NamespaceWorker) next() {
select {
case <-worker.stopChan:
worker.queue.ShutDown()
return
default:
break
}
obj, quit := worker.queue.Get()
if quit {
return
......@@ -257,6 +250,8 @@ func (worker *NamespaceWorker) next() {
}
func (worker *NamespaceWorker) work() {
defer worker.wg.Done()
worker.log.Info("Starting worker for namespace:", worker.namespace)
for !worker.queue.ShuttingDown() {
......@@ -272,6 +267,7 @@ func (worker *NamespaceWorker) work() {
// responsible for each namespace.
type NamespaceQueues struct {
Queue workqueue.RateLimitingInterface
DoneChan chan struct{}
ingClass string
log *logrus.Logger
vController *varnish.VarnishController
......@@ -279,6 +275,7 @@ type NamespaceQueues struct {
listers *Listers
client kubernetes.Interface
recorder record.EventRecorder
wg *sync.WaitGroup
}
// NewNamespaceQueues creates a NamespaceQueues object.
......@@ -301,6 +298,7 @@ func NewNamespaceQueues(
workqueue.DefaultControllerRateLimiter(), "_ALL_")
return &NamespaceQueues{
Queue: q,
DoneChan: make(chan struct{}),
log: log,
ingClass: ingClass,
vController: vController,
......@@ -308,6 +306,7 @@ func NewNamespaceQueues(
listers: listers,
client: client,
recorder: recorder,
wg: new(sync.WaitGroup),
}
}
......@@ -347,7 +346,6 @@ func (qs *NamespaceQueues) next() {
log: qs.log,
vController: qs.vController,
queue: q,
stopChan: make(chan struct{}),
listers: qs.listers,
ing: qs.listers.ing.Ingresses(ns),
svc: qs.listers.svc.Services(ns),
......@@ -357,9 +355,11 @@ func (qs *NamespaceQueues) next() {
bcfg: qs.listers.bcfg.BackendConfigs(ns),
client: qs.client,
recorder: qs.recorder,
wg: qs.wg,
}
qs.workers[ns] = worker
go worker.work()
qs.wg.Add(1)
}
worker.queue.Add(obj)
qs.Queue.Forget(obj)
......@@ -373,16 +373,19 @@ func (qs *NamespaceQueues) Run() {
for !qs.Queue.ShuttingDown() {
qs.next()
}
qs.log.Info("Shutting down dispatcher worker")
}
// Stop shuts down the main queue loop initiated by Run(), and in turn
// shuts down all of the NamespaceWorkers.
func (qs *NamespaceQueues) Stop() {
qs.log.Info("Shutting down dispatcher worker")
qs.Queue.ShutDown()
for _, worker := range qs.workers {
qs.log.Infof("Shutting down queue for namespace: %s",
worker.namespace)
worker.queue.ShutDown()
close(worker.stopChan)
}
// XXX wait for WaitGroup
qs.log.Info("Waiting for workers to shut down")
qs.wg.Wait()
close(qs.DoneChan)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment