Commit dd400975 authored by Geoff Simmons's avatar Geoff Simmons

Update controller code for TLS offloader support.

WIP: Service deletion currently not working, and testing is otherwise
incomplete.
parent 397aa1d1
......@@ -43,6 +43,7 @@ import (
clientset "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/clientset/versioned"
vcr_informers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/informers/externalversions"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/controller"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
"github.com/sirupsen/logrus"
......@@ -159,6 +160,7 @@ func main() {
log.Fatal("Cannot initialize Varnish controller: ", err)
os.Exit(-1)
}
hController := haproxy.NewOffloaderController(log, *monIntvlF)
config, err := clientcmd.BuildConfigFromFlags(*masterURLF, *kubeconfigF)
if err != nil {
......@@ -195,15 +197,16 @@ func main() {
}
ingController, err := controller.NewIngressController(log,
*ingressClassF, kubeClient, vController, informerFactory,
vcrInformerFactory)
*ingressClassF, kubeClient, vController, hController,
informerFactory, vcrInformerFactory)
if err != nil {
log.Fatalf("Could not initialize controller: %v")
os.Exit(-1)
}
vController.EvtGenerator(ingController)
go handleTermination(log, ingController, vController)
go handleTermination(log, ingController, vController, hController)
vController.Start()
hController.Start()
informerFactory.Start(informerStop)
ingController.Run(*readyfileF, uint16(*metricsPortF))
}
......@@ -211,7 +214,8 @@ func main() {
func handleTermination(
log *logrus.Logger,
ingc *controller.IngressController,
vc *varnish.Controller) {
vc *varnish.Controller,
hc *haproxy.Controller) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
......@@ -225,6 +229,9 @@ func handleTermination(
log.Info("Shutting down informers")
informerStop <- struct{}{}
log.Info("Shutting down the haproxy controller")
hc.Quit()
log.Info("Shutting down the Varnish controller")
vc.Quit()
......
......@@ -2,9 +2,11 @@ module code.uplex.de/uplex-varnish/k8s-ingress
require (
code.uplex.de/uplex-varnish/varnishapi v0.0.0-20191205154529-31e610a4139d
github.com/go-openapi/strfmt v0.19.0
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/google/go-cmp v0.3.0
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/haproxytech/models v1.2.4
github.com/imdario/mergo v0.3.6 // indirect
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.2.0
......
......@@ -13,7 +13,13 @@ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbt
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/purell v1.1.0 h1:rmGxhojJlM0tuKtfdvliR84CFHljx9ag64t2xmVkjK4=
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf h1:eg0MeVzsP1G42dRafH3vf+al2vQIJU0YHX+1Tw87oco=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
......@@ -30,11 +36,37 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb h1:D4uzjWwKYQ5XnAvUbuvHW93esHg7F8N/OYeBBcJoTr0=
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0 h1:8JV+dzJJiK46XqGLqqLav8ZfEiJECp8jlOFhpiCdZ+0=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
github.com/go-openapi/errors v0.17.0/go.mod h1:LcZQpmvG4wyF5j4IhA73wkLFQg+QJXOQHVjmcZxhka0=
github.com/go-openapi/errors v0.19.0 h1:guf3T2lnCBKlODmERt4T9GtMWRpJOikgKGyIvi0xcb8=
github.com/go-openapi/errors v0.19.0/go.mod h1:LcZQpmvG4wyF5j4IhA73wkLFQg+QJXOQHVjmcZxhka0=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonpointer v0.17.0 h1:nH6xp8XdXHx8dqveo0ZuJBluCO2qGrPbDNZ0dwoRHP0=
github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/jsonreference v0.17.0 h1:yJW3HCkTHg7NOA+gZ83IPHzUSnUzGXhGmsdiCcMexbA=
github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I=
github.com/go-openapi/loads v0.17.0 h1:H22nMs3GDQk4SwAaFQ+jLNw+0xoFeCueawhZlv8MBYs=
github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU=
github.com/go-openapi/runtime v0.0.0-20180920151709-4f900dc2ade9 h1:zXd+rkzHwMIYVTJ/j/v8zUQ9j3Ir32gC5Dn9DzZVvCk=
github.com/go-openapi/runtime v0.0.0-20180920151709-4f900dc2ade9/go.mod h1:6v9a6LTXWQCdL8k1AO3cvqx5OtZY/Y9wKTgaoP6YRfA=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/spec v0.17.0 h1:XNvrt8FlSVP8T1WuhbAFF6QDhJc0zsoWzX4wXARhhpE=
github.com/go-openapi/spec v0.17.0/go.mod h1:XkF/MOi14NmjsfZ8VtAKf8pIlbZzyoTvZsdfssdxcBI=
github.com/go-openapi/strfmt v0.17.0/go.mod h1:P82hnJI0CXkErkXi8IKjPbNBM6lV6+5pLP5l494TcyU=
github.com/go-openapi/strfmt v0.19.0 h1:0Dn9qy1G9+UJfRU7TR8bmdGxb4uifB7HNrJjOnV0yPk=
github.com/go-openapi/strfmt v0.19.0/go.mod h1:+uW+93UVvGGq2qGaZxdDeJqSAqBqBdl+ZPMF/cC8nDY=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg=
github.com/go-openapi/swag v0.19.0 h1:Kg7Wl7LkTPlmc393QZQ/5rQadPhi7pBVEMZxyTi0Ii8=
github.com/go-openapi/swag v0.19.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/kXLo40Tg=
github.com/go-openapi/validate v0.19.0 h1:SF5vyj6PBFM6D1cw2NJIFrlS8Su2YKk6ADPPjAH70Bw=
github.com/go-openapi/validate v0.19.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
......@@ -60,6 +92,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
......@@ -67,6 +100,8 @@ github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhp
github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/haproxytech/models v1.2.4 h1:KYWMzEVVRxAPI6F64xF6oISY10TGm6VWvo/T2rXp4x4=
github.com/haproxytech/models v1.2.4/go.mod h1:UXZVErm/XN6z10sM/enmxrdeEnwo7vz1JI+a8ycEvOQ=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
......@@ -90,8 +125,12 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 h1:2gxZ0XQIU/5z3Z3bUBu+FXuk2pFbkN6tcwi/pjyaDic=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
......@@ -108,6 +147,7 @@ github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
......@@ -149,6 +189,7 @@ golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......
......@@ -36,6 +36,7 @@ import (
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
vcr_informers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/informers/externalversions"
vcr_listers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/listers/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
"github.com/sirupsen/logrus"
......@@ -126,15 +127,15 @@ func NewIngressController(
ingClass string,
kubeClient kubernetes.Interface,
vc *varnish.Controller,
hc *haproxy.Controller,
infFactory informers.SharedInformerFactory,
vcrInfFactory vcr_informers.SharedInformerFactory,
) (*IngressController, error) {
ingc := IngressController{
log: log,
client: kubeClient,
stopCh: make(chan struct{}),
vController: vc,
log: log,
client: kubeClient,
stopCh: make(chan struct{}),
}
InitMetrics()
......@@ -192,8 +193,8 @@ func NewIngressController(
Lister(),
}
ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, ingc.vController,
ingc.listers, ingc.client, ingc.recorder)
ingc.nsQs = NewNamespaceQueues(ingc.log, ingClass, vc, hc, ingc.listers,
ingc.client, ingc.recorder)
return &ingc, nil
}
......
......@@ -44,12 +44,7 @@ func (worker *NamespaceWorker) syncEndp(key string) error {
return nil
}
if eps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if eps == nil {
return fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
} else if isIngress, err := worker.isVarnishIngSvc(svc, eps); err != nil {
if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
return err
} else if isIngress {
worker.log.Infof("Endpoints changed for Varnish Ingress "+
......
......@@ -36,6 +36,7 @@ import (
"strconv"
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
......@@ -57,6 +58,22 @@ const (
defACLfailStatus = uint16(403)
)
func (worker *NamespaceWorker) filterVarnishIngSvcs(
svcs []*api_v1.Service,
) ([]*api_v1.Service, error) {
n := 0
for _, svc := range svcs {
if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
return nil, err
} else if isIngress {
svcs[n] = svc
n++
}
}
svcs = svcs[:n]
return svcs, nil
}
func (worker *NamespaceWorker) getVarnishSvcForIng(
ing *extensions.Ingress) (*api_v1.Service, error) {
......@@ -64,6 +81,22 @@ func (worker *NamespaceWorker) getVarnishSvcForIng(
if err != nil {
return nil, err
}
worker.log.Debugf("Cluster Services: %+v", svcs)
// n := 0
// for _, svc := range svcs {
// if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
// return nil, err
// } else if isIngress {
// svcs[n] = svc
// n++
// }
// }
// svcs = svcs[:n]
if svcs, err = worker.filterVarnishIngSvcs(svcs); err != nil {
return nil, err
}
worker.log.Debugf("Cluster Varnish Services for Ingress: %+v", svcs)
if varnishSvc, exists := ing.Annotations[varnishSvcKey]; exists {
worker.log.Tracef("Ingress %s/%s has annotation %s:%s",
ing.Namespace, ing.Name, varnishSvcKey, varnishSvc)
......@@ -95,6 +128,21 @@ func (worker *NamespaceWorker) getVarnishSvcForIng(
if err != nil {
return nil, err
}
worker.log.Debugf("Namespace Services: %+v", svcs)
// n = 0
// for _, svc := range svcs {
// if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
// return nil, err
// } else if isIngress {
// svcs[n] = svc
// n++
// }
// }
// svcs = svcs[:n]
if svcs, err = worker.filterVarnishIngSvcs(svcs); err != nil {
return nil, err
}
worker.log.Debugf("Namespace Varnish Services for Ingress: %+v", svcs)
if len(svcs) == 1 {
worker.log.Tracef("Exactly one Varnish Ingress Service "+
"in namespace %s: %s", worker.namespace, svcs[0])
......@@ -114,10 +162,16 @@ func (worker *NamespaceWorker) getIngsForVarnishSvc(
if err != nil {
return nil, err
}
if allVarnishSvcs, err = worker.filterVarnishIngSvcs(allVarnishSvcs); err != nil {
return nil, err
}
nsVarnishSvcs, err := worker.svc.List(varnishIngressSelector)
if err != nil {
return nil, err
}
if nsVarnishSvcs, err = worker.filterVarnishIngSvcs(nsVarnishSvcs); err != nil {
return nil, err
}
ings4Svc := make([]*extensions.Ingress, 0)
for _, ing := range ings {
if !worker.isVarnishIngress(ing) {
......@@ -319,10 +373,10 @@ func (worker *NamespaceWorker) ings2VCLSpec(
if namespace == "" {
namespace = "default"
}
if ing.Spec.TLS != nil && len(ing.Spec.TLS) > 0 {
worker.log.Warnf("TLS config currently ignored in "+
"Ingress %s/%s", namespace, ing.Name)
}
// if ing.Spec.TLS != nil && len(ing.Spec.TLS) > 0 {
// worker.log.Warnf("TLS config currently ignored in "+
// "Ingress %s/%s", namespace, ing.Name)
// }
if ing.Spec.Backend != nil {
if vclSpec.DefaultService.Name != "" {
panic("More than one Ingress default backend")
......@@ -827,6 +881,44 @@ func (worker *NamespaceWorker) configReqDisps(spec *vcl.Spec,
}
}
func (worker *NamespaceWorker) ings2OffloaderSpec(
ings []*extensions.Ingress) (haproxy.Spec, error) {
offldrSpec := haproxy.Spec{}
for _, ing := range ings {
for _, tls := range ing.Spec.TLS {
if len(tls.Hosts) > 0 {
worker.log.Warnf("Ingress %s/%s: ignoring "+
"Hosts config %v",
ing.ObjectMeta.Namespace,
ing.ObjectMeta.Name, tls.Hosts)
}
if tls.SecretName == "" {
return offldrSpec, fmt.Errorf("Ingress %s/%s: "+
"TLS without Secret not supported",
ing.ObjectMeta.Namespace,
ing.ObjectMeta.Name)
}
if offldrSpec.Name != "" {
if offldrSpec.Namespace == ing.ObjectMeta.Namespace &&
offldrSpec.Name == tls.SecretName {
continue
}
return offldrSpec, fmt.Errorf("Multiple TLS "+
"Secrets not supported: Secret %s in "+
"Ingress %s/%s, Secret %s/%s specified"+
" in another Ingress", tls.SecretName,
ing.ObjectMeta.Namespace,
ing.ObjectMeta.Name,
offldrSpec.Namespace, offldrSpec.Name)
}
offldrSpec.Name = tls.SecretName
offldrSpec.Namespace = ing.ObjectMeta.Namespace
}
}
return offldrSpec, nil
}
func (worker *NamespaceWorker) addOrUpdateIng(ing *extensions.Ingress) error {
ingKey := ing.ObjectMeta.Namespace + "/" + ing.ObjectMeta.Name
worker.log.Infof("Adding or Updating Ingress: %s", ingKey)
......@@ -850,8 +942,15 @@ func (worker *NamespaceWorker) addOrUpdateIng(ing *extensions.Ingress) error {
}
if len(ings) == 0 {
worker.log.Infof("No Ingresses to be implemented by Varnish "+
"Service %s, setting to not ready", svcKey)
return worker.vController.SetNotReady(svcKey)
"Service %s", svcKey)
worker.log.Infof("Setting Service %s to not ready", svcKey)
if err = worker.vController.SetNotReady(svcKey); err != nil {
return err
}
if worker.hController.HasOffloader(svcKey) {
return worker.hController.DeleteOffldSvc(svcKey)
}
return nil
}
ingNames := make([]string, len(ings))
......@@ -907,6 +1006,31 @@ func (worker *NamespaceWorker) addOrUpdateIng(ing *extensions.Ingress) error {
"%s/%s", svc.Namespace, svc.Name)
}
// Update any TLS offloader first. Offloaders forward their
// readiness check to the Varnish readiness check, so if we're
// adding a new service, the offloader becomes ready when
// Varnish is ready.
offldrSpec, err := worker.ings2OffloaderSpec(ings)
if err != nil {
return err
}
if offldrSpec.Name == "" {
worker.log.Infof("No TLS config found for Ingresses: %v",
ingNames)
} else {
// XXX first check if the PEM Secret is already set
if err = worker.updateCertSecret(&offldrSpec); err != nil {
return err
}
worker.log.Infof("Ingress TLS Secret %s/%s: added certificate "+
"%s", offldrSpec.Namespace, offldrSpec.Name,
offldrSpec.CertName())
// XXX check if already loaded
if err = worker.hController.Update(svcKey, offldrSpec); err != nil {
return err
}
}
ingsMeta := make(map[string]varnish.Meta)
for _, ing := range ings {
metaDatum := varnish.Meta{
......@@ -940,19 +1064,19 @@ func (worker *NamespaceWorker) addOrUpdateIng(ing *extensions.Ingress) error {
worker.log.Infof("Varnish Service %s: config already "+
"loaded: hash=%s", svcKey,
vclSpec.Canonical().DeepHash())
return nil
}
worker.log.Tracef("Update config svc=%s ingressMetaData=%+v "+
"vcfgMetaData=%+v bcfgMetaData=%+v: %+v", svcKey, ingsMeta,
vcfgMeta, bcfgMeta, vclSpec)
err = worker.vController.Update(svcKey, vclSpec, ingsMeta, vcfgMeta,
bcfgMeta)
if err != nil {
return err
} else {
worker.log.Tracef("Update config svc=%s ingressMetaData=%+v "+
"vcfgMetaData=%+v bcfgMetaData=%+v: %+v", svcKey,
ingsMeta, vcfgMeta, bcfgMeta, vclSpec)
err = worker.vController.Update(svcKey, vclSpec, ingsMeta,
vcfgMeta, bcfgMeta)
if err != nil {
return err
}
worker.log.Tracef("Updated config svc=%s ingressMetaData=%+v "+
"vcfgMetaData=%+v bcfgMetaData=%+v: %+v", svcKey,
ingsMeta, vcfgMeta, bcfgMeta, vclSpec)
}
worker.log.Tracef("Updated config svc=%s ingressMetaData=%+v "+
"vcfgMetaData=%+v bcfgMetaData=%+v: %+v", svcKey, ingsMeta,
vcfgMeta, bcfgMeta, vclSpec)
return nil
}
......
......@@ -33,12 +33,99 @@ import (
vcr_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
)
// XXX make these configurable
const (
admSecretKey = "admin"
dplaneSecretKey = "dataplaneapi"
certSecretNs = "kube-system"
certSecretName = "tls-cert"
tlsSecretType = "kubernetes.io/tls"
)
// XXX make this configurable
const admSecretKey = "admin"
// XXX client...Update(secret) returns Secret
// XXX client...Create(secret) if it's new?
func (worker *NamespaceWorker) updateCertSecret(spec *haproxy.Spec) error {
nsLister := worker.listers.secr.Secrets(spec.Namespace)
tlsSecret, err := nsLister.Get(spec.Name)
if err != nil {
return err
}
if tlsSecret.Type != tlsSecretType {
return fmt.Errorf("Ingress TLS Secret %s/%s: type is %s, "+
"should be %s",
tlsSecret.ObjectMeta.Namespace,
tlsSecret.ObjectMeta.Name, tlsSecret.Type,
tlsSecretType)
}
crt, ok := tlsSecret.Data["tls.crt"]
if !ok {
return fmt.Errorf("Ingress TLS Secret %s/%s: key tls.crt not "+
"found", tlsSecret.ObjectMeta.Namespace,
tlsSecret.ObjectMeta.Name)
}
key, ok := tlsSecret.Data["tls.key"]
if !ok {
return fmt.Errorf("Ingress TLS Secret %s/%s: key tls.key not "+
"found", tlsSecret.ObjectMeta.Namespace,
tlsSecret.ObjectMeta.Name)
}
nsLister = worker.listers.secr.Secrets(certSecretNs)
certSecret, err := nsLister.Get(certSecretName)
if err != nil {
return err
}
pem := string(crt)
if crt[len(crt)+1] != byte('\n') {
pem += "\n"
}
pem += string(key)
certName := spec.CertName()
certSecret.Data[certName] = []byte(pem)
worker.log.Infof("Updating Secret %s/%s with %s to add contents from "+
"Ingress TLS Secret %s/%s", certSecret.ObjectMeta.Namespace,
certSecret.ObjectMeta.Name, certName,
tlsSecret.ObjectMeta.Namespace, tlsSecret.ObjectMeta.Name)
_, err = worker.client.CoreV1().Secrets("kube-system").
Update(certSecret)
if err != nil {
return err
}
spec.UID = string(tlsSecret.ObjectMeta.UID)
spec.ResourceVersion = tlsSecret.ObjectMeta.ResourceVersion
return nil
}
func (worker *NamespaceWorker) deleteTLSSecret(secret *api_v1.Secret) error {
nsLister := worker.listers.secr.Secrets(certSecretNs)
certSecret, err := nsLister.Get(certSecretName)
if err != nil {
return err
}
spec := haproxy.Spec{
Namespace: secret.ObjectMeta.Namespace,
Name: secret.ObjectMeta.Name,
}
certName := spec.CertName()
delete(certSecret.Data, certName)
worker.log.Infof("Updating Secret %s/%s to delete %s and remove "+
"contents from Ingress TLS Secret %s/%s",
certSecret.ObjectMeta.Namespace, certSecret.ObjectMeta.Name,
certName, secret.ObjectMeta.Namespace, secret.ObjectMeta.Name)
_, err = worker.client.CoreV1().Secrets("kube-system").
Update(certSecret)
return err
}
func (worker *NamespaceWorker) getVarnishSvcsForSecret(
secretName string) ([]*api_v1.Service, error) {
......@@ -117,6 +204,43 @@ func (worker *NamespaceWorker) updateVarnishSvcsForSecret(
return nil
}
func (worker *NamespaceWorker) enqueueIngsForTLSSecret(
secret *api_v1.Secret) error {
var ings []*extensions.Ingress
nsIngs, err := worker.ing.List(varnishIngressSelector)
if err != nil {
return err
}
for _, ing := range nsIngs {
if !worker.isVarnishIngress(ing) {
continue
}
for _, tls := range ing.Spec.TLS {
if tls.SecretName == secret.Name {
ings = append(ings, ing)
}
}
}
if len(ings) == 0 {
worker.log.Infof("No Varnish Ingresses defined for TLS Secret "+
"%s/%s", secret.Namespace, secret.Name)
// syncCounters.WithLabelValues(worker.namespace, "Service",
// "Ignore").Inc()
return nil
}
for _, ing := range ings {
worker.log.Infof("Varnish Ingress %s/%s uses TLS Secret %s/%s,"+
" re-queueing", ing.Namespace, ing.Name,
secret.Namespace, secret.Name)
worker.queue.Add(&SyncObj{Type: Update, Obj: ing})
}
return nil
}
func (worker *NamespaceWorker) setSecret(secret *api_v1.Secret) error {
secretData, exists := secret.Data[admSecretKey]
if !exists {
......@@ -126,6 +250,12 @@ func (worker *NamespaceWorker) setSecret(secret *api_v1.Secret) error {
secretKey := secret.Namespace + "/" + secret.Name
worker.log.Tracef("Setting secret %s", secretKey)
worker.vController.SetAdmSecret(secretKey, secretData)
secretData, exists = secret.Data[dplaneSecretKey]
if !exists {
return nil
}
worker.hController.SetDataplaneSecret(secretKey, secretData)
return nil
}
......@@ -136,6 +266,10 @@ func (worker *NamespaceWorker) syncSecret(key string) error {
return err
}
if secret.Type == tlsSecretType {
return worker.enqueueIngsForTLSSecret(secret)
}
app, ok := secret.Labels[labelKey]
if !ok || app != labelVal {
worker.log.Infof("Not a Varnish secret: %s/%s",
......@@ -179,6 +313,13 @@ func (worker *NamespaceWorker) deleteSecret(obj interface{}) error {
worker.log.Warnf("Delete Secret: not found: %v", obj)
return nil
}
if secr.Type == tlsSecretType {
worker.log.Infof("Deleting TLS Secret: %s/%s", secr.Namespace,
secr.Name)
return worker.deleteTLSSecret(secr)
}
worker.log.Infof("Deleting Secret: %s/%s", secr.Namespace, secr.Name)
svcs, err := worker.getVarnishSvcsForSecret(secr.Name)
if err != nil {
......@@ -194,6 +335,7 @@ func (worker *NamespaceWorker) deleteSecret(obj interface{}) error {
secretKey := secr.Namespace + "/" + secr.Name
worker.vController.DeleteAdmSecret(secretKey)
worker.hController.DeleteDataplaneSecret(secretKey)
return worker.updateVarnishSvcsForSecret(svcs, secretKey)
}
......@@ -31,6 +31,7 @@ package controller
import (
"fmt"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish/vcl"
api_v1 "k8s.io/api/core/v1"
......@@ -39,21 +40,40 @@ import (
)
// XXX make this configurable
const admPortName = "varnishadm"
const (
admPortName = "varnishadm"
tlsPortName = "tls"
dplanePortName = "dataplane"
faccessPortName = "faccess"
)
// isVarnishIngSvc determines if a Service represents a Varnish that
// can implement Ingress, for which this controller is responsible.
// Currently the app label must point to a hardwired name.
func (worker *NamespaceWorker) isVarnishIngSvc(
svc *api_v1.Service,
endps *api_v1.Endpoints,
) (bool, error) {
if endps == nil {
panic("isVarnishIngSvc(svc, endps == nil")
}
// Currently the app label must point to a hardwired name, and a
// hardwired admin port name must be defined for one of the Endpoints.
func (worker *NamespaceWorker) isVarnishIngSvc(svc *api_v1.Service) (bool, error) {
app, exists := svc.Labels[labelKey]
isVingSvc := exists && app == labelVal
return isVingSvc, nil
if !exists || app != labelVal {
return false, nil
}
endps, err := worker.getServiceEndpoints(svc)
if err != nil {
return false, err
}
for _, subset := range endps.Subsets {
worker.log.Debugf("Service %s/%s Endpoint subset: %+v",
svc.Namespace, svc.Name, subset)
worker.log.Debugf("Service %s/%s Endpoint Ports: %+v",
svc.Namespace, svc.Name, subset.Ports)
for _, port := range subset.Ports {
worker.log.Debugf("Service %s/%s port: %+v",
svc.Namespace, svc.Name, port)
if port.Name == admPortName {
return true, nil
}
}
}
return false, nil
}
func (worker *NamespaceWorker) getIngsForSvc(
......@@ -129,19 +149,51 @@ func (worker *NamespaceWorker) isVarnishInVCLSpec(ing *extensions.Ingress) bool
return false
}
func epAddrs2VCLAddrs(
epAddrs []api_v1.EndpointAddress,
vclAddrs []vcl.Address,
admPort int32,
) []vcl.Address {
for _, epAddr := range epAddrs {
vclAddr := vcl.Address{
IP: epAddr.IP,
Port: admPort,
}
vclAddrs = append(vclAddrs, vclAddr)
}
return vclAddrs
}
func epAddrs2OffldAddrs(
epAddrs []api_v1.EndpointAddress,
offldAddrs []haproxy.OffldAddr,
dplanePort, faccessPort int32,
) []haproxy.OffldAddr {
for _, epAddr := range epAddrs {
offldAddr := haproxy.OffldAddr{
IP: epAddr.IP,
DataplanePort: dplanePort,
FaccessPort: faccessPort,
}
if epAddr.TargetRef != nil {
offldAddr.PodNamespace = epAddr.TargetRef.Namespace
offldAddr.PodName = epAddr.TargetRef.Name
}
offldAddrs = append(offldAddrs, offldAddr)
}
return offldAddrs
}
func (worker *NamespaceWorker) syncSvc(key string) error {
var addrs []vcl.Address
var offldAddrs []haproxy.OffldAddr
worker.log.Infof("Syncing Service: %s/%s", worker.namespace, key)
svc, err := worker.svc.Get(key)
if err != nil {
return err
}
if eps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if eps == nil {
return fmt.Errorf("could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
} else if isIngress, err := worker.isVarnishIngSvc(svc, eps); err != nil {
if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
return err
} else if !isIngress {
return worker.enqueueIngressForService(svc)
......@@ -237,13 +289,20 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
svc.Namespace, svc.Name)
}
// XXX hard-wired Port name
// XXX hard-wired Port names
for _, subset := range endps.Subsets {
admPort := int32(0)
admPort, dplanePort, faccessPort := int32(0), int32(0), int32(0)
hasTLS := false
for _, port := range subset.Ports {
if port.Name == admPortName {
switch port.Name {
case admPortName:
admPort = port.Port
break
case dplanePortName:
hasTLS = true
dplanePort = port.Port
case faccessPortName:
hasTLS = true
faccessPort = port.Port
}
}
if admPort == 0 {
......@@ -251,12 +310,45 @@ func (worker *NamespaceWorker) syncSvc(key string) error {
"Service %s/%s endpoint", admPortName,
svc.Namespace, svc.Name)
}
for _, address := range subset.Addresses {
addr := vcl.Address{
IP: address.IP,
Port: admPort,
}
addrs = append(addrs, addr)
addrs = epAddrs2VCLAddrs(subset.Addresses, addrs, admPort)
addrs = epAddrs2VCLAddrs(subset.NotReadyAddresses, addrs,
admPort)
if hasTLS {
offldAddrs = epAddrs2OffldAddrs(subset.Addresses,
offldAddrs, dplanePort, faccessPort)
offldAddrs = epAddrs2OffldAddrs(
subset.NotReadyAddresses, offldAddrs,
dplanePort, faccessPort)
}
// for _, address := range subset.Addresses {
// addr := vcl.Address{
// IP: address.IP,
// Port: admPort,
// }
// addrs = append(addrs, addr)
// if hasTLS {
// addr := haproxy.OffldAddr{
// IP: address.IP,
// DataplanePort: dplanePort,
// FaccessPort: faccessPort,
// }
// if address.TargetRef != nil {
// addr.PodNamespace =
// address.TargetRef.Namespace
// addr.PodName = address.TargetRef.Name
// }
// offldAddrs = append(offldAddrs, addr)
// }
// }
}
if len(offldAddrs) > 0 {
worker.log.Tracef("Varnish service %s/%s offloader addresses: "+
"%+v", svc.Namespace, svc.Name, offldAddrs)
err = worker.hController.AddOrUpdateOffloader(
svc.Namespace+"/"+svc.Name, offldAddrs,
worker.namespace+"/"+secrName)
if err != nil {
return err
}
}
worker.log.Tracef("Varnish service %s/%s addresses: %+v", svc.Namespace,
......@@ -281,24 +373,18 @@ func (worker *NamespaceWorker) deleteSvc(obj interface{}) error {
return nil
}
nsKey := svc.Namespace + "/" + svc.Name
worker.log.Info("Deleting Service: ", nsKey)
if worker.vController.HasVarnishSvc(nsKey) {
if err := worker.vController.DeleteVarnishSvc(nsKey); err != nil {
return err
}
}
if endps, err := worker.getServiceEndpoints(svc); err != nil {
return err
} else if endps == nil {
worker.log.Warnf("Service %s: Endpoints already deleted", nsKey)
return nil
} else if isIngress, err := worker.isVarnishIngSvc(svc, endps); err != nil {
worker.log.Info("Deleting Service:", nsKey)
if isIngress, err := worker.isVarnishIngSvc(svc); err != nil {
return err
} else if !isIngress {
return worker.enqueueIngressForService(svc)
}
if err := worker.vController.DeleteVarnishSvc(nsKey); err != nil {
return err
}
if worker.hController.HasOffloader(nsKey) {
return worker.hController.DeleteOffldSvc(nsKey)
}
return nil
}
......@@ -46,6 +46,7 @@ import (
ving_v1alpha1 "code.uplex.de/uplex-varnish/k8s-ingress/pkg/apis/varnishingress/v1alpha1"
vcr_listers "code.uplex.de/uplex-varnish/k8s-ingress/pkg/client/listers/varnishingress/v1alpha1"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/haproxy"
"code.uplex.de/uplex-varnish/k8s-ingress/pkg/varnish"
)
......@@ -67,6 +68,7 @@ type NamespaceWorker struct {
ingClass string
log *logrus.Logger
vController *varnish.Controller
hController *haproxy.Controller
queue workqueue.RateLimitingInterface
listers *Listers
ing ext_listers.IngressNamespaceLister
......@@ -271,6 +273,7 @@ type NamespaceQueues struct {
ingClass string
log *logrus.Logger
vController *varnish.Controller
hController *haproxy.Controller
workers map[string]*NamespaceWorker
listers *Listers
client kubernetes.Interface
......@@ -290,6 +293,7 @@ func NewNamespaceQueues(
log *logrus.Logger,
ingClass string,
vController *varnish.Controller,
hController *haproxy.Controller,
listers *Listers,
client kubernetes.Interface,
recorder record.EventRecorder) *NamespaceQueues {
......@@ -302,6 +306,7 @@ func NewNamespaceQueues(
log: log,
ingClass: ingClass,
vController: vController,
hController: hController,
workers: make(map[string]*NamespaceWorker),
listers: listers,
client: client,
......@@ -345,6 +350,7 @@ func (qs *NamespaceQueues) next() {
ingClass: qs.ingClass,
log: qs.log,
vController: qs.vController,
hController: qs.hController,
queue: q,
listers: qs.listers,
ing: qs.listers.ing.Ingresses(ns),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment