Commit d733fb1d authored by Geoff Simmons's avatar Geoff Simmons

Add some source code docs, quiets golint a bit.

Some elements were changed to be unexported, since they didn't need
to be exported or publicly documented.
parent 07e73578
......@@ -26,7 +26,7 @@
* SUCH DAMAGE.
*/
// varnishingress is the client API for the API group
// Package varnishingress is the client API for the API group
// ingress.varnish-cache.org, used to define Custom Resources for the
// Varnish ingress project.
//
......@@ -34,5 +34,7 @@
package varnishingress
const (
// GroupName is the API group name for Custom Resources
// defined for this project.
GroupName = "ingress.varnish-cache.org"
)
......@@ -26,7 +26,7 @@
* SUCH DAMAGE.
*/
// varnishingress is the client API for the API group
// Package varnishingress is the client API for the API group
// ingress.varnish-cache.org, used to define Custom Resources for the
// Varnish ingress project.
//
......@@ -34,5 +34,7 @@
package varnishingress
const (
// GroupName is the API group name for Custom Resources
// defined for this project.
GroupName = "ingress.varnish-cache.org"
)
......@@ -29,9 +29,10 @@
// +k8s:deepcopy-gen=package
// +groupName=ingress.varnish-cache.org
// v1alpha1 is a version of the client API for CustomResource types
// defined for the Varnish Ingress project. Most of the code in the API
// is generated by k8s code generators (see: k8s.io/code-generator).
// Package v1alpha1 is a version of the client API for CustomResource
// types defined for the Varnish Ingress project. Most of the code in
// the API is generated by k8s code generators (see:
// k8s.io/code-generator).
//
// See: https://code.uplex.de/uplex-varnish/k8s-ingress
//
......
......@@ -53,8 +53,11 @@ func Resource(resource string) schema.GroupResource {
}
var (
// SchemeBuilder adds Go types for Custom Resources to the
// client API.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
// AddToScheme specifies k8s.io/apimachinery/pkg/runtime.AddToScheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
......
......@@ -62,6 +62,10 @@ type infrmrs struct {
vcfg cache.SharedIndexInformer
}
// Listers aggregates listers from k8s.io/client-go/listers for the
// various resource types of interested. These are initialized by
// IngressController, and handed off to NamespaceWorker workers to
// read data from the client-go cache.
type Listers struct {
ing ext_listers.IngressLister
svc core_v1_listers.ServiceLister
......@@ -83,7 +87,14 @@ type IngressController struct {
recorder record.EventRecorder
}
// NewIngressController creates a controller
// NewIngressController creates a controller.
//
// log: logger initialized at startup
// kubeClient: k8s client initialized at startup
// vc: Varnish controller
// infFactory: SharedInformerFactory to create informers & listers for
// the k8s standard client APIs
// vcrInfFactory: SharedInformerFactory for the project's own client APIs
func NewIngressController(
log *logrus.Logger,
kubeClient kubernetes.Interface,
......@@ -203,7 +214,9 @@ func (ingc *IngressController) updateObj(old, new interface{}) {
ingc.nsQs.Queue.Add(new)
}
// Run the Ingress controller
// Run the Ingress controller -- start the informers in goroutines,
// wait for the caches to sync, and call Run() for the
// NamespaceQueues. Then block until Stop() is invoked.
func (ingc *IngressController) Run() {
defer utilruntime.HandleCrash()
defer ingc.nsQs.Stop()
......@@ -235,7 +248,7 @@ func (ingc *IngressController) Run() {
ingc.log.Info("Shutting down workers")
}
// Stop the Ingress controller
// Stop the Ingress controller -- signal the workers to stop.
func (ingc *IngressController) Stop() {
close(ingc.stopCh)
}
/*
* Copyright (c) 2018 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
// Package controller provides access to the k8s server API and drives
// the main loops of the Ingress controller. It sets up informers,
// lister, event handlers, queues and workers, and it starts and stops
// the workers that sync resources.
//
// Only this package imports packages from k8s.io and from the
// project's own client APIs (in pkg/client).
package controller
......@@ -325,7 +325,7 @@ func (worker *NamespaceWorker) addOrUpdateIng(ing *extensions.Ingress) error {
if err != nil {
return err
}
worker.log.Debugf("Updated Ingress key=%s uuid=%s svc=%s: %+v", ingKey,
worker.log.Debugf("Updated Ingress key=%s uuid=%s svc=%s: %+v", ingKey,
string(ing.ObjectMeta.UID), svcKey, vclSpec)
return nil
}
......
......@@ -52,6 +52,13 @@ const (
syncFailure = "SyncFailure"
)
// NamespaceWorker serves fanout of work items to workers for each
// namespace for which the controller is notified about a resource
// update. The NamespaceQueues object creates a new instance when it
// reads an item from a new namespace from its main queue. Each worker
// has its own queue and listers filtered for its namespace. Thus each
// namespace is synced separately and sequentially, and all of the
// namespaces are synced in parallel.
type NamespaceWorker struct {
namespace string
log *logrus.Logger
......@@ -223,6 +230,10 @@ func (worker *NamespaceWorker) work() {
worker.log.Info("Shutting down worker for namespace:", worker.namespace)
}
// NamespaceQueues reads from the main queue to which informers add
// new work items from all namespaces. The worker reads items from the
// queue and places them on separate queues for NamespaceWorkers
// responsible for each namespace.
type NamespaceQueues struct {
Queue workqueue.RateLimitingInterface
log *logrus.Logger
......@@ -233,6 +244,13 @@ type NamespaceQueues struct {
recorder record.EventRecorder
}
// NewNamespaceQueues creates a NamespaceQueues object.
//
// log: logger initialized at startup
// vController: Varnish controller initialied at startup
// listers: client-go/lister instance for each resource type
// client: k8s API client initialized at startup
// recorder: Event broadcaster initialized at startup
func NewNamespaceQueues(
log *logrus.Logger,
vController *varnish.VarnishController,
......@@ -302,6 +320,9 @@ func (qs *NamespaceQueues) next() {
qs.Queue.Forget(obj)
}
// Run comprises the main loop of the controller, reading from the
// main queue of work items and handing them off to workers for each
// namespace.
func (qs *NamespaceQueues) Run() {
qs.log.Info("Starting dispatcher worker")
for !qs.Queue.ShuttingDown() {
......@@ -310,6 +331,8 @@ func (qs *NamespaceQueues) Run() {
qs.log.Info("Shutting down dispatcher worker")
}
// Stop shuts down the main queue loop initiated by Run(), and in turn
// shuts down all of the NamespaceWorkers.
func (qs *NamespaceQueues) Stop() {
qs.Queue.ShutDown()
for _, worker := range qs.workers {
......
......@@ -26,6 +26,11 @@
* SUCH DAMAGE.
*/
// Package varnish encapsulates interaction with Varnish instances to
// transform desired states from Ingress and VarnishConfig configs to
// the actual state of the cluster. Only this package imports
// varnishapi/pkg/admin to interact with the CLI of each Varnish
// instance.
package varnish
import (
......@@ -59,17 +64,33 @@ var (
admTimeout = time.Second * 10
)
// VarnishAdmError encapsulates an error encountered for an individual
// Varnish instance, and satisfies the Error interface.
type VarnishAdmError struct {
addr string
err error
}
// Error returns an error meesage for an error encountered at a
// Varnish instance, identifying the instance by its Endpoint address
// (internal IP) and admin port.
func (vadmErr VarnishAdmError) Error() string {
return fmt.Sprintf("%s: %v", vadmErr.addr, vadmErr.err)
}
// VarnishAdmErrors is a collection of errors encountered at Varnish
// instances. Most attempts to sync the state of Varnish instances do
// not break off at the first error; the attempt is repeated for each
// instance in a cluster, collecting error information along the way.
// This object contains error information for each instance in a
// cluster that failed to sync. The type satisifies the Error
// interface.
type VarnishAdmErrors []VarnishAdmError
// Error returns an error message that includes errors for each
// instance in a Varnish cluster that failed a sync operation, where
// each instance is identified by it Endpoint (internal IP) and admin
// port.
func (vadmErrs VarnishAdmErrors) Error() string {
var sb strings.Builder
sb.WriteRune('[')
......@@ -108,6 +129,9 @@ type varnishSvc struct {
cfgLoaded bool
}
// VarnishController encapsulates information about each Varnish
// cluster deployed as Ingress implementations in the cluster, and
// their current states.
type VarnishController struct {
log *logrus.Logger
svcs map[string]*varnishSvc
......@@ -115,6 +139,14 @@ type VarnishController struct {
errChan chan error
}
// NewVarnishController returns an instance of VarnishController.
//
// log: logger object initialized at startup
// tmplDir: directory containing templates for VCL generation
//
// If tmplDir is the empty string, use the environment variable
// TEMPLATE_DIR. If the env variable does not exist, use the current
// working directory.
func NewVarnishController(
log *logrus.Logger, tmplDir string) (*VarnishController, error) {
......@@ -134,6 +166,8 @@ func NewVarnishController(
}, nil
}
// Start initiates the Varnish controller and starts the monitor
// goroutine.
func (vc *VarnishController) Start(errChan chan error) {
vc.errChan = errChan
vc.log.Info("Starting Varnish controller")
......@@ -415,6 +449,15 @@ func (vc *VarnishController) updateVarnishSvcAddrs(key string,
return errs
}
// AddOrUpdateVarnishSvc causes a sync for the Varnish Service
// identified by namespace/name key.
//
// addrs: list of admin addresses for instances in the Service
// (internal IPs and admin ports)
// secrName: namespace/name of the admin secret to use for the
// Service
// loadVCL: true if the VCL config for the Service should be
// reloaded
func (vc *VarnishController) AddOrUpdateVarnishSvc(key string,
addrs []vcl.Address, secrName string, loadVCL bool) error {
......@@ -456,6 +499,10 @@ func (vc *VarnishController) AddOrUpdateVarnishSvc(key string,
return vc.updateVarnishSvcAddrs(key, addrs, secrPtr, loadVCL)
}
// DeleteVarnishSvc is called on the Delete event for the Varnish
// Service identified by the namespace/name key. The Varnish instance
// is set to the unready state, and no further action is taken (other
// resources in the cluster may shut down the Varnish instances).
func (vc *VarnishController) DeleteVarnishSvc(key string) error {
svc, ok := vc.svcs[key]
if !ok {
......@@ -468,6 +515,12 @@ func (vc *VarnishController) DeleteVarnishSvc(key string) error {
return err
}
// Update a Varnish Service to implement an Ingress.
//
// svcKey: namespace/name key for the Service
// ingKey: namespace/name key for the Ingress
// uid: UID field from the Ingress
// spec: VCL spec corresponding to the Ingress definition
func (vc *VarnishController) Update(
svcKey, ingKey, uid string, spec vcl.Spec) error {
......@@ -493,6 +546,12 @@ func (vc *VarnishController) Update(
return vc.updateVarnishSvc(svcKey)
}
// DeleteIngress is called for the Delete event on an Ingress, and
// syncs its effect for a Varnish Service.
//
// svcKey: namespace/name key for the Varnish Service
// ingKey: namespace/name key for the Ingress
//
// We currently only support one Ingress definition at a time for a
// Varnish Service, so deleting the Ingress means that we set Varnish
// instances to the not ready state.
......@@ -527,7 +586,13 @@ func (vc *VarnishController) DeleteIngress(svcKey, ingKey string) error {
return errs
}
// Currently only one Ingress at a time for a Varnish Service.
// HasIngress returns true iff an Ingress definition is already loaded
// for a Varnish Service (so a new sync attempt is not necessary).
//
// svcKey: namespace/name key for the Varnish Service
// ingKey: namespace/name key for the Ingress
// uid: UID field from the Ingress
// spec: VCL specification derived from the Ingress
func (vc *VarnishController) HasIngress(svcKey, ingKey, uid string,
spec vcl.Spec) bool {
......@@ -541,6 +606,8 @@ func (vc *VarnishController) HasIngress(svcKey, ingKey, uid string,
reflect.DeepEqual(svc.spec.spec.Canonical(), spec.Canonical())
}
// SetAdmSecret stores the Secret data identified by the
// namespace/name key.
func (vc *VarnishController) SetAdmSecret(key string, secret []byte) {
secr, exists := vc.secrets[key]
if !exists {
......@@ -551,6 +618,9 @@ func (vc *VarnishController) SetAdmSecret(key string, secret []byte) {
copy(*vc.secrets[key], secret)
}
// UpdateSvcForSecret associates the Secret identified by the
// namespace/name secretKey with the Varnish Service identified by the
// namespace/name svcKey. The Service is newly synced if necessary.
func (vc *VarnishController) UpdateSvcForSecret(svcKey, secretKey string) error {
secret, exists := vc.secrets[secretKey]
if !exists {
......@@ -581,6 +651,8 @@ func (vc *VarnishController) UpdateSvcForSecret(svcKey, secretKey string) error
return vc.updateVarnishSvc(svcKey)
}
// DeleteAdmSecret removes the secret identified by the namespace/name
// key.
func (vc *VarnishController) DeleteAdmSecret(name string) {
_, exists := vc.secrets[name]
if exists {
......@@ -588,6 +660,7 @@ func (vc *VarnishController) DeleteAdmSecret(name string) {
}
}
// Quit stops the Varnish controller.
func (vc *VarnishController) Quit() {
vc.errChan <- nil
}
......@@ -26,6 +26,11 @@
* SUCH DAMAGE.
*/
// Package vcl encapsulates representations of a VCL configuration
// derived from Ingress and VarnishConfig specifications, and
// checking the representations for equivalence (to check if new
// syncs are necessary). It drives the templating that generates
// VCL source code.
package vcl
import (
......@@ -40,6 +45,9 @@ import (
"text/template"
)
// Address represents an endpoint for either a backend instance
// (Endpoint of a Service to which requests are routed) or a Varnish
// instance (where the port is the admin port).
type Address struct {
IP string
Port int32
......@@ -53,17 +61,20 @@ func (addr Address) hash(hash hash.Hash) {
}
// interface for sorting []Address
type ByIPPort []Address
type byIPPort []Address
func (a ByIPPort) Len() int { return len(a) }
func (a ByIPPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByIPPort) Less(i, j int) bool {
func (a byIPPort) Len() int { return len(a) }
func (a byIPPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byIPPort) Less(i, j int) bool {
if a[i].IP < a[j].IP {
return true
}
return a[i].Port < a[j].Port
}
// Service represents either a backend Service (Endpoints to which
// requests are routed) or a Varnish Service (with addresses for the
// admin ports).
type Service struct {
Name string
Addresses []Address
......@@ -77,12 +88,14 @@ func (svc Service) hash(hash hash.Hash) {
}
// interface for sorting []Service
type ByName []Service
type byName []Service
func (a ByName) Len() int { return len(a) }
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a byName) Len() int { return len(a) }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
// Rule represents an IngressRule: a Host name (possibly empty) and a
// map from URL paths to Services.
type Rule struct {
Host string
PathMap map[string]Service
......@@ -104,12 +117,14 @@ func (rule Rule) hash(hash hash.Hash) {
}
// interface for sorting []Rule
type ByHost []Rule
type byHost []Rule
func (a ByHost) Len() int { return len(a) }
func (a ByHost) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByHost) Less(i, j int) bool { return a[i].Host < a[j].Host }
func (a byHost) Len() int { return len(a) }
func (a byHost) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byHost) Less(i, j int) bool { return a[i].Host < a[j].Host }
// Probe represents the configuration of health probes derived from
// the VarnishConfig Custom Resource.
type Probe struct {
Timeout string
Interval string
......@@ -126,6 +141,8 @@ func (probe Probe) hash(hash hash.Hash) {
hash.Write([]byte(probe.Threshold))
}
// ShardCluster represents the configuration for self-sharding derived
// from the VarnishConfig Custom Resource.
type ShardCluster struct {
Nodes []Service
Probe Probe
......@@ -140,13 +157,27 @@ func (shard ShardCluster) hash(hash hash.Hash) {
hash.Write([]byte(shard.MaxSecondaryTTL))
}
// Spec is the specification for a VCL configuration derived from
// Ingresses and VarnishConfig Custom Resources. This abstracts the
// VCL to be loaded by all instances of a Varnish Service.
type Spec struct {
// DefaultService corresponds to the default IngressBackend in
// an Ingress, if present.
DefaultService Service
Rules []Rule
AllServices map[string]Service
ShardCluster ShardCluster
// Rules corresponds to the IngressRules in an Ingress.
Rules []Rule
// AllServices is a map of Service names to Service
// configurations for all IngressBackends mentioned in an
// Ingress, including the default Backend, and all Backends to
// which requests are to be routed.
AllServices map[string]Service
// ShardCluster is derived from the self-sharding
// specification in a VarnishConfig resource.
ShardCluster ShardCluster
}
// DeepHash computes a 64-bit hash value from a Spec such that if two
// Specs are deeply equal, then their hash values are equal.
func (spec Spec) DeepHash() uint64 {
hash := fnv.New64a()
spec.DefaultService.hash(hash)
......@@ -168,6 +199,10 @@ func (spec Spec) DeepHash() uint64 {
return hash.Sum64()
}
// Canonical returns a canonical form of a Spec, in which all of its
// fields are ordered. This ensures that reflect.DeepEqual and
// DeepHash return values consistent with the equivalence of two
// Specs.
func (spec Spec) Canonical() Spec {
canon := Spec{
DefaultService: Service{Name: spec.DefaultService.Name},
......@@ -176,36 +211,36 @@ func (spec Spec) Canonical() Spec {
ShardCluster: spec.ShardCluster,
}
copy(canon.DefaultService.Addresses, spec.DefaultService.Addresses)
sort.Stable(ByIPPort(canon.DefaultService.Addresses))
sort.Stable(byIPPort(canon.DefaultService.Addresses))
copy(canon.Rules, spec.Rules)
sort.Stable(ByHost(canon.Rules))
sort.Stable(byHost(canon.Rules))
for _, rule := range canon.Rules {
for _, svc := range rule.PathMap {
sort.Stable(ByIPPort(svc.Addresses))
sort.Stable(byIPPort(svc.Addresses))
}
}
for name, svcs := range spec.AllServices {
canon.AllServices[name] = svcs
sort.Stable(ByIPPort(canon.AllServices[name].Addresses))
sort.Stable(byIPPort(canon.AllServices[name].Addresses))
}
sort.Stable(ByName(canon.ShardCluster.Nodes))
sort.Stable(byName(canon.ShardCluster.Nodes))
for _, node := range canon.ShardCluster.Nodes {
sort.Stable(ByIPPort(node.Addresses))
sort.Stable(byIPPort(node.Addresses))
}
return canon
}
var fMap = template.FuncMap{
"plusOne": func(i int) int { return i + 1 },
"vclMangle": func(s string) string { return Mangle(s) },
"vclMangle": func(s string) string { return mangle(s) },
"backendName": func(svc Service, addr string) string {
return BackendName(svc, addr)
return backendName(svc, addr)
},
"dirName": func(svc Service) string {
return DirectorName(svc)
return directorName(svc)
},
"urlMatcher": func(rule Rule) string {
return URLMatcher(rule)
return urlMatcher(rule)
},
}
......@@ -215,23 +250,24 @@ const (
)
var (
IngressTmpl *template.Template
ShardTmpl *template.Template
ingressTmpl *template.Template
shardTmpl *template.Template
symPattern = regexp.MustCompile("^[[:alpha:]][[:word:]-]*$")
first = regexp.MustCompile("[[:alpha:]]")
restIllegal = regexp.MustCompile("[^[:word:]-]+")
)
// InitTemplates initializes templates for VCL generation.
func InitTemplates(tmplDir string) error {
var err error
ingTmplPath := path.Join(tmplDir, ingTmplSrc)
shardTmplPath := path.Join(tmplDir, shardTmplSrc)
IngressTmpl, err = template.New(ingTmplSrc).
ingressTmpl, err = template.New(ingTmplSrc).
Funcs(fMap).ParseFiles(ingTmplPath)
if err != nil {
return err
}
ShardTmpl, err = template.New(shardTmplSrc).
shardTmpl, err = template.New(shardTmplSrc).
Funcs(fMap).ParseFiles(shardTmplPath)
if err != nil {
return err
......@@ -248,20 +284,21 @@ func replIllegal(ill []byte) []byte {
return repl
}
// GetSrc returns the VCL generated to implement a Spec.
func (spec Spec) GetSrc() (string, error) {
var buf bytes.Buffer
if err := IngressTmpl.Execute(&buf, spec); err != nil {
if err := ingressTmpl.Execute(&buf, spec); err != nil {
return "", err
}
if len(spec.ShardCluster.Nodes) > 0 {
if err := ShardTmpl.Execute(&buf, spec.ShardCluster); err != nil {
if err := shardTmpl.Execute(&buf, spec.ShardCluster); err != nil {
return "", err
}
}
return buf.String(), nil
}
func Mangle(s string) string {
func mangle(s string) string {
var mangled string
bytes := []byte(s)
if s == "" || symPattern.Match(bytes) {
......@@ -276,14 +313,14 @@ func Mangle(s string) string {
return mangled
}
func BackendName(svc Service, addr string) string {
return Mangle(svc.Name + "_" + addr)
func backendName(svc Service, addr string) string {
return mangle(svc.Name + "_" + addr)
}
func DirectorName(svc Service) string {
return Mangle(svc.Name + "_director")
func directorName(svc Service) string {
return mangle(svc.Name + "_director")
}
func URLMatcher(rule Rule) string {
return Mangle(rule.Host + "_url")
func urlMatcher(rule Rule) string {
return mangle(rule.Host + "_url")
}
......@@ -112,7 +112,7 @@ var cafeSpec = Spec{
func TestIngressTemplate(t *testing.T) {
var buf bytes.Buffer
gold := "ingressrule.golden"
if err := IngressTmpl.Execute(&buf, cafeSpec); err != nil {
if err := ingressTmpl.Execute(&buf, cafeSpec); err != nil {
t.Fatal("Execute():", err)
}
ok, err := cmpGold(buf.Bytes(), gold)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment