Commit fab57f10 authored by Geoff Simmons's avatar Geoff Simmons

First working version of the reloader.

parent cf281ed5
*~
# Build artifacts
/k8s-vcl-reloader
/cmd/main_version.go
See LICENSE for details.
Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
# Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
# All rights reserved
#
# Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
all: k8s-vcl-reloader
CODE_SUBDIRS=./pkg/... ./cmd/...
build:
go fmt $(CODE_SUBDIRS)
go generate $(CODE_SUBDIRS)
go build $(CODE_SUBDIRS)
k8s-vcl-reloader: build
GOOS=linux go build -ldflags="-w -s" -o k8s-vcl-reloader cmd/*.go
check: build
golint ./pkg/apiclient/...
golint ./pkg/interfaces/...
golint ./pkg/varnish/...
golint ./cmd/...
go test -v ./pkg/apiclient/... ./pkg/varnish/...
test: check
clean:
go clean $(CODE_SUBDIRS)
rm -f cmd/main_version.go
rm -f k8s-vcl-reloader
NOTE: WORK IN PROGRESS
/*
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package main
//go:generate gogitversion -p main
import (
"flag"
"fmt"
"math"
"os"
"os/signal"
"strings"
"syscall"
"time"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/apiclient"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/varnish"
"github.com/sirupsen/logrus"
api_v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var (
versionF = flag.Bool("version", false, "print version and exit")
loglvlF = flag.String("log-level", "INFO",
"log level: one of PANIC, FATAL, ERROR, WARN, INFO, DEBUG, \n"+
"or TRACE")
namespaceF = flag.String("namespace", api_v1.NamespaceAll,
"namespace in which to listen for resources (default all)")
masterURLF = flag.String("masterurl", "", "cluster master URL, for "+
"out-of-cluster runs")
kubeconfigF = flag.String("kubeconfig", "", "config path for the "+
"cluster master URL, for out-of-cluster runs")
monIntvlF = flag.Duration("monitorintvl", 30*time.Second,
"interval at which the monitor thread checks and updates\n"+
"the Varnish instance.\n"+
"Monitor deactivated when <= 0s")
metricsPortF = flag.Uint("metricsport", 8080,
"port at which to listen for the /metrics endpoint")
resyncPeriodF = flag.Duration("resyncPeriod", 30*time.Second,
"if non-zero, re-update the controller with the state of\n"+
"the cluster this often, even if nothing has changed,\n"+
"to synchronize state that may have been missed")
admTimeoutF = flag.Duration("admTimeout", 10*time.Second,
"timeout for connections to the Varnish instance\n"+
"administrative port (0s for no timeout)")
homeF = flag.String("n", "", "name of the Varnish instance,\n"+
"the same value set for varnishd -n")
mainF = flag.String("main", "default.vcl",
"the VCL source to be loaded, must include all other sources")
cfgMapF = flag.String("configmap", "",
"k8s ConfigMap representing Varnish sources (required)")
logFormat = logrus.TextFormatter{
DisableColors: true,
FullTimestamp: true,
}
log = &logrus.Logger{
Out: os.Stdout,
Formatter: &logFormat,
Level: logrus.InfoLevel,
}
informerStop = make(chan struct{})
)
type tmplPaths []string
func (t *tmplPaths) String() string {
return strings.Join(*t, " ")
}
func (t *tmplPaths) Set(s string) error {
*t = append(*t, s)
return nil
}
type svcs []string
func (s *svcs) String() string {
return strings.Join(*s, " ")
}
func (s *svcs) Set(svc string) error {
*s = append(*s, svc)
return nil
}
func main() {
var templatePaths tmplPaths
var services svcs
flag.Var(&templatePaths, "template",
"tuple of the form tmplPath:vclPath, where tmplPath is the\n"+
"path of a template, and vclPath is the path of the\n"+
"VCL source to be generated from the template")
flag.Var(&services, "svc",
"k8s Service whose Endpoints are to be configured as Varnish\n"+
"backends")
flag.Parse()
if *versionF {
fmt.Printf("%s version %s\n", os.Args[0], version)
os.Exit(0)
}
lvl := strings.ToLower(*loglvlF)
switch lvl {
case "panic":
log.Level = logrus.PanicLevel
case "fatal":
log.Level = logrus.FatalLevel
case "error":
log.Level = logrus.ErrorLevel
case "warn":
log.Level = logrus.WarnLevel
case "debug":
log.Level = logrus.DebugLevel
case "trace":
log.Level = logrus.TraceLevel
case "info":
break
default:
fmt.Printf("Unknown log level %s, exiting", *loglvlF)
os.Exit(-1)
}
log.Info("Starting VCL reloader version: ", version)
podNs, podName := os.Getenv("POD_NAMESPACE"), os.Getenv("POD_NAME")
if podNs == "" || podName == "" {
log.Fatalf("Env variable POD_NAMESPACE and POD_NAME required")
os.Exit(-1)
}
log.Infof("Running in Pod: %s/%s", podNs, podName)
if *cfgMapF == "" {
log.Fatal("configmap arg is required")
os.Exit(-1)
}
if *metricsPortF > math.MaxUint16 {
log.Fatalf("metricsport %d out of range (max %d)",
*metricsPortF, math.MaxUint16)
os.Exit(-1)
}
tmplMap := make(map[string]string, len(templatePaths))
for _, tmplPath := range templatePaths {
tmplArgs := strings.Split(tmplPath, ":")
if len(tmplArgs) != 2 {
log.Fatal("illegal template arg: ", tmplPath)
os.Exit(-1)
}
tmplMap[tmplArgs[0]] = tmplArgs[1]
}
fqSvcs := make([]string, len(services))
for i, svc := range services {
svcArg := strings.Split(svc, "/")
if len(svcArg) > 2 {
log.Fatal("illegal svc arg: ", svc)
os.Exit(-1)
}
if len(svcArg) == 2 {
if *namespaceF != api_v1.NamespaceAll &&
svcArg[0] != *namespaceF {
log.Fatalf("conflicting namespaces: "+
"namespace=%s svc=%s", *namespaceF, svc)
os.Exit(-1)
}
fqSvcs[i] = svc
continue
}
if *namespaceF != api_v1.NamespaceAll {
fqSvcs[i] = *namespaceF + "/" + svc
} else {
fqSvcs[i] = "default/" + svc
}
}
var cfgMapNamespace, cfgMapName string
cfgMapSlice := strings.Split(*cfgMapF, "/")
if len(cfgMapSlice) > 2 {
log.Fatal("illegal configmap arg: ", *cfgMapF)
os.Exit(-1)
}
if len(cfgMapSlice) == 2 {
if *namespaceF != api_v1.NamespaceAll &&
cfgMapSlice[0] != *namespaceF {
log.Fatalf("conflicting namespaces: "+
"namespace=%s configmap=%s", *namespaceF,
*cfgMapF)
os.Exit(-1)
}
cfgMapNamespace = cfgMapSlice[0]
cfgMapName = cfgMapSlice[1]
} else if *namespaceF != api_v1.NamespaceAll {
cfgMapNamespace = *namespaceF
cfgMapName = *cfgMapF
} else {
cfgMapNamespace = "default"
cfgMapName = *cfgMapF
}
updaterCfg := apiclient.SpecUpdaterConfig{
SvcKeys: fqSvcs,
Main: *mainF,
CfgMapNamespace: cfgMapNamespace,
CfgMapName: cfgMapName,
}
vController, err := varnish.NewController(log, *homeF, tmplMap, *mainF,
*admTimeoutF, *monIntvlF)
if err != nil {
log.Fatal("Cannot initialize Varnish controller: ", err)
os.Exit(-1)
}
config, err := clientcmd.BuildConfigFromFlags(*masterURLF, *kubeconfigF)
if err != nil {
log.Fatalf("error creating client configuration: %v", err)
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal("Failed to create client:", err)
}
var informerFactory informers.SharedInformerFactory
if *namespaceF == api_v1.NamespaceAll {
informerFactory = informers.NewSharedInformerFactory(
kubeClient, *resyncPeriodF)
} else {
informerFactory = informers.NewSharedInformerFactoryWithOptions(
kubeClient, *resyncPeriodF,
informers.WithNamespace(*namespaceF))
}
client, err := apiclient.NewClient(log, kubeClient, updaterCfg,
vController, informerFactory, fqSvcs,
cfgMapNamespace+"/"+cfgMapName, podNs, podName)
if err != nil {
log.Fatalf("Could not initialize API client: %v", err)
os.Exit(-1)
}
vController.EvtGenerator(client)
go handleTermination(log, client, vController)
vController.Start()
informerFactory.Start(informerStop)
client.Run("", uint16(*metricsPortF))
}
func handleTermination(
log *logrus.Logger,
client *apiclient.Client,
vc *varnish.Controller) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
sig := <-signalChan
log.Infof("Received signal (%s), shutting down", sig.String())
log.Info("Shutting down the API client")
client.Stop()
log.Info("Shutting down informers")
informerStop <- struct{}{}
log.Info("Shutting down the Varnish controller")
vc.Quit()
log.Info("Exiting")
os.Exit(0)
}
ARG varnish_version=6.3.1
# Build container
FROM debian:stretch-20191014-slim AS builder
RUN apt-get update -q && \
apt-get install -y -q debian-archive-keyring && \
apt-get install -y -q curl gnupg apt-transport-https pandoc && \
curl -L https://packagecloud.io/varnishcache/varnish63/gpgkey | apt-key add -
ARG varnish_version
ARG go_version=1.11
COPY ./container/varnishcache_varnish63.list /etc/apt/sources.list.d/
COPY ./container/stretch-backports.list /etc/apt/sources.list.d/
RUN apt-get update -q && \
apt-get install -y -q varnish-dev=${varnish_version}-1~stretch && \
apt-get -t stretch-backports install -y -q golang-${go_version}-go && \
apt-get -t stretch-backports install -y -q golang-golang-x-tools && \
apt-get install -y -q git make && \
apt-get remove -y -q curl gnupg apt-transport-https && \
apt-get autoremove -y -q && apt-get clean && rm -rf /var/lib/apt/lists/*
ENV GOPATH=/go
ENV GOSRC=${GOPATH}/src
ENV PATH=${PATH}:/usr/lib/go-${go_version}/bin:${GOPATH}/bin
RUN mkdir -p ${GOPATH}
RUN go get -d -v github.com/slimhazard/gogitversion && \
cd ${GOSRC}/github.com/slimhazard/gogitversion && \
make install
ENV SRC=${GOSRC}/code.uplex.de/uplex-varnish/k8s-vcl-reloader
RUN mkdir -p ${SRC}/cmd
RUN mkdir -p ${SRC}/pkg
RUN mkdir -p ${SRC}/.git
WORKDIR ${SRC}
COPY go.mod .
COPY go.sum .
ENV GO111MODULE=on
RUN go mod download
COPY ./pkg/ ${SRC}/pkg/
COPY ./cmd/ ${SRC}/cmd/
COPY ./.git/ ${SRC}/.git/
RUN go generate ./cmd/... && go build ./pkg/... ./cmd/... && \
GOOS=linux go build -ldflags="-w -s" -o k8s-vcl-reloader cmd/*.go
# Runtime container
FROM varnish:${varnish_version}
# All we need is libvarnishapi
RUN /bin/rm /lib/systemd/system/varnish* \
&& /bin/rm /usr/sbin/varnish* \
&& /bin/rm /usr/bin/varnish* \
&& /bin/rm -rf /usr/lib/varnish/ \
&& /bin/rm /etc/varnish/* \
&& apt-get autoremove -y -q && apt-get clean && rm -rf /var/lib/apt/lists/*
COPY ./container/reloader_exec.sh /usr/bin
RUN /bin/chmod 755 /usr/bin/reloader_exec.sh
ENV SRC=/go/src/code.uplex.de/uplex-varnish/k8s-vcl-reloader
COPY --from=builder ${SRC}/k8s-vcl-reloader /usr/bin
ENTRYPOINT ["/usr/bin/k8s-vcl-reloader"]
# Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
# All rights reserved
#
# Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
all: container
DOCKER_BUILD_OPTIONS =
VER ?= latest
container: Dockerfile
docker build $(DOCKER_BUILD_OPTIONS) -t uplex/k8s-vcl-reloader:$(VER) \
-f Dockerfile ..
#!/bin/bash
set -e
set -u
exec /usr/bin/k8s-vcl-reloader "$@"
deb http://deb.debian.org/debian stretch-backports main
deb https://packagecloud.io/varnishcache/varnish63/debian/ stretch main
deb-src https://packagecloud.io/varnishcache/varnish63/debian/ stretch main
module code.uplex.de/uplex-varnish/k8s-vcl-reloader
require (
code.uplex.de/uplex-varnish/k8s-ingress v0.0.0-20190626000336-792efa9710c9
code.uplex.de/uplex-varnish/varnishapi v0.0.0-20191122093045-3a7d46d81ba1
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/prometheus/client_golang v1.2.1
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0 // indirect
golang.org/x/crypto v0.0.0-20191117063200-497ca9f6d64f // indirect
golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
k8s.io/api v0.0.0-20191004102255-dacd7df5a50b
k8s.io/apimachinery v0.0.0-20191004074956-01f8b7d1121a
k8s.io/client-go v10.0.0+incompatible
sigs.k8s.io/yaml v1.1.0 // indirect
)
This diff is collapsed.
This diff is collapsed.
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import api_v1 "k8s.io/api/core/v1"
func (worker *NamespaceWorker) syncCfgMap(key string) error {
worker.log.Infof("Syncing ConfigMap: %s/%s", worker.namespace, key)
return worker.updater.Update()
}
func (worker *NamespaceWorker) addCfgMap(key string) error {
return worker.syncCfgMap(key)
}
func (worker *NamespaceWorker) updateCfgMap(key string) error {
return worker.syncCfgMap(key)
}
func (worker *NamespaceWorker) deleteCfgMap(obj interface{}) error {
cfgMap, ok := obj.(*api_v1.ConfigMap)
if !ok || cfgMap == nil {
worker.log.Warnf("Delete ConfigMap: not found: %v", obj)
return nil
}
return worker.syncCfgMap(cfgMap.Name)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import api_v1 "k8s.io/api/core/v1"
func (worker *NamespaceWorker) syncEndp(key string) error {
worker.log.Infof("Syncing Endpoints: %s/%s", worker.namespace, key)
svc, err := worker.svc.Get(key)
if err != nil {
worker.log.Errorf("Cannot get service for endpoints %s/%s: %s",
worker.namespace, key, err)
return err
}
worker.log.Infof("Endpoints changed for Service %s/%s, enqueuing "+
"service sync", svc.Namespace, svc.Name)
worker.queue.Add(&SyncObj{Type: Update, Obj: svc})
return nil
}
func (worker *NamespaceWorker) addEndp(key string) error {
return worker.syncEndp(key)
}
func (worker *NamespaceWorker) updateEndp(key string) error {
return worker.syncEndp(key)
}
func (worker *NamespaceWorker) deleteEndp(obj interface{}) error {
endp, ok := obj.(*api_v1.Endpoints)
if !ok || endp == nil {
worker.log.Warnf("Delete Endpoints: not found: %v", obj)
return nil
}
return worker.syncEndp(endp.Name)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import (
"fmt"
"net/http"
"k8s.io/client-go/util/workqueue"
"github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
namespace = "vclreloader"
workqSubsystem = "workqueue"
)
type promProvider struct{}
func (promProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
label := make(map[string]string)
label["namespace"] = name
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "depth",
Help: "Current depth of the workqueue",
ConstLabels: label,
})
prometheus.Register(depth)
return depth
}
func (promProvider) NewAddsMetric(name string) workqueue.CounterMetric {
label := make(map[string]string)
label["namespace"] = name
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "adds_total",
Help: "Total number of adds handled by the workqueue",
ConstLabels: label,
})
prometheus.Register(adds)
return adds
}
func (promProvider) NewLatencyMetric(name string) workqueue.SummaryMetric {
label := make(map[string]string)
label["namespace"] = name
latency := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "latency_useconds",
Help: "Time spent (in µsecs) by items waiting in the " +
"workqueue",
ConstLabels: label,
})
prometheus.Register(latency)
return latency
}
func (promProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
label := make(map[string]string)
label["namespace"] = name
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "work_duration_useconds",
Help: "Time needed (in µsecs) to process items from the " +
"workqueue",
ConstLabels: label,
})
prometheus.Register(workDuration)
return workDuration
}
func (promProvider) NewUnfinishedWorkSecondsMetric(
name string) workqueue.SettableGaugeMetric {
label := make(map[string]string)
label["namespace"] = name
unfinishedWorkDuration := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "unfinished_work_seconds",
Help: "Time spent (in secs) on unfinished work still in" +
"the workqueue",
ConstLabels: label,
})
prometheus.Register(unfinishedWorkDuration)
return unfinishedWorkDuration
}
func (promProvider) NewLongestRunningProcessorMicrosecondsMetric(
name string) workqueue.SettableGaugeMetric {
label := make(map[string]string)
label["namespace"] = name
maxProc := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "max_processor_seconds",
Help: "Time spent (in µsecs) by the longest running " +
"processor in the workqueue",
ConstLabels: label,
})
prometheus.Register(maxProc)
return maxProc
}
func (promProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
label := make(map[string]string)
label["namespace"] = name
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: workqSubsystem,
Namespace: namespace,
Name: "retries_total",
Help: "Total number of retries handled by workqueue",
ConstLabels: label,
})
prometheus.Register(retries)
return retries
}
var (
watchCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "watcher",
Namespace: namespace,
Name: "events_total",
Help: "Total number of watcher API events",
}, []string{"kind", "event"})
syncCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "sync",
Name: "result_total",
Help: "Total number of synchronization results",
}, []string{"namespace", "kind", "result"})
)
// InitMetrics sets the prometheus provider for the workqueue metrics,
// and registers the counter vectors.
func InitMetrics() {
workqueue.SetProvider(promProvider{})
prometheus.Register(watchCounters)
prometheus.Register(syncCounters)
}
// ServeMetrics executes the HTTP Handler for the /metrics endpoint.
func ServeMetrics(log *logrus.Logger, port uint16) {
addr := fmt.Sprintf(":%d", port)
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(addr, nil))
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import (
"fmt"
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
func (client *Client) podEvent(evtType, reason, msgFmt string,
args ...interface{}) {
if client.pod == nil {
noop := meta_v1.GetOptions{}
pod, err := client.client.CoreV1().Pods(client.podNs).
Get(client.podName, noop)
if err != nil {
e := fmt.Errorf("Cannot retrieve Pod object for %s/%s,"+
" will not generate event(%s, %s): %v",
client.podNs, client.podName, evtType, reason,
err)
utilruntime.HandleError(e)
return
}
client.pod = pod
}
client.recorder.Eventf(client.pod, evtType, reason, msgFmt, args...)
}
// PodInfoEvent generates an Event with type "Normal" for the current Pod.
func (client *Client) PodInfoEvent(reason, msgFmt string, args ...interface{}) {
client.podEvent(api_v1.EventTypeNormal, reason, msgFmt, args...)
}
// PodWarnEvent generates an Event with type "Warning" for the current Pod.
func (client *Client) PodWarnEvent(reason, msgFmt string, args ...interface{}) {
client.podEvent(api_v1.EventTypeWarning, reason, msgFmt, args...)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import api_v1 "k8s.io/api/core/v1"
func (worker *NamespaceWorker) syncSvc(key string) error {
worker.log.Infof("Syncing Service: %s/%s", worker.namespace, key)
return worker.updater.Update()
}
func (worker *NamespaceWorker) addSvc(key string) error {
return worker.syncSvc(key)
}
func (worker *NamespaceWorker) updateSvc(key string) error {
return worker.syncSvc(key)
}
func (worker *NamespaceWorker) deleteSvc(obj interface{}) error {
svc, ok := obj.(*api_v1.Service)
if !ok || svc == nil {
worker.log.Warnf("Delete Service: not found: %v", obj)
return nil
}
return worker.syncSvc(svc.Name)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import (
"fmt"
"time"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/varnish"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/varnish/vcl"
"github.com/sirupsen/logrus"
api_v1 "k8s.io/api/core/v1"
core_v1_listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
// SpecUpdaterConfig represents the user configuration for k8s
// resources that are considered for a VCL update.
type SpecUpdaterConfig struct {
SvcKeys []string
Main string
CfgMapName string
CfgMapNamespace string
}
// SpecUpdater encapsulates updating the VCL spec from the current
// state of the k8s cluster.
type SpecUpdater struct {
log *logrus.Logger
cfg SpecUpdaterConfig
vc *varnish.Controller
svc core_v1_listers.ServiceLister
endp core_v1_listers.EndpointsLister
cfgMap core_v1_listers.ConfigMapLister
}
// NewSpecUpdater creates a SpecUpdater.
func NewSpecUpdater(
log *logrus.Logger,
cfg SpecUpdaterConfig,
vc *varnish.Controller,
listers *Listers) *SpecUpdater {
return &SpecUpdater{
log: log,
cfg: cfg,
vc: vc,
endp: listers.endp,
svc: listers.svc,
cfgMap: listers.cfgMap,
}
}
func getEndpSpec(addr api_v1.EndpointAddress, port int32) vcl.Endpoint {
ep := vcl.Endpoint{
IP: addr.IP,
Port: port,
}
ep.Namespace = addr.TargetRef.Namespace
ep.UID = string(addr.TargetRef.UID)
ep.ResourceVersion = addr.TargetRef.ResourceVersion
return ep
}
func (updater *SpecUpdater) getSvcSpec(
svc *api_v1.Service) (vclSvc vcl.Service, err error) {
updater.log.Debugf("Updating Service %s/%s spec from cluster config",
svc.Namespace, svc.Name)
vclSvc = vcl.Service{}
eps, err := updater.endp.Endpoints(svc.Namespace).Get(svc.Name)
if err != nil {
return
}
vclSvc.Namespace = svc.Namespace
vclSvc.UID = string(svc.UID)
vclSvc.ResourceVersion = svc.ResourceVersion
vclSvc.Endpoints = make(map[string]vcl.Endpoint)
for _, subset := range eps.Subsets {
if len(subset.Ports) != 1 {
// XXX ???
err = fmt.Errorf("Endpoint %s/%s has %d ports",
eps.Namespace, eps.Name, len(subset.Ports))
return
}
port := subset.Ports[0].Port
for _, addr := range subset.Addresses {
vclSvc.Endpoints[addr.TargetRef.Name] =
getEndpSpec(addr, port)
}
}
return
}
func (updater *SpecUpdater) getSpec(version string) (vcl.Spec, error) {
updater.log.Info("Updating VCL spec from cluster config")
spec := vcl.Spec{
Version: version,
Main: updater.cfg.Main,
Created: time.Now(),
}
svcsSpec := make(map[string]vcl.Service)
for _, svcKey := range updater.cfg.SvcKeys {
ns, name, err := cache.SplitMetaNamespaceKey(svcKey)
if err != nil {
return spec, err
}
svc, err := updater.svc.Services(ns).Get(name)
if err != nil {
return spec, err
}
// XXX same name in different namespaces
svcsSpec[svc.Name], err = updater.getSvcSpec(svc)
if err != nil {
return spec, err
}
}
if len(svcsSpec) != len(updater.cfg.SvcKeys) {
var svcsNotFound []string
for _, key := range updater.cfg.SvcKeys {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return spec, err
}
if _, exists := svcsSpec[name]; !exists {
svcsNotFound = append(svcsNotFound, name)
}
}
return spec, fmt.Errorf("Services not found, no update: %s",
svcsNotFound)
}
spec.Services = svcsSpec
cfgMap, err := updater.cfgMap.ConfigMaps(updater.cfg.CfgMapNamespace).
Get(updater.cfg.CfgMapName)
if err != nil {
return spec, err
}
spec.CfgMap.Name = cfgMap.Name
spec.CfgMap.Namespace = cfgMap.Namespace
spec.CfgMap.Version = version
spec.CfgMap.UID = string(cfgMap.UID)
spec.CfgMap.ResourceVersion = cfgMap.ResourceVersion
return spec, nil
}
// Update generates a new configuration from the current state of the
// k8s cluster, and loads it as a new instance of VCL.
func (updater *SpecUpdater) Update() error {
updater.log.Info("Update signal received")
spec, err := updater.getSpec("")
if err != nil {
// XXX generate Warn event
updater.log.Errorf("Cannot create spec: %s", err)
return err
}
if updater.vc.HasConfig(spec) {
updater.log.Infof("Varnish VCL config is up to date")
return nil
}
if err = updater.vc.Update(&spec); err != nil {
// XXX generate Warn event, inc counter
updater.log.Errorf("Update failed: %s", err)
return err
}
// XXX generate Info event, inc counter(?)
updater.log.Info("Varnish update succeeded")
return nil
}
/*
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package apiclient
import (
"fmt"
"sync"
api_v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
core_v1_listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"github.com/sirupsen/logrus"
)
const (
// syncSuccess and syncFailure are reasons for Events
syncSuccess = "SyncSuccess"
syncFailure = "SyncFailure"
)
// NamespaceWorker serves fanout of work items to workers for each
// namespace for which the controller is notified about a resource
// update. The NamespaceQueues object creates a new instance when it
// reads an item from a new namespace from its main queue. Each worker
// has its own queue and listers filtered for its namespace. Thus each
// namespace is synced separately and sequentially, and all of the
// namespaces are synced in parallel.
type NamespaceWorker struct {
namespace string
log *logrus.Logger
updater *SpecUpdater
queue workqueue.RateLimitingInterface
listers *Listers
svc core_v1_listers.ServiceNamespaceLister
endp core_v1_listers.EndpointsNamespaceLister
cfgMap core_v1_listers.ConfigMapNamespaceLister
recorder record.EventRecorder
wg *sync.WaitGroup
}
func (worker *NamespaceWorker) event(obj interface{}, evtType, reason,
msgFmt string, args ...interface{}) {
eventObj := obj
if syncObj, ok := obj.(*SyncObj); ok {
eventObj = syncObj.Obj
}
kind := "Unknown"
switch eventObj.(type) {
case *api_v1.Service:
svc, _ := eventObj.(*api_v1.Service)
worker.recorder.Eventf(svc, evtType, reason, msgFmt, args...)
kind = "Service"
case *api_v1.Endpoints:
endp, _ := eventObj.(*api_v1.Endpoints)
worker.recorder.Eventf(endp, evtType, reason, msgFmt, args...)
kind = "Endpoints"
case *api_v1.ConfigMap:
cfgMap, _ := eventObj.(*api_v1.ConfigMap)
worker.recorder.Eventf(cfgMap, evtType, reason, msgFmt, args...)
kind = "ConfigMap"
default:
worker.log.Warnf("Unhandled type %T, no event generated",
eventObj)
}
syncCounters.WithLabelValues(worker.namespace, kind, reason).Inc()
}
func (worker *NamespaceWorker) infoEvent(obj interface{}, reason, msgFmt string,
args ...interface{}) {
worker.event(obj, api_v1.EventTypeNormal, reason, msgFmt, args...)
}
func (worker *NamespaceWorker) warnEvent(obj interface{}, reason, msgFmt string,
args ...interface{}) {
worker.event(obj, api_v1.EventTypeWarning, reason, msgFmt, args...)
}
func (worker *NamespaceWorker) syncSuccess(obj interface{}, msgFmt string,
args ...interface{}) {
worker.log.Infof(msgFmt, args...)
worker.infoEvent(obj, syncSuccess, msgFmt, args...)
}
func (worker *NamespaceWorker) syncFailure(obj interface{}, msgFmt string,
args ...interface{}) {
worker.log.Errorf(msgFmt, args...)
worker.warnEvent(obj, syncFailure, msgFmt, args...)
}
func (worker *NamespaceWorker) dispatch(obj interface{}) error {
syncObj, ok := obj.(*SyncObj)
if !ok {
worker.syncFailure(obj, "Unhandled type %T", obj)
return nil
}
_, key, err := getNameSpace(syncObj.Obj)
if err != nil {
worker.syncFailure(syncObj.Obj,
"Cannot get key for object %v: %v", syncObj.Obj, err)
return nil
}
switch syncObj.Type {
case Add:
switch syncObj.Obj.(type) {
case *api_v1.Service:
return worker.addSvc(key)
case *api_v1.Endpoints:
return worker.addEndp(key)
case *api_v1.ConfigMap:
return worker.addCfgMap(key)
default:
worker.syncFailure(syncObj.Obj,
"Unhandled object type: %T", syncObj.Obj)
return nil
}
case Update:
switch syncObj.Obj.(type) {
case *api_v1.Service:
return worker.updateSvc(key)
case *api_v1.Endpoints:
return worker.updateEndp(key)
case *api_v1.ConfigMap:
return worker.updateCfgMap(key)
default:
worker.syncFailure(syncObj.Obj,
"Unhandled object type: %T", syncObj.Obj)
return nil
}
case Delete:
deletedObj := syncObj.Obj
deleted, ok := obj.(cache.DeletedFinalStateUnknown)
if ok {
deletedObj = deleted.Obj
}
switch deletedObj.(type) {
case *api_v1.Service:
return worker.deleteSvc(deletedObj)
case *api_v1.Endpoints:
return worker.deleteEndp(deletedObj)
case *api_v1.ConfigMap:
return worker.deleteCfgMap(deletedObj)
default:
worker.syncFailure(deletedObj,
"Unhandled object type: %T", deletedObj)
return nil
}
default:
worker.syncFailure(syncObj.Obj, "Unhandled sync type: %v",
syncObj.Type)
return nil
}
}
func (worker *NamespaceWorker) next() {
obj, quit := worker.queue.Get()
if quit {
return
}
defer worker.queue.Done(obj)
if err := worker.dispatch(obj); err == nil {
if ns, name, err := getNameSpace(obj); err == nil {
worker.syncSuccess(obj, "Successfully synced: %s/%s",
ns, name)
} else {
worker.syncSuccess(obj, "Successfully synced")
}
worker.queue.Forget(obj)
} else {
worker.syncFailure(obj, "Error, requeueing: %v", err)
worker.queue.AddRateLimited(obj)
}
}
func (worker *NamespaceWorker) work() {
defer worker.wg.Done()
worker.log.Info("Starting worker for namespace:", worker.namespace)
for !worker.queue.ShuttingDown() {
worker.next()
}
worker.log.Info("Shutting down worker for namespace:", worker.namespace)
}
// NamespaceQueues reads from the main queue to which informers add
// new work items from all namespaces. The worker reads items from the
// queue and places them on separate queues for NamespaceWorkers
// responsible for each namespace.
type NamespaceQueues struct {
Queue workqueue.RateLimitingInterface
DoneChan chan struct{}
log *logrus.Logger
updater *SpecUpdater
workers map[string]*NamespaceWorker
listers *Listers
recorder record.EventRecorder
wg *sync.WaitGroup
}
// NewNamespaceQueues creates a NamespaceQueues object.
//
// log: logger initialized at startup
// vController: Varnish controller initialied at startup
// listers: client-go/lister instance for each resource type
// recorder: Event broadcaster initialized at startup
func NewNamespaceQueues(
log *logrus.Logger,
updater *SpecUpdater,
listers *Listers,
recorder record.EventRecorder) *NamespaceQueues {
q := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "_ALL_")
return &NamespaceQueues{
Queue: q,
DoneChan: make(chan struct{}),
log: log,
updater: updater,
workers: make(map[string]*NamespaceWorker),
listers: listers,
recorder: recorder,
wg: new(sync.WaitGroup),
}
}
func getNameSpace(obj interface{}) (namespace, name string, err error) {
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
namespace, name, err = cache.SplitMetaNamespaceKey(k)
return
}
func (qs *NamespaceQueues) next() {
obj, quit := qs.Queue.Get()
if quit {
return
}
defer qs.Queue.Done(obj)
syncObj, ok := obj.(*SyncObj)
if !ok {
utilruntime.HandleError(fmt.Errorf("Unhandled type %T", obj))
return
}
ns, _, err := getNameSpace(syncObj.Obj)
if err != nil {
utilruntime.HandleError(err)
return
}
worker, exists := qs.workers[ns]
if !exists {
q := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), ns)
worker = &NamespaceWorker{
namespace: ns,
log: qs.log,
updater: qs.updater,
queue: q,
listers: qs.listers,
svc: qs.listers.svc.Services(ns),
endp: qs.listers.endp.Endpoints(ns),
cfgMap: qs.listers.cfgMap.ConfigMaps(ns),
recorder: qs.recorder,
wg: qs.wg,
}
qs.workers[ns] = worker
go worker.work()
qs.wg.Add(1)
}
worker.queue.Add(obj)
qs.Queue.Forget(obj)
}
// Run comprises the main loop of the API client, reading from the
// main queue of work items and handing them off to workers for each
// namespace.
func (qs *NamespaceQueues) Run() {
qs.log.Info("Starting dispatcher worker")
for !qs.Queue.ShuttingDown() {
qs.next()
}
}
// Stop shuts down the main queue loop initiated by Run(), and in turn
// shuts down all of the NamespaceWorkers.
func (qs *NamespaceQueues) Stop() {
qs.log.Info("Shutting down dispatcher worker")
qs.Queue.ShutDown()
for _, worker := range qs.workers {
qs.log.Infof("Shutting down queue for namespace: %s",
worker.namespace)
worker.queue.ShutDown()
}
qs.log.Info("Waiting for workers to shut down")
qs.wg.Wait()
close(qs.DoneChan)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package interfaces defines interfaces that allow pkg/apiclient and
// pkg/varnish to access code from one another without introducing
// import cycles.
package interfaces
// A PodEventGenerator defines methods to generate Events for the
// Varnish instance.
type PodEventGenerator interface {
PodInfoEvent(reason, msgFmt string, args ...interface{})
PodWarnEvent(reason, msgFmt string, args ...interface{})
}
// An Updater defines the method that generates a new configuration
// from the state of k8s cluster, and loads it as the new VCL config.
type Updater interface {
Update() error
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package varnish
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "vclreloader"
subsystem = "varnish"
)
type instanceMetrics struct {
updates prometheus.Counter
updateErrs prometheus.Counter
tmplFails prometheus.Counter
vsmFails prometheus.Counter
connectFails prometheus.Counter
vclLoads prometheus.Counter
vclLoadErrs prometheus.Counter
templateGens prometheus.Counter
templateErrs prometheus.Counter
connectLatency prometheus.Summary
vclLoadLatency prometheus.Summary
pings prometheus.Counter
pingFails prometheus.Counter
panics prometheus.Counter
childRunning prometheus.Counter
childNotRunning prometheus.Counter
vclDiscards prometheus.Counter
monitorChecks prometheus.Counter
}
var (
beEndpsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "backend_endpoints",
Help: "Current number of Service endpoints configured " +
"as Varnish backends",
})
monResultCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "monitor_result_total",
Help: "Total number of monitor results",
}, []string{"status", "result"})
metrics = &instanceMetrics{
updates: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "updates_total",
Help: "Total number of attempted updates",
}),
updateErrs: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "update_errors_total",
Help: "Total number of update errors",
}),
tmplFails: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "template_errors_total",
Help: "Total number of template parse failures",
}),
vsmFails: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vsm_errors_total",
Help: "Total number of shared memory attach failures",
}),
connectFails: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "admin_connect_fails_total",
Help: "Total number of admin connection failures",
}),
vclLoads: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_loads_total",
Help: "Total number of successful VCL loads",
}),
vclLoadErrs: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_load_errors_total",
Help: "Total number of VCL load errors",
}),
templateGens: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_gens_total",
Help: "Total number of successful VCL generations",
}),
templateErrs: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_gen_errors_total",
Help: "Total number of VCL generation errors",
}),
connectLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "admin_connect_latency_seconds",
Help: "Admin connection latency",
Objectives: latencyObjectives,
}),
vclLoadLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_load_latency_seconds",
Help: "VCL load latency",
Objectives: latencyObjectives,
}),
pings: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pings_total",
Help: "Total number of successful pings",
}),
pingFails: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ping_errors_total",
Help: "Total number of ping errors",
}),
panics: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "panics_total",
Help: "Total number of panics detected",
}),
childRunning: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "child_running_total",
Help: "Total number of monitor runs with the " +
"child process in the running state",
}),
childNotRunning: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "child_not_running_total",
Help: "Total number of monitor runs with the " +
"child process not in the running state",
}),
vclDiscards: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "vcl_discards_total",
Help: "Total number of VCL discards",
}),
monitorChecks: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "monitor_checks_total",
Help: "Total number of monitor checks",
}),
}
instMetricsMtx = &sync.Mutex{}
latencyObjectives = map[float64]float64{
0.5: 0.001,
0.9: 0.001,
0.95: 0.001,
0.99: 0.001,
0.999: 0.001,
}
)
func initMetrics() {
prometheus.Register(monResultCtr)
prometheus.Register(beEndpsGauge)
prometheus.Register(metrics.updates)
prometheus.Register(metrics.updateErrs)
prometheus.Register(metrics.vsmFails)
prometheus.Register(metrics.tmplFails)
prometheus.Register(metrics.connectFails)
prometheus.Register(metrics.vclLoads)
prometheus.Register(metrics.vclLoadErrs)
prometheus.Register(metrics.templateErrs)
prometheus.Register(metrics.templateGens)
prometheus.Register(metrics.connectLatency)
prometheus.Register(metrics.vclLoadLatency)
prometheus.Register(metrics.pings)
prometheus.Register(metrics.pingFails)
prometheus.Register(metrics.panics)
prometheus.Register(metrics.childRunning)
prometheus.Register(metrics.childNotRunning)
prometheus.Register(metrics.vclDiscards)
prometheus.Register(metrics.monitorChecks)
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package varnish
import (
"time"
"code.uplex.de/uplex-varnish/varnishapi/pkg/admin"
"github.com/prometheus/client_golang/prometheus"
)
const (
connectErr = "ConnectFailure"
vsmErr = "SharedMemoryFailure"
pingErr = "PingFailure"
statusErr = "StatusFailure"
statusNotRun = "StatusNotRunning"
panicErr = "PanicFailure"
panic = "Panic"
vclListErr = "VCLListFailure"
discardErr = "VCLDiscardFailure"
updateErr = "UpdateFailure"
monitorGood = "MonitorGood"
)
func (vc *Controller) infoEvt(reason, msgFmt string, args ...interface{}) {
vc.log.Infof(msgFmt, args...)
vc.evt.PodInfoEvent(reason, msgFmt, args...)
monResultCtr.WithLabelValues("info", reason).Inc()
}
func (vc *Controller) warnEvt(reason, msgFmt string, args ...interface{}) {
vc.log.Warnf(msgFmt, args...)
vc.evt.PodWarnEvent(reason, msgFmt, args...)
monResultCtr.WithLabelValues("warning", reason).Inc()
}
func (vc *Controller) errorEvt(reason, msgFmt string, args ...interface{}) {
vc.log.Errorf(msgFmt, args...)
vc.evt.PodWarnEvent(reason, msgFmt, args...)
monResultCtr.WithLabelValues("error", reason).Inc()
}
func (vc *Controller) checkInst() bool {
metrics.monitorChecks.Inc()
vc.log.Infof("Monitoring Varnish")
vc.admMtx.Lock()
defer vc.admMtx.Unlock()
vc.wg.Add(1)
defer vc.wg.Done()
vc.log.Tracef("Get address and secret from %s", vc.home)
addr, secret, err := vc.getAddrSecret()
if err != nil {
metrics.vsmFails.Inc()
vc.errorEvt(vsmErr, "Error reading admin address and secret: "+
"%v", err)
return false
}
timer := prometheus.NewTimer(metrics.connectLatency)
adm, err := admin.Dial(addr, secret, vc.admTimeout)
timer.ObserveDuration()
if err != nil {
metrics.connectFails.Inc()
vc.errorEvt(connectErr, "Error connecting: %v", err)
return false
}
defer adm.Close()
vc.Banner = adm.Banner
vc.log.Debugf("Connected to Varnish instance, banner: %s", adm.Banner)
pong, err := adm.Ping()
if err != nil {
metrics.pingFails.Inc()
vc.errorEvt(pingErr, "Error pinging: %v", err)
return false
}
metrics.pings.Inc()
vc.log.Debugf("Succesfully pinged instance: %+v", pong)
state, err := adm.Status()
if err != nil {
vc.errorEvt(statusErr, "Error getting status: %v", err)
return false
}
if state == admin.Running {
metrics.childRunning.Inc()
vc.log.Debugf("Status: %s", state)
} else {
metrics.childNotRunning.Inc()
vc.warnEvt(statusNotRun, "Status: %s", state)
}
panic, err := adm.GetPanic()
if err != nil {
vc.errorEvt(panicErr, "Error getting panic: %v", err)
return false
}
if panic == "" {
vc.log.Debug("No panic")
} else {
metrics.panics.Inc()
vc.errorEvt(panic, "Panic: %s", panic)
// XXX clear the panic? Should be configurable
}
vcls, err := adm.VCLList()
if err != nil {
vc.errorEvt(vclListErr,
"Error getting VCL list: %v", err)
return false
}
for _, vcl := range vcls {
if vcl.State == admin.ColdState {
if err = adm.VCLDiscard(vcl.Name); err != nil {
vc.errorEvt(discardErr,
"Error discarding VCL %s: %v",
vcl.Name, err)
return false
}
metrics.vclDiscards.Inc()
vc.log.Infof("Discarded VCL %s", vcl.Name)
}
}
return true
}
func (vc *Controller) monitor(monitorIntvl time.Duration) {
if monitorIntvl <= 0 {
vc.log.Infof("Varnish monitor interval=%v, monitor not running",
monitorIntvl)
return
}
vc.log.Info("Varnish monitor starting, interval: ", monitorIntvl)
for {
time.Sleep(monitorIntvl)
vc.log.Infof("Monitoring Varnish instance")
good := true
if !vc.checkInst() {
good = false
}
// if vc.spec == nil {
// vc.warnEvt(updateErr, "VCL spec not initialized, "+
// "not updating Varnish")
// metrics.updateErrs.Inc()
// good = false
// } else if err := vc.Update(vc.spec); err != nil {
// vc.errorEvt(updateErr, "Errors updating Varnish: %+v",
// err)
// metrics.updateErrs.Inc()
// good = false
// }
if err := vc.updater.Update(); err != nil {
vc.errorEvt(updateErr, "Errors updating Varnish: %+v",
err)
metrics.updateErrs.Inc()
good = false
}
if good {
vc.infoEvt(monitorGood, "Monitor check good")
}
}
}
/*
* Copyright (c) 2019 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package varnish encapsulates interaction with a Varnish instance
// to transform desired states from k8s configs to the actual state of
// its VCL configuration. Only this package imports
// varnishapi/pkg/admin to interact with the CLI of the Varnish
// instance.
package varnish
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"sync"
"text/template"
"time"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/interfaces"
"code.uplex.de/uplex-varnish/k8s-vcl-reloader/pkg/varnish/vcl"
"code.uplex.de/uplex-varnish/varnishapi/pkg/admin"
"code.uplex.de/uplex-varnish/varnishapi/pkg/vsm"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// Controller encapsulates information about the Varnish instance that
// it configures.
type Controller struct {
log *logrus.Logger
updater interfaces.Updater
evt interfaces.PodEventGenerator
spec *vcl.Spec
cfgLoaded bool
home string
Banner string
tmplMap map[string]string
tmpl2file map[*template.Template]string
loadFile string
admTimeout time.Duration
admMtx *sync.Mutex
wg *sync.WaitGroup
monIntvl time.Duration
}
// NewController returns an instance of Controller.
//
// log: logger object initialized at startup
// home: Varnish home directory
// tmplMap: map of template paths to file paths
// loadFile: file to be named in a VCL load command
// admTimeout: connection timeout for the Varnish admin port
// monIntvl: interval at which to run the monitor
func NewController(
log *logrus.Logger,
home string,
tmplMap map[string]string,
loadFile string,
admTimeout time.Duration,
monIntvl time.Duration) (*Controller, error) {
tmpl2file, err := vcl.InitTemplates(tmplMap)
if err != nil {
metrics.tmplFails.Inc()
return nil, err
}
initMetrics()
return &Controller{
log: log,
home: home,
loadFile: loadFile,
tmplMap: tmplMap,
tmpl2file: tmpl2file,
admTimeout: admTimeout,
monIntvl: monIntvl,
admMtx: &sync.Mutex{},
wg: new(sync.WaitGroup),
}, nil
}
// EvtGenerator sets the object that implements interface
// PodEventGenerator, and will be used by the monitor goroutine to
// generate Events.
func (vc *Controller) EvtGenerator(podEvt interfaces.PodEventGenerator) {
vc.evt = podEvt
}
// SetUpdater sets the object that implements interface Updater, and
// will be used by generate a VCL spec from the current state of the
// k8s cluster.
func (vc *Controller) SetUpdater(updater interfaces.Updater) {
vc.updater = updater
}
// Start initiates the Varnish controller and starts the monitor
// goroutine.
func (vc *Controller) Start() {
fmt.Printf("Varnish controller logging at level: %s\n", vc.log.Level)
go vc.monitor(vc.monIntvl)
}
func (vc *Controller) getAddrSecret() (string, []byte, error) {
vsm := vsm.New()
if vsm == nil {
return "", nil, fmt.Errorf("Cannot initialize shared memory " +
"attach (out of memory?)")
}
defer vsm.Destroy()
if err := vsm.Attach(vc.home); err != nil {
return "", nil, fmt.Errorf("Cannot attach to shared memory "+
"for %s: %s", vc.home, vsm.Error())
}
addr, err := vsm.GetMgmtAddr()
if err != nil {
return "", nil, fmt.Errorf("Cannot get management address: %s",
vsm.Error())
}
secretPath, err := vsm.GetSecretPath()
if err != nil {
return addr, nil, fmt.Errorf("Cannot get secret path: %s",
vsm.Error())
}
file, err := os.Open(secretPath)
if err != nil {
return addr, nil, fmt.Errorf("Cannot open secret at %s: %s",
secretPath, err)
}
secret, err := ioutil.ReadAll(file)
if err != nil {
return addr, nil, fmt.Errorf("Cannot read secret at %s: %s",
secretPath, err)
}
return addr, secret, nil
}
// Update a Varnish Service to implement an configuration.
//
// spec: VCL spec corresponding to the configuration
func (vc *Controller) Update(spec *vcl.Spec) error {
var err error
metrics.updates.Inc()
cfgName := spec.ConfigName()
vc.log.Infof("Update Varnish to VCL config %s: %+v", cfgName, spec)
vc.admMtx.Lock()
defer vc.admMtx.Unlock()
vc.wg.Add(1)
defer vc.wg.Done()
vc.spec = spec
vc.cfgLoaded = false
vc.log.Trace("Re-parsing templates: ", vc.tmplMap)
vc.tmpl2file, err = vcl.InitTemplates(vc.tmplMap)
if err != nil {
metrics.tmplFails.Inc()
return err
}
vc.log.Tracef("Get address and secret from %s", vc.home)
addr, secret, err := vc.getAddrSecret()
if err != nil {
metrics.vsmFails.Inc()
return err
}
vc.log.Tracef("Connect to %s, timeout=%v", addr, vc.admTimeout)
timer := prometheus.NewTimer(metrics.connectLatency)
adm, err := admin.Dial(addr, secret, vc.admTimeout)
timer.ObserveDuration()
if err != nil {
metrics.connectFails.Inc()
return err
}
defer adm.Close()
vc.Banner = adm.Banner
vc.log.Info("Connected to Varnish admin endpoint")
loaded := false
vc.log.Trace("List VCLs")
vcls, err := adm.VCLList()
if err != nil {
return err
}
vc.log.Tracef("VCL List: %+v", vcls)
for _, vcl := range vcls {
if vcl.Name == cfgName {
loaded = true
break
}
}
if loaded {
vc.log.Infof("Config %s already loaded", cfgName)
vc.cfgLoaded = true
return nil
}
vc.log.Trace("Generating VCL files from templates")
for tmpl, file := range vc.tmpl2file {
vc.log.Trace("Template", tmpl.Name())
if err = vc.spec.WriteTemplate(tmpl, file); err != nil {
metrics.templateErrs.Inc()
return fmt.Errorf("Cannot generate %s from template "+
"%s: %s", file, tmpl.Name(), err)
}
}
vc.log.Info("VCL files generated from templates")
metrics.templateGens.Inc()
vc.log.Tracef("Load config %s", cfgName)
timer = prometheus.NewTimer(metrics.vclLoadLatency)
err = adm.VCLLoad(cfgName, vc.loadFile)
timer.ObserveDuration()
if err != nil {
metrics.vclLoadErrs.Inc()
return fmt.Errorf("Error loading config %s: %v", cfgName, err)
}
if err = adm.VCLUse(cfgName); err != nil {
metrics.vclLoadErrs.Inc()
return fmt.Errorf("Error using config %s after load: %v",
cfgName, err)
}
vc.cfgLoaded = true
metrics.vclLoads.Inc()
vc.log.Infof("Loaded VCL config %s", cfgName)
return nil
}
// HasConfig returns true iff a configuration is already loaded for a
// Varnish Service (so a new sync attempt is not necessary).
//
// spec: VCL specification derived from the configuration
func (vc *Controller) HasConfig(spec vcl.Spec) bool {
if vc.spec == nil {
return false
}
if !vc.cfgLoaded {
return false
}
return bytes.Equal(vc.spec.DeepHash(), spec.DeepHash())
}
// Quit stops the Varnish controller.
func (vc *Controller) Quit() {
vc.log.Info("Wait for admin interactions with Varnish instances to " +
"finish")
vc.wg.Wait()
}
......@@ -37,6 +37,9 @@ import (
)
const (
// RFC3339Micro is date/time format with µs precision,
// suitable for use with time.Format() and time.Parse() (cf
// go's time.RFC3339Nano)
RFC3339Micro = "2006-01-02T15:04:05.999999Z07:00"
)
......@@ -140,13 +143,19 @@ func (spec Spec) DeepHash() []byte {
return (hash.Sum(nil))
}
// FQVersion returns a fully qualified version -- the version string
// appended with the creation time of the Spec (since the same VCL
// version may be used with different backend configurations).
// FQVersion returns a fully qualified version: the fully qualified
// name of the ConfigMap appended with the version string and the
// creation time of the Spec (since the same VCL version may be used
// with different backend configurations).
func (spec Spec) FQVersion() string {
return spec.Version + "-" + spec.Created.Format(RFC3339Micro)
return spec.CfgMap.Namespace + "/" + spec.CfgMap.Name + "-" +
spec.Version + "-" + spec.Created.Format(RFC3339Micro)
}
// ConfigName generates a configuration name that is safe to use in a
// VCL load command. Equivalent to the result of FQVersion(), with
// strings of characters that are illegal for a VCL name replaced with
// '_'.
func (spec Spec) ConfigName() string {
name := spec.FQVersion()
return vclIllegal.ReplaceAllLiteralString(name, "_")
......
......@@ -49,12 +49,12 @@ func InitTemplates(
tmpls2files := make(map[*template.Template]string, len(tmplMap))
for tmplPath, filePath := range tmplMap {
if tmpl, err := template.New(filepath.Base(tmplPath)).
Funcs(fMap).ParseFiles(tmplPath); err != nil {
tmpl, err := template.New(filepath.Base(tmplPath)).Funcs(fMap).
ParseFiles(tmplPath)
if err != nil {
return nil, err
} else {
tmpls2files[tmpl] = filePath
}
tmpls2files[tmpl] = filePath
}
return tmpls2files, nil
}
......
......@@ -203,16 +203,41 @@ func TestFQVersion(t *testing.T) {
if testing.Verbose() {
t.Log("FQVersion():", fqVersion)
}
if !strings.HasPrefix(fqVersion, testSpec.Version+"-") {
t.Fatal("FQVersion(): does not begin with spec.Version + "+
if !strings.HasPrefix(fqVersion,
testSpec.CfgMap.Namespace+"/"+testSpec.CfgMap.Name+"-") {
t.Fatal("FQVersion(): does not begin with ConfigMap name + "+
"\"-\":", fqVersion)
}
versionSlice := strings.SplitN(fqVersion, "-", 2)
if created, err := time.Parse(RFC3339Micro, versionSlice[1]); err != nil {
t.Fatal("FQVersion: cannot parse timestamp", err)
versionSlice := strings.SplitN(fqVersion, "-", 4)
if versionSlice[2] != testSpec.Version {
t.Fatalf("FQVersion(): does not include spec.Version=%s (%s): "+
"%s", testSpec.Version, versionSlice[2], fqVersion)
}
if created, err := time.Parse(RFC3339Micro, versionSlice[3]); err != nil {
t.Fatal("FQVersion(): cannot parse timestamp", err)
} else if testSpec.Created != created {
t.Fatal("FQVersion(): timestamp != Spec.Created", fqVersion)
}
noversionSpec := Spec{
Created: time.Unix(1136239445, 123456000),
CfgMap: ConfigMap{
Name: "vcl-cfgmap",
Meta: Meta{
Namespace: "default",
},
},
}
fqVersion = noversionSpec.FQVersion()
if testing.Verbose() {
t.Log("FQVersion():", fqVersion)
}
versionSlice = strings.SplitN(fqVersion, "-", 4)
if created, err := time.Parse(RFC3339Micro, versionSlice[3]); err != nil {
t.Fatal("FQVersion(): cannot parse timestamp", err)
} else if noversionSpec.Created != created {
t.Fatal("FQVersion(): timestamp != Spec.Created", fqVersion)
}
}
func TestConfigName(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment