Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
K
k8s-crt-dnldr
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
k8s
k8s-crt-dnldr
Commits
75123476
Commit
75123476
authored
Jul 22, 2020
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add k8s client code -- pkg/k8s and pkg/update.
parent
ae5ea8db
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
731 additions
and
0 deletions
+731
-0
Makefile
Makefile
+2
-0
client.go
pkg/k8s/client.go
+272
-0
worker.go
pkg/k8s/worker.go
+287
-0
status.go
pkg/update/status.go
+170
-0
No files found.
Makefile
View file @
75123476
...
@@ -36,6 +36,8 @@ check: build
...
@@ -36,6 +36,8 @@ check: build
golint ./pkg/crt/...
golint ./pkg/crt/...
golint ./pkg/pem/...
golint ./pkg/pem/...
golint ./pkg/rest/...
golint ./pkg/rest/...
golint ./pkg/update/...
golint ./pkg/k8s/...
go
test
-v
./pkg/crt/... ./pkg/pem/... ./pkg/rest/...
go
test
-v
./pkg/crt/... ./pkg/pem/... ./pkg/rest/...
test
:
check
test
:
check
...
...
pkg/k8s/client.go
0 → 100644
View file @
75123476
/*
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package k8s encapsulates interaction with the Kubernetes API. It
// implements the client-go watcher that obtains information about
// TLS Secrets, and updates any PEM files maintained on the current
// system.
package
k8s
import
(
"context"
"fmt"
"time"
"code.uplex.de/k8s/k8s-crt-dnldr/pkg/crt"
"code.uplex.de/k8s/k8s-crt-dnldr/pkg/pem"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core_v1
"k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
api_v1
"k8s.io/api/core/v1"
meta_v1
"k8s.io/apimachinery/pkg/apis/meta/v1"
)
// SyncType classifies the sync event, passed through to workers.
type
SyncType
uint8
const
(
// Add event
Add
SyncType
=
iota
// Update event
Update
// Delete event
Delete
)
func
(
t
SyncType
)
String
()
string
{
switch
t
{
case
Add
:
return
"add"
case
Update
:
return
"update"
case
Delete
:
return
"delete"
default
:
return
"unknown sync type"
}
}
// SyncObj wraps the Secret for which event handlers are notified, and
// encodes the sync event. These are the objects passed into the
// queues for workers.
type
SyncObj
struct
{
Type
SyncType
Secret
*
api_v1
.
Secret
}
// Filters Secrets with the type field that identify a Secret to be
// used for TLS certificates, and named in an Ingress resource.
func
ingressTLSSecrets
(
opts
*
meta_v1
.
ListOptions
)
{
opts
.
FieldSelector
=
fields
.
OneTermEqualSelector
(
"type"
,
string
(
api_v1
.
SecretTypeTLS
))
.
String
()
}
// Watcher watches Kubernetes API for TLS Secrets, and updates or
// deletes PEM files when needed.
type
Watcher
struct
{
log
*
logrus
.
Logger
informer
cache
.
SharedIndexInformer
nsQs
*
NamespaceQueues
ctx
context
.
Context
cancel
context
.
CancelFunc
getter
*
crt
.
Getter
}
// NewWatcher creates an API client.
//
// log: logger initialized at startup
// kubeClient: k8s client initialized at startup
// ns: namespace in which to watch Secrets (empty for all)
// base: directory into which PEM files are stored
// gid: group ID to be set in PEM file permissions, if >= 0
// resyncPeriod: for the k8s informer (0s for no resync)
func
NewWatcher
(
log
*
logrus
.
Logger
,
kubeClient
kubernetes
.
Interface
,
ns
string
,
base
string
,
gid
int
,
resyncPeriod
time
.
Duration
,
)
(
*
Watcher
,
error
)
{
watcher
:=
Watcher
{
log
:
log
,
}
infFactory
:=
informers
.
NewSharedInformerFactoryWithOptions
(
kubeClient
,
resyncPeriod
,
informers
.
WithNamespace
(
ns
),
informers
.
WithTweakListOptions
(
ingressTLSSecrets
))
watcher
.
informer
=
infFactory
.
Core
()
.
V1
()
.
Secrets
()
.
Informer
()
lister
:=
infFactory
.
Core
()
.
V1
()
.
Secrets
()
.
Lister
()
eventBroadcaster
:=
record
.
NewBroadcaster
()
eventBroadcaster
.
StartLogging
(
watcher
.
log
.
Printf
)
eventBroadcaster
.
StartRecordingToSink
(
&
core_v1
.
EventSinkImpl
{
Interface
:
kubeClient
.
CoreV1
()
.
Events
(
ns
),
})
evtScheme
:=
runtime
.
NewScheme
()
if
err
:=
api_v1
.
AddToScheme
(
evtScheme
);
err
!=
nil
{
return
nil
,
err
}
recorder
:=
eventBroadcaster
.
NewRecorder
(
evtScheme
,
api_v1
.
EventSource
{
Component
:
"crt-dnldr"
})
evtFuncs
:=
cache
.
ResourceEventHandlerFuncs
{
AddFunc
:
watcher
.
addObj
,
DeleteFunc
:
watcher
.
deleteObj
,
UpdateFunc
:
watcher
.
updateObj
,
}
watcher
.
informer
.
AddEventHandler
(
evtFuncs
)
watcher
.
getter
=
crt
.
NewGetter
(
lister
)
files
:=
pem
.
NewFiles
(
base
,
gid
,
watcher
.
getter
)
watcher
.
nsQs
=
NewNamespaceQueues
(
watcher
.
log
,
files
,
recorder
)
// InitMetrics()
return
&
watcher
,
nil
}
// GetCrtGetter returns a crt.Getter object from the SecretLister
// created by the Watcher. This object should be used exclusively to
// access Secret contents.
func
(
watcher
*
Watcher
)
GetCrtGetter
()
*
crt
.
Getter
{
return
watcher
.
getter
}
func
(
watcher
*
Watcher
)
getSecret
(
obj
interface
{})
(
*
api_v1
.
Secret
,
bool
)
{
secret
,
ok
:=
obj
.
(
*
api_v1
.
Secret
)
if
!
ok
{
// XXX error counter
m
,
mErr
:=
meta
.
Accessor
(
obj
)
t
,
tErr
:=
meta
.
TypeAccessor
(
obj
)
if
mErr
==
nil
&&
tErr
==
nil
{
if
t
.
GetKind
()
!=
""
{
watcher
.
log
.
Errorf
(
"Not a Secret: %s %s/%s"
,
t
.
GetKind
(),
m
.
GetNamespace
(),
m
.
GetName
())
}
else
{
watcher
.
log
.
Errorf
(
"Not a Secret: %s/%s"
,
m
.
GetNamespace
(),
m
.
GetName
())
}
}
else
{
watcher
.
log
.
Errorf
(
"Got a non-Secret, unknown type"
)
}
return
nil
,
false
}
if
secret
.
Type
!=
api_v1
.
SecretTypeTLS
{
watcher
.
log
.
Errorf
(
"Not a TLS Secret: %s/%s"
,
secret
.
Namespace
,
secret
.
Name
)
return
nil
,
false
}
return
secret
,
true
}
func
(
watcher
*
Watcher
)
logObj
(
action
string
,
secret
*
api_v1
.
Secret
)
{
watcher
.
log
.
Debugf
(
"%s Secret: %s/%s %s %s"
,
action
,
secret
.
Namespace
,
secret
.
Name
,
string
(
secret
.
UID
),
secret
.
ResourceVersion
)
}
func
(
watcher
*
Watcher
)
addObj
(
obj
interface
{})
{
secret
,
ok
:=
watcher
.
getSecret
(
obj
)
if
!
ok
{
return
}
watcher
.
logObj
(
"Add"
,
secret
)
// watchCounters.WithLabelValues("Add").Inc()
// watcher.nsQs.Queue.Add(&SyncObj{Type: Add, Secret: secret})
}
func
(
watcher
*
Watcher
)
deleteObj
(
obj
interface
{})
{
secret
,
ok
:=
watcher
.
getSecret
(
obj
)
if
!
ok
{
return
}
watcher
.
logObj
(
"Delete"
,
secret
)
// watchCounters.WithLabelValues("Delete").Inc()
watcher
.
nsQs
.
Queue
.
Add
(
&
SyncObj
{
Type
:
Delete
,
Secret
:
secret
})
}
func
(
watcher
*
Watcher
)
updateObj
(
old
,
new
interface
{})
{
newSecret
,
ok
:=
watcher
.
getSecret
(
new
)
if
!
ok
{
return
}
watcher
.
logObj
(
"Update:"
,
newSecret
)
// watchCounters.WithLabelValues("Update").Inc()
watcher
.
nsQs
.
Queue
.
Add
(
&
SyncObj
{
Type
:
Update
,
Secret
:
newSecret
})
}
// Run the API client -- start the informers in goroutines, wait for
// the caches to sync, and call Run() for the NamespaceQueues. Then
// block until Stop() is invoked.
func
(
watcher
*
Watcher
)
Run
(
metricsPort
uint16
)
{
defer
utilruntime
.
HandleCrash
()
watcher
.
ctx
,
watcher
.
cancel
=
context
.
WithCancel
(
context
.
Background
())
watcher
.
log
.
Info
(
"Launching the Secret informer"
)
go
watcher
.
informer
.
Run
(
watcher
.
ctx
.
Done
())
// watcher.log.Infof("Starting metrics listener at port %d", metricsPort)
// go ServeMetrics(watcher.log, metricsPort)
watcher
.
log
.
Info
(
"Waiting for cache to sync"
)
if
ok
:=
cache
.
WaitForCacheSync
(
watcher
.
ctx
.
Done
(),
watcher
.
informer
.
HasSynced
);
!
ok
{
err
:=
fmt
.
Errorf
(
"Failed waiting for caches to sync"
)
utilruntime
.
HandleError
(
err
)
return
}
watcher
.
log
.
Info
(
"Cache synced, running workers"
)
// XXX how to get initial state?
// XXX start http listener
go
wait
.
Until
(
watcher
.
nsQs
.
Run
,
time
.
Second
,
watcher
.
ctx
.
Done
())
}
// Stop the k8s watcher -- signal the workers to stop.
func
(
watcher
*
Watcher
)
Stop
()
{
watcher
.
log
.
Info
(
"Stopping k8s API watcher"
)
watcher
.
cancel
()
<-
watcher
.
ctx
.
Done
()
watcher
.
log
.
Info
(
"Shutting down workers"
)
watcher
.
nsQs
.
Stop
()
watcher
.
log
.
Info
(
"k8s API client exiting"
)
}
pkg/k8s/worker.go
0 → 100644
View file @
75123476
/*
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package
k8s
import
(
"fmt"
"os"
"sync"
"code.uplex.de/k8s/k8s-crt-dnldr/pkg/pem"
"code.uplex.de/k8s/k8s-crt-dnldr/pkg/update"
api_v1
"k8s.io/api/core/v1"
utilruntime
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"github.com/sirupsen/logrus"
)
// NamespaceWorker serves fanout of work items to workers for each
// namespace for which the controller is notified about a resource
// update. The NamespaceQueues object creates a new instance when it
// reads an item from a new namespace from its main queue. Thus each
// namespace is synced separately and sequentially, and all of the
// namespaces are synced in parallel.
type
NamespaceWorker
struct
{
namespace
string
log
*
logrus
.
Logger
queue
workqueue
.
RateLimitingInterface
files
*
pem
.
Files
recorder
record
.
EventRecorder
wg
*
sync
.
WaitGroup
}
func
(
worker
*
NamespaceWorker
)
update
(
secret
*
api_v1
.
Secret
)
update
.
Status
{
if
!
worker
.
files
.
Have
(
secret
.
Namespace
,
secret
.
Name
,
string
(
secret
.
UID
),
secret
.
ResourceVersion
)
{
return
update
.
MakeNoop
(
"pem file currently not stored"
)
}
if
worker
.
files
.
Check
(
secret
.
Namespace
,
secret
.
Name
,
string
(
secret
.
UID
),
secret
.
ResourceVersion
)
{
return
update
.
MakeNoop
(
"pem file already up to date"
)
}
if
found
,
valid
,
err
:=
worker
.
files
.
Write
(
secret
.
Namespace
,
secret
.
Name
,
string
(
secret
.
UID
),
secret
.
ResourceVersion
);
!
found
{
return
update
.
MakeRecoverable
(
"pem file not found"
)
}
else
if
!
valid
{
return
update
.
MakeFatal
(
"%v"
,
err
)
}
else
if
err
!=
nil
{
if
os
.
IsPermission
(
err
)
{
return
update
.
MakeFatal
(
"%s"
,
err
)
}
return
update
.
MakeRecoverable
(
"%v"
,
err
)
}
return
update
.
MakeSuccess
(
"pem file updated"
)
}
func
(
worker
*
NamespaceWorker
)
delete
(
secret
*
api_v1
.
Secret
)
update
.
Status
{
if
!
worker
.
files
.
Have
(
secret
.
Namespace
,
secret
.
Name
,
string
(
secret
.
UID
),
secret
.
ResourceVersion
)
{
return
update
.
MakeNoop
(
"pem file currently not stored"
)
}
exist
,
err
:=
worker
.
files
.
Delete
(
secret
.
Namespace
,
secret
.
Name
)
if
!
exist
{
return
update
.
MakeNoop
(
"pem file not found"
)
}
if
err
!=
nil
{
if
os
.
IsPermission
(
err
)
{
return
update
.
MakeFatal
(
"%s"
,
err
)
}
return
update
.
MakeRecoverable
(
"%v"
,
err
)
}
return
update
.
MakeSuccess
(
"pem file deleted"
)
}
func
(
worker
*
NamespaceWorker
)
syncSuccess
(
syncType
SyncType
,
secret
*
api_v1
.
Secret
,
status
update
.
Status
)
{
worker
.
log
.
Infof
(
"%s Secret %s/%s: %s"
,
syncType
,
secret
.
Namespace
,
secret
.
Name
,
status
)
worker
.
recorder
.
Eventf
(
secret
,
api_v1
.
EventTypeNormal
,
status
.
Reason
(),
"%s: %s"
,
syncType
,
status
)
}
func
(
worker
*
NamespaceWorker
)
syncFailure
(
syncType
SyncType
,
secret
*
api_v1
.
Secret
,
status
update
.
Status
)
{
worker
.
log
.
Errorf
(
"%s Secret %s/%s: %s"
,
syncType
,
secret
.
Namespace
,
secret
.
Name
,
status
)
worker
.
recorder
.
Eventf
(
secret
,
api_v1
.
EventTypeWarning
,
status
.
Reason
(),
"%s: %s"
,
syncType
,
status
)
}
func
(
worker
*
NamespaceWorker
)
next
()
{
obj
,
quit
:=
worker
.
queue
.
Get
()
if
quit
{
return
}
defer
worker
.
queue
.
Done
(
obj
)
syncObj
,
ok
:=
obj
.
(
*
SyncObj
)
if
!
ok
{
worker
.
log
.
Errorf
(
"Unhandled type %T"
,
obj
)
worker
.
queue
.
Forget
(
obj
)
}
secret
:=
syncObj
.
Secret
if
secret
.
Type
!=
api_v1
.
SecretTypeTLS
{
worker
.
log
.
Panicf
(
"Secret %s/%s: type is %s, should be %s"
,
secret
.
Namespace
,
secret
.
Name
,
secret
.
Type
,
api_v1
.
SecretTypeTLS
)
}
syncType
:=
syncObj
.
Type
var
status
update
.
Status
switch
syncObj
.
Type
{
case
Update
:
status
=
worker
.
update
(
secret
)
case
Delete
:
status
=
worker
.
delete
(
secret
)
default
:
status
=
update
.
MakeFatal
(
"Unhandled sync: %s %s/%s"
,
syncType
,
secret
.
Namespace
,
secret
.
Name
)
}
switch
status
.
Type
{
case
update
.
Noop
:
worker
.
log
.
Infof
(
"%s Secret %s/%s: %s"
,
syncType
,
secret
.
Namespace
,
secret
.
Name
,
status
)
worker
.
queue
.
Forget
(
obj
)
case
update
.
Success
:
worker
.
syncSuccess
(
syncType
,
secret
,
status
)
worker
.
queue
.
Forget
(
obj
)
case
update
.
Fatal
:
worker
.
syncFailure
(
syncType
,
secret
,
status
)
worker
.
queue
.
Forget
(
obj
)
case
update
.
Recoverable
:
worker
.
syncFailure
(
syncType
,
secret
,
status
)
worker
.
queue
.
AddRateLimited
(
obj
)
}
}
func
(
worker
*
NamespaceWorker
)
work
()
{
defer
worker
.
wg
.
Done
()
worker
.
log
.
Info
(
"Starting worker for namespace:"
,
worker
.
namespace
)
for
!
worker
.
queue
.
ShuttingDown
()
{
worker
.
next
()
}
worker
.
log
.
Info
(
"Shutting down worker for namespace:"
,
worker
.
namespace
)
}
// NamespaceQueues reads from the main queue to which informers add
// new work items from all namespaces. The worker reads items from the
// queue and places them on separate queues for NamespaceWorkers
// responsible for each namespace.
type
NamespaceQueues
struct
{
Queue
workqueue
.
RateLimitingInterface
log
*
logrus
.
Logger
workers
map
[
string
]
*
NamespaceWorker
files
*
pem
.
Files
recorder
record
.
EventRecorder
wg
*
sync
.
WaitGroup
}
// NewNamespaceQueues creates a NamespaceQueues object.
//
// log: logger initialized at startup
// files: for managing PEM files
// recorder: Event broadcaster initialized at startup
func
NewNamespaceQueues
(
log
*
logrus
.
Logger
,
files
*
pem
.
Files
,
recorder
record
.
EventRecorder
,
)
*
NamespaceQueues
{
q
:=
workqueue
.
NewRateLimitingQueue
(
workqueue
.
DefaultControllerRateLimiter
())
return
&
NamespaceQueues
{
Queue
:
q
,
log
:
log
,
workers
:
make
(
map
[
string
]
*
NamespaceWorker
),
files
:
files
,
recorder
:
recorder
,
wg
:
new
(
sync
.
WaitGroup
),
}
}
func
getNameSpace
(
obj
interface
{})
(
namespace
,
name
string
,
err
error
)
{
k
,
err
:=
cache
.
DeletionHandlingMetaNamespaceKeyFunc
(
obj
)
if
err
!=
nil
{
return
}
namespace
,
name
,
err
=
cache
.
SplitMetaNamespaceKey
(
k
)
return
}
func
(
qs
*
NamespaceQueues
)
next
()
{
obj
,
quit
:=
qs
.
Queue
.
Get
()
if
quit
{
return
}
defer
qs
.
Queue
.
Done
(
obj
)
syncObj
,
ok
:=
obj
.
(
*
SyncObj
)
if
!
ok
{
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"Unhandled type %T"
,
obj
))
return
}
secret
:=
syncObj
.
Secret
worker
,
exists
:=
qs
.
workers
[
secret
.
Namespace
]
if
!
exists
{
q
:=
workqueue
.
NewNamedRateLimitingQueue
(
workqueue
.
DefaultControllerRateLimiter
(),
secret
.
Namespace
)
worker
=
&
NamespaceWorker
{
namespace
:
secret
.
Namespace
,
log
:
qs
.
log
,
queue
:
q
,
files
:
qs
.
files
,
recorder
:
qs
.
recorder
,
wg
:
qs
.
wg
,
}
qs
.
workers
[
secret
.
Namespace
]
=
worker
qs
.
wg
.
Add
(
1
)
go
worker
.
work
()
}
worker
.
queue
.
Add
(
obj
)
qs
.
Queue
.
Forget
(
obj
)
}
// Run comprises the main loop of the controller, reading from the
// main queue of work items and handing them off to workers for each
// namespace.
func
(
qs
*
NamespaceQueues
)
Run
()
{
qs
.
log
.
Info
(
"Starting dispatcher worker"
)
for
!
qs
.
Queue
.
ShuttingDown
()
{
qs
.
next
()
}
}
// Stop shuts down the main queue loop initiated by Run(), and in turn
// shuts down all of the NamespaceWorkers.
func
(
qs
*
NamespaceQueues
)
Stop
()
{
qs
.
log
.
Info
(
"Shutting down dispatcher worker"
)
qs
.
Queue
.
ShutDown
()
for
_
,
worker
:=
range
qs
.
workers
{
qs
.
log
.
Infof
(
"Shutting down queue for namespace: %s"
,
worker
.
namespace
)
worker
.
queue
.
ShutDown
()
}
qs
.
log
.
Info
(
"Waiting for workers to shut down"
)
qs
.
wg
.
Wait
()
}
pkg/update/status.go
0 → 100644
View file @
75123476
/*-
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package update defines the Status type, which classifies the result
// of a synchronization operation. This distinguishes forms of success
// or failure after an Add/Update/Delete notification for Secrets, and
// determeins furher actions such as error handling, logging and event
// generation.
package
update
import
"fmt"
// StatusType is the classifier for the result of a synchronization.
type
StatusType
uint8
const
(
// Noop indicates that no change was necessary. The result is
// logged, but no further action is taken.
Noop
StatusType
=
iota
// Success indicates that a change was successfully
// executed. The result is logged, and an event is generated
// for the Secret that was synchronized.
Success
// Fatal indicates an unrecoverable error, which cannot be
// compensated by retrying the same synchronization. This
// result is logged, a warning event is generated, and the
// operation is never retried.
Fatal
// Recoverable indicates an error that might be compensated
// with a retry. This result is logged, a warning event is
// generated, and the sync operation is re-queued with a rate
// limiting delay (likely an exponential backoff, as scheduled
// by the client-go workqueue).
Recoverable
)
func
(
t
StatusType
)
String
()
string
{
switch
t
{
case
Noop
:
return
"no cluster change necessary"
case
Success
:
return
"successfully synced"
case
Fatal
:
return
"unrecoverable error"
case
Recoverable
:
return
"recoverable error"
default
:
return
"UNKNOWN SYNC STATUS"
}
}
// Reason returns a string suitable as the reason string for an event
// after a sync returns the given status type.
//
// Success: SyncSuccess
// Fatal: SyncFatalError
// Recoverable: SyncRecoverableError
//
// Not valid for the Noop type, since no event is generated for Noop.
func
(
t
StatusType
)
Reason
()
string
{
switch
t
{
case
Success
:
return
"SyncSuccess"
case
Fatal
:
return
"SyncFatalError"
case
Recoverable
:
return
"SyncRecoverableError"
default
:
panic
(
"illegal StatusType for Reason()"
)
}
}
// Status encapsulates the result of a synchronization operation,
// including its type and an optional detail message (an error message
// for the error types).
type
Status
struct
{
Msg
string
Type
StatusType
}
func
(
status
Status
)
String
()
string
{
if
status
.
Msg
==
""
{
return
status
.
Type
.
String
()
}
return
status
.
Type
.
String
()
+
": "
+
status
.
Msg
}
// Reason returns a string suitable as the reason string for an event
// after a sync returns. Identical to Reason() for the given type.
func
(
status
Status
)
Reason
()
string
{
return
status
.
Type
.
Reason
()
}
// IsError returns true iff the Status has one of the error types:
// Fatal, Recoverable or Incomplete.
func
(
status
Status
)
IsError
()
bool
{
switch
status
.
Type
{
case
Noop
:
return
false
case
Success
:
return
false
case
Fatal
:
return
true
case
Recoverable
:
return
true
default
:
return
true
}
}
func
(
status
Status
)
Error
()
string
{
return
status
.
String
()
}
// Make is a convenience function to create a Status of the given
// type, setting the Msg to a formatted string.
func
Make
(
t
StatusType
,
format
string
,
args
...
interface
{})
Status
{
return
Status
{
Msg
:
fmt
.
Sprintf
(
format
,
args
...
),
Type
:
t
,
}
}
// MakeNoop creates a Noop Status with a formatted string for the Msg.
func
MakeNoop
(
format
string
,
args
...
interface
{})
Status
{
return
Make
(
Noop
,
format
,
args
...
)
}
// MakeSuccess creates a Success Status with a formatted string for
// the Msg.
func
MakeSuccess
(
format
string
,
args
...
interface
{})
Status
{
return
Make
(
Success
,
format
,
args
...
)
}
// MakeFatal creates a Fatal Status with a formatted string for the
// Msg.
func
MakeFatal
(
format
string
,
args
...
interface
{})
Status
{
return
Make
(
Fatal
,
format
,
args
...
)
}
// MakeRecoverable creates a Recoverable Status with a formatted
// string for the Msg.
func
MakeRecoverable
(
format
string
,
args
...
interface
{})
Status
{
return
Make
(
Recoverable
,
format
,
args
...
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment