kube-proxy: add NodeTopologyConfig for tracking topology labels

This simplifies how the proxier receives update for change in node
labels. Instead of passing the complete Node object we just pass
the proxy relevant topology labels extracted from the complete list
of labels, and the downstream event handlers will only be notified
when there are changes in topology labels.

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora
2025-05-07 21:45:59 +05:30
parent 9822e51403
commit af7abde0e5
11 changed files with 271 additions and 266 deletions

View File

@@ -607,7 +607,9 @@ func (s *ProxyServer) Run(ctx context.Context) error {
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
HealthServer: s.HealthzServer,
})
nodeConfig.RegisterEventHandler(s.Proxier)
nodeTopologyConfig := config.NewNodeTopologyConfig(ctx, currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
nodeTopologyConfig.RegisterEventHandler(s.Proxier)
go nodeConfig.Run(wait.NeverStop)

View File

@@ -19,6 +19,7 @@ package config
import (
"context"
"fmt"
"reflect"
"sync"
"time"
@@ -273,24 +274,6 @@ type NodeHandler interface {
OnNodeSynced()
}
// NoopNodeHandler is a noop handler for proxiers that have not yet
// implemented a full NodeHandler.
type NoopNodeHandler struct{}
// OnNodeAdd is a noop handler for Node creates.
func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
// OnNodeUpdate is a noop handler for Node updates.
func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
// OnNodeDelete is a noop handler for Node deletes.
func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
// OnNodeSynced is a noop handler for Node syncs.
func (*NoopNodeHandler) OnNodeSynced() {}
var _ NodeHandler = &NoopNodeHandler{}
// NodeConfig tracks a set of node configurations.
// It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
type NodeConfig struct {
@@ -483,3 +466,86 @@ func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) {
c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList())
}
}
// NodeTopologyHandler is an abstract interface for objects which receive
// notifications about changes in proxy relevant node topology labels.
type NodeTopologyHandler interface {
// OnTopologyChange is called whenever a change is observed in proxy
// relevant node topology labels, and provides the observed change.
OnTopologyChange(topologyLabels map[string]string)
}
// NodeTopologyConfig tracks node topology labels.
type NodeTopologyConfig struct {
listerSynced cache.InformerSynced
eventHandlers []NodeTopologyHandler
topologyLabels map[string]string
logger klog.Logger
}
// NewNodeTopologyConfig creates a new NodeTopologyConfig.
func NewNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeTopologyConfig {
return newNodeTopologyConfig(ctx, nodeInformer, resyncPeriod, nil)
}
// newNodeTopologyConfig implements NewNodeTopologyConfig by additionally consuming a callback function which is invoked when
// event handler completes processing and is only used for testing.
func newNodeTopologyConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration, callback func()) *NodeTopologyConfig {
result := &NodeTopologyConfig{
logger: klog.FromContext(ctx),
topologyLabels: make(map[string]string),
}
handlerRegistration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
result.handleNodeEvent(obj)
if callback != nil {
callback()
}
},
UpdateFunc: func(_, newObj interface{}) {
result.handleNodeEvent(newObj)
if callback != nil {
callback()
}
},
DeleteFunc: func(_ interface{}) {},
},
resyncPeriod,
)
result.listerSynced = handlerRegistration.HasSynced
return result
}
// RegisterEventHandler registers a handler which is called on Node object change.
func (n *NodeTopologyConfig) RegisterEventHandler(handler NodeTopologyHandler) {
n.eventHandlers = append(n.eventHandlers, handler)
}
// handleNodeEvent is a helper function to handle Add, Update and Delete
// events on Node objects and call downstream event handlers.
func (n *NodeTopologyConfig) handleNodeEvent(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
topologyLabels := make(map[string]string)
if _, ok = node.Labels[v1.LabelTopologyZone]; ok {
topologyLabels[v1.LabelTopologyZone] = node.Labels[v1.LabelTopologyZone]
}
// skip calling event handlers when no change in topology labels
if reflect.DeepEqual(n.topologyLabels, topologyLabels) {
return
}
n.topologyLabels = topologyLabels
for i := range n.eventHandlers {
n.logger.V(4).Info("Calling handler.OnTopologyChange")
n.eventHandlers[i].OnTopologyChange(n.topologyLabels)
}
}

View File

@@ -17,12 +17,15 @@ limitations under the License.
package config
import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -457,3 +460,137 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
// Currently this module has a circular dependency with config, and so it's
// named config_test, which means even test methods need to be public. This
// is refactoring that we can avoid by resolving the dependency.
type nodeTopologyHandlerMock struct {
topologyLabels map[string]string
}
func (n *nodeTopologyHandlerMock) OnTopologyChange(topologyLabels map[string]string) {
n.topologyLabels = topologyLabels
}
// waitForInvocation waits for event handler to complete processing of the invocation.
func waitForInvocation(invoked <-chan struct{}) error {
for {
select {
// unit tests will hard timeout in 5m with a stack trace, prevent that
// and surface a clearer reason for failure.
case <-time.After(wait.ForeverTestTimeout):
return fmt.Errorf("timed out waiting for event handler to process update")
case <-invoked:
return nil
}
}
}
func TestNewNodeTopologyConfig(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("nodes", ktesting.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
nodeInformer := sharedInformers.Core().V1().Nodes()
invoked := make(chan struct{})
config := newNodeTopologyConfig(ctx, nodeInformer, time.Minute, func() {
// The callback is invoked after the event has been processed by the
// handlers. For this unit test, we write to the channel here and wait
// for it in waitForInvocation() which is called before doing assertions.
invoked <- struct{}{}
})
handler := &nodeTopologyHandlerMock{
topologyLabels: make(map[string]string),
}
config.RegisterEventHandler(handler)
sharedInformers.Start(stopCh)
testNodeName := "test-node"
// add non-topology labels, handle should receive no notification
fakeWatch.Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
Labels: map[string]string{
v1.LabelInstanceType: "m4.large",
v1.LabelOSStable: "linux",
},
},
})
err := waitForInvocation(invoked)
require.NoError(t, err)
require.Empty(t, handler.topologyLabels)
// add topology label not relevant to kube-proxy, handle should receive no notification
fakeWatch.Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
Labels: map[string]string{
v1.LabelInstanceType: "m4.large",
v1.LabelOSStable: "linux",
v1.LabelTopologyRegion: "us-east-1",
},
},
})
err = waitForInvocation(invoked)
require.NoError(t, err)
require.Empty(t, handler.topologyLabels)
// add relevant zone topology label, handle should receive notification
fakeWatch.Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
Labels: map[string]string{
v1.LabelInstanceType: "c6.large",
v1.LabelOSStable: "windows",
v1.LabelTopologyZone: "us-west-2a",
},
},
})
err = waitForInvocation(invoked)
require.NoError(t, err)
require.Len(t, handler.topologyLabels, 1)
require.Equal(t, map[string]string{
v1.LabelTopologyZone: "us-west-2a",
}, handler.topologyLabels)
// add region topology label, handle should not receive notification
// because kube-proxy doesn't do any region-based topology.
fakeWatch.Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
Labels: map[string]string{
v1.LabelInstanceType: "m3.medium",
v1.LabelOSStable: "windows",
v1.LabelTopologyRegion: "us-east-1",
v1.LabelTopologyZone: "us-east-1b",
},
},
})
err = waitForInvocation(invoked)
require.NoError(t, err)
require.Len(t, handler.topologyLabels, 1)
require.Equal(t, map[string]string{
v1.LabelTopologyZone: "us-east-1b",
}, handler.topologyLabels)
// update non-topology label, handle should not receive notification
fakeWatch.Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
Labels: map[string]string{
v1.LabelInstanceType: "m3.large",
v1.LabelOSStable: "windows",
v1.LabelTopologyRegion: "us-east-1",
v1.LabelTopologyZone: "us-east-1b",
},
},
})
err = waitForInvocation(invoked)
require.NoError(t, err)
require.Len(t, handler.topologyLabels, 1)
}

View File

@@ -26,7 +26,6 @@ import (
"encoding/base32"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
@@ -143,10 +142,10 @@ type Proxier struct {
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
topologyLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
// updating iptables with some partial data after kube-proxy restart.
@@ -623,78 +622,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.syncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change.
func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.topologyLabels = topologyLabels
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels)
proxier.Sync()
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
// OnNodeDelete is called whenever deletion of an existing node
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.Sync()
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnNodeSynced() {
}
// OnServiceCIDRsChanged is called whenever a change is observed
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
@@ -998,7 +935,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels)
// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName

View File

@@ -27,7 +27,6 @@ import (
"io"
"net"
"os/exec"
"reflect"
"strconv"
"strings"
"sync"
@@ -171,10 +170,10 @@ type Proxier struct {
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
topologyLabels map[string]string
// initialSync is a bool indicating if the proxier is syncing for the first time.
// It is set to true when a new proxier is initialized and then set to false on all
// future syncs.
@@ -850,72 +849,15 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.syncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change.
func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.topologyLabels = topologyLabels
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels)
proxier.Sync()
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
// OnNodeDelete is called whenever deletion of an existing node
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.mu.Unlock()
proxier.Sync()
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnNodeSynced() {
}
// OnServiceCIDRsChanged is called whenever a change is observed
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
@@ -1871,7 +1813,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !ok {
proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.topologyLabels)
if onlyNodeLocalEndpoints {
if len(localEndpoints) > 0 {
endpoints = localEndpoints

View File

@@ -30,7 +30,6 @@ import (
"k8s.io/client-go/tools/events"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/utils/ptr"
)
@@ -38,9 +37,7 @@ type HollowProxy struct {
ProxyServer *proxyapp.ProxyServer
}
type FakeProxier struct {
proxyconfig.NoopNodeHandler
}
type FakeProxier struct{}
func (*FakeProxier) Sync() {}
func (*FakeProxier) SyncLoop() {
@@ -55,6 +52,7 @@ func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointS
func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {}
func (*FakeProxier) OnEndpointSlicesSynced() {}
func (*FakeProxier) OnServiceCIDRsChanged(_ []string) {}
func (*FakeProxier) OnTopologyChange(_ map[string]string) {}
func NewHollowProxy(
nodeName string,

View File

@@ -128,32 +128,10 @@ func (proxier *metaProxier) OnEndpointSlicesSynced() {
proxier.ipv6Proxier.OnEndpointSlicesSynced()
}
// OnNodeAdd is called whenever creation of new node object is observed.
func (proxier *metaProxier) OnNodeAdd(node *v1.Node) {
proxier.ipv4Proxier.OnNodeAdd(node)
proxier.ipv6Proxier.OnNodeAdd(node)
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *metaProxier) OnNodeUpdate(oldNode, node *v1.Node) {
proxier.ipv4Proxier.OnNodeUpdate(oldNode, node)
proxier.ipv6Proxier.OnNodeUpdate(oldNode, node)
}
// OnNodeDelete is called whenever deletion of an existing node
// object is observed.
func (proxier *metaProxier) OnNodeDelete(node *v1.Node) {
proxier.ipv4Proxier.OnNodeDelete(node)
proxier.ipv6Proxier.OnNodeDelete(node)
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *metaProxier) OnNodeSynced() {
proxier.ipv4Proxier.OnNodeSynced()
proxier.ipv6Proxier.OnNodeSynced()
// OnTopologyChange is called whenever change in proxy relevant topology labels is observed.
func (proxier *metaProxier) OnTopologyChange(topologyLabels map[string]string) {
proxier.ipv4Proxier.OnTopologyChange(topologyLabels)
proxier.ipv6Proxier.OnTopologyChange(topologyLabels)
}
// OnServiceCIDRsChanged is called whenever a change is observed

View File

@@ -27,7 +27,6 @@ import (
"net"
"os"
"os/exec"
"reflect"
"strconv"
"strings"
"sync"
@@ -151,10 +150,10 @@ type Proxier struct {
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
topologyLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
// updating nftables with some partial data after kube-proxy restart.
@@ -841,78 +840,16 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.syncProxyRules()
}
// OnNodeAdd is called whenever creation of new node object
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
// OnTopologyChange is called whenever this node's proxy relevant topology-related labels change.
func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.topologyLabels = topologyLabels
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node topology labels", "labels", topologyLabels)
proxier.Sync()
}
// OnNodeUpdate is called whenever modification of an existing
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
if reflect.DeepEqual(proxier.nodeLabels, node.Labels) {
return
}
proxier.mu.Lock()
proxier.nodeLabels = map[string]string{}
for k, v := range node.Labels {
proxier.nodeLabels[k] = v
}
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
// OnNodeDelete is called whenever deletion of an existing node
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.nodeName {
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.nodeName)
return
}
proxier.mu.Lock()
proxier.nodeLabels = nil
proxier.needFullSync = true
proxier.mu.Unlock()
proxier.Sync()
}
// OnNodeSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnNodeSynced() {
}
// OnServiceCIDRsChanged is called whenever a change is observed
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) {
@@ -1312,7 +1249,7 @@ func (proxier *Proxier) syncProxyRules() (retryError error) {
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.topologyLabels)
// skipServiceUpdate is used for all service-related chains and their elements.
// If no changes were done to the service or its endpoints, these objects may be skipped.

View File

@@ -41,7 +41,11 @@ import (
// "Usable endpoints" means Ready endpoints by default, but will fall back to
// Serving-Terminating endpoints (independently for Cluster and Local) if no Ready
// endpoints are available.
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
//
// Note: NodeTopologyConfig.handleNodeEvent (pkg/proxy/config) filters topology labels
// before notifying proxiers. If you modify the logic over here to watch other endpoint
// types or labels, ensure the filtering logic in NodeTopologyConfig is updated accordingly.
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, topologyLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
if len(endpoints) == 0 {
// If there are no endpoints, we have nothing to categorize
return
@@ -51,7 +55,7 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName str
var useServingTerminatingEndpoints bool
if svcInfo.UsesClusterEndpoints() {
zone := nodeLabels[v1.LabelTopologyZone]
zone := topologyLabels[v1.LabelTopologyZone]
topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone)
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
if !ep.IsReady() {

View File

@@ -28,7 +28,7 @@ import (
type Provider interface {
config.EndpointSliceHandler
config.ServiceHandler
config.NodeHandler
config.NodeTopologyHandler
config.ServiceCIDRHandler
// Sync immediately synchronizes the Provider's current state to proxy rules.

View File

@@ -44,7 +44,6 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
@@ -641,8 +640,6 @@ type endPointsReferenceCountMap map[string]*uint16
type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily
// TODO(imroc): implement node handler for winkernel proxier.
proxyconfig.NoopNodeHandler
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since policies were synced. For a single object,
@@ -1098,6 +1095,13 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
// TODO(imroc): implement OnTopologyChanged for winkernel proxier.
// OnTopologyChange is called whenever node topology labels are changed.
// The informer is tweaked to listen for updates of the node where this
// instance of kube-proxy is running, this guarantees the changed labels
// are for this node.
func (proxier *Proxier) OnTopologyChange(topologyLabels map[string]string) {}
func (proxier *Proxier) cleanupAllPolicies() {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*serviceInfo)