Commit a00536cb authored by Geoff Simmons's avatar Geoff Simmons

Add the CLI option incompleteRetryDelay, default 5s, must be > 0s.

parent d5c1528b
...@@ -56,6 +56,8 @@ import ( ...@@ -56,6 +56,8 @@ import (
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
const defIncomplRetryDelay = 5 * time.Second
var ( var (
versionF = flag.Bool("version", false, "print version and exit") versionF = flag.Bool("version", false, "print version and exit")
loglvlF = flag.String("log-level", "INFO", loglvlF = flag.String("log-level", "INFO",
...@@ -87,6 +89,11 @@ var ( ...@@ -87,6 +89,11 @@ var (
"if non-zero, re-update the controller with the state of\n"+ "if non-zero, re-update the controller with the state of\n"+
"the cluster this often, even if nothing has changed,\n"+ "the cluster this often, even if nothing has changed,\n"+
"to synchronize state that may have been missed") "to synchronize state that may have been missed")
incomplRetryDelayF = flag.Duration("incompleteRetryDelay",
defIncomplRetryDelay,
"re-queue delay when the controller does not have all of the\n"+
"information required for a necessary cluster change\n"+
"must be > 0s")
logFormat = logrus.TextFormatter{ logFormat = logrus.TextFormatter{
DisableColors: true, DisableColors: true,
FullTimestamp: true, FullTimestamp: true,
...@@ -167,6 +174,11 @@ func main() { ...@@ -167,6 +174,11 @@ func main() {
os.Exit(-1) os.Exit(-1)
} }
if *incomplRetryDelayF == 0 {
log.Fatal("incompleteRetryDelay must be > 0s")
os.Exit(-1)
}
log.Info("Starting Varnish Ingress controller version:", version) log.Info("Starting Varnish Ingress controller version:", version)
log.Info("Ingress class:", *ingressClassF) log.Info("Ingress class:", *ingressClassF)
...@@ -220,7 +232,7 @@ func main() { ...@@ -220,7 +232,7 @@ func main() {
ingController, err := controller.NewIngressController(log, ingController, err := controller.NewIngressController(log,
*ingressClassF, kubeClient, vController, hController, *ingressClassF, kubeClient, vController, hController,
informerFactory, vcrInformerFactory, vsecrInformerFactory, informerFactory, vcrInformerFactory, vsecrInformerFactory,
tsecrInformerFactory) tsecrInformerFactory, *incomplRetryDelayF)
if err != nil { if err != nil {
log.Fatalf("Could not initialize controller: %v") log.Fatalf("Could not initialize controller: %v")
os.Exit(-1) os.Exit(-1)
......
...@@ -149,6 +149,7 @@ func NewIngressController( ...@@ -149,6 +149,7 @@ func NewIngressController(
vcrInfFactory vcr_informers.SharedInformerFactory, vcrInfFactory vcr_informers.SharedInformerFactory,
vsecrInfFactory informers.SharedInformerFactory, vsecrInfFactory informers.SharedInformerFactory,
tsecrInfFactory informers.SharedInformerFactory, tsecrInfFactory informers.SharedInformerFactory,
incomplRetryDelay time.Duration,
) (*IngressController, error) { ) (*IngressController, error) {
ingc := IngressController{ ingc := IngressController{
...@@ -215,7 +216,7 @@ func NewIngressController( ...@@ -215,7 +216,7 @@ func NewIngressController(
} }
ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, vc, hc, ingc.listers, ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, vc, hc, ingc.listers,
ingc.client, ingc.recorder) ingc.client, ingc.recorder, incomplRetryDelay)
return &ingc, nil return &ingc, nil
} }
......
...@@ -52,14 +52,6 @@ import ( ...@@ -52,14 +52,6 @@ import (
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish" "code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
) )
const (
// syncSuccess and syncFailure are reasons for Events
syncSuccess = "SyncSuccess"
syncFailure = "SyncFailure"
)
const incompleteRetryDelay = 5 * time.Second
// NamespaceWorker serves fanout of work items to workers for each // NamespaceWorker serves fanout of work items to workers for each
// namespace for which the controller is notified about a resource // namespace for which the controller is notified about a resource
// update. The NamespaceQueues object creates a new instance when it // update. The NamespaceQueues object creates a new instance when it
...@@ -68,23 +60,24 @@ const incompleteRetryDelay = 5 * time.Second ...@@ -68,23 +60,24 @@ const incompleteRetryDelay = 5 * time.Second
// namespace is synced separately and sequentially, and all of the // namespace is synced separately and sequentially, and all of the
// namespaces are synced in parallel. // namespaces are synced in parallel.
type NamespaceWorker struct { type NamespaceWorker struct {
namespace string namespace string
ingClass string ingClass string
log *logrus.Logger log *logrus.Logger
vController *varnish.Controller vController *varnish.Controller
hController *haproxy.Controller hController *haproxy.Controller
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
listers *Listers listers *Listers
ing ext_listers.IngressNamespaceLister ing ext_listers.IngressNamespaceLister
svc core_v1_listers.ServiceNamespaceLister svc core_v1_listers.ServiceNamespaceLister
endp core_v1_listers.EndpointsNamespaceLister endp core_v1_listers.EndpointsNamespaceLister
tsecr core_v1_listers.SecretNamespaceLister tsecr core_v1_listers.SecretNamespaceLister
vsecr core_v1_listers.SecretNamespaceLister vsecr core_v1_listers.SecretNamespaceLister
vcfg vcr_listers.VarnishConfigNamespaceLister vcfg vcr_listers.VarnishConfigNamespaceLister
bcfg vcr_listers.BackendConfigNamespaceLister bcfg vcr_listers.BackendConfigNamespaceLister
client kubernetes.Interface client kubernetes.Interface
recorder record.EventRecorder recorder record.EventRecorder
wg *sync.WaitGroup wg *sync.WaitGroup
incomplRetryDelay time.Duration
} }
func (worker *NamespaceWorker) event(obj interface{}, evtType, reason, func (worker *NamespaceWorker) event(obj interface{}, evtType, reason,
...@@ -261,7 +254,7 @@ func (worker *NamespaceWorker) next() { ...@@ -261,7 +254,7 @@ func (worker *NamespaceWorker) next() {
worker.queue.AddRateLimited(obj) worker.queue.AddRateLimited(obj)
case update.Incomplete: case update.Incomplete:
worker.syncFailure(obj, msgPfx, status) worker.syncFailure(obj, msgPfx, status)
worker.queue.AddAfter(obj, incompleteRetryDelay) worker.queue.AddAfter(obj, worker.incomplRetryDelay)
} }
} }
...@@ -282,16 +275,17 @@ func (worker *NamespaceWorker) work() { ...@@ -282,16 +275,17 @@ func (worker *NamespaceWorker) work() {
// queue and places them on separate queues for NamespaceWorkers // queue and places them on separate queues for NamespaceWorkers
// responsible for each namespace. // responsible for each namespace.
type NamespaceQueues struct { type NamespaceQueues struct {
Queue workqueue.RateLimitingInterface Queue workqueue.RateLimitingInterface
ingClass string ingClass string
log *logrus.Logger log *logrus.Logger
vController *varnish.Controller vController *varnish.Controller
hController *haproxy.Controller hController *haproxy.Controller
workers map[string]*NamespaceWorker workers map[string]*NamespaceWorker
listers *Listers listers *Listers
client kubernetes.Interface client kubernetes.Interface
recorder record.EventRecorder recorder record.EventRecorder
wg *sync.WaitGroup wg *sync.WaitGroup
incomplRetryDelay time.Duration
} }
// NewNamespaceQueues creates a NamespaceQueues object. // NewNamespaceQueues creates a NamespaceQueues object.
...@@ -309,21 +303,24 @@ func NewNamespaceQueues( ...@@ -309,21 +303,24 @@ func NewNamespaceQueues(
hController *haproxy.Controller, hController *haproxy.Controller,
listers *Listers, listers *Listers,
client kubernetes.Interface, client kubernetes.Interface,
recorder record.EventRecorder) *NamespaceQueues { recorder record.EventRecorder,
incomplRetryDelay time.Duration,
) *NamespaceQueues {
q := workqueue.NewNamedRateLimitingQueue( q := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "_ALL_") workqueue.DefaultControllerRateLimiter(), "_ALL_")
return &NamespaceQueues{ return &NamespaceQueues{
Queue: q, Queue: q,
log: log, log: log,
ingClass: ingClass, ingClass: ingClass,
vController: vController, vController: vController,
hController: hController, hController: hController,
workers: make(map[string]*NamespaceWorker), workers: make(map[string]*NamespaceWorker),
listers: listers, listers: listers,
client: client, client: client,
recorder: recorder, recorder: recorder,
wg: new(sync.WaitGroup), wg: new(sync.WaitGroup),
incomplRetryDelay: incomplRetryDelay,
} }
} }
...@@ -358,23 +355,24 @@ func (qs *NamespaceQueues) next() { ...@@ -358,23 +355,24 @@ func (qs *NamespaceQueues) next() {
q := workqueue.NewNamedRateLimitingQueue( q := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), ns) workqueue.DefaultControllerRateLimiter(), ns)
worker = &NamespaceWorker{ worker = &NamespaceWorker{
namespace: ns, namespace: ns,
ingClass: qs.ingClass, ingClass: qs.ingClass,
log: qs.log, log: qs.log,
vController: qs.vController, vController: qs.vController,
hController: qs.hController, hController: qs.hController,
queue: q, queue: q,
listers: qs.listers, listers: qs.listers,
ing: qs.listers.ing.Ingresses(ns), ing: qs.listers.ing.Ingresses(ns),
svc: qs.listers.svc.Services(ns), svc: qs.listers.svc.Services(ns),
endp: qs.listers.endp.Endpoints(ns), endp: qs.listers.endp.Endpoints(ns),
vsecr: qs.listers.vsecr.Secrets(ns), vsecr: qs.listers.vsecr.Secrets(ns),
tsecr: qs.listers.tsecr.Secrets(ns), tsecr: qs.listers.tsecr.Secrets(ns),
vcfg: qs.listers.vcfg.VarnishConfigs(ns), vcfg: qs.listers.vcfg.VarnishConfigs(ns),
bcfg: qs.listers.bcfg.BackendConfigs(ns), bcfg: qs.listers.bcfg.BackendConfigs(ns),
client: qs.client, client: qs.client,
recorder: qs.recorder, recorder: qs.recorder,
wg: qs.wg, wg: qs.wg,
incomplRetryDelay: qs.incomplRetryDelay,
} }
qs.workers[ns] = worker qs.workers[ns] = worker
qs.wg.Add(1) qs.wg.Add(1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment