Specify hostname, subdomain via annotation on podspec.

The hostname is a DNS A record, if the subdomain maps to a service name
in the same namespace
This commit is contained in:
Abhishek Shah
2016-02-02 10:59:54 -08:00
parent 4e00333f9b
commit a3c00aadd5
13 changed files with 261 additions and 54 deletions

View File

@@ -22,10 +22,13 @@ import (
"reflect"
"time"
"encoding/json"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
podutil "k8s.io/kubernetes/pkg/api/pod"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
@@ -37,8 +40,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
@@ -187,7 +188,8 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
oldPod := cur.(*api.Pod)
// Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAnnotationsAreEqual(newPod.Annotations, oldPod.Annotations) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
@@ -200,6 +202,17 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
}
}
func hostNameAndDomainAnnotationsAreEqual(annotation1, annotation2 map[string]string) bool {
if annotation1 == nil {
annotation1 = map[string]string{}
}
if annotation2 == nil {
annotation2 = map[string]string{}
}
return annotation1[utilpod.PodHostnameAnnotation] == annotation2[utilpod.PodHostnameAnnotation] &&
annotation1[utilpod.PodSubdomainAnnotation] == annotation2[utilpod.PodSubdomainAnnotation]
}
// When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) {
@@ -294,6 +307,8 @@ func (e *EndpointController) syncService(key string) {
}
subsets := []api.EndpointSubset{}
podHostNames := map[string]endpoints.HostRecord{}
for i := range pods.Items {
pod := &pods.Items[i]
@@ -316,14 +331,26 @@ func (e *EndpointController) syncService(key string) {
continue
}
hostname := pod.Annotations[utilpod.PodHostnameAnnotation]
if len(hostname) > 0 &&
pod.Annotations[utilpod.PodSubdomainAnnotation] == service.Name &&
service.Namespace == pod.Namespace {
hostRecord := endpoints.HostRecord{
HostName: hostname,
}
podHostNames[string(pod.Status.PodIP)] = hostRecord
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
epa := api.EndpointAddress{
IP: pod.Status.PodIP,
TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
if api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa},
@@ -356,14 +383,38 @@ func (e *EndpointController) syncService(key string) {
return
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
serializedPodHostNames := ""
if len(podHostNames) > 0 {
b, err := json.Marshal(podHostNames)
if err != nil {
glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err)
e.queue.Add(key) // Retry
return
}
serializedPodHostNames = string(b)
}
podHostNamesAreEqual := verifyPodHostNamesAreEqual(serializedPodHostNames, currentEndpoints.Annotations)
newAnnotations := make(map[string]string)
newAnnotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
reflect.DeepEqual(currentEndpoints.Labels, service.Labels) && podHostNamesAreEqual {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}
if len(serializedPodHostNames) == 0 {
delete(newEndpoints.Annotations, endpoints.PodHostnamesAnnotation)
} else {
newEndpoints.Annotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
}
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
@@ -377,6 +428,14 @@ func (e *EndpointController) syncService(key string) {
}
}
func verifyPodHostNamesAreEqual(newPodHostNames string, oldAnnotations map[string]string) bool {
oldPodHostNames := ""
if oldAnnotations != nil {
oldPodHostNames = oldAnnotations[endpoints.PodHostnamesAnnotation]
}
return oldPodHostNames == newPodHostNames
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to