Commit 0e21a5d0 authored by Geoff Simmons's avatar Geoff Simmons

Fundamental refactoring: controller & Varnishen in different containers.

Docs are not yet updated in this commit.
parent af716613
*~
# Build artifacts
/k8s-ingress
/main_version.go
/cmd/k8s-ingress
/cmd/main_version.go
......@@ -24,13 +24,7 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
all: push
IMAGE = varnish-ingress
DOCKER_BUILD_OPTIONS =
MINIKUBE =
all: k8s-ingress
PACKAGES = \
k8s.io/client-go/kubernetes \
......@@ -62,18 +56,7 @@ check: k8s-ingress
test: check
docker-minikube:
ifeq ($(MINKUBE),1)
eval $(minikube docker-env)
endif
container: docker-minikube
docker build --build-arg PACKAGES="$(PACKAGES)" \
$(DOCKER_BUILD_OPTIONS) -t $(IMAGE) .
push: docker-minikube container
docker push $(IMAGE)
clean:
go clean ./...
rm -f main_version.go
rm -f k8s-ingress
......@@ -34,8 +34,8 @@ import (
"reflect"
"time"
"code.uplex.de/uplex-varnish/k8s-ingress/varnish"
"code.uplex.de/uplex-varnish/k8s-ingress/varnish/vcl"
"code.uplex.de/uplex-varnish/k8s-ingress/cmd/varnish"
"code.uplex.de/uplex-varnish/k8s-ingress/cmd/varnish/vcl"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
......@@ -56,6 +56,10 @@ const (
ingressClassKey = "kubernetes.io/ingress.class"
resyncPeriod = 0
watchNamespace = api_v1.NamespaceAll
admSecretName = "adm-secret"
admSecretKey = "admin"
admSvcName = "varnish-ingress-admin"
admPortName = "varnishadm"
// resyncPeriod = 30 * time.Second
)
......@@ -68,9 +72,11 @@ type IngressController struct {
ingController cache.Controller
svcController cache.Controller
endpController cache.Controller
secrController cache.Controller
ingLister StoreToIngressLister
svcLister cache.Store
endpLister StoreToEndpointLister
secrLister StoreToSecretLister
syncQueue *taskQueue
stopCh chan struct{}
recorder record.EventRecorder
......@@ -166,7 +172,7 @@ func NewIngressController(kubeClient kubernetes.Interface,
AddFunc: func(obj interface{}) {
addSvc := obj.(*api_v1.Service)
log.Print("svcHandler.AddFunc:", addSvc)
if ingc.isExternalServiceForStatus(addSvc) {
if ingc.isVarnishAdmSvc(addSvc, namespace) {
ingc.syncQueue.enqueue(addSvc)
return
}
......@@ -195,6 +201,10 @@ func NewIngressController(kubeClient kubernetes.Interface,
}
log.Print("Removing service:", remSvc.Name)
if ingc.isVarnishAdmSvc(remSvc, namespace) {
ingc.syncQueue.enqueue(remSvc)
return
}
ingc.enqueueIngressForService(remSvc)
},
......@@ -202,12 +212,13 @@ func NewIngressController(kubeClient kubernetes.Interface,
if !reflect.DeepEqual(old, cur) {
curSvc := cur.(*api_v1.Service)
log.Print("svcHandler.UpdateFunc:", old, curSvc)
if ingc.isExternalServiceForStatus(curSvc) {
log.Printf("Service %v changed, syncing",
curSvc.Name)
if ingc.isVarnishAdmSvc(curSvc, namespace) {
ingc.syncQueue.enqueue(curSvc)
return
}
log.Printf("Service %v changed, syncing",
curSvc.Name)
ingc.enqueueIngressForService(curSvc)
}
},
......@@ -222,7 +233,7 @@ func NewIngressController(kubeClient kubernetes.Interface,
endpHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addEndp := obj.(*api_v1.Endpoints)
log.Print("endpHandler.UpdateFunc:", addEndp)
log.Print("endpHandler.AddFunc:", addEndp)
log.Print("Adding endpoints:", addEndp.Name)
ingc.syncQueue.enqueue(obj)
},
......@@ -252,7 +263,9 @@ func NewIngressController(kubeClient kubernetes.Interface,
},
UpdateFunc: func(old, cur interface{}) {
log.Print("endpHandler.UpdateFunc:", old, cur)
if !reflect.DeepEqual(old, cur) {
oldEps := old.(*api_v1.Endpoints)
curEps := cur.(*api_v1.Endpoints)
if !reflect.DeepEqual(oldEps.Subsets, curEps.Subsets) {
log.Printf("Endpoints %v changed, syncing",
cur.(*api_v1.Endpoints).Name)
ingc.syncQueue.enqueue(cur)
......@@ -266,6 +279,65 @@ func NewIngressController(kubeClient kubernetes.Interface,
"endpoints", namespace, fields.Everything()),
&api_v1.Endpoints{}, resyncPeriod, endpHandlers)
secrHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secr := obj.(*api_v1.Secret)
log.Print("secrHandler.AddFunc:", secr)
if !ingc.isAdminSecret(secr) {
log.Printf("Ignoring Secret %v", secr.Name)
return
}
log.Printf("Adding Secret: %v", secr.Name)
ingc.syncQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
remSecr, isSecr := obj.(*api_v1.Secret)
log.Print("secrHandler.DeleteFunc:", remSecr, isSecr)
if !isSecr {
deletedState, ok := obj.(cache.
DeletedFinalStateUnknown)
if !ok {
log.Printf("Error received unexpected "+
"object: %v", obj)
return
}
remSecr, ok = deletedState.Obj.(*api_v1.Secret)
if !ok {
log.Printf("Error "+
"DeletedFinalStateUnknown "+
"contained non-Secret object: "+
"%v", deletedState.Obj)
return
}
}
if !ingc.isAdminSecret(remSecr) {
log.Printf("Ignoring Secret %v", remSecr.Name)
return
}
log.Printf("Removing Secret: %v", remSecr.Name)
ingc.syncQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
log.Print("endpHandler.UpdateFunc:", old, cur)
curSecr := cur.(*api_v1.Secret)
if !ingc.isAdminSecret(curSecr) {
log.Printf("Ignoring Secret %v", curSecr.Name)
return
}
if !reflect.DeepEqual(old, cur) {
log.Printf("Secret %v changed, syncing",
cur.(*api_v1.Secret).Name)
ingc.syncQueue.enqueue(cur)
}
},
}
ingc.secrLister.Store, ingc.secrController = cache.NewInformer(
cache.NewListWatchFromClient(ingc.client.Core().RESTClient(),
"secrets", namespace, fields.Everything()),
&api_v1.Secret{}, resyncPeriod, secrHandlers)
return &ingc
}
......@@ -281,6 +353,7 @@ func (ingc *IngressController) Run() {
go ingc.svcController.Run(ingc.stopCh)
go ingc.endpController.Run(ingc.stopCh)
go ingc.ingController.Run(ingc.stopCh)
go ingc.secrController.Run(ingc.stopCh)
go ingc.syncQueue.run(time.Second, ingc.stopCh)
<-ingc.stopCh
}
......@@ -307,7 +380,7 @@ func (ingc *IngressController) addOrUpdateIng(task Task,
return
}
err = ingc.vController.Update(key, vclSpec)
err = ingc.vController.Update(key, string(ing.ObjectMeta.UID), vclSpec)
if err != nil {
// XXX as above
ingc.syncQueue.requeueAfter(task, err, 5*time.Second)
......@@ -352,6 +425,12 @@ func (ingc *IngressController) sync(task Task) {
case Endpoints:
ingc.syncEndp(task)
return
case Service:
ingc.syncSvc(task)
return
case Secret:
ingc.syncSecret(task)
return
}
}
......@@ -464,6 +543,28 @@ func (ingc *IngressController) ing2VCLSpec(ing *extensions.Ingress) (vcl.Spec, e
return vclSpec, nil
}
func (ingc *IngressController) endpsTargetPort2Addrs(svc *api_v1.Service,
endps api_v1.Endpoints, targetPort int32) ([]vcl.Address, error) {
var addrs []vcl.Address
for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == targetPort {
for _, address := range subset.Addresses {
addr := vcl.Address{
IP: address.IP,
Port: port.Port,
}
addrs = append(addrs, addr)
}
return addrs, nil
}
}
}
return addrs, fmt.Errorf("No endpoints for target port %v in service "+
"%s", targetPort, svc.Name)
}
func (ingc *IngressController) ingBackend2Addrs(backend extensions.IngressBackend,
namespace string) ([]vcl.Address, error) {
......@@ -506,25 +607,12 @@ func (ingc *IngressController) ingBackend2Addrs(backend extensions.IngressBacken
svc.Name)
}
for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == targetPort {
for _, address := range subset.Addresses {
addr := vcl.Address{
IP: address.IP,
Port: port.Port,
}
addrs = append(addrs, addr)
}
return addrs, nil
}
}
}
return addrs, fmt.Errorf("No endpoints for target port %v in service "+
"%s", targetPort, svc.Name)
return ingc.endpsTargetPort2Addrs(svc, endps, targetPort)
}
func (ingc *IngressController) getTargetPort(svcPort *api_v1.ServicePort, svc *api_v1.Service) (int32, error) {
func (ingc *IngressController) getTargetPort(svcPort *api_v1.ServicePort,
svc *api_v1.Service) (int32, error) {
if (svcPort.TargetPort == intstr.IntOrString{}) {
return svcPort.Port, nil
}
......@@ -556,6 +644,97 @@ func (ingc *IngressController) getTargetPort(svcPort *api_v1.ServicePort, svc *a
return portNum, nil
}
func (ingc *IngressController) syncSvc(task Task) {
var addrs []vcl.Address
key := task.Key
svcObj, exists, err := ingc.svcLister.GetByKey(key)
if err != nil {
ingc.syncQueue.requeue(task, err)
return
}
if !exists {
log.Print("Deleting Service:", key)
err := ingc.vController.DeleteVarnishSvc(key)
if err != nil {
log.Printf("Error deleting configuration for %v: %v",
key, err)
}
return
}
svc := svcObj.(*api_v1.Service)
endps, err := ingc.endpLister.GetServiceEndpoints(svc)
if err != nil {
ingc.syncQueue.requeueAfter(task, err, 5*time.Second)
ingc.recorder.Eventf(svc, api_v1.EventTypeWarning, "Rejected",
"%v was rejected: %v", key, err)
return
}
// XXX hard-wired Port name
targetPort := int32(0)
for _, port := range svc.Spec.Ports {
if port.Name == admPortName {
targetPort, err = ingc.getTargetPort(&port, svc)
if err != nil {
ingc.syncQueue.requeueAfter(task, err,
5*time.Second)
ingc.recorder.Eventf(svc,
api_v1.EventTypeWarning, "Rejected",
"%v was rejected: %v", key, err)
return
}
break
}
}
if targetPort == 0 {
err = fmt.Errorf("No target port for port %s in service %s",
admPortName, svc.Name)
ingc.syncQueue.requeueAfter(task, err, 5*time.Second)
ingc.recorder.Eventf(svc, api_v1.EventTypeWarning, "Rejected",
"%v was rejected: %v", key, err)
return
}
addrs, err = ingc.endpsTargetPort2Addrs(svc, endps, targetPort)
if err != nil {
ingc.syncQueue.requeueAfter(task, err, 5*time.Second)
ingc.recorder.Eventf(svc, api_v1.EventTypeWarning, "Rejected",
"%v was rejected: %v", key, err)
return
}
ingc.vController.AddOrUpdateVarnishSvc(key, addrs)
}
func (ingc *IngressController) syncSecret(task Task) {
key := task.Key
obj, exists, err := ingc.secrLister.GetByKey(key)
if err != nil {
ingc.syncQueue.requeue(task, err)
return
}
if !exists {
log.Print("Deleting Secret:", key)
ingc.vController.DeleteAdmSecret()
return
}
secret, exists := obj.(*api_v1.Secret)
if !exists {
log.Printf("Error: Not a Secret: %v", obj)
return
}
secretData, exists := secret.Data[admSecretKey]
if !exists {
log.Printf("Error: Secret %v does not have key %s", secret.Name,
admSecretKey)
return
}
ingc.vController.SetAdmSecret(secretData)
}
// Check if resource ingress class annotation (if exists) matches
// ingress controller class
func (ingc *IngressController) isVarnishIngress(ing *extensions.Ingress) bool {
......@@ -570,10 +749,17 @@ func (ingc *IngressController) hasIngress(ing *extensions.Ingress) bool {
return ingc.vController.HasIngress(name)
}
// isExternalServiceForStatus matches the service specified by the
// external-service arg
func (ingc *IngressController) isExternalServiceForStatus(svc *api_v1.Service) bool {
// return ingc.statusUpdater.namespace == svc.Namespace &&
// ingc.statusUpdater.externalServiceName == svc.Name
return false
// isVarnishAdmSvc determines if a Service represents the admin
// connection of a Varnish instance for which this controller is
// responsible.
// Currently we match the namespace and a hardwired Name.
func (ingc *IngressController) isVarnishAdmSvc(svc *api_v1.Service,
namespace string) bool {
return svc.ObjectMeta.Namespace == namespace &&
svc.ObjectMeta.Name == admSvcName
}
func (ingc *IngressController) isAdminSecret(secr *api_v1.Secret) bool {
return secr.Name == admSecretName
}
......@@ -130,6 +130,8 @@ const (
Endpoints
// Service resource
Service
// Secret resource
Secret
)
// Task is an element of a taskQueue
......@@ -149,6 +151,8 @@ func NewTask(key string, obj interface{}) (Task, error) {
k = Endpoints
case *api_v1.Service:
k = Service
case *api_v1.Secret:
k = Secret
default:
return Task{}, fmt.Errorf("Unknown type: %v", t)
}
......@@ -156,20 +160,14 @@ func NewTask(key string, obj interface{}) (Task, error) {
return Task{k, key}, nil
}
// compareLinks returns true if the 2 self links are equal.
// func compareLinks(l1, l2 string) bool {
// // TODO: These can be partial links
// return l1 == l2 && l1 != ""
// }
// StoreToIngressLister makes a Store that lists Ingress.
// TODO: Move this to cache/listers post 1.1.
type StoreToIngressLister struct {
cache.Store
}
// GetByKeySafe calls Store.GetByKeySafe and returns a copy of the ingress so it is
// safe to modify.
// GetByKeySafe calls Store.GetByKeySafe and returns a copy of the
// ingress so it is safe to modify.
func (s *StoreToIngressLister) GetByKeySafe(key string) (ing *extensions.Ingress, exists bool, err error) {
item, exists, err := s.Store.GetByKey(key)
if !exists || err != nil {
......@@ -257,3 +255,8 @@ func FindPort(pod *api_v1.Pod, svcPort *api_v1.ServicePort) (int32, error) {
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
// StoreToSecretLister makes a Store that lists Secrets
type StoreToSecretLister struct {
cache.Store
}
......@@ -38,8 +38,8 @@ import (
"os/signal"
"syscall"
"code.uplex.de/uplex-varnish/k8s-ingress/controller"
"code.uplex.de/uplex-varnish/k8s-ingress/varnish"
"code.uplex.de/uplex-varnish/k8s-ingress/cmd/controller"
"code.uplex.de/uplex-varnish/k8s-ingress/cmd/varnish"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
......
/*
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
// TODO
* VCL housekeeping
* either discard the previously active VCL immediately on new vcl.use
* or periodically clean up
* monitoring
* periodically call ping, status, panic.show when otherwise idle
*/
package varnish
import (
"bytes"
"fmt"
"io"
"log"
"regexp"
"strconv"
"strings"
"time"
"code.uplex.de/uplex-varnish/k8s-ingress/cmd/varnish/vcl"
"code.uplex.de/uplex-varnish/varnishapi/pkg/admin"
)
const (
regularLabel = "vk8s_regular"
readinessLabel = "vk8s_readiness"
readyCfg = "vk8s_ready"
notAvailCfg = "vk8s_notavailable"
)
// XXX make admTimeout configurable
var (
nonAlNum = regexp.MustCompile("[^[:alnum:]]+")
admTimeout = time.Second * 10
)
func vclConfigName(key string, uid string) string {
name := "vk8sing_" + key + "_" + uid
return nonAlNum.ReplaceAllLiteralString(name, "_")
}
type VarnishAdmError struct {
addr string
err error
}
func (vadmErr VarnishAdmError) Error() string {
return fmt.Sprintf("%s: %v", vadmErr.addr, vadmErr.err)
}
type VarnishAdmErrors []VarnishAdmError
func (vadmErrs VarnishAdmErrors) Error() string {
var sb strings.Builder
sb.WriteRune('[')
for _, err := range vadmErrs {
sb.WriteRune('{')
sb.WriteString(err.Error())
sb.WriteRune('}')
}
sb.WriteRune(']')
return sb.String()
}
type vclSpec struct {
spec vcl.Spec
key string
uid string
}
type varnishSvc struct {
addr string
adm *admin.Admin
}
type VarnishController struct {
errChan chan error
admSecret []byte
varnishSvcs map[string][]*varnishSvc
spec *vclSpec
}
func NewVarnishController() *VarnishController {
vc := VarnishController{}
vc.varnishSvcs = make(map[string][]*varnishSvc)
return &vc
}
func (vc *VarnishController) Start(errChan chan error) {
// XXX start a goroutine to ping etc and discard old VCL instances
vc.errChan = errChan
log.Print("Starting Varnish controller")
}
func (vc *VarnishController) getSrc() (string, error) {
var buf bytes.Buffer
if err := vcl.Tmpl.Execute(&buf, vc.spec.spec); err != nil {
return "", err
}
return buf.String(), nil
}
func (vc *VarnishController) updateVarnishInstances(svcs []*varnishSvc) error {
var errs VarnishAdmErrors
for _, svc := range svcs {
var err error
if svc.adm != nil {
continue
}
svc.adm, err = admin.Dial(svc.addr, vc.admSecret, admTimeout)
if err != nil {
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
svc.adm = nil
continue
}
log.Printf("Connected to Varnish admin endpoint at %s",
svc.addr)
}
if vc.spec == nil {
log.Print("Update Varnish instances: Currently no Ingress " +
"defined")
if len(errs) == 0 {
return nil
}
return errs
}
vclSrc, err := vc.getSrc()
if err != nil {
return err
}
cfgName := vclConfigName(vc.spec.key, vc.spec.uid)
log.Printf("Update Varnish instances: load config %s", cfgName)
for _, svc := range svcs {
err = svc.adm.VCLInline(cfgName, vclSrc)
if err != nil {
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
continue
}
log.Printf("Loaded config %s at Varnish endpoint %s", cfgName,
svc.addr)
err = svc.adm.VCLLabel(regularLabel, cfgName)
if err != nil {
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
continue
}
log.Printf("Labeled config %s as %s at Varnish endpoint %s",
cfgName, regularLabel, svc.addr)
err = svc.adm.VCLLabel(readinessLabel, readyCfg)
if err != nil {
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
continue
}
log.Printf("Labeled config %s as %s at Varnish endpoint %s",
readyCfg, readinessLabel, svc.addr)
}
if len(errs) == 0 {
return nil
}
return errs
}
func (vc *VarnishController) addVarnishSvc(key string,
addrs []vcl.Address) error {
vc.varnishSvcs[key] = make([]*varnishSvc, len(addrs))
for i, addr := range addrs {
admAddr := addr.IP + ":" + strconv.Itoa(int(addr.Port))
svc := varnishSvc{addr: admAddr}
vc.varnishSvcs[key][i] = &svc
}
return vc.updateVarnishInstances(vc.varnishSvcs[key])
}
func (vc *VarnishController) removeVarnishInstances(svcs []*varnishSvc) error {
var errs VarnishAdmErrors
for _, svc := range svcs {
if svc.adm == nil {
continue
}
if err := svc.adm.VCLLabel(readinessLabel, notAvailCfg); err != nil {
if err == io.EOF {
continue
}
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
continue
}
if err := svc.adm.Close(); err != nil {
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
}
}
if len(errs) == 0 {
return nil
}
return errs
}
func (vc *VarnishController) updateVarnishSvc(key string,
addrs []vcl.Address) error {
var errs VarnishAdmErrors
var newSvcs, remSvcs, keepSvcs []*varnishSvc
var updateAddrs map[string]struct{}
var prevAddrs map[string]*varnishSvc
for _, addr := range addrs {
key := addr.IP + ":" + strconv.Itoa(int(addr.Port))
updateAddrs[key] = struct{}{}
}
for _, svc := range vc.varnishSvcs[key] {
prevAddrs[svc.addr] = svc
}
for addr := range updateAddrs {
svc, exists := prevAddrs[addr]
if exists {
keepSvcs = append(keepSvcs, svc)
continue
}
newSvc := &varnishSvc{addr: addr}
newSvcs = append(newSvcs, newSvc)
}
for addr, svc := range prevAddrs {
_, exists := updateAddrs[addr]
if !exists {
remSvcs = append(remSvcs, svc)
}
}
vc.varnishSvcs[key] = append(keepSvcs, newSvcs...)
for _, svc := range remSvcs {
if svc.adm == nil {
continue
}
if err := svc.adm.VCLLabel(readinessLabel, notAvailCfg); err != nil {
if err == io.EOF {
continue
}
admErr := VarnishAdmError{addr: svc.addr, err: err}
errs = append(errs, admErr)
}
}
updateErrs := vc.updateVarnishInstances(vc.varnishSvcs[key])
if updateErrs != nil {
vadmErrs, ok := updateErrs.(VarnishAdmErrors)
if ok {
errs = append(errs, vadmErrs...)
} else {
return updateErrs
}
}
if len(errs) == 0 {
return nil
}
return errs
}
func (vc *VarnishController) AddOrUpdateVarnishSvc(key string,
addrs []vcl.Address) error {
if vc.admSecret == nil {
return fmt.Errorf("Cannot add or update Varnish service %s: "+
"no admin secret has been set", key)
}
_, ok := vc.varnishSvcs[key]
if !ok {
return vc.addVarnishSvc(key, addrs)
}
return vc.updateVarnishSvc(key, addrs)
}
func (vc *VarnishController) DeleteVarnishSvc(key string) error {
svcs, ok := vc.varnishSvcs[key]
if !ok {
return nil
}
delete(vc.varnishSvcs, key)
return vc.removeVarnishInstances(svcs)
}
func (vc *VarnishController) Update(key, uid string, spec vcl.Spec) error {
if vc.spec != nil && vc.spec.key != "" && vc.spec.key != key {
return fmt.Errorf("Multiple Ingress definitions currently not "+
"supported: current=%s new=%s", vc.spec.key, key)
}
if vc.spec == nil {
vc.spec = &vclSpec{}
}
vc.spec.key = key
vc.spec.uid = uid
vc.spec.spec = spec
var errs VarnishAdmErrors
for _, svcs := range vc.varnishSvcs {
updateErrs := vc.updateVarnishInstances(svcs)
vadmErrs, ok := updateErrs.(VarnishAdmErrors)
if ok {
errs = append(errs, vadmErrs...)
continue
}
return updateErrs
}
if len(errs) == 0 {
return nil
}
return errs
}
// We currently only support one Ingress definition at a time, so
// deleting the Ingress means that we revert to the "notfound" config,
// which returns synthetic 404 Not Found for all requests.
// XXX set to the notready state?
func (vc *VarnishController) DeleteIngress(key string) error {
if vc.spec != nil && vc.spec.key != "" && vc.spec.key != key {
return fmt.Errorf("Unknown Ingress %s", key)
}
vc.spec = nil
var errs VarnishAdmErrors
for _, svcs := range vc.varnishSvcs {
for _, svc := range svcs {
if svc.adm == nil {
continue
}
if err := svc.adm.VCLLabel(regularLabel, notAvailCfg); err != nil {
admErr := VarnishAdmError{
addr: svc.addr,
err: err,
}
errs = append(errs, admErr)
continue
}
log.Printf("Labeled config %s as %s at Varinsh "+
"endpoint %s", notAvailCfg, regularLabel,
svc.addr)
}
}
if len(errs) == 0 {
return nil
}
return errs
}
// Currently only one Ingress at a time
func (vc *VarnishController) HasIngress(key string) bool {
if vc.spec == nil {
return false
}
return vc.spec.key == key
}
func (vc *VarnishController) SetAdmSecret(secret []byte) {
vc.admSecret = make([]byte, len(secret))
copy(vc.admSecret, secret)
}
// XXX Controller becomes not ready
func (vc *VarnishController) DeleteAdmSecret() {
vc.admSecret = nil
}
func (vc *VarnishController) Quit() {
for _, svcs := range vc.varnishSvcs {
for _, svc := range svcs {
if svc.adm == nil {
continue
}
if err := svc.adm.Close(); err != nil {
log.Printf("Failed to close admin connection "+
"at Varnish endpoint %s: %v", svc.addr,
err)
continue
}
log.Printf("Closed admin connection at Varinsh "+
"endpoint %s", svc.addr)
}
}
}
FROM golang:1.10.3 as builder
ARG PACKAGES
RUN go get -d -v github.com/slimhazard/gogitversion && \
cd /go/src/github.com/slimhazard/gogitversion && \
make install
RUN go get -u -v $PACKAGES
COPY . /go/src/code.uplex.de/uplex-varnish/k8s-ingress
WORKDIR /go/src/code.uplex.de/uplex-varnish/k8s-ingress/cmd
RUN go generate && \
CGO_ENABLED=0 GOOS=linux go build -o k8s-ingress *.go
FROM alpine:3.8
COPY --from=builder /go/src/code.uplex.de/uplex-varnish/k8s-ingress/cmd/k8s-ingress /k8s-ingress
COPY --from=builder /go/src/code.uplex.de/uplex-varnish/k8s-ingress/cmd/varnish/vcl/vcl.tmpl /vcl.tmpl
ENTRYPOINT ["/k8s-ingress"]
FROM golang:1.10.3 as builder
ARG PACKAGES
RUN go get -d -v github.com/slimhazard/gogitversion && \
cd /go/src/github.com/slimhazard/gogitversion && \
make install
RUN go get -v $PACKAGES
COPY . /go/src/code.uplex.de/uplex-varnish/k8s-ingress
WORKDIR /go/src/code.uplex.de/uplex-varnish/k8s-ingress
RUN go generate && \
CGO_ENABLED=0 GOOS=linux go build -o k8s-ingress *.go
FROM centos:centos7
COPY varnishcache_varnish60.repo /etc/yum.repos.d/
RUN yum install -y epel-release && yum update -y -q && \
......@@ -17,6 +6,13 @@ RUN yum install -y epel-release && yum update -y -q && \
yum install -y -q varnish-6.0.1 && \
yum install -y -q --nogpgcheck vmod-re2-1.5.1 && \
yum clean all && rm -rf /var/cache/yum
COPY varnish/vcl/vcl.tmpl /
COPY --from=builder /go/src/code.uplex.de/uplex-varnish/k8s-ingress/k8s-ingress /k8s-ingress
ENTRYPOINT ["/k8s-ingress"]
COPY bogo_backend.vcl /etc/varnish/
COPY ready.vcl /etc/varnish/
COPY notavailable.vcl /etc/varnish
COPY boot.vcl /etc/varnish
COPY start.cli /etc/varnish
RUN mkdir /var/run/varnish
ENTRYPOINT ["/usr/sbin/varnishd", "-F", "-a", ":80", "-a", "k8s=:8080", \
"-S", "/var/run/varnish/_.secret", "-T", "0.0.0.0:6081", \
"-p", "vcl_path=/etc/varnish", "-I", "/etc/varnish/start.cli", \
"-f", ""]
# Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
# All rights reserved
#
# Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
all: controller varnish
DOCKER_BUILD_OPTIONS =
MINIKUBE =
PACKAGES = \
k8s.io/client-go/kubernetes \
k8s.io/client-go/kubernetes/scheme \
k8s.io/client-go/kubernetes/typed/core/v1 \
k8s.io/client-go/rest \
k8s.io/client-go/tools/cache \
k8s.io/client-go/tools/record \
k8s.io/client-go/util/workqueue \
k8s.io/api/core/v1 \
k8s.io/api/extensions/v1beta1 \
k8s.io/apimachinery/pkg/apis/meta/v1 \
k8s.io/apimachinery/pkg/fields \
k8s.io/apimachinery/pkg/labels \
k8s.io/apimachinery/pkg/util/intstr \
k8s.io/apimachinery/pkg/util/wait \
code.uplex.de/uplex-varnish/varnishapi/pkg/admin
docker-minikube:
ifeq ($(MINKUBE),1)
eval $(minikube docker-env)
endif
controller: Dockerfile.controller docker-minikube
docker build --build-arg PACKAGES="$(PACKAGES)" \
$(DOCKER_BUILD_OPTIONS) -t varnish-ingress/controller \
-f Dockerfile.controller ..
varnish: Dockerfile.varnish docker-minikube
docker build $(DOCKER_BUILD_OPTIONS) -t varnish-ingress/varnish \
-f Dockerfile.varnish .
# Bogus backend, to which no request is ever sent.
# - Sentinel that no backend was determined after a request has been
# evaluated according to IngressRules.
# - Defined when the Ingress is not ready (not currently implementing
# any IngressSpec), so that Varnish doesn't complain about no
# backend definition.
backend notfound {
# 192.0.2.0/24 reserved for docs & examples (RFC5737).
.host = "192.0.2.255";
.port = "80";
}
vcl 4.1;
include "bogo_backend.vcl";
sub vcl_recv {
if (local.socket == "k8s") {
if (req.url == "/ready") {
return (vcl(vk8s_readiness));
}
return (synth(404));
}
return (vcl(vk8s_regular));
}
vcl 4.0;
include "bogo_backend.vcl";
# Send a synthetic response with status 503 to every request.
# Used for the readiness check and regular traffic when Varnish is not ready.
sub vcl_recv {
return(synth(503));
}
vcl 4.0;
include "bogo_backend.vcl";
# Send a synthetic response with status 200 to every request.
# Used for the readiness check when Varnish is ready.
sub vcl_recv {
return(synth(200));
}
vcl.load vk8s_ready /etc/varnish/ready.vcl
vcl.load vk8s_notavailable /etc/varnish/notavailable.vcl
vcl.label vk8s_readiness vk8s_notavailable
vcl.label vk8s_regular vk8s_notavailable
vcl.load boot /etc/varnish/boot.vcl
vcl.use boot
start
apiVersion: v1
kind: Secret
metadata:
name: adm-secret
namespace: varnish-ingress
type: Opaque
data:
admin: XGASQn0dd/oEsWh5WMXUpmKAKNZYnQGsHSmO/nHkv1w=
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: varnish-ingress-controller
namespace: varnish-ingress
spec:
replicas: 1
selector:
matchLabels:
app: varnish-ingress-controller
template:
metadata:
labels:
app: varnish-ingress-controller
spec:
serviceAccountName: varnish-ingress
containers:
- image: varnish-ingress/controller
imagePullPolicy: IfNotPresent
name: varnish-ingress-controller
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
......@@ -12,6 +12,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
......
apiVersion: v1
kind: Service
metadata:
name: varnish-ingress-admin
namespace: varnish-ingress
annotations:
service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
spec:
clusterIP: None
ports:
- port: 6081
name: varnishadm
selector:
app: varnish-ingress
publishNotReadyAddresses: true
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: varnish
namespace: varnish-ingress
spec:
replicas: 1
selector:
matchLabels:
app: varnish-ingress
template:
metadata:
labels:
app: varnish-ingress
spec:
serviceAccountName: varnish-ingress
containers:
- image: varnish-ingress/varnish
imagePullPolicy: IfNotPresent
name: varnish-ingress
ports:
- name: http
containerPort: 80
- name: k8sport
containerPort: 8080
- name: admport
containerPort: 6081
volumeMounts:
- name: adm-secret
mountPath: "/var/run/varnish"
readOnly: true
livenessProbe:
exec:
command:
- /usr/bin/pgrep
- -P
- "0"
- varnishd
readinessProbe:
httpGet:
path: /ready
port: k8sport
volumes:
- name: adm-secret
secret:
secretName: adm-secret
items:
- key: admin
path: _.secret
/*
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
// TODO
* VCL housekeeping
* either discard the previously active VCL immediately on new vcl.use
* or periodically clean up
* monitoring
* periodically call ping, status, panic.show when otherwise idle
*/
package varnish
import (
"bufio"
"crypto/rand"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"os/user"
"path/filepath"
"strconv"
"sync/atomic"
"syscall"
"time"
"code.uplex.de/uplex-varnish/k8s-ingress/varnish/vcl"
"code.uplex.de/uplex-varnish/varnishapi/pkg/admin"
)
// XXX timeout for getting Admin connection (waiting for varnishd start)
// timeout waiting for child process to stop
const (
vclDir = "/etc/varnish"
vclFile = "ingress.vcl"
varnishLsn = ":80"
varnishdPath = "/usr/sbin/varnishd"
admConn = "localhost:6081"
secretFile = "_.secret"
notFoundVCL = `vcl 4.0;
backend default { .host = "192.0.2.255"; .port = "80"; }
sub vcl_recv {
return (synth(404));
}
`
)
var (
vclPath = filepath.Join(vclDir, vclFile)
tmpPath = filepath.Join(os.TempDir(), vclFile)
secretPath = filepath.Join(vclDir, secretFile)
varnishArgs = []string{
"-a", varnishLsn, "-f", vclPath, "-F", "-S", secretPath,
"-M", admConn,
}
vcacheUID int
varnishGID int
currentIng string
configCtr = uint64(0)
)
type VarnishController struct {
varnishdCmd *exec.Cmd
adm *admin.Admin
errChan chan error
}
func NewVarnishController() *VarnishController {
return &VarnishController{}
}
func (vc *VarnishController) Start(errChan chan error) {
vc.errChan = errChan
log.Print("Starting Varnish controller")
vcacheUser, err := user.Lookup("varnish")
if err != nil {
vc.errChan <- err
return
}
varnishGrp, err := user.LookupGroup("varnish")
if err != nil {
vc.errChan <- err
return
}
vcacheUID, err = strconv.Atoi(vcacheUser.Uid)
if err != nil {
vc.errChan <- err
return
}
varnishGID, err = strconv.Atoi(varnishGrp.Gid)
if err != nil {
vc.errChan <- err
return
}
secret := make([]byte, 32)
_, err = rand.Read(secret)
if err != nil {
vc.errChan <- err
return
}
if err := ioutil.WriteFile(secretPath, secret, 0400); err != nil {
vc.errChan <- err
return
}
if err := os.Chown(secretPath, vcacheUID, varnishGID); err != nil {
vc.errChan <- err
return
}
log.Print("Wrote secret file")
notFoundBytes := []byte(notFoundVCL)
if err := ioutil.WriteFile(vclPath, notFoundBytes, 0644); err != nil {
vc.errChan <- err
return
}
if err := os.Chown(vclPath, vcacheUID, varnishGID); err != nil {
vc.errChan <- err
return
}
log.Print("Wrote initial VCL file")
if vc.adm, err = admin.Listen(admConn); err != nil {
vc.errChan <- err
return
}
log.Print("Opened port to listen for Varnish adm connection")
vc.varnishdCmd = exec.Command(varnishdPath, varnishArgs...)
if err := vc.varnishdCmd.Start(); err != nil {
vc.errChan <- err
return
}
log.Print("Launched varnishd")
if err := vc.adm.Accept(secret); err != nil {
vc.errChan <- err
return
}
log.Print("Accepted varnish admin connection")
}
func (vc *VarnishController) Update(key string, spec vcl.Spec) error {
if currentIng != "" && currentIng != key {
return fmt.Errorf("Multiple Ingress definitions currently not "+
"supported: current=%s new=%s", currentIng, key)
}
currentIng = key
f, err := os.Create(tmpPath)
if err != nil {
return err
}
wr := bufio.NewWriter(f)
if err := vcl.Tmpl.Execute(wr, spec); err != nil {
return err
}
wr.Flush()
f.Close()
log.Printf("Wrote new VCL config to %s", tmpPath)
ctr := atomic.AddUint64(&configCtr, 1)
configName := fmt.Sprintf("ingress-%d", ctr)
if err := vc.adm.VCLLoad(configName, tmpPath); err != nil {
log.Print("Failed to load VCL: ", err)
return err
}
log.Printf("Loaded VCL config %s", configName)
newVCL, err := ioutil.ReadFile(tmpPath)
if err != nil {
log.Print(err)
return err
}
if err = ioutil.WriteFile(vclPath, newVCL, 0644); err != nil {
log.Print(err)
return err
}
log.Printf("Wrote VCL config to %s", vclPath)
if err = vc.adm.VCLUse(configName); err != nil {
log.Print("Failed to activate VCL: ", err)
return err
}
log.Printf("Activated VCL config %s", configName)
// XXX discard previously active VCL
return nil
}
// We currently only support one Ingress definition at a time, so
// deleting the Ingress means that we revert to the "boot" config,
// which returns synthetic 404 Not Found for all requests.
func (vc *VarnishController) DeleteIngress(key string) error {
if currentIng != "" && currentIng != key {
return fmt.Errorf("Unknown Ingress %s", key)
}
if err := vc.adm.VCLUse("boot"); err != nil {
log.Print("Failed to activate VCL: ", err)
return err
}
log.Printf("Activated VCL config boot")
currentIng = ""
// XXX discard previously active VCL
return nil
}
// Currently only one Ingress at a time
func (vc *VarnishController) HasIngress(key string) bool {
if currentIng == "" {
return false
}
return key == currentIng
}
func (vc *VarnishController) Quit() {
if err := vc.adm.Stop(); err != nil {
log.Print("Failed to stop Varnish child process:", err)
} else {
for {
tmoChan := time.After(time.Minute)
select {
case <-tmoChan:
// XXX config the timeout
log.Print("timeout waiting for Varnish child " +
"process to finish")
return
default:
state, err := vc.adm.Status()
if err != nil {
log.Print("Can't get Varnish child "+
"process status:", err)
return
}
if state != admin.Stopped {
continue
}
}
}
}
vc.adm.Close()
if err := vc.varnishdCmd.Process.Signal(syscall.SIGTERM); err != nil {
log.Print("Failed to stop Varnish:", err)
return
}
log.Print("Stopped Varnish")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment