Commit 540a36ce authored by Geoff Simmons's avatar Geoff Simmons

Add the maxSyncRetries CLI option.

$ vikingctrl --help
[...]
  -maxSyncRetries uint
    	maximum number of retires for cluster synchronizations
    	that fail due to recoverable errors, or because
    	necessary information is missing. 0 for unlimited
    	retries (default 0)
[...]

IOW set a maximum number of re-queues for SyncIncomplete and
SyncRecoverable type failures. By default unlimited.

While here, update some missing function parameter docs.
parent a7defb40
...@@ -99,6 +99,11 @@ var ( ...@@ -99,6 +99,11 @@ var (
varnishImplF = flag.String("varnishImpl", "", varnishImplF = flag.String("varnishImpl", "",
"set to 'klarlack' to enable features only implemented by\n"+ "set to 'klarlack' to enable features only implemented by\n"+
"the klarlack image for Varnish Ingress") "the klarlack image for Varnish Ingress")
maxSyncRetriesF = flag.Uint("maxSyncRetries", 0,
"maximum number of retires for cluster synchronizations\n"+
"that fail due to recoverable errors, or because\n"+
"necessary information is missing. 0 for unlimited\n"+
"retries (default 0)")
logFormat = logrus.TextFormatter{ logFormat = logrus.TextFormatter{
DisableColors: true, DisableColors: true,
...@@ -190,6 +195,9 @@ func main() { ...@@ -190,6 +195,9 @@ func main() {
if *devModeF { if *devModeF {
log.Info("devmode enabled") log.Info("devmode enabled")
} }
if *maxSyncRetriesF > 0 {
log.Infof("maxSyncRetries = %d", *maxSyncRetriesF)
}
log.Info("Die Würde des Menschen ist unantastbar") log.Info("Die Würde des Menschen ist unantastbar")
log.Info("Black Lives Matter") log.Info("Black Lives Matter")
...@@ -245,7 +253,7 @@ func main() { ...@@ -245,7 +253,7 @@ func main() {
*ingressClassF, *namespaceF, *devModeF, *varnishImplF, *ingressClassF, *namespaceF, *devModeF, *varnishImplF,
kubeClient, vController, hController, informerFactory, kubeClient, vController, hController, informerFactory,
vcrInformerFactory, vsecrInformerFactory, tsecrInformerFactory, vcrInformerFactory, vsecrInformerFactory, tsecrInformerFactory,
*incomplRetryDelayF) *incomplRetryDelayF, *maxSyncRetriesF)
if err != nil { if err != nil {
log.Fatalf("Could not initialize controller: %v", err) log.Fatalf("Could not initialize controller: %v", err)
os.Exit(-1) os.Exit(-1)
......
...@@ -85,6 +85,10 @@ ifndef LOG_LEVEL ...@@ -85,6 +85,10 @@ ifndef LOG_LEVEL
LOG_LEVEL=INFO LOG_LEVEL=INFO
endif endif
ifndef MAX_RETRIES
MAX_RETRIES=0
endif
all: deploy all: deploy
deploy-controller-helm: deploy-controller-helm:
...@@ -93,7 +97,7 @@ deploy-controller-helm: ...@@ -93,7 +97,7 @@ deploy-controller-helm:
--set vikingController.image.repository=$(CONTROLLER_IMAGE) \ --set vikingController.image.repository=$(CONTROLLER_IMAGE) \
--set vikingController.image.tag=$(CONTROLLER_TAG) \ --set vikingController.image.tag=$(CONTROLLER_TAG) \
--set vikingController.varnishImpl=$(VARNISH) \ --set vikingController.varnishImpl=$(VARNISH) \
--set vikingController.extraArgs="{-log-level=$(LOG_LEVEL)}" --set vikingController.extraArgs="{-log-level=$(LOG_LEVEL),-maxSyncRetries=$(MAX_RETRIES)}"
deploy-controller-kubectl: deploy-controller-kubectl:
@kubectl apply -f serviceaccount-controller.yaml @kubectl apply -f serviceaccount-controller.yaml
......
...@@ -140,11 +140,19 @@ type IngressController struct { ...@@ -140,11 +140,19 @@ type IngressController struct {
// //
// log: logger initialized at startup // log: logger initialized at startup
// ingClass: value of the ingress.class Ingress annotation // ingClass: value of the ingress.class Ingress annotation
// namespace: namespace to which the controller is restricted
// devMode: true if development mode was specfied at invocation
// varnishImpl: name of the varnish implementation (varnish, klarlack)
// kubeClient: k8s client initialized at startup // kubeClient: k8s client initialized at startup
// vc: Varnish controller // vc: Varnish controller
// hc: haproxy controller
// infFactory: SharedInformerFactory to create informers & listers for // infFactory: SharedInformerFactory to create informers & listers for
// the k8s standard client APIs // the k8s standard client APIs
// vcrInfFactory: SharedInformerFactory for the project's own client APIs // vcrInfFactory: SharedInformerFactory for the project's own client APIs
// vsecrInfFactory: SharedInformerFactory for Secrets annotated for viking
// tsecrInfFactory: SharedInformerFactory for TLS Secrets
// incomplRetryDelay: duration until SyncIncomplete errors are re-queued
// maxSyncRetries: max re-queues for SyncIncomplete or SyncRecoverable
func NewIngressController( func NewIngressController(
log *logrus.Logger, log *logrus.Logger,
ingClass string, ingClass string,
...@@ -159,6 +167,7 @@ func NewIngressController( ...@@ -159,6 +167,7 @@ func NewIngressController(
vsecrInfFactory informers.SharedInformerFactory, vsecrInfFactory informers.SharedInformerFactory,
tsecrInfFactory informers.SharedInformerFactory, tsecrInfFactory informers.SharedInformerFactory,
incomplRetryDelay time.Duration, incomplRetryDelay time.Duration,
maxSyncRetries uint,
) (*IngressController, error) { ) (*IngressController, error) {
ingc := IngressController{ ingc := IngressController{
...@@ -250,7 +259,7 @@ func NewIngressController( ...@@ -250,7 +259,7 @@ func NewIngressController(
ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, vc, hc, ingc.listers, ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, vc, hc, ingc.listers,
ingc.client, ingc.recorder, incomplRetryDelay, devMode, ingc.client, ingc.recorder, incomplRetryDelay, devMode,
varnishImpl) varnishImpl, maxSyncRetries)
return &ingc, nil return &ingc, nil
} }
......
...@@ -60,6 +60,7 @@ import ( ...@@ -60,6 +60,7 @@ import (
// namespace is synced separately and sequentially, and all of the // namespace is synced separately and sequentially, and all of the
// namespaces are synced in parallel. // namespaces are synced in parallel.
type NamespaceWorker struct { type NamespaceWorker struct {
maxSyncRetries uint
namespace string namespace string
ingClass string ingClass string
varnishImpl string varnishImpl string
...@@ -254,6 +255,20 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) update.Status { ...@@ -254,6 +255,20 @@ func (worker *NamespaceWorker) dispatch(obj interface{}) update.Status {
} }
} }
func (worker *NamespaceWorker) tooManyRetries(obj interface{}) bool {
if worker.maxSyncRetries == 0 {
return false
}
return uint(worker.queue.NumRequeues(obj)) >= worker.maxSyncRetries
}
func (worker *NamespaceWorker) fatalRetryStatus(
status update.Status,
) update.Status {
return update.MakeFatal("max sync retries %d exceeded, was: (%s) %s",
worker.maxSyncRetries, status.Reason(), status)
}
func (worker *NamespaceWorker) next() { func (worker *NamespaceWorker) next() {
obj, quit := worker.queue.Get() obj, quit := worker.queue.Get()
if quit { if quit {
...@@ -283,9 +298,21 @@ func (worker *NamespaceWorker) next() { ...@@ -283,9 +298,21 @@ func (worker *NamespaceWorker) next() {
worker.syncFailure(obj, msgPfx, status) worker.syncFailure(obj, msgPfx, status)
worker.queue.Forget(obj) worker.queue.Forget(obj)
case update.Recoverable: case update.Recoverable:
if worker.tooManyRetries(obj) {
worker.syncFailure(obj, msgPfx,
worker.fatalRetryStatus(status))
worker.queue.Forget(obj)
return
}
worker.syncFailure(obj, msgPfx, status) worker.syncFailure(obj, msgPfx, status)
worker.queue.AddRateLimited(obj) worker.queue.AddRateLimited(obj)
case update.Incomplete: case update.Incomplete:
if worker.tooManyRetries(obj) {
worker.syncFailure(obj, msgPfx,
worker.fatalRetryStatus(status))
worker.queue.Forget(obj)
return
}
worker.syncFailure(obj, msgPfx, status) worker.syncFailure(obj, msgPfx, status)
worker.queue.AddAfter(obj, worker.incomplRetryDelay) worker.queue.AddAfter(obj, worker.incomplRetryDelay)
} }
...@@ -321,16 +348,22 @@ type NamespaceQueues struct { ...@@ -321,16 +348,22 @@ type NamespaceQueues struct {
wg *sync.WaitGroup wg *sync.WaitGroup
incomplRetryDelay time.Duration incomplRetryDelay time.Duration
devMode bool devMode bool
maxSyncRetries uint
} }
// NewNamespaceQueues creates a NamespaceQueues object. // NewNamespaceQueues creates a NamespaceQueues object.
// //
// log: logger initialized at startup // log: logger initialized at startup
// ingClass: value of the ingress.class Ingress annotation // ingClass: value of the ingress.class Ingress annotation
// vController: Varnish controller initialied at startup // vController: Varnish controller initialized at startup
// hController: haproxy controller initialized at startup
// listers: client-go/lister instance for each resource type // listers: client-go/lister instance for each resource type
// client: k8s API client initialized at startup // client: k8s API client initialized at startup
// recorder: Event broadcaster initialized at startup // recorder: Event broadcaster initialized at startup
// incomplRetryDelay: duration until SyncIncomplete errors are re-queued
// devMode: true if development mode was specified at invocation
// varnishImpl: name of the varnish implementation (varnish, klarlack)
// maxSyncRetries: max re-queues for SyncIncomplete or SyncRecoverable
func NewNamespaceQueues( func NewNamespaceQueues(
log *logrus.Logger, log *logrus.Logger,
ingClass string, ingClass string,
...@@ -342,6 +375,7 @@ func NewNamespaceQueues( ...@@ -342,6 +375,7 @@ func NewNamespaceQueues(
incomplRetryDelay time.Duration, incomplRetryDelay time.Duration,
devMode bool, devMode bool,
varnishImpl string, varnishImpl string,
maxSyncRetries uint,
) *NamespaceQueues { ) *NamespaceQueues {
q := workqueue.NewNamedRateLimitingQueue( q := workqueue.NewNamedRateLimitingQueue(
...@@ -360,6 +394,7 @@ func NewNamespaceQueues( ...@@ -360,6 +394,7 @@ func NewNamespaceQueues(
wg: new(sync.WaitGroup), wg: new(sync.WaitGroup),
incomplRetryDelay: incomplRetryDelay, incomplRetryDelay: incomplRetryDelay,
devMode: devMode, devMode: devMode,
maxSyncRetries: maxSyncRetries,
} }
} }
...@@ -413,6 +448,7 @@ func (qs *NamespaceQueues) next() { ...@@ -413,6 +448,7 @@ func (qs *NamespaceQueues) next() {
recorder: qs.recorder, recorder: qs.recorder,
wg: qs.wg, wg: qs.wg,
incomplRetryDelay: qs.incomplRetryDelay, incomplRetryDelay: qs.incomplRetryDelay,
maxSyncRetries: qs.maxSyncRetries,
} }
if qs.devMode { if qs.devMode {
worker.tcfg = qs.listers.tcfg.TemplateConfigs(ns) worker.tcfg = qs.listers.tcfg.TemplateConfigs(ns)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment