mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #46201 from wojtek-t/address_kubeproxy_todos
Automatic merge from submit-queue Address remaining TODOs in kube-proxy. Followup PR from the previous two.
This commit is contained in:
		@@ -30,6 +30,7 @@ import (
 | 
				
			|||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
@@ -237,14 +238,8 @@ func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, prev
 | 
				
			|||||||
	change.current = endpointsToEndpointsMap(current, ecm.hostname)
 | 
						change.current = endpointsToEndpointsMap(current, ecm.hostname)
 | 
				
			||||||
	if reflect.DeepEqual(change.previous, change.current) {
 | 
						if reflect.DeepEqual(change.previous, change.current) {
 | 
				
			||||||
		delete(ecm.items, *namespacedName)
 | 
							delete(ecm.items, *namespacedName)
 | 
				
			||||||
		return false
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// TODO: Instead of returning true/false, we should consider returning whether
 | 
						return len(ecm.items) > 0
 | 
				
			||||||
	// the map contains some element or not. Currently, if the change is
 | 
					 | 
				
			||||||
	// "reverting" some previous endpoints update, but there are still some other
 | 
					 | 
				
			||||||
	// modified endpoints, we will return false, even though there are some change
 | 
					 | 
				
			||||||
	// to apply.
 | 
					 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newServiceChangeMap() serviceChangeMap {
 | 
					func newServiceChangeMap() serviceChangeMap {
 | 
				
			||||||
@@ -266,14 +261,8 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
 | 
				
			|||||||
	change.current = serviceToServiceMap(current)
 | 
						change.current = serviceToServiceMap(current)
 | 
				
			||||||
	if reflect.DeepEqual(change.previous, change.current) {
 | 
						if reflect.DeepEqual(change.previous, change.current) {
 | 
				
			||||||
		delete(scm.items, *namespacedName)
 | 
							delete(scm.items, *namespacedName)
 | 
				
			||||||
		return false
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// TODO: Instead of returning true/false, we should consider returning whether
 | 
						return len(scm.items) > 0
 | 
				
			||||||
	// the map contains some element or not. Currently, if the change is
 | 
					 | 
				
			||||||
	// "reverting" some previous endpoints update, but there are still some other
 | 
					 | 
				
			||||||
	// modified endpoints, we will return false, even though there are some change
 | 
					 | 
				
			||||||
	// to apply.
 | 
					 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
 | 
					func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
 | 
				
			||||||
@@ -340,6 +329,7 @@ type Proxier struct {
 | 
				
			|||||||
	// with some partial data after kube-proxy restart.
 | 
						// with some partial data after kube-proxy restart.
 | 
				
			||||||
	endpointsSynced bool
 | 
						endpointsSynced bool
 | 
				
			||||||
	servicesSynced  bool
 | 
						servicesSynced  bool
 | 
				
			||||||
 | 
						initialized     int32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	throttle flowcontrol.RateLimiter
 | 
						throttle flowcontrol.RateLimiter
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -596,41 +586,35 @@ func (proxier *Proxier) SyncLoop() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (proxier *Proxier) setInitialized(value bool) {
 | 
				
			||||||
 | 
						var initialized int32
 | 
				
			||||||
 | 
						if value {
 | 
				
			||||||
 | 
							initialized = 1
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						atomic.StoreInt32(&proxier.initialized, initialized)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (proxier *Proxier) isInitialized() bool {
 | 
				
			||||||
 | 
						return atomic.LoadInt32(&proxier.initialized) > 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
 | 
					func (proxier *Proxier) OnServiceAdd(service *api.Service) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
				
			||||||
	if proxier.serviceChanges.update(&namespacedName, nil, service) {
 | 
						if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
 | 
					func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
				
			||||||
	if proxier.serviceChanges.update(&namespacedName, oldService, service) {
 | 
						if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
 | 
					func (proxier *Proxier) OnServiceDelete(service *api.Service) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
				
			||||||
	if proxier.serviceChanges.update(&namespacedName, service, nil) {
 | 
						if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -638,6 +622,7 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) {
 | 
				
			|||||||
func (proxier *Proxier) OnServiceSynced() {
 | 
					func (proxier *Proxier) OnServiceSynced() {
 | 
				
			||||||
	proxier.mu.Lock()
 | 
						proxier.mu.Lock()
 | 
				
			||||||
	proxier.servicesSynced = true
 | 
						proxier.servicesSynced = true
 | 
				
			||||||
 | 
						proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
 | 
				
			||||||
	proxier.mu.Unlock()
 | 
						proxier.mu.Unlock()
 | 
				
			||||||
	// Call it unconditionally - this is called once per lifetime.
 | 
						// Call it unconditionally - this is called once per lifetime.
 | 
				
			||||||
	proxier.syncProxyRules()
 | 
						proxier.syncProxyRules()
 | 
				
			||||||
@@ -688,39 +673,21 @@ func updateServiceMap(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
 | 
					func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
				
			||||||
	if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) {
 | 
						if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
 | 
					func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
				
			||||||
	if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) {
 | 
						if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
 | 
					func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
 | 
				
			||||||
	namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
						namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
 | 
				
			||||||
	if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) {
 | 
						if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
 | 
				
			||||||
		// TODO(wojtek-t): If the initial sync of informer either for endpoints or
 | 
					 | 
				
			||||||
		// services is not finished, it doesn't make sense to call syncProxyRules
 | 
					 | 
				
			||||||
		// because it will early-return (to avoid resyncing iptables with partial
 | 
					 | 
				
			||||||
		// state right after kube-proxy restart). This can eat a token for calling
 | 
					 | 
				
			||||||
		// syncProxyRules, but is not that critical since it can happen only
 | 
					 | 
				
			||||||
		// after kube-proxy was (re)started.
 | 
					 | 
				
			||||||
		proxier.syncProxyRules()
 | 
							proxier.syncProxyRules()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -728,6 +695,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
 | 
				
			|||||||
func (proxier *Proxier) OnEndpointsSynced() {
 | 
					func (proxier *Proxier) OnEndpointsSynced() {
 | 
				
			||||||
	proxier.mu.Lock()
 | 
						proxier.mu.Lock()
 | 
				
			||||||
	proxier.endpointsSynced = true
 | 
						proxier.endpointsSynced = true
 | 
				
			||||||
 | 
						proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
 | 
				
			||||||
	proxier.mu.Unlock()
 | 
						proxier.mu.Unlock()
 | 
				
			||||||
	// Call it unconditionally - this is called once per lifetime.
 | 
						// Call it unconditionally - this is called once per lifetime.
 | 
				
			||||||
	proxier.syncProxyRules()
 | 
						proxier.syncProxyRules()
 | 
				
			||||||
@@ -1531,9 +1499,7 @@ func (proxier *Proxier) syncProxyRules() {
 | 
				
			|||||||
	proxier.iptablesData.Write(proxier.natChains.Bytes())
 | 
						proxier.iptablesData.Write(proxier.natChains.Bytes())
 | 
				
			||||||
	proxier.iptablesData.Write(proxier.natRules.Bytes())
 | 
						proxier.iptablesData.Write(proxier.natRules.Bytes())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if glog.V(5) {
 | 
					 | 
				
			||||||
	glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
 | 
						glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
 | 
						err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
 | 
							glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user