Merge pull request #130119 from npinaeva/nft-restart

[kube-proxy: nftables] Optimize kube-proxy restart time
This commit is contained in:
Kubernetes Prow Robot
2025-03-04 10:17:44 -08:00
committed by GitHub
2 changed files with 241 additions and 51 deletions

View File

@@ -163,6 +163,7 @@ type Proxier struct {
// updating nftables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
syncedOnce bool
lastFullSync time.Time
needFullSync bool
initialized int32
@@ -712,13 +713,13 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
})
}
// flush containers
proxier.clusterIPs.reset(tx)
proxier.serviceIPs.reset(tx)
proxier.firewallIPs.reset(tx)
proxier.noEndpointServices.reset(tx)
proxier.noEndpointNodePorts.reset(tx)
proxier.serviceNodePorts.reset(tx)
// read or flush containers
proxier.clusterIPs.readOrReset(tx, proxier.nftables, proxier.logger)
proxier.serviceIPs.readOrReset(tx, proxier.nftables, proxier.logger)
proxier.firewallIPs.readOrReset(tx, proxier.nftables, proxier.logger)
proxier.noEndpointServices.readOrReset(tx, proxier.nftables, proxier.logger)
proxier.noEndpointNodePorts.readOrReset(tx, proxier.nftables, proxier.logger)
proxier.serviceNodePorts.readOrReset(tx, proxier.nftables, proxier.logger)
}
// CleanupLeftovers removes all nftables rules and chains created by the Proxier
@@ -1082,19 +1083,30 @@ func newNFTElementStorage(containerType, containerName string) *nftElementStorag
return c
}
// reset clears the internal state and flushes the nftables map/set.
func (s *nftElementStorage) reset(tx *knftables.Transaction) {
// readOrReset updates the existing elements from the nftables map/set.
// If reading fails, it clears the internal state and flushes the nftables map/set.
func (s *nftElementStorage) readOrReset(tx *knftables.Transaction, nftables knftables.Interface, logger klog.Logger) {
clear(s.elements)
if s.containerType == "set" {
tx.Flush(&knftables.Set{
Name: s.containerName,
})
} else {
tx.Flush(&knftables.Map{
Name: s.containerName,
})
defer s.resetLeftoverKeys()
elems, err := nftables.ListElements(context.TODO(), s.containerType, s.containerName)
if err != nil && !knftables.IsNotFound(err) {
if s.containerType == "set" {
tx.Flush(&knftables.Set{
Name: s.containerName,
})
} else {
tx.Flush(&knftables.Map{
Name: s.containerName,
})
}
logger.Error(err, "Failed to list nftables elements", "containerName", s.containerName, "containerType", s.containerType)
return
}
for _, elem := range elems {
newKey := joinNFTSlice(elem.Key)
newValue := joinNFTSlice(elem.Value)
s.elements[newKey] = newValue
}
s.resetLeftoverKeys()
}
// resetLeftoverKeys is only called internally by nftElementStorage methods.
@@ -1178,6 +1190,7 @@ func (proxier *Proxier) syncProxyRules() {
doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod)
defer func() {
proxier.syncedOnce = true
metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
if !doFullSync {
metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
@@ -1252,6 +1265,26 @@ func (proxier *Proxier) syncProxyRules() {
ipvX_addr = "ipv6_addr"
}
var existingChains sets.Set[string]
existingChainsList, err := proxier.nftables.List(context.TODO(), "chain")
if err == nil {
existingChains = sets.New(existingChainsList...)
} else {
proxier.logger.Error(err, "Failed to list existing chains")
}
var existingAffinitySets sets.Set[string]
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
if err == nil {
existingAffinitySets = sets.New[string]()
for _, set := range existingSets {
if isAffinitySetName(set) {
existingAffinitySets.Insert(set)
}
}
} else {
proxier.logger.Error(err, "Failed to list existing sets")
}
// Accumulate service/endpoint chains and affinity sets to keep.
activeChains := sets.New[string]()
activeAffinitySets := sets.New[string]()
@@ -1295,7 +1328,8 @@ func (proxier *Proxier) syncProxyRules() {
// Note the endpoint chains that will be used
for _, ep := range allLocallyReachableEndpoints {
if epInfo, ok := ep.(*endpointInfo); ok {
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate)
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate ||
proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo))
// Note the affinity sets that will be used
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
activeAffinitySets.Insert(epInfo.affinitySetName)
@@ -1737,6 +1771,10 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
if proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo) {
// If the EP chain is already updated, we can skip it.
continue
}
endpointChain := epInfo.chainName
// Handle traffic that loops back to the originator with SNAT.
@@ -1776,36 +1814,26 @@ func (proxier *Proxier) syncProxyRules() {
// short amount of time later that the chain is now unreferenced. So we flush them
// now, and record the time that they become stale in staleChains so they can be
// deleted later.
existingChains, err := proxier.nftables.List(context.TODO(), "chains")
if err == nil {
for _, chain := range existingChains {
if isServiceChainName(chain) {
if !activeChains.Has(chain) {
tx.Flush(&knftables.Chain{
Name: chain,
})
proxier.staleChains[chain] = start
} else {
delete(proxier.staleChains, chain)
}
for chain := range existingChains {
if isServiceChainName(chain) {
if !activeChains.Has(chain) {
tx.Flush(&knftables.Chain{
Name: chain,
})
proxier.staleChains[chain] = start
} else {
delete(proxier.staleChains, chain)
}
}
} else if !knftables.IsNotFound(err) {
proxier.logger.Error(err, "Failed to list nftables chains: stale chains will not be deleted")
}
// OTOH, we can immediately delete any stale affinity sets
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
if err == nil {
for _, set := range existingSets {
if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
tx.Delete(&knftables.Set{
Name: set,
})
}
for set := range existingAffinitySets {
if !activeAffinitySets.Has(set) {
tx.Delete(&knftables.Set{
Name: set,
})
}
} else if !knftables.IsNotFound(err) {
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
}
proxier.clusterIPs.cleanupLeftoverKeys(tx)
@@ -1871,6 +1899,30 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// epChainSkipUpdate returns true if the EP chain doesn't need to be updated.
func (proxier *Proxier) epChainSkipUpdate(existingChains, existingAffinitySets sets.Set[string], svcInfo *servicePortInfo, epInfo *endpointInfo) bool {
if proxier.syncedOnce {
// We only skip updating EP chains during the first sync to speed up kube-proxy restart, otherwise return false.
return false
}
if existingChains == nil || existingAffinitySets == nil {
// listing existing objects failed, can't skip updating
return false
}
// EP chain can have up to 3 rules:
// - loopback masquerade rule
// - includes the endpoint IP
// - affinity rule when session affinity is set to ClusterIP
// - includes the affinity set name
// - DNAT rule
// - includes the endpoint IP + port
// EP chain name includes the endpoint IP + port => loopback and DNAT rules are pre-defined by the chain name.
// When session affinity is set to ClusterIP, the affinity set is created for local endpoints.
// Therefore, we can check that sessions affinity hasn't changed by checking if the affinity set exists.
wantAffinitySet := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
return existingChains.Has(epInfo.chainName) && wantAffinitySet == existingAffinitySets.Has(epInfo.affinitySetName)
}
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {