Commit 397aa1d1 authored by Geoff Simmons's avatar Geoff Simmons

Add haproxy controller code.

WIP -- testing currently incomplete.
parent e670a7bb
/*
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package haproxy
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/go-openapi/strfmt"
"github.com/haproxytech/models"
)
var fmts = strfmt.NewFormats()
const (
dataplaneUser = "dataplaneapi"
txPath = "/v1/services/haproxy/transactions"
sitesPath = "/v1/services/haproxy/sites"
reloadsPath = "/v1/services/haproxy/reloads"
versionHdr = "Configuration-Version"
reloadIdHdr = "Reload-ID"
varnishSock = "unix@/varnish.sock"
frontend = "offloader"
backend = "varnish"
server = backend
frontendSitePath = sitesPath + "/" + frontend
)
var port = int64(443)
type ReloadStatus uint8
const (
Unknown ReloadStatus = iota
Failed
InProgress
Succeeded
)
func status(reloadStr string) ReloadStatus {
switch reloadStr {
case "failed":
return Failed
case "in_progress":
return InProgress
case "succeeded":
return Succeeded
default:
return Unknown
}
}
func (status ReloadStatus) String() string {
switch status {
case Failed:
return "failed"
case InProgress:
return "in_progress"
case Succeeded:
return "succeeded"
default:
return "UNKNOWN"
}
}
type ReloadState struct {
ID string
Response string
Timestamp time.Time
Status ReloadStatus
}
type DataplaneError struct {
Err *models.Error
Status int
Version int
}
func (err *DataplaneError) Error() string {
return *err.Err.Message
}
type DataplaneClient struct {
baseURL *url.URL
user string
password string
client *http.Client
}
func NewDataplaneClient(host, pass string) *DataplaneClient {
return &DataplaneClient{
baseURL: &url.URL{
Scheme: "http",
Host: host,
},
user: dataplaneUser,
password: pass,
client: http.DefaultClient,
}
}
func (client *DataplaneClient) getHost() string {
return client.baseURL.Host
}
func getSite(certName string) ([]byte, error) {
site := &models.Site{
Name: frontend,
Service: &models.SiteService{
Listeners: []*models.Bind{
&models.Bind{
Name: frontend,
Port: &port,
Ssl: true,
SslCertificate: certName,
},
},
},
Farms: []*models.SiteFarm{
&models.SiteFarm{
Name: backend,
UseAs: "default",
Servers: []*models.Server{
&models.Server{
Name: server,
Address: varnishSock,
Check: "enabled",
SendProxyV2: "enabled",
},
},
},
},
}
return site.MarshalBinary()
}
func drainAndClose(body io.ReadCloser) {
io.Copy(ioutil.Discard, body)
body.Close()
}
func (client *DataplaneClient) getReq(
path, method string, body io.Reader) (*http.Request, error) {
relPath := &url.URL{Path: path}
url := client.baseURL.ResolveReference(relPath)
req, err := http.NewRequest(method, url.String(), body)
if err != nil {
return req, err
}
req.SetBasicAuth(client.user, client.password)
req.Header.Set("Accept", "application/json")
return req, nil
}
func setVersion(req *http.Request, version int64) {
query := req.URL.Query()
query.Add("version", strconv.FormatInt(version, 10))
req.URL.RawQuery = query.Encode()
}
func setTx(req *http.Request, tx *models.Transaction) {
query := req.URL.Query()
query.Add("transaction_id", tx.ID)
req.URL.RawQuery = query.Encode()
}
func getBody(resp *http.Response) ([]byte, error) {
defer drainAndClose(resp.Body)
// XXX read ContentLength bytes
return ioutil.ReadAll(resp.Body)
}
func getTx(body []byte) (tx *models.Transaction, err error) {
tx = &models.Transaction{
Version: tx.Version,
ID: tx.ID,
Status: tx.Status,
}
if err = tx.UnmarshalBinary(body); err != nil {
err = tx.Validate(fmts)
}
return tx, err
}
func getError(resp *http.Response, body []byte) error {
dplaneErr := &DataplaneError{
Err: &models.Error{},
Status: resp.StatusCode,
}
if dplaneVer, err := strconv.Atoi(
resp.Header.Get(versionHdr)); err != nil {
dplaneErr.Version = dplaneVer
}
dplaneErr.Err.UnmarshalJSON(body)
return dplaneErr
}
func (client *DataplaneClient) StartTx(
version int64) (tx *models.Transaction, err error) {
// path := &url.URL{Path: "/v1/services/haproxy/transactions"}
// url := client.baseURL.ResolveReference(path)
// req, err := http.NewRequest("POST", url.String(), nil)
// if err != nil {
// return
// }
// req.SetBasicAuth(client.user, client.password)
// query := req.URL.Query()
// query.Add("version", strconv.FormatInt(version, 10))
// req.URL.RawQuery = query.Encode()
// req.Header.Set("Accept", "application/json")
req, err := client.getReq(txPath, http.MethodPost, nil)
if err != nil {
return
}
setVersion(req, version)
resp, err := client.client.Do(req)
if err != nil {
return
}
// defer drainAndClose(resp.Body)
// // XXX read ContentLength bytes
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return
// }
body, err := getBody(resp)
if err != nil {
return
}
if resp.StatusCode == http.StatusCreated {
// tx = &models.Transaction{}
// if err = tx.UnmarshalBinary(body); err != nil {
// err = tx.Validate(fmts)
// }
// return
return getTx(body)
}
// dplaneErr := &DataplaneError{
// Err: &models.Error{},
// Status: resp.StatusCode,
// }
// if dplaneVer, err := strconv.Atoi(
// resp.Header.Get("Configuration-Version")); err != nil {
// dplaneErr.Version = dplaneVer
// }
// dplaneErr.Err.UnmarshalJSON(body)
// return nil, dplaneErr
return nil, getError(resp, body)
}
func (client *DataplaneClient) FinishTx(
tx *models.Transaction) (ReloadState, error) {
// path := &url.URL{Path: "/v1/services/haproxy/transactions/" + tx.ID}
// url := client.baseURL.ResolveReference(path)
// req, err := http.NewRequest("PUT", url.String(), nil)
// if err != nil {
// return false, err
// }
// req.SetBasicAuth(client.user, client.password)
// req.Header.Set("Accept", "application/json")
state := ReloadState{Status: Unknown}
req, err := client.getReq(txPath+"/"+tx.ID, http.MethodPut, nil)
if err != nil {
return state, err
}
setVersion(req, tx.Version)
resp, err := client.client.Do(req)
if err != nil {
return state, err
}
// defer drainAndClose(resp.Body)
// // XXX read ContentLength bytes
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return false, err
// }
body, err := getBody(resp)
if err != nil {
return state, err
}
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted:
if tx, err = getTx(body); err != nil {
return state, err
}
if resp.StatusCode == http.StatusOK {
state.Status = Succeeded
state.Timestamp = time.Now()
return state, nil
}
state.ID = resp.Header.Get(reloadIdHdr)
state.Status = status(tx.Status)
return state, nil
default:
return state, getError(resp, body)
}
// if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
// if tx, err = body.getTx(); err != nil {
// return false, err
// }
// return resp.StatusCode == http.StatusOK, nil
// }
// dplaneErr := &DataplaneError{
// Err: &models.Error{},
// Status: resp.StatusCode,
// }
// if dplaneVer, err := strconv.Atoi(
// resp.Header.Get("Configuration-Version")); err != nil {
// dplaneErr.Version = dplaneVer
// }
// dplaneErr.Err.UnmarshalJSON(body)
// return false, dplaneErr
// return false, resp.getError(body)
}
func (client *DataplaneClient) configTLS(
tx *models.Transaction, spec Spec, path, method string) error {
// port := int64(443)
// site := &models.Site{
// Name: "offloader",
// Service: &models.SiteService{
// Listeners: []*models.Bind{
// &models.Bind{
// Name: "offloader",
// Port: &port,
// Ssl: true,
// SslCertificate: "ssl-cert-snakeoil.pem",
// },
// },
// },
// Farms: []*models.SiteFarm{
// &models.SiteFarm{
// Name: "varnish",
// UseAs: "default",
// Servers: []*models.Server{
// &models.Server{
// Name: "varnish",
// Address: varnishSock,
// Check: "enabled",
// SendProxyV2: "enabled",
// },
// },
// },
// },
// }
// siteBytes, err := site.MarshalBinary()
// if err != nil {
// return err
// }
var rdr *bytes.Reader
if method != http.MethodDelete {
site, err := getSite(spec.CertName())
if err != nil {
return err
}
rdr = bytes.NewReader(site)
}
// path := &url.URL{Path: "/v1/services/haproxy/sites"}
// url := client.baseURL.ResolveReference(path)
// req, err := http.NewRequest("POST", url.String(),
// bytes.NewReader(siteBytes))
// if err != nil {
// return err
// }
// query := req.URL.Query()
// query.Add("transaction_id", tx.ID)
// req.URL.RawQuery = query.Encode()
// req.SetBasicAuth(client.user, client.password)
// req.Header.Set("Accept", "application/json")
req, err := client.getReq(path, method, rdr)
if err != nil {
return err
}
setTx(req, tx)
if method != http.MethodDelete {
req.Header.Set("Content-Type", "application/json")
}
resp, err := client.client.Do(req)
if err != nil {
return err
}
// defer drainAndClose(resp.Body)
// // XXX read ContentLength bytes
// body, err := ioutil.ReadAll(resp.Body)
// if err != nil {
// return err
// }
body, err := getBody(resp)
if err != nil {
return err
}
switch resp.StatusCode {
case http.StatusAccepted:
// XXX return the Reload-ID?
if method != http.MethodDelete {
// Validate the response body
site := &models.Site{}
if err = site.UnmarshalBinary(body); err != nil {
return err
}
if err = site.Validate(fmts); err != nil {
return err
}
}
return nil
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
panic("Got " + http.StatusText(resp.StatusCode) +
" but did not set force_reload")
default:
return getError(resp, body)
}
// if resp.StatusCode == http.StatusCreated {
// // XXX ????
// panic("Got Created but did not set force_reload")
// }
// if resp.StatusCode != http.StatusAccepted {
// dplaneErr := &DataplaneError{
// Err: &models.Error{},
// Status: resp.StatusCode,
// }
// if dplaneVer, err := strconv.Atoi(
// resp.Header.Get("Configuration-Version")); err != nil {
// dplaneErr.Version = dplaneVer
// }
// dplaneErr.Err.UnmarshalJSON(body)
// return dplaneErr
// }
// site = &models.Site{}
// if err = site.UnmarshalBinary(body); err != nil {
// return err
// }
// if err = site.Validate(fmts); err != nil {
// return err
// }
// return nil
}
func (client *DataplaneClient) AddTLS(
tx *models.Transaction, spec Spec) error {
return client.configTLS(tx, spec, sitesPath, http.MethodPost)
}
func (client *DataplaneClient) UpdateTLS(
tx *models.Transaction, spec Spec) error {
return client.configTLS(tx, spec, frontendSitePath, http.MethodPut)
}
func (client *DataplaneClient) DeleteTLS(tx *models.Transaction) error {
return client.configTLS(tx, Spec{}, frontendSitePath, http.MethodDelete)
}
func (client *DataplaneClient) DeleteTx(tx *models.Transaction) error {
return nil
}
func (client *DataplaneClient) Reloaded(id string) (bool, ReloadState, error) {
state := ReloadState{ID: id}
req, err := client.getReq(reloadsPath+"/"+id, http.MethodGet, nil)
if err != nil {
return false, state, err
}
resp, err := client.client.Do(req)
if err != nil {
return false, state, err
}
body, err := getBody(resp)
if err != nil {
return false, state, err
}
switch resp.StatusCode {
case http.StatusOK:
reload := &models.Reload{}
if err = reload.UnmarshalBinary(body); err != nil {
return false, state, err
}
if err = reload.Validate(fmts); err != nil {
return false, state, err
}
state.ID = reload.ID
state.Response = reload.Response
state.Timestamp = time.Unix(reload.ReloadTimestamp, 0)
state.Status = status(reload.Status)
return true, state, nil
default:
dplaneErr := getError(resp, body)
if resp.StatusCode == http.StatusNotFound {
return false, state, nil
}
return false, state, dplaneErr
}
}
/*
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package haproxy
import (
"net/http"
"net/url"
)
type FaccessError struct {
Status int
PEM string
}
func (err FaccessError) Error() string {
return err.PEM + ": " + http.StatusText(err.Status)
}
type FaccessClient struct {
baseURL *url.URL
client *http.Client
}
func NewFaccessClient(host string) *FaccessClient {
return &FaccessClient{
baseURL: &url.URL{
Scheme: "http",
Host: host,
},
client: http.DefaultClient,
}
}
func (client *FaccessClient) getHost() string {
return client.baseURL.Host
}
func (client *FaccessClient) PEMExists(spec Spec) (bool, error) {
relPath := &url.URL{Path: spec.CertName()}
url := client.baseURL.ResolveReference(relPath)
req, err := http.NewRequest(http.MethodHead, url.String(), nil)
if err != nil {
return false, err
}
resp, err := client.client.Do(req)
if err != nil {
return false, err
}
defer drainAndClose(resp.Body)
switch resp.StatusCode {
case http.StatusNoContent:
return true, nil
case http.StatusNotFound:
return false, nil
default:
return false, &FaccessError{
Status: resp.StatusCode,
PEM: spec.CertName(),
}
}
}
/*
* Copyright (c) 2020 UPLEX Nils Goroll Systemoptimierung
* All rights reserved
*
* Author: Geoffrey Simmons <geoffrey.simmons@uplex.de>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
package haproxy
// XXX:
// - monitor
// - metrics
import (
"crypto/sha512"
"encoding/base64"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/interfaces"
"github.com/sirupsen/logrus"
)
const linux_name_max = 255
type Spec struct {
Namespace string
Name string
UID string
ResourceVersion string
}
func (spec Spec) String() string {
return spec.Namespace + "/" + spec.Name
}
func (spec Spec) hashedName() string {
hash := sha512.New512_256()
hash.Write([]byte(spec.Namespace))
hash.Write([]byte(spec.Name))
bytes := hash.Sum(nil)
return base64.RawURLEncoding.EncodeToString(bytes)
}
// CertName returns a name for a PEM file to be used in the haproxy
// ssl configuration (concatenated from crt and key), formed from
// the Namespace and Name of the TLS Secret. The name has the .pem
// extension.
//
// Guaranteed to be shorter than NAME_MAX on recent common Linux
// distributions; so it may be an opaque hash string, if the Namespace
// and/or Name are too long.
func (spec Spec) CertName() string {
name := spec.Namespace + "_" + spec.Name
if len(name) > linux_name_max {
name = spec.hashedName()
}
return name + ".pem"
}
type OffldAddr struct {
PodNamespace string
PodName string
IP string
DataplanePort int32
FaccessPort int32
}
type OffldrError struct {
addr string
name string
err error
}
func (offldrErr OffldrError) Error() string {
return fmt.Sprintf("name=%s addr=%s: %v", offldrErr.name,
offldrErr.addr, offldrErr.err)
}
type OffldrErrors []OffldrError
func (offldrErrs OffldrErrors) Error() string {
var sb strings.Builder
sb.WriteRune('[')
for _, err := range offldrErrs {
sb.WriteRune('{')
sb.WriteString(err.Error())
sb.WriteRune('}')
}
sb.WriteRune(']')
return sb.String()
}
type configStatus struct {
dplaneState ReloadState
version int64
pemExists bool
}
type haproxyInst struct {
spec *Spec
dplane *DataplaneClient
faccess *FaccessClient
dplanePasswd *string
admMtx *sync.Mutex
status configStatus
name string
}
type offldrSvc struct {
instances []*haproxyInst
spec *Spec
secrName string
}
type Controller struct {
log *logrus.Logger
svcEvt interfaces.SvcEventGenerator
svcs map[string]*offldrSvc
secrets map[string]*string
wg *sync.WaitGroup
monIntvl time.Duration
}
func NewOffloaderController(
log *logrus.Logger, monIntvl time.Duration) *Controller {
// XXX initMetrics()
return &Controller{
svcs: make(map[string]*offldrSvc),
secrets: make(map[string]*string),
log: log,
monIntvl: monIntvl,
wg: new(sync.WaitGroup),
}
}
func (hc *Controller) Start() {
fmt.Printf("Offloader controller logging at level: %s\n", hc.log.Level)
// go hc.monitor(hc.monIntvl)
}
func (hc *Controller) HasOffloader(svcKey string) bool {
_, exists := hc.svcs[svcKey]
return exists
}
// XXX eliminate repetition in updateLoadStatus() & updateInstance()
func (hc *Controller) updateLoadStatus(inst *haproxyInst) error {
hc.log.Debugf("Offloader instance %s, checking load status, spec: %+v",
inst.name, inst.spec)
inst.admMtx.Lock()
defer inst.admMtx.Unlock()
hc.wg.Add(1)
defer hc.wg.Done()
if !inst.status.pemExists {
hc.log.Debugf("Offloader instance %s, checking for PEM file %s",
inst.name, inst.spec.CertName())
pemExists, err := inst.faccess.PEMExists(*inst.spec)
if err != nil {
// XXX decode?
return err
}
inst.status.pemExists = pemExists
if !pemExists {
// XXX SyncIncomplete
return fmt.Errorf("Offloader instance %s: certificate "+
"PEM %s not found", inst.name,
inst.spec.CertName())
}
hc.log.Infof("Offloader instance %s: PEM file %s exists",
inst.name, inst.spec.CertName())
// XXX initiate load
}
hc.log.Debugf("Offloader instance %s: checking reload state: %s",
inst.name, inst.status.dplaneState.ID)
reloaded, state, err := inst.dplane.Reloaded(inst.status.dplaneState.ID)
if err != nil {
return err
}
inst.status.dplaneState = state
if !reloaded {
// XXX SyncIncomplete
return fmt.Errorf("Offloader instance %s: TLS config %s not "+
"loaded, status: %s", inst.name, inst.spec,
state.Status)
}
hc.log.Infof("Offloader instance %s: TLS config %s successfully "+
"loaded at %s", inst.name, inst.spec,
state.Timestamp.Format(time.RFC3339))
return nil
}
func (hc *Controller) updateInstance(inst *haproxyInst, spec *Spec) error {
var err error
hc.log.Info("Update offloader instance %s to TLS config %s: %s",
inst.name, spec)
hc.log.Debugf("Offloader instance: %+v", inst)
if reflect.DeepEqual(spec, inst.spec) {
hc.log.Infof("Offloader instance %s: TLS config %s was "+
"accepted for load", inst.name, spec)
return hc.updateLoadStatus(inst)
}
hc.log.Debugf("Offloader instance %s, old spec: %+v, new spec: %+v",
inst.name, inst.spec, spec)
inst.admMtx.Lock()
defer inst.admMtx.Unlock()
hc.wg.Add(1)
defer hc.wg.Done()
hc.log.Debugf("Offloader instance %s, checking for PEM file %s",
inst.name, spec.CertName())
pemExists, err := inst.faccess.PEMExists(*spec)
if err != nil {
// XXX decode?
return err
}
inst.status.pemExists = pemExists
if !pemExists {
// XXX SyncIncomplete
return fmt.Errorf("Offloader instance %s: certificate PEM %s "+
"not found", inst.name, spec.CertName())
}
hc.log.Infof("Offloader instance %s: PEM file %s exists", inst.name,
spec.CertName())
init := inst.status.version == 0
version := inst.status.version + 1
hc.log.Debugf("Offloader instance %s: starting tx for version %d",
inst.name, version)
tx, err := inst.dplane.StartTx(version)
if err != nil {
return err
}
hc.log.Debugf("Offloader instance %s: started transaction: %+v",
inst.name, tx)
if init {
hc.log.Debugf("Offloader instance %s: adding TLS config %s",
inst.name, spec)
err = inst.dplane.AddTLS(tx, *spec)
} else {
hc.log.Debugf("Offloader instance %s: updating TLS config %s",
inst.name, spec)
err = inst.dplane.UpdateTLS(tx, *spec)
}
if err != nil {
return err
}
hc.log.Debugf("Offloader instance %s: finishing tx for version %d: %+v",
inst.name, tx.Version, tx)
state, err := inst.dplane.FinishTx(tx)
if err != nil {
return err
}
hc.log.Infof("Offloader instance %s: TLS config %s accepted for load",
inst.name, spec)
inst.status.version = tx.Version
inst.status.dplaneState = state
inst.spec = &Spec{
Namespace: spec.Namespace,
Name: spec.Name,
UID: spec.UID,
ResourceVersion: spec.ResourceVersion,
}
// XXX where does this go?
// defer hc.dataplane.DeleteTx(tx)
switch state.Status {
case Succeeded:
inst.status.dplaneState = state
hc.log.Infof("Offloader instance %s: TLS config %s sucessfully"+
"loaded at %s", inst.name, spec,
state.Timestamp.Format(time.RFC3339))
return nil
case Failed, Unknown:
return fmt.Errorf("Offloader instance %s: TLS config %s load "+
"failed or status unknown: %v", inst.name, spec, state)
case InProgress:
hc.log.Debugf("Offloader instance %s: checking reload state: "+
"%s", inst.name, inst.status.dplaneState.ID)
reloaded, state, err := inst.dplane.Reloaded(
inst.status.dplaneState.ID)
if err != nil {
return err
}
inst.status.dplaneState = state
if reloaded {
hc.log.Infof("Offloader instance %s: TLS config %s "+
"successfully loaded at %s", inst.name, spec,
state.Timestamp.Format(time.RFC3339))
return nil
}
// XXX SyncIncomplete
return fmt.Errorf("Offloader instance %s: TLS config %s not "+
"loaded, status: %s", inst.name, spec, state.Status)
default:
panic("Illegal reload status")
}
}
// updateVarnishSvc(name string) error
// updates each instance in vc.svcs[name]
func (inst *haproxyInst) mkError(err error) OffldrError {
return OffldrError{
addr: inst.dplane.getHost(),
name: inst.name,
err: err,
}
}
func (hc *Controller) updateOffldSvc(svcKey string) error {
var errs OffldrErrors
svc, exists := hc.svcs[svcKey]
if !exists || svc == nil {
return fmt.Errorf("No known offloader service for %s", svcKey)
}
if svc.secrName == "" {
return fmt.Errorf("No known admin secret for offloader service"+
" %s", svcKey)
}
if svc.spec == nil {
hc.log.Infof("Offloader service %s: no current spec", svcKey)
return nil
}
hc.log.Info("Updating offloader instances for ", svcKey)
for _, inst := range svc.instances {
if inst == nil {
hc.log.Errorf("Instance object is nil")
continue
}
// XXX metrics
if err := hc.updateInstance(inst, svc.spec); err != nil {
offldrErr := inst.mkError(err)
errs = append(errs, offldrErr)
}
}
if len(errs) != 0 {
return errs
}
return nil
}
// setCfgLabel(inst *varnishInst, cfg, lbl string, mayClose bool)
// Label cfg as lbl at Varnish instance inst. If mayClose is true, then
// losing the admin connection is not an error (Varnish may be
// shutting down).
// Changes readiness & regular
// removeVarnishInstances(insts []*varnishInst) error
// On Delete for a Varnish instance, we set it to the unready state.
// For haproxy delete the site.
func (hc *Controller) removeOffldrInstances(
insts []*haproxyInst) (errs OffldrErrors) {
for _, inst := range insts {
version := inst.status.version + 1
hc.log.Debugf("Offloader instance %s: starting tx for version "+
"%d", inst.name, version)
tx, err := inst.dplane.StartTx(version)
if err != nil {
errs = append(errs, inst.mkError(err))
continue
}
hc.log.Debugf("Offloader instance %s: started transaction: %+v",
inst.name, tx)
hc.log.Debugf("Offloader instance %s: deleting TLS config",
inst.name)
err = inst.dplane.DeleteTLS(tx)
if err != nil {
errs = append(errs, inst.mkError(err))
continue
}
hc.log.Debugf("Offloader instance %s: finishing tx for "+
"version %d: %+v", inst.name, tx.Version, tx)
state, err := inst.dplane.FinishTx(tx)
if err != nil {
errs = append(errs, inst.mkError(err))
continue
}
defer inst.dplane.DeleteTx(tx)
hc.log.Infof("Offloader instance %s: transaction accepted for "+
"delete", inst.name)
inst.status.version = tx.Version
inst.status.dplaneState = state
switch state.Status {
case Succeeded:
inst.status.dplaneState = state
hc.log.Infof("Offloader instance %s: TLS config "+
"sucessfully deleted at %s", inst.name,
state.Timestamp.Format(time.RFC3339))
continue
case Failed, Unknown:
err = fmt.Errorf("Offloader instance %s: TLS config "+
"delete failed or status unknown: %v",
inst.name, state)
errs = append(errs, inst.mkError(err))
continue
case InProgress:
hc.log.Debugf("Offloader instance %s: checking reload "+
"state: %s", inst.name,
inst.status.dplaneState.ID)
reloaded, state, err := inst.dplane.Reloaded(
inst.status.dplaneState.ID)
if err != nil {
errs = append(errs, inst.mkError(err))
continue
}
inst.status.dplaneState = state
if reloaded {
hc.log.Infof("Offloader instance %s: TLS "+
"config successfully deleted at %s",
inst.name,
state.Timestamp.Format(time.RFC3339))
// instsGauge.Dec()
continue
}
// XXX SyncIncomplete
err = fmt.Errorf("Offloader instance %s: TLS config "+
"not deleted, status: %s", inst.name,
state.Status)
errs = append(errs, inst.mkError(err))
continue
default:
panic("Illegal reload status")
}
}
if len(errs) == 0 {
return nil
}
return errs
}
// updateVarnishSvcAddrs(key string, addrs []vcl.Address, secrPtr *[]byte,
// loadVCL bool)
// New Endpoints for the Service, constructs the new, remove and keep lists.
func offldAddr2haproxyInst(addr OffldAddr, dplanePasswd *string) *haproxyInst {
var passwd string
if dplanePasswd != nil {
passwd = *dplanePasswd
}
dplaneAddr := addr.IP + ":" + strconv.Itoa(int(addr.DataplanePort))
dplaneClient := NewDataplaneClient(dplaneAddr, passwd)
faccessAddr := addr.IP + ":" + strconv.Itoa(int(addr.FaccessPort))
faccessClient := NewFaccessClient(faccessAddr)
return &haproxyInst{
dplane: dplaneClient,
faccess: faccessClient,
name: addr.PodNamespace + "/" + addr.PodName,
dplanePasswd: dplanePasswd,
admMtx: &sync.Mutex{},
}
}
// Used as a map key in updateOffldrAddrs().
type offldaddrs struct {
dplaneAddr, faccessAddr string
}
func (hc *Controller) updateOffldrAddrs(key string, addrs []OffldAddr,
passwdPtr *string) error {
var newInsts, remInsts, keepInsts []*haproxyInst
svc, exists := hc.svcs[key]
if !exists {
panic("No known offloader service " + key)
}
updateAddrs := make(map[offldaddrs]OffldAddr)
prevAddrs := make(map[offldaddrs]*haproxyInst)
for _, addr := range addrs {
key := offldaddrs{
dplaneAddr: addr.IP + ":" +
strconv.Itoa(int(addr.DataplanePort)),
faccessAddr: addr.IP + ":" +
strconv.Itoa(int(addr.FaccessPort)),
}
updateAddrs[key] = addr
}
for _, inst := range svc.instances {
key := offldaddrs{
dplaneAddr: inst.dplane.getHost(),
faccessAddr: inst.faccess.getHost(),
}
prevAddrs[key] = inst
}
for addr := range updateAddrs {
inst, exists := prevAddrs[addr]
if exists {
keepInsts = append(keepInsts, inst)
continue
}
newInst := offldAddr2haproxyInst(updateAddrs[addr], passwdPtr)
newInsts = append(newInsts, newInst)
}
for addr, inst := range prevAddrs {
_, exists := updateAddrs[addr]
if !exists {
remInsts = append(remInsts, inst)
}
}
hc.log.Debugf("Varnish offloader svc %s: keeping instances=%+v, "+
"new instances=%+v, removing instances=%+v", key, keepInsts,
newInsts, remInsts)
svc.instances = append(keepInsts, newInsts...)
errs := hc.removeOffldrInstances(remInsts)
hc.log.Tracef("Offloader svc %s config: %+v", key, *svc)
updateErrs := hc.updateOffldSvc(key)
if updateErrs != nil {
offldrErrs, ok := updateErrs.(OffldrErrors)
if ok {
errs = append(errs, offldrErrs...)
} else {
return updateErrs
}
}
if len(errs) == 0 {
return nil
}
return errs
}
// AddOrUpdateVarnishSvc(key string, addrs []vcl.Address, secrName string,
// loadVCL bool) error
// AddOrUpdateVarnishSvc causes a sync for the Varnish Service
// identified by namespace/name key.
//
// addrs: list of admin addresses for instances in the Service
// (internal IPs and admin ports)
// secrName: namespace/name of the admin secret to use for the
// Service
// loadVCL: true if the VCL config for the Service should be
// reloaded
//
// - if vc.svcs[key] does not exist, create it, and its instances from addrs
// - set svc.secrName = secrName.
// - if vc.secrets[secrName] exists, set that secret for all instances
// - call updateVarnishSvcAddrs() -- updates each instance
//
// => called solely by worker.syncSvc() (Service updates)
// call *before* updating Varnish, so that haproxy is only ready when
// Varnish becomes ready
func (hc *Controller) AddOrUpdateOffloader(key string, addrs []OffldAddr,
secrName string) error {
var passwdPtr *string
svc, exists := hc.svcs[key]
if !exists {
var instances []*haproxyInst
svc = &offldrSvc{}
for _, addr := range addrs {
instance := offldAddr2haproxyInst(addr, nil)
hc.log.Debugf("offloader svc %s: creating instance %+v",
key, *instance)
instances = append(instances, instance)
// instsGauge.Inc()
}
svc.instances = instances
hc.svcs[key] = svc
// svcsGauge.Inc()
hc.log.Debugf("offloader svc %s: created config", key)
}
hc.log.Debugf("offloader svc %s config: %+v", key, svc)
svc.secrName = secrName
if _, exists := hc.secrets[secrName]; exists {
passwdPtr = hc.secrets[secrName]
}
for _, inst := range svc.instances {
inst.dplanePasswd = passwdPtr
}
hc.log.Debugf("offloader svc %s: updated with secret %s", key, secrName)
hc.log.Debugf("Update offloader svc %s: addrs=%+v secret=%s", key,
addrs, secrName)
return hc.updateOffldrAddrs(key, addrs, passwdPtr)
}
// DeleteVarnishSvc(key string) error
// DeleteVarnishSvc is called on the Delete event for the Varnish
// Service identified by the namespace/name key. The Varnish instance
// is set to the unready state, and no further action is taken (other
// resources in the cluster may shut down the Varnish instances).
//
// - dataplane.Delete() the Site
// - currently no way to set haproxy healthz to not ready
func (hc *Controller) DeleteOffldSvc(svcKey string) error {
svc, exists := hc.svcs[svcKey]
if !exists {
return nil
}
err := hc.removeOffldrInstances(svc.instances)
if err != nil {
delete(hc.svcs, svcKey)
// svcsGauge.Dec()
}
return err
}
// Update(svcKey string, spec vcl.Spec, ingsMeta map[string]Meta, vcfgMeta Meta,
// bcfgMeta map[string]Meta) error
// Update a Varnish Service to implement an configuration.
//
// svcKey: namespace/name key for the Service
// spec: VCL spec corresponding to the configuration
// ingsMeta: Ingress meta-data
// vcfgMeta: VarnishConfig meta-data
// bcfgMeta: BackendConfig meta-data
//
// - if vc.svcs[key] does not exist, create it
// - call updateVarnishSvc(svcKey)
func (hc *Controller) Update(svcKey string, spec Spec) error {
svc, exists := hc.svcs[svcKey]
if !exists {
svc = &offldrSvc{instances: make([]*haproxyInst, 0)}
hc.svcs[svcKey] = svc
// svcsGauge.Inc()
hc.log.Infof("Added offloader service definition %s", svcKey)
}
svc.spec = &spec
if len(svc.instances) == 0 {
return fmt.Errorf("Currently no known offloader endpoints for "+
"Service %s", svcKey)
}
return hc.updateOffldSvc(svcKey)
}
// SetNotReady(svcKey string) error
// SetNotReady may be called on the Delete event on an Ingress, if no
// Ingresses remain that are to be implemented by a Varnish Service.
// The Service is set to the not ready state, by relabelling VCL so
// that readiness checks are not answered with status 200.
// HasConfig(svcKey string, spec vcl.Spec, ingsMeta map[string]Meta,
// vcfgMeta Meta,bcfgMeta map[string]Meta)
// HasConfig returns true iff a configuration is already loaded for a
// Varnish Service (so a new sync attempt is not necessary).
//
// svcKey: namespace/name key for the Varnish Service
// spec: VCL specification derived from the configuration
// ingsMeta: Ingress meta-data
// vcfgMeta: VarnishConfig meta-data
// bcfgMeta: BackendConfig meta-data
// SetAdmSecret(key string, secret []byte)
// SetAdmSecret stores the Secret data identified by the
// namespace/name key.
func (hc *Controller) SetDataplaneSecret(key string, secret []byte) {
_, exists := hc.secrets[key]
if !exists {
s := string(secret)
hc.secrets[key] = &s
// secretsGauge.Inc()
return
}
*hc.secrets[key] = string(secret)
}
// UpdateSvcForSecret(svcKey, secretKey string) error
// UpdateSvcForSecret associates the Secret identified by the
// namespace/name secretKey with the Varnish Service identified by the
// namespace/name svcKey. The Service is newly synced if necessary.
// DeleteAdmSecret(name string)
// DeleteAdmSecret removes the secret identified by the namespace/name
// key.
func (hc *Controller) DeleteDataplaneSecret(name string) {
_, exists := hc.secrets[name]
if !exists {
return
}
delete(hc.secrets, name)
// secretsGauge.Dec()
}
// Quit stops the offloader controller.
func (hc *Controller) Quit() {
hc.log.Info("Wait for admin interactions with offloader instances to " +
"finish")
hc.wg.Wait()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment