Commit 694ed696 authored by Geoff Simmons's avatar Geoff Simmons

Update the implementation of self-sharding.

Since the controller now interacts with the headless Service that
defines the admin ports, it can no longer find the http port in
that Service definition. This is needed to configure the Varnishen
as backends for one another.

Search for all Services in the same namespace that define the same
selector as the admin Service (and hence are configured for the
same Pods). We then search for the http port in the Endpoints of
those Services.
parent 8af27d70
......@@ -28,7 +28,12 @@
package controller
import api_v1 "k8s.io/api/core/v1"
import (
"reflect"
api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
func (worker *NamespaceWorker) syncEndp(key string) error {
worker.log.Infof("Syncing Endpoints: %s/%s", worker.namespace, key)
......@@ -94,3 +99,27 @@ func (worker *NamespaceWorker) deleteEndp(obj interface{}) error {
}
return worker.syncEndp(endp.Name)
}
// getAllServiceEndpoints returns the Endpoints of all Services in the
// same namespace that specify the same selectors.
func (worker *NamespaceWorker) getAllServiceEndpoints(
svc *api_v1.Service,
) (epSlice []*api_v1.Endpoints, err error) {
selector := svc.Spec.Selector
nsLister := worker.listers.svc.Services(svc.Namespace)
svcs, err := nsLister.List(labels.Everything())
if err != nil {
return
}
for _, nsSvc := range svcs {
if !reflect.DeepEqual(selector, nsSvc.Spec.Selector) {
continue
}
if eps, err := worker.getServiceEndpoints(nsSvc); err != nil {
return epSlice, err
} else {
epSlice = append(epSlice, eps)
}
}
return
}
......@@ -417,9 +417,9 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
worker.log.Tracef("Set cluster shard configuration for Service %s/%s",
svc.Namespace, svc.Name)
endps, err := worker.getServiceEndpoints(svc)
if endps == nil || err != nil {
if endps == nil {
endps, err := worker.getAllServiceEndpoints(svc)
if endps == nil || len(endps) == 0 || err != nil {
if endps == nil || len(endps) == 0 {
err = fmt.Errorf("could not find endpoints for "+
"service: %s/%s", svc.Namespace, svc.Name)
}
......@@ -430,12 +430,14 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
var nAddrs int
var httpPort int32
for _, endpSubset := range endps.Subsets {
nAddrs += len(endpSubset.Addresses)
nAddrs += len(endpSubset.NotReadyAddresses)
for _, port := range endpSubset.Ports {
if httpPort == 0 && port.Name == "http" {
httpPort = port.Port
for _, eps := range endps {
for _, endpSubset := range eps.Subsets {
nAddrs += len(endpSubset.Addresses)
nAddrs += len(endpSubset.NotReadyAddresses)
for _, port := range endpSubset.Ports {
if httpPort == 0 && port.Name == "http" {
httpPort = port.Port
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment