From af7abde0e55aa4643bee31e16997ea65831d9b95 Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Wed, 7 May 2025 21:45:59 +0530 Subject: [PATCH] kube-proxy: add NodeTopologyConfig for tracking topology labels This simplifies how the proxier receives update for change in node labels. Instead of passing the complete Node object we just pass the proxy relevant topology labels extracted from the complete list of labels, and the downstream event handlers will only be notified when there are changes in topology labels. Signed-off-by: Daman Arora --- cmd/kube-proxy/app/server.go | 4 +- pkg/proxy/config/config.go | 102 +++++++++++++++---- pkg/proxy/config/config_test.go | 137 ++++++++++++++++++++++++++ pkg/proxy/iptables/proxier.go | 81 ++------------- pkg/proxy/ipvs/proxier.go | 76 ++------------ pkg/proxy/kubemark/hollow_proxy.go | 6 +- pkg/proxy/metaproxier/meta_proxier.go | 30 +----- pkg/proxy/nftables/proxier.go | 81 ++------------- pkg/proxy/topology.go | 8 +- pkg/proxy/types.go | 2 +- pkg/proxy/winkernel/proxier.go | 10 +- 11 files changed, 271 insertions(+), 266 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index cd900f3580d..4fdc5eb8f0a 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -607,7 +607,9 @@ func (s *ProxyServer) Run(ctx context.Context) error { nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{ HealthServer: s.HealthzServer, }) - nodeConfig.RegisterEventHandler(s.Proxier) + + nodeTopologyConfig := config.NewNodeTopologyConfig(ctx, currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration) + nodeTopologyConfig.RegisterEventHandler(s.Proxier) go nodeConfig.Run(wait.NeverStop) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 1544ad5a7d7..1f24196e90c 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -19,6 +19,7 @@ package config import ( "context" "fmt" + "reflect" "sync" "time" @@ -273,24 +274,6 @@ type NodeHandler interface { OnNodeSynced() } -// NoopNodeHandler is a noop handler for proxiers that have not yet -// implemented a full NodeHandler. -type NoopNodeHandler struct{} - -// OnNodeAdd is a noop handler for Node creates. -func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {} - -// OnNodeUpdate is a noop handler for Node updates. -func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {} - -// OnNodeDelete is a noop handler for Node deletes. -func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {} - -// OnNodeSynced is a noop handler for Node syncs. -func (*NoopNodeHandler) OnNodeSynced() {} - -var _ NodeHandler = &NoopNodeHandler{} - // NodeConfig tracks a set of node configurations. // It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change. type NodeConfig struct { @@ -483,3 +466,86 @@ func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) { c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList()) } } + +// NodeTopologyHandler is an abstract interface for objects which receive +// notifications about changes in proxy relevant node topology labels. +type NodeTopologyHandler interface { + // OnTopologyChange is called whenever a change is observed in proxy + // relevant node topology labels, and provides the observed change. + OnTopologyChange(topologyLabels map[string]string) +} + +// NodeTopologyConfig tracks node topology labels. +type NodeTopologyConfig struct { + listerSynced cache.InformerSynced + eventHandlers []NodeTopologyHandler + topologyLabels map[string]string + logger klog.Logger +} + +// NewNodeTopologyConfig creates a new NodeTopologyConfig. +func NewNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeTopologyConfig { + return newNodeTopologyConfig(ctx, nodeInformer, resyncPeriod, nil) +} + +// newNodeTopologyConfig implements NewNodeTopologyConfig by additionally consuming a callback function which is invoked when +// event handler completes processing and is only used for testing. +func newNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration, callback func()) *NodeTopologyConfig { + result := &NodeTopologyConfig{ + logger: klog.FromContext(ctx), + topologyLabels: make(map[string]string), + } + + handlerRegistration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + result.handleNodeEvent(obj) + if callback != nil { + callback() + } + }, + UpdateFunc: func(_, newObj interface{}) { + result.handleNodeEvent(newObj) + if callback != nil { + callback() + } + }, + DeleteFunc: func(_ interface{}) {}, + }, + resyncPeriod, + ) + result.listerSynced = handlerRegistration.HasSynced + + return result +} + +// RegisterEventHandler registers a handler which is called on Node object change. +func (n *NodeTopologyConfig) RegisterEventHandler(handler NodeTopologyHandler) { + n.eventHandlers = append(n.eventHandlers, handler) +} + +// handleNodeEvent is a helper function to handle Add, Update and Delete +// events on Node objects and call downstream event handlers. +func (n *NodeTopologyConfig) handleNodeEvent(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + + topologyLabels := make(map[string]string) + if _, ok = node.Labels[v1.LabelTopologyZone]; ok { + topologyLabels[v1.LabelTopologyZone] = node.Labels[v1.LabelTopologyZone] + } + + // skip calling event handlers when no change in topology labels + if reflect.DeepEqual(n.topologyLabels, topologyLabels) { + return + } + + n.topologyLabels = topologyLabels + for i := range n.eventHandlers { + n.logger.V(4).Info("Calling handler.OnTopologyChange") + n.eventHandlers[i].OnTopologyChange(n.topologyLabels) + } +} diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 88802c3667a..c2f6511b345 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -17,12 +17,15 @@ limitations under the License. package config import ( + "fmt" "reflect" "sort" "sync" "testing" "time" + "github.com/stretchr/testify/require" + "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -457,3 +460,137 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { // Currently this module has a circular dependency with config, and so it's // named config_test, which means even test methods need to be public. This // is refactoring that we can avoid by resolving the dependency. + +type nodeTopologyHandlerMock struct { + topologyLabels map[string]string +} + +func (n *nodeTopologyHandlerMock) OnTopologyChange(topologyLabels map[string]string) { + n.topologyLabels = topologyLabels +} + +// waitForInvocation waits for event handler to complete processing of the invocation. +func waitForInvocation(invoked <-chan struct{}) error { + for { + select { + // unit tests will hard timeout in 5m with a stack trace, prevent that + // and surface a clearer reason for failure. + case <-time.After(wait.ForeverTestTimeout): + return fmt.Errorf("timed out waiting for event handler to process update") + case <-invoked: + return nil + } + } +} + +func TestNewNodeTopologyConfig(t *testing.T) { + _, ctx := klogtesting.NewTestContext(t) + client := fake.NewClientset() + fakeWatch := watch.NewFake() + client.PrependWatchReactor("nodes", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + stopCh := make(chan struct{}) + defer close(stopCh) + + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + nodeInformer := sharedInformers.Core().V1().Nodes() + invoked := make(chan struct{}) + config := newNodeTopologyConfig(ctx, nodeInformer, time.Minute, func() { + // The callback is invoked after the event has been processed by the + // handlers. For this unit test, we write to the channel here and wait + // for it in waitForInvocation() which is called before doing assertions. + invoked <- struct{}{} + }) + + handler := &nodeTopologyHandlerMock{ + topologyLabels: make(map[string]string), + } + config.RegisterEventHandler(handler) + sharedInformers.Start(stopCh) + + testNodeName := "test-node" + + // add non-topology labels, handle should receive no notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m4.large", + v1.LabelOSStable: "linux", + }, + }, + }) + err := waitForInvocation(invoked) + require.NoError(t, err) + require.Empty(t, handler.topologyLabels) + + // add topology label not relevant to kube-proxy, handle should receive no notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m4.large", + v1.LabelOSStable: "linux", + v1.LabelTopologyRegion: "us-east-1", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Empty(t, handler.topologyLabels) + + // add relevant zone topology label, handle should receive notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "c6.large", + v1.LabelOSStable: "windows", + v1.LabelTopologyZone: "us-west-2a", + }, + }, + }) + + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) + require.Equal(t, map[string]string{ + v1.LabelTopologyZone: "us-west-2a", + }, handler.topologyLabels) + + // add region topology label, handle should not receive notification + // because kube-proxy doesn't do any region-based topology. + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m3.medium", + v1.LabelOSStable: "windows", + v1.LabelTopologyRegion: "us-east-1", + v1.LabelTopologyZone: "us-east-1b", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) + require.Equal(t, map[string]string{ + v1.LabelTopologyZone: "us-east-1b", + }, handler.topologyLabels) + + // update non-topology label, handle should not receive notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m3.large", + v1.LabelOSStable: "windows", + v1.LabelTopologyRegion: "us-east-1", + v1.LabelTopologyZone: "us-east-1b", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) +} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4d1b257b24c..c599c2b0015 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -26,7 +26,6 @@ import ( "encoding/base32" "fmt" "net" - "reflect" "strconv" "strings" "sync" @@ -143,10 +142,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid // updating iptables with some partial data after kube-proxy restart. @@ -623,78 +622,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.needFullSync = true proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.needFullSync = true - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.needFullSync = true - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} @@ -998,7 +935,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) // clusterPolicyChain contains the endpoints used with "Cluster" traffic policy clusterPolicyChain := svcInfo.clusterPolicyChainName diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 82846e3c1f4..8fc01b29798 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -27,7 +27,6 @@ import ( "io" "net" "os/exec" - "reflect" "strconv" "strings" "sync" @@ -171,10 +170,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // initialSync is a bool indicating if the proxier is syncing for the first time. // It is set to true when a new proxier is initialized and then set to false on all // future syncs. @@ -850,72 +849,15 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} @@ -1871,7 +1813,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) if onlyNodeLocalEndpoints { if len(localEndpoints) > 0 { endpoints = localEndpoints diff --git a/pkg/proxy/kubemark/hollow_proxy.go b/pkg/proxy/kubemark/hollow_proxy.go index 36bdcf6f6c4..34fdc94896c 100644 --- a/pkg/proxy/kubemark/hollow_proxy.go +++ b/pkg/proxy/kubemark/hollow_proxy.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/tools/events" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/utils/ptr" ) @@ -38,9 +37,7 @@ type HollowProxy struct { ProxyServer *proxyapp.ProxyServer } -type FakeProxier struct { - proxyconfig.NoopNodeHandler -} +type FakeProxier struct{} func (*FakeProxier) Sync() {} func (*FakeProxier) SyncLoop() { @@ -55,6 +52,7 @@ func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointS func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {} func (*FakeProxier) OnEndpointSlicesSynced() {} func (*FakeProxier) OnServiceCIDRsChanged(_ []string) {} +func (*FakeProxier) OnTopologyChange(_ map[string]string) {} func NewHollowProxy( nodeName string, diff --git a/pkg/proxy/metaproxier/meta_proxier.go b/pkg/proxy/metaproxier/meta_proxier.go index df56bd8f8a6..1df3e9ddb86 100644 --- a/pkg/proxy/metaproxier/meta_proxier.go +++ b/pkg/proxy/metaproxier/meta_proxier.go @@ -128,32 +128,10 @@ func (proxier *metaProxier) OnEndpointSlicesSynced() { proxier.ipv6Proxier.OnEndpointSlicesSynced() } -// OnNodeAdd is called whenever creation of new node object is observed. -func (proxier *metaProxier) OnNodeAdd(node *v1.Node) { - proxier.ipv4Proxier.OnNodeAdd(node) - proxier.ipv6Proxier.OnNodeAdd(node) -} - -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *metaProxier) OnNodeUpdate(oldNode, node *v1.Node) { - proxier.ipv4Proxier.OnNodeUpdate(oldNode, node) - proxier.ipv6Proxier.OnNodeUpdate(oldNode, node) -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *metaProxier) OnNodeDelete(node *v1.Node) { - proxier.ipv4Proxier.OnNodeDelete(node) - proxier.ipv6Proxier.OnNodeDelete(node) - -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *metaProxier) OnNodeSynced() { - proxier.ipv4Proxier.OnNodeSynced() - proxier.ipv6Proxier.OnNodeSynced() +// OnTopologyChange is called whenever change in proxy relevant topology labels is observed. +func (proxier *metaProxier) OnTopologyChange(topologyLabels map[string]string) { + proxier.ipv4Proxier.OnTopologyChange(topologyLabels) + proxier.ipv6Proxier.OnTopologyChange(topologyLabels) } // OnServiceCIDRsChanged is called whenever a change is observed diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 6b9ff52d43c..4f88ec0cf26 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -27,7 +27,6 @@ import ( "net" "os" "os/exec" - "reflect" "strconv" "strings" "sync" @@ -151,10 +150,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid // updating nftables with some partial data after kube-proxy restart. @@ -841,78 +840,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.needFullSync = true proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.needFullSync = true - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.needFullSync = true - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) { @@ -1312,7 +1249,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) // skipServiceUpdate is used for all service-related chains and their elements. // If no changes were done to the service or its endpoints, these objects may be skipped. diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index e30b3d5233e..c8b447971ed 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -41,7 +41,11 @@ import ( // "Usable endpoints" means Ready endpoints by default, but will fall back to // Serving-Terminating endpoints (independently for Cluster and Local) if no Ready // endpoints are available. -func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { +// +// Note: NodeTopologyConfig.handleNodeEvent (pkg/proxy/config) filters topology labels +// before notifying proxiers. If you modify the logic over here to watch other endpoint +// types or labels, ensure the filtering logic in NodeTopologyConfig is updated accordingly. +func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, topologyLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { if len(endpoints) == 0 { // If there are no endpoints, we have nothing to categorize return @@ -51,7 +55,7 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName str var useServingTerminatingEndpoints bool if svcInfo.UsesClusterEndpoints() { - zone := nodeLabels[v1.LabelTopologyZone] + zone := topologyLabels[v1.LabelTopologyZone] topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone) clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { if !ep.IsReady() { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 0065da6b960..5c042c17f9e 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -28,7 +28,7 @@ import ( type Provider interface { config.EndpointSliceHandler config.ServiceHandler - config.NodeHandler + config.NodeTopologyHandler config.ServiceCIDRHandler // Sync immediately synchronizes the Provider's current state to proxy rules. diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index bd7d5ae760c..f32a5cb4fc6 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -44,7 +44,6 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -641,8 +640,6 @@ type endPointsReferenceCountMap map[string]*uint16 type Proxier struct { // ipFamily defines the IP family which this proxier is tracking. ipFamily v1.IPFamily - // TODO(imroc): implement node handler for winkernel proxier. - proxyconfig.NoopNodeHandler // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since policies were synced. For a single object, @@ -1098,6 +1095,13 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} +// TODO(imroc): implement OnTopologyChanged for winkernel proxier. +// OnTopologyChange is called whenever node topology labels are changed. +// The informer is tweaked to listen for updates of the node where this +// instance of kube-proxy is running, this guarantees the changed labels +// are for this node. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {} + func (proxier *Proxier) cleanupAllPolicies() { for svcName, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*serviceInfo)