Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
k8s-ingress
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
3
Merge Requests
3
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Open sidebar
uplex-varnish
k8s-ingress
Commits
7395fe0a
Commit
7395fe0a
authored
May 09, 2020
by
Tim Leers
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix `close a closed channel` panic on shutdown
parent
fd51f64f
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
17 additions
and
18 deletions
+17
-18
controller.go
pkg/controller/controller.go
+17
-15
worker.go
pkg/controller/worker.go
+0
-3
No files found.
pkg/controller/controller.go
View file @
7395fe0a
...
@@ -29,6 +29,7 @@
...
@@ -29,6 +29,7 @@
package
controller
package
controller
import
(
import
(
"context"
"fmt"
"fmt"
"os"
"os"
"time"
"time"
...
@@ -109,7 +110,8 @@ type IngressController struct {
...
@@ -109,7 +110,8 @@ type IngressController struct {
informers
*
infrmrs
informers
*
infrmrs
listers
*
Listers
listers
*
Listers
nsQs
*
NamespaceQueues
nsQs
*
NamespaceQueues
stopCh
chan
struct
{}
ctx
context
.
Context
cancel
context
.
CancelFunc
recorder
record
.
EventRecorder
recorder
record
.
EventRecorder
}
}
...
@@ -135,7 +137,6 @@ func NewIngressController(
...
@@ -135,7 +137,6 @@ func NewIngressController(
ingc
:=
IngressController
{
ingc
:=
IngressController
{
log
:
log
,
log
:
log
,
client
:
kubeClient
,
client
:
kubeClient
,
stopCh
:
make
(
chan
struct
{}),
}
}
InitMetrics
()
InitMetrics
()
...
@@ -323,15 +324,16 @@ func (ingc *IngressController) updateObj(old, new interface{}) {
...
@@ -323,15 +324,16 @@ func (ingc *IngressController) updateObj(old, new interface{}) {
// the controller is ready (after informers have launched).
// the controller is ready (after informers have launched).
func
(
ingc
*
IngressController
)
Run
(
readyFile
string
,
metricsPort
uint16
)
{
func
(
ingc
*
IngressController
)
Run
(
readyFile
string
,
metricsPort
uint16
)
{
defer
utilruntime
.
HandleCrash
()
defer
utilruntime
.
HandleCrash
()
defer
ingc
.
nsQs
.
Stop
()
ingc
.
ctx
,
ingc
.
cancel
=
context
.
WithCancel
(
context
.
Background
())
ingc
.
log
.
Info
(
"Launching informers"
)
ingc
.
log
.
Info
(
"Launching informers"
)
go
ingc
.
informers
.
ing
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
ing
.
Run
(
ingc
.
ctx
.
Done
()
)
go
ingc
.
informers
.
svc
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
svc
.
Run
(
ingc
.
ctx
.
Done
()
)
go
ingc
.
informers
.
endp
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
endp
.
Run
(
ingc
.
ctx
.
Done
()
)
go
ingc
.
informers
.
secr
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
secr
.
Run
(
ingc
.
ctx
.
Done
()
)
go
ingc
.
informers
.
vcfg
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
vcfg
.
Run
(
ingc
.
ctx
.
Done
()
)
go
ingc
.
informers
.
bcfg
.
Run
(
ingc
.
stopCh
)
go
ingc
.
informers
.
bcfg
.
Run
(
ingc
.
ctx
.
Done
()
)
ingc
.
log
.
Infof
(
"Starting metrics listener at port %d"
,
metricsPort
)
ingc
.
log
.
Infof
(
"Starting metrics listener at port %d"
,
metricsPort
)
go
ServeMetrics
(
ingc
.
log
,
metricsPort
)
go
ServeMetrics
(
ingc
.
log
,
metricsPort
)
...
@@ -355,7 +357,7 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
...
@@ -355,7 +357,7 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
}
}
ingc
.
log
.
Info
(
"Waiting for caches to sync"
)
ingc
.
log
.
Info
(
"Waiting for caches to sync"
)
if
ok
:=
cache
.
WaitForCacheSync
(
ingc
.
stopCh
,
if
ok
:=
cache
.
WaitForCacheSync
(
ingc
.
ctx
.
Done
()
,
ingc
.
informers
.
ing
.
HasSynced
,
ingc
.
informers
.
ing
.
HasSynced
,
ingc
.
informers
.
svc
.
HasSynced
,
ingc
.
informers
.
svc
.
HasSynced
,
ingc
.
informers
.
endp
.
HasSynced
,
ingc
.
informers
.
endp
.
HasSynced
,
...
@@ -369,16 +371,16 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
...
@@ -369,16 +371,16 @@ func (ingc *IngressController) Run(readyFile string, metricsPort uint16) {
}
}
ingc
.
log
.
Info
(
"Caches synced, running workers"
)
ingc
.
log
.
Info
(
"Caches synced, running workers"
)
go
wait
.
Until
(
ingc
.
nsQs
.
Run
,
time
.
Second
,
ingc
.
stopCh
)
go
wait
.
Until
(
ingc
.
nsQs
.
Run
,
time
.
Second
,
ingc
.
ctx
.
Done
()
)
<-
ingc
.
stopCh
<-
ingc
.
ctx
.
Done
()
ingc
.
log
.
Info
(
"Stop received for Controller"
)
}
}
// Stop the Ingress controller -- signal the workers to stop.
// Stop the Ingress controller -- signal the workers to stop.
func
(
ingc
*
IngressController
)
Stop
()
{
func
(
ingc
*
IngressController
)
Stop
()
{
ingc
.
stopCh
<-
struct
{}{}
ingc
.
log
.
Info
(
"Shutting down workers"
)
ingc
.
log
.
Info
(
"Shutting down workers"
)
close
(
ingc
.
stopCh
)
ingc
.
cancel
(
)
<-
ingc
.
nsQs
.
DoneChan
ingc
.
nsQs
.
Stop
()
ingc
.
log
.
Info
(
"Controller exiting"
)
ingc
.
log
.
Info
(
"Controller exiting"
)
}
}
pkg/controller/worker.go
View file @
7395fe0a
...
@@ -269,7 +269,6 @@ func (worker *NamespaceWorker) work() {
...
@@ -269,7 +269,6 @@ func (worker *NamespaceWorker) work() {
// responsible for each namespace.
// responsible for each namespace.
type
NamespaceQueues
struct
{
type
NamespaceQueues
struct
{
Queue
workqueue
.
RateLimitingInterface
Queue
workqueue
.
RateLimitingInterface
DoneChan
chan
struct
{}
ingClass
string
ingClass
string
log
*
logrus
.
Logger
log
*
logrus
.
Logger
vController
*
varnish
.
Controller
vController
*
varnish
.
Controller
...
@@ -302,7 +301,6 @@ func NewNamespaceQueues(
...
@@ -302,7 +301,6 @@ func NewNamespaceQueues(
workqueue
.
DefaultControllerRateLimiter
(),
"_ALL_"
)
workqueue
.
DefaultControllerRateLimiter
(),
"_ALL_"
)
return
&
NamespaceQueues
{
return
&
NamespaceQueues
{
Queue
:
q
,
Queue
:
q
,
DoneChan
:
make
(
chan
struct
{}),
log
:
log
,
log
:
log
,
ingClass
:
ingClass
,
ingClass
:
ingClass
,
vController
:
vController
,
vController
:
vController
,
...
@@ -393,5 +391,4 @@ func (qs *NamespaceQueues) Stop() {
...
@@ -393,5 +391,4 @@ func (qs *NamespaceQueues) Stop() {
}
}
qs
.
log
.
Info
(
"Waiting for workers to shut down"
)
qs
.
log
.
Info
(
"Waiting for workers to shut down"
)
qs
.
wg
.
Wait
()
qs
.
wg
.
Wait
()
close
(
qs
.
DoneChan
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment