Commit 3f307b14 authored by Geoff Simmons's avatar Geoff Simmons

Refactor querying for addresses of nodes in a self-sharding cluster.

This had been done by querying Pods. It's enough just to query
Endpoints.
parent ca3b99d8
......@@ -277,68 +277,53 @@ func (worker *NamespaceWorker) configSharding(spec *vcl.Spec,
svc.Namespace, svc.Name)
endps, err := worker.getAllServiceEndpoints(svc)
if endps == nil || len(endps) == 0 || err != nil {
if endps == nil || len(endps) == 0 {
return update.MakeIncomplete(
"could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
}
if err != nil {
return IncompleteIfNotFound(
err, "Error getting endpoints for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
if endps == nil || len(endps) == 0 {
return update.MakeIncomplete(
"could not find endpoints for service: %s/%s",
svc.Namespace, svc.Name)
}
worker.log.Tracef("Endpoints for shard configuration: %+v", endps)
var nAddrs int
var httpPort int32
// Populate spec.ShardCluster.Nodes with Pod names and the http endpoint
for _, eps := range endps {
for _, endpSubset := range eps.Subsets {
nAddrs += len(endpSubset.Addresses)
nAddrs += len(endpSubset.NotReadyAddresses)
httpPort := int32(0)
for _, port := range endpSubset.Ports {
if httpPort == 0 && port.Name == "http" {
if port.Name == "http" {
httpPort = port.Port
break
}
}
if httpPort == 0 {
continue
}
addresses := append(endpSubset.Addresses,
endpSubset.NotReadyAddresses...)
for _, addr := range addresses {
node := vcl.Service{
Addresses: make([]vcl.Address, 1),
}
ns, name := getTargetPod(addr)
if ns != "" && name != "" {
node.Name = ns + "_" + name
}
node.Addresses[0].IP = addr.IP
node.Addresses[0].Port = httpPort
spec.ShardCluster.Nodes =
append(spec.ShardCluster.Nodes, node)
}
}
}
if httpPort == 0 {
return update.MakeFatal(
"No http port found in the endpoints for service %s/%s",
svc.Namespace, svc.Name)
}
if nAddrs <= 1 {
if len(spec.ShardCluster.Nodes) <= 1 {
return update.MakeFatal(
"Sharding requested, but %d endpoint addresses found "+
"for service %s/%s", nAddrs, svc.Namespace,
svc.Name)
}
pods, err := worker.getPods(svc)
if err != nil {
return IncompleteIfNotFound(err,
"Error getting pod information for service %s/%s: %v",
svc.Namespace, svc.Name, err)
}
if len(pods.Items) <= 1 {
return update.MakeFatal(
"Sharding requested, but %d pods found for service "+
"%s/%s", len(pods.Items), svc.Namespace,
svc.Name)
}
worker.log.Tracef("Pods for shard configuration: %+v", pods.Items)
// Populate spec.ShardCluster.Nodes with Pod names and the http endpoint
for _, pod := range pods.Items {
node := vcl.Service{Addresses: make([]vcl.Address, 1)}
if pod.Spec.Hostname != "" {
node.Name = pod.Spec.Hostname
} else {
node.Name = pod.Name
}
node.Addresses[0].IP = pod.Status.PodIP
node.Addresses[0].Port = httpPort
spec.ShardCluster.Nodes = append(spec.ShardCluster.Nodes, node)
"for service %s/%s",
len(spec.ShardCluster.Nodes), svc.Namespace, svc.Name)
}
worker.log.Tracef("Node configuration for self-sharding in Service "+
"%s/%s: %+v", svc.Namespace, svc.Name, spec.ShardCluster.Nodes)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment