diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index cd900f3580d..4fdc5eb8f0a 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -607,7 +607,9 @@ func (s *ProxyServer) Run(ctx context.Context) error { nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{ HealthServer: s.HealthzServer, }) - nodeConfig.RegisterEventHandler(s.Proxier) + + nodeTopologyConfig := config.NewNodeTopologyConfig(ctx, currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration) + nodeTopologyConfig.RegisterEventHandler(s.Proxier) go nodeConfig.Run(wait.NeverStop) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 1544ad5a7d7..1f24196e90c 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -19,6 +19,7 @@ package config import ( "context" "fmt" + "reflect" "sync" "time" @@ -273,24 +274,6 @@ type NodeHandler interface { OnNodeSynced() } -// NoopNodeHandler is a noop handler for proxiers that have not yet -// implemented a full NodeHandler. -type NoopNodeHandler struct{} - -// OnNodeAdd is a noop handler for Node creates. -func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {} - -// OnNodeUpdate is a noop handler for Node updates. -func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {} - -// OnNodeDelete is a noop handler for Node deletes. -func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {} - -// OnNodeSynced is a noop handler for Node syncs. -func (*NoopNodeHandler) OnNodeSynced() {} - -var _ NodeHandler = &NoopNodeHandler{} - // NodeConfig tracks a set of node configurations. // It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change. type NodeConfig struct { @@ -483,3 +466,86 @@ func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) { c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList()) } } + +// NodeTopologyHandler is an abstract interface for objects which receive +// notifications about changes in proxy relevant node topology labels. +type NodeTopologyHandler interface { + // OnTopologyChange is called whenever a change is observed in proxy + // relevant node topology labels, and provides the observed change. + OnTopologyChange(topologyLabels map[string]string) +} + +// NodeTopologyConfig tracks node topology labels. +type NodeTopologyConfig struct { + listerSynced cache.InformerSynced + eventHandlers []NodeTopologyHandler + topologyLabels map[string]string + logger klog.Logger +} + +// NewNodeTopologyConfig creates a new NodeTopologyConfig. +func NewNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeTopologyConfig { + return newNodeTopologyConfig(ctx, nodeInformer, resyncPeriod, nil) +} + +// newNodeTopologyConfig implements NewNodeTopologyConfig by additionally consuming a callback function which is invoked when +// event handler completes processing and is only used for testing. +func newNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration, callback func()) *NodeTopologyConfig { + result := &NodeTopologyConfig{ + logger: klog.FromContext(ctx), + topologyLabels: make(map[string]string), + } + + handlerRegistration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + result.handleNodeEvent(obj) + if callback != nil { + callback() + } + }, + UpdateFunc: func(_, newObj interface{}) { + result.handleNodeEvent(newObj) + if callback != nil { + callback() + } + }, + DeleteFunc: func(_ interface{}) {}, + }, + resyncPeriod, + ) + result.listerSynced = handlerRegistration.HasSynced + + return result +} + +// RegisterEventHandler registers a handler which is called on Node object change. +func (n *NodeTopologyConfig) RegisterEventHandler(handler NodeTopologyHandler) { + n.eventHandlers = append(n.eventHandlers, handler) +} + +// handleNodeEvent is a helper function to handle Add, Update and Delete +// events on Node objects and call downstream event handlers. +func (n *NodeTopologyConfig) handleNodeEvent(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + + topologyLabels := make(map[string]string) + if _, ok = node.Labels[v1.LabelTopologyZone]; ok { + topologyLabels[v1.LabelTopologyZone] = node.Labels[v1.LabelTopologyZone] + } + + // skip calling event handlers when no change in topology labels + if reflect.DeepEqual(n.topologyLabels, topologyLabels) { + return + } + + n.topologyLabels = topologyLabels + for i := range n.eventHandlers { + n.logger.V(4).Info("Calling handler.OnTopologyChange") + n.eventHandlers[i].OnTopologyChange(n.topologyLabels) + } +} diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 88802c3667a..c2f6511b345 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -17,12 +17,15 @@ limitations under the License. package config import ( + "fmt" "reflect" "sort" "sync" "testing" "time" + "github.com/stretchr/testify/require" + "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -457,3 +460,137 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { // Currently this module has a circular dependency with config, and so it's // named config_test, which means even test methods need to be public. This // is refactoring that we can avoid by resolving the dependency. + +type nodeTopologyHandlerMock struct { + topologyLabels map[string]string +} + +func (n *nodeTopologyHandlerMock) OnTopologyChange(topologyLabels map[string]string) { + n.topologyLabels = topologyLabels +} + +// waitForInvocation waits for event handler to complete processing of the invocation. +func waitForInvocation(invoked <-chan struct{}) error { + for { + select { + // unit tests will hard timeout in 5m with a stack trace, prevent that + // and surface a clearer reason for failure. + case <-time.After(wait.ForeverTestTimeout): + return fmt.Errorf("timed out waiting for event handler to process update") + case <-invoked: + return nil + } + } +} + +func TestNewNodeTopologyConfig(t *testing.T) { + _, ctx := klogtesting.NewTestContext(t) + client := fake.NewClientset() + fakeWatch := watch.NewFake() + client.PrependWatchReactor("nodes", ktesting.DefaultWatchReactor(fakeWatch, nil)) + + stopCh := make(chan struct{}) + defer close(stopCh) + + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + nodeInformer := sharedInformers.Core().V1().Nodes() + invoked := make(chan struct{}) + config := newNodeTopologyConfig(ctx, nodeInformer, time.Minute, func() { + // The callback is invoked after the event has been processed by the + // handlers. For this unit test, we write to the channel here and wait + // for it in waitForInvocation() which is called before doing assertions. + invoked <- struct{}{} + }) + + handler := &nodeTopologyHandlerMock{ + topologyLabels: make(map[string]string), + } + config.RegisterEventHandler(handler) + sharedInformers.Start(stopCh) + + testNodeName := "test-node" + + // add non-topology labels, handle should receive no notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m4.large", + v1.LabelOSStable: "linux", + }, + }, + }) + err := waitForInvocation(invoked) + require.NoError(t, err) + require.Empty(t, handler.topologyLabels) + + // add topology label not relevant to kube-proxy, handle should receive no notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m4.large", + v1.LabelOSStable: "linux", + v1.LabelTopologyRegion: "us-east-1", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Empty(t, handler.topologyLabels) + + // add relevant zone topology label, handle should receive notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "c6.large", + v1.LabelOSStable: "windows", + v1.LabelTopologyZone: "us-west-2a", + }, + }, + }) + + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) + require.Equal(t, map[string]string{ + v1.LabelTopologyZone: "us-west-2a", + }, handler.topologyLabels) + + // add region topology label, handle should not receive notification + // because kube-proxy doesn't do any region-based topology. + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m3.medium", + v1.LabelOSStable: "windows", + v1.LabelTopologyRegion: "us-east-1", + v1.LabelTopologyZone: "us-east-1b", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) + require.Equal(t, map[string]string{ + v1.LabelTopologyZone: "us-east-1b", + }, handler.topologyLabels) + + // update non-topology label, handle should not receive notification + fakeWatch.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNodeName, + Labels: map[string]string{ + v1.LabelInstanceType: "m3.large", + v1.LabelOSStable: "windows", + v1.LabelTopologyRegion: "us-east-1", + v1.LabelTopologyZone: "us-east-1b", + }, + }, + }) + err = waitForInvocation(invoked) + require.NoError(t, err) + require.Len(t, handler.topologyLabels, 1) +} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4d1b257b24c..c599c2b0015 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -26,7 +26,6 @@ import ( "encoding/base32" "fmt" "net" - "reflect" "strconv" "strings" "sync" @@ -143,10 +142,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid // updating iptables with some partial data after kube-proxy restart. @@ -623,78 +622,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.needFullSync = true proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.needFullSync = true - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.needFullSync = true - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} @@ -998,7 +935,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) // clusterPolicyChain contains the endpoints used with "Cluster" traffic policy clusterPolicyChain := svcInfo.clusterPolicyChainName diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 82846e3c1f4..8fc01b29798 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -27,7 +27,6 @@ import ( "io" "net" "os/exec" - "reflect" "strconv" "strings" "sync" @@ -171,10 +170,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // initialSync is a bool indicating if the proxier is syncing for the first time. // It is set to true when a new proxier is initialized and then set to false on all // future syncs. @@ -850,72 +849,15 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} @@ -1871,7 +1813,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) if onlyNodeLocalEndpoints { if len(localEndpoints) > 0 { endpoints = localEndpoints diff --git a/pkg/proxy/kubemark/hollow_proxy.go b/pkg/proxy/kubemark/hollow_proxy.go index 36bdcf6f6c4..34fdc94896c 100644 --- a/pkg/proxy/kubemark/hollow_proxy.go +++ b/pkg/proxy/kubemark/hollow_proxy.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/tools/events" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/utils/ptr" ) @@ -38,9 +37,7 @@ type HollowProxy struct { ProxyServer *proxyapp.ProxyServer } -type FakeProxier struct { - proxyconfig.NoopNodeHandler -} +type FakeProxier struct{} func (*FakeProxier) Sync() {} func (*FakeProxier) SyncLoop() { @@ -55,6 +52,7 @@ func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointS func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {} func (*FakeProxier) OnEndpointSlicesSynced() {} func (*FakeProxier) OnServiceCIDRsChanged(_ []string) {} +func (*FakeProxier) OnTopologyChange(_ map[string]string) {} func NewHollowProxy( nodeName string, diff --git a/pkg/proxy/metaproxier/meta_proxier.go b/pkg/proxy/metaproxier/meta_proxier.go index df56bd8f8a6..1df3e9ddb86 100644 --- a/pkg/proxy/metaproxier/meta_proxier.go +++ b/pkg/proxy/metaproxier/meta_proxier.go @@ -128,32 +128,10 @@ func (proxier *metaProxier) OnEndpointSlicesSynced() { proxier.ipv6Proxier.OnEndpointSlicesSynced() } -// OnNodeAdd is called whenever creation of new node object is observed. -func (proxier *metaProxier) OnNodeAdd(node *v1.Node) { - proxier.ipv4Proxier.OnNodeAdd(node) - proxier.ipv6Proxier.OnNodeAdd(node) -} - -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *metaProxier) OnNodeUpdate(oldNode, node *v1.Node) { - proxier.ipv4Proxier.OnNodeUpdate(oldNode, node) - proxier.ipv6Proxier.OnNodeUpdate(oldNode, node) -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *metaProxier) OnNodeDelete(node *v1.Node) { - proxier.ipv4Proxier.OnNodeDelete(node) - proxier.ipv6Proxier.OnNodeDelete(node) - -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *metaProxier) OnNodeSynced() { - proxier.ipv4Proxier.OnNodeSynced() - proxier.ipv6Proxier.OnNodeSynced() +// OnTopologyChange is called whenever change in proxy relevant topology labels is observed. +func (proxier *metaProxier) OnTopologyChange(topologyLabels map[string]string) { + proxier.ipv4Proxier.OnTopologyChange(topologyLabels) + proxier.ipv6Proxier.OnTopologyChange(topologyLabels) } // OnServiceCIDRsChanged is called whenever a change is observed diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 6b9ff52d43c..4f88ec0cf26 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -27,7 +27,6 @@ import ( "net" "os" "os/exec" - "reflect" "strconv" "strings" "sync" @@ -151,10 +150,10 @@ type Proxier struct { endpointsChanges *proxy.EndpointsChangeTracker serviceChanges *proxy.ServiceChangeTracker - mu sync.Mutex // protects the following fields - svcPortMap proxy.ServicePortMap - endpointsMap proxy.EndpointsMap - nodeLabels map[string]string + mu sync.Mutex // protects the following fields + svcPortMap proxy.ServicePortMap + endpointsMap proxy.EndpointsMap + topologyLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid // updating nftables with some partial data after kube-proxy restart. @@ -841,78 +840,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { proxier.syncProxyRules() } -// OnNodeAdd is called whenever creation of new node object -// is observed. -func (proxier *Proxier) OnNodeAdd(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - +// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) { proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } + proxier.topologyLabels = topologyLabels proxier.needFullSync = true proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - + proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels) proxier.Sync() } -// OnNodeUpdate is called whenever modification of an existing -// node object is observed. -func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - if reflect.DeepEqual(proxier.nodeLabels, node.Labels) { - return - } - - proxier.mu.Lock() - proxier.nodeLabels = map[string]string{} - for k, v := range node.Labels { - proxier.nodeLabels[k] = v - } - proxier.needFullSync = true - proxier.mu.Unlock() - proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels) - - proxier.Sync() -} - -// OnNodeDelete is called whenever deletion of an existing node -// object is observed. -func (proxier *Proxier) OnNodeDelete(node *v1.Node) { - if node.Name != proxier.nodeName { - proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", - "eventNode", node.Name, "currentNode", proxier.nodeName) - return - } - - proxier.mu.Lock() - proxier.nodeLabels = nil - proxier.needFullSync = true - proxier.mu.Unlock() - - proxier.Sync() -} - -// OnNodeSynced is called once all the initial event handlers were -// called and the state is fully propagated to local cache. -func (proxier *Proxier) OnNodeSynced() { -} - // OnServiceCIDRsChanged is called whenever a change is observed // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) { @@ -1312,7 +1249,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels) // skipServiceUpdate is used for all service-related chains and their elements. // If no changes were done to the service or its endpoints, these objects may be skipped. diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index e30b3d5233e..c8b447971ed 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -41,7 +41,11 @@ import ( // "Usable endpoints" means Ready endpoints by default, but will fall back to // Serving-Terminating endpoints (independently for Cluster and Local) if no Ready // endpoints are available. -func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { +// +// Note: NodeTopologyConfig.handleNodeEvent (pkg/proxy/config) filters topology labels +// before notifying proxiers. If you modify the logic over here to watch other endpoint +// types or labels, ensure the filtering logic in NodeTopologyConfig is updated accordingly. +func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, topologyLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { if len(endpoints) == 0 { // If there are no endpoints, we have nothing to categorize return @@ -51,7 +55,7 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName str var useServingTerminatingEndpoints bool if svcInfo.UsesClusterEndpoints() { - zone := nodeLabels[v1.LabelTopologyZone] + zone := topologyLabels[v1.LabelTopologyZone] topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone) clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { if !ep.IsReady() { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 0065da6b960..5c042c17f9e 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -28,7 +28,7 @@ import ( type Provider interface { config.EndpointSliceHandler config.ServiceHandler - config.NodeHandler + config.NodeTopologyHandler config.ServiceCIDRHandler // Sync immediately synchronizes the Provider's current state to proxy rules. diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index bd7d5ae760c..f32a5cb4fc6 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -44,7 +44,6 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -641,8 +640,6 @@ type endPointsReferenceCountMap map[string]*uint16 type Proxier struct { // ipFamily defines the IP family which this proxier is tracking. ipFamily v1.IPFamily - // TODO(imroc): implement node handler for winkernel proxier. - proxyconfig.NoopNodeHandler // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since policies were synced. For a single object, @@ -1098,6 +1095,13 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { // in any of the ServiceCIDRs, and provides complete list of service cidrs. func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {} +// TODO(imroc): implement OnTopologyChanged for winkernel proxier. +// OnTopologyChange is called whenever node topology labels are changed. +// The informer is tweaked to listen for updates of the node where this +// instance of kube-proxy is running, this guarantees the changed labels +// are for this node. +func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {} + func (proxier *Proxier) cleanupAllPolicies() { for svcName, svc := range proxier.svcPortMap { svcInfo, ok := svc.(*serviceInfo)