mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Remove watching Endpoints of Headless Services
Signed-off-by: Ricardo Pchevuzinske Katz <ricardo.katz@serpro.gov.br>
This commit is contained in:
		@@ -45,7 +45,9 @@ go_library(
 | 
			
		||||
        "//pkg/version/verflag:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -30,7 +30,9 @@ import (
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/selection"
 | 
			
		||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/server/healthz"
 | 
			
		||||
@@ -603,9 +605,22 @@ func (s *ProxyServer) Run() error {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	labelSelector := labels.NewSelector()
 | 
			
		||||
	labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
 | 
			
		||||
 | 
			
		||||
	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
 | 
			
		||||
		informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
 | 
			
		||||
			options.LabelSelector = "!" + apis.LabelServiceProxyName
 | 
			
		||||
			options.LabelSelector = labelSelector.String()
 | 
			
		||||
		}))
 | 
			
		||||
 | 
			
		||||
	// Create configs (i.e. Watches for Services and Endpoints)
 | 
			
		||||
 
 | 
			
		||||
@@ -18,6 +18,7 @@ go_library(
 | 
			
		||||
        "//pkg/api/v1/endpoints:go_default_library",
 | 
			
		||||
        "//pkg/api/v1/pod:go_default_library",
 | 
			
		||||
        "//pkg/apis/core:go_default_library",
 | 
			
		||||
        "//pkg/apis/core/v1/helper:go_default_library",
 | 
			
		||||
        "//pkg/controller:go_default_library",
 | 
			
		||||
        "//pkg/util/metrics:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -43,6 +43,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1/endpoints"
 | 
			
		||||
	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | 
			
		||||
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
			
		||||
	helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/metrics"
 | 
			
		||||
)
 | 
			
		||||
@@ -55,8 +56,8 @@ const (
 | 
			
		||||
	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
 | 
			
		||||
	maxRetries = 15
 | 
			
		||||
 | 
			
		||||
	// An annotation on the Service denoting if the endpoints controller should
 | 
			
		||||
	// go ahead and create endpoints for unready pods. This annotation is
 | 
			
		||||
	// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints
 | 
			
		||||
	// controller should go ahead and create endpoints for unready pods. This annotation is
 | 
			
		||||
	// currently only used by StatefulSets, where we need the pod to be DNS
 | 
			
		||||
	// resolvable during initialization and termination. In this situation we
 | 
			
		||||
	// create a headless Service just for the StatefulSet, and clients shouldn't
 | 
			
		||||
@@ -545,6 +546,16 @@ func (e *EndpointController) syncService(key string) error {
 | 
			
		||||
		delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if newEndpoints.Labels == nil {
 | 
			
		||||
		newEndpoints.Labels = make(map[string]string)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !helper.IsServiceIPSet(service) {
 | 
			
		||||
		newEndpoints.Labels[v1.IsHeadlessService] = ""
 | 
			
		||||
	} else {
 | 
			
		||||
		delete(newEndpoints.Labels, v1.IsHeadlessService)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
 | 
			
		||||
	if createEndpoints {
 | 
			
		||||
		// No previous endpoints, create them
 | 
			
		||||
 
 | 
			
		||||
@@ -305,6 +305,9 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -346,6 +349,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -387,6 +393,9 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -424,6 +433,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -461,6 +473,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -498,6 +513,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses:         []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -539,6 +557,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -610,6 +631,9 @@ func TestSyncEndpointsItems(t *testing.T) {
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			ResourceVersion: "",
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: endptspkg.SortSubsets(expectedSubsets),
 | 
			
		||||
	})
 | 
			
		||||
@@ -651,6 +675,8 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
 | 
			
		||||
			{Name: "port1", Port: 8088, Protocol: "TCP"},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
 | 
			
		||||
	serviceLabels[v1.IsHeadlessService] = ""
 | 
			
		||||
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			ResourceVersion: "",
 | 
			
		||||
@@ -697,6 +723,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
 | 
			
		||||
	})
 | 
			
		||||
	endpoints.syncService(ns + "/foo")
 | 
			
		||||
 | 
			
		||||
	serviceLabels[v1.IsHeadlessService] = ""
 | 
			
		||||
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
@@ -797,6 +824,9 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -837,6 +867,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{},
 | 
			
		||||
	})
 | 
			
		||||
@@ -873,6 +906,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{},
 | 
			
		||||
	})
 | 
			
		||||
@@ -909,6 +945,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{},
 | 
			
		||||
	})
 | 
			
		||||
@@ -934,6 +973,9 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
 | 
			
		||||
	data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "foo",
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -1222,6 +1264,9 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
 | 
			
		||||
			},
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -1269,6 +1314,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				v1.EndpointsLastChangeTriggerTime: triggerTimeString,
 | 
			
		||||
			},
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
@@ -1314,7 +1362,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
 | 
			
		||||
			Name:            "foo",
 | 
			
		||||
			Namespace:       ns,
 | 
			
		||||
			ResourceVersion: "1",
 | 
			
		||||
			Annotations:     map[string]string{}, // Annotation not set anymore.
 | 
			
		||||
			Labels: map[string]string{
 | 
			
		||||
				v1.IsHeadlessService: "",
 | 
			
		||||
			}, // Annotation not set anymore.
 | 
			
		||||
		},
 | 
			
		||||
		Subsets: []v1.EndpointSubset{{
 | 
			
		||||
			Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@ import (
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
 
 | 
			
		||||
@@ -33,4 +33,10 @@ const (
 | 
			
		||||
 | 
			
		||||
	// LabelNamespaceNodeRestriction is a forbidden label namespace that kubelets may not self-set when the NodeRestriction admission plugin is enabled
 | 
			
		||||
	LabelNamespaceNodeRestriction = "node-restriction.kubernetes.io"
 | 
			
		||||
 | 
			
		||||
	// IsHeadlessService is added by Controller to an Endpoint denoting if its parent
 | 
			
		||||
	// Service is Headless. The existence of this label can be used further by other
 | 
			
		||||
	// controllers and kube-proxy to check if the Endpoint objects should be replicated when
 | 
			
		||||
	// using Headless Services
 | 
			
		||||
	IsHeadlessService = "service.kubernetes.io/headless"
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user