mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	pkg/proxy: move proxier health eventing to cmd/kube-proxy
Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
		@@ -574,7 +574,7 @@ func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master strin
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(config.HealthzBindAddress) > 0 {
 | 
						if len(config.HealthzBindAddress) > 0 {
 | 
				
			||||||
		s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef)
 | 
							s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = s.platformSetup()
 | 
						err = s.platformSetup()
 | 
				
			||||||
@@ -822,16 +822,17 @@ func (s *ProxyServer) Run() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// TODO(thockin): make it possible for healthz and metrics to be on the same port.
 | 
						// TODO(thockin): make it possible for healthz and metrics to be on the same port.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var errCh chan error
 | 
						var healthzErrCh, metricsErrCh chan error
 | 
				
			||||||
	if s.Config.BindAddressHardFail {
 | 
						if s.Config.BindAddressHardFail {
 | 
				
			||||||
		errCh = make(chan error)
 | 
							healthzErrCh = make(chan error)
 | 
				
			||||||
 | 
							metricsErrCh = make(chan error)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start up a healthz server if requested
 | 
						// Start up a healthz server if requested
 | 
				
			||||||
	serveHealthz(s.HealthzServer, errCh)
 | 
						serveHealthz(s.HealthzServer, healthzErrCh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start up a metrics server if requested
 | 
						// Start up a metrics server if requested
 | 
				
			||||||
	serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh)
 | 
						serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
 | 
						noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -896,7 +897,13 @@ func (s *ProxyServer) Run() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	go s.Proxier.SyncLoop()
 | 
						go s.Proxier.SyncLoop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return <-errCh
 | 
						select {
 | 
				
			||||||
 | 
						case err = <-healthzErrCh:
 | 
				
			||||||
 | 
							s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", err.Error())
 | 
				
			||||||
 | 
						case err = <-metricsErrCh:
 | 
				
			||||||
 | 
							s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, "FailedToStartMetricServer", "StartKubeProxy", err.Error())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *ProxyServer) birthCry() {
 | 
					func (s *ProxyServer) birthCry() {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -470,7 +470,7 @@ func TestHealthzServer(t *testing.T) {
 | 
				
			|||||||
	httpFactory := newFakeHTTPServerFactory()
 | 
						httpFactory := newFakeHTTPServerFactory()
 | 
				
			||||||
	fakeClock := testingclock.NewFakeClock(time.Now())
 | 
						fakeClock := testingclock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
 | 
						hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
 | 
				
			||||||
	server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
 | 
						server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hsTest := &serverTest{
 | 
						hsTest := &serverTest{
 | 
				
			||||||
@@ -524,7 +524,7 @@ func TestLivezServer(t *testing.T) {
 | 
				
			|||||||
	httpFactory := newFakeHTTPServerFactory()
 | 
						httpFactory := newFakeHTTPServerFactory()
 | 
				
			||||||
	fakeClock := testingclock.NewFakeClock(time.Now())
 | 
						fakeClock := testingclock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
 | 
						hs := newProxierHealthServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
 | 
				
			||||||
	server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
 | 
						server := hs.httpFactory.New(hs.addr, livezHandler{hs: hs})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hsTest := &serverTest{
 | 
						hsTest := &serverTest{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,9 +23,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	"k8s.io/client-go/tools/events"
 | 
					 | 
				
			||||||
	"k8s.io/klog/v2"
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/proxy/metrics"
 | 
						"k8s.io/kubernetes/pkg/proxy/metrics"
 | 
				
			||||||
	"k8s.io/utils/clock"
 | 
						"k8s.io/utils/clock"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -68,8 +66,6 @@ type proxierHealthServer struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	addr          string
 | 
						addr          string
 | 
				
			||||||
	healthTimeout time.Duration
 | 
						healthTimeout time.Duration
 | 
				
			||||||
	recorder      events.EventRecorder
 | 
					 | 
				
			||||||
	nodeRef       *v1.ObjectReference
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	lastUpdated         atomic.Value
 | 
						lastUpdated         atomic.Value
 | 
				
			||||||
	oldestPendingQueued atomic.Value
 | 
						oldestPendingQueued atomic.Value
 | 
				
			||||||
@@ -77,19 +73,17 @@ type proxierHealthServer struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewProxierHealthServer returns a proxier health http server.
 | 
					// NewProxierHealthServer returns a proxier health http server.
 | 
				
			||||||
func NewProxierHealthServer(addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) ProxierHealthUpdater {
 | 
					func NewProxierHealthServer(addr string, healthTimeout time.Duration) ProxierHealthUpdater {
 | 
				
			||||||
	return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout, recorder, nodeRef)
 | 
						return newProxierHealthServer(stdNetListener{}, stdHTTPServerFactory{}, clock.RealClock{}, addr, healthTimeout)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder events.EventRecorder, nodeRef *v1.ObjectReference) *proxierHealthServer {
 | 
					func newProxierHealthServer(listener listener, httpServerFactory httpServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *proxierHealthServer {
 | 
				
			||||||
	hs := &proxierHealthServer{
 | 
						hs := &proxierHealthServer{
 | 
				
			||||||
		listener:      listener,
 | 
							listener:      listener,
 | 
				
			||||||
		httpFactory:   httpServerFactory,
 | 
							httpFactory:   httpServerFactory,
 | 
				
			||||||
		clock:         c,
 | 
							clock:         c,
 | 
				
			||||||
		addr:          addr,
 | 
							addr:          addr,
 | 
				
			||||||
		healthTimeout: healthTimeout,
 | 
							healthTimeout: healthTimeout,
 | 
				
			||||||
		recorder:      recorder,
 | 
					 | 
				
			||||||
		nodeRef:       nodeRef,
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// The node is eligible (and thus the proxy healthy) while it's starting up
 | 
						// The node is eligible (and thus the proxy healthy) while it's starting up
 | 
				
			||||||
	// and until we've processed the first node event that indicates the
 | 
						// and until we've processed the first node event that indicates the
 | 
				
			||||||
@@ -166,12 +160,7 @@ func (hs *proxierHealthServer) Run() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	listener, err := hs.listener.Listen(hs.addr)
 | 
						listener, err := hs.listener.Listen(hs.addr)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		msg := fmt.Sprintf("failed to start proxier healthz on %s: %v", hs.addr, err)
 | 
							return fmt.Errorf("failed to start proxier healthz on %s: %v", hs.addr, err)
 | 
				
			||||||
		// TODO(thockin): move eventing back to caller
 | 
					 | 
				
			||||||
		if hs.recorder != nil {
 | 
					 | 
				
			||||||
			hs.recorder.Eventf(hs.nodeRef, nil, api.EventTypeWarning, "FailedToStartProxierHealthcheck", "StartKubeProxy", msg)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return fmt.Errorf("%v", msg)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
 | 
						klog.V(3).InfoS("Starting healthz HTTP server", "address", hs.addr)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user