mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Replace calls to time.After with time.NewTimer for explicit stopping
This commit is contained in:
		
							
								
								
									
										17
									
								
								pkg/client/cache/reflector.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										17
									
								
								pkg/client/cache/reflector.go
									
									
									
									
										vendored
									
									
								
							@@ -106,17 +106,24 @@ var (
 | 
				
			|||||||
	errorStopRequested = errors.New("Stop requested")
 | 
						errorStopRequested = errors.New("Stop requested")
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// resyncChan returns a channel which will receive something when a resync is required.
 | 
					// resyncChan returns a channel which will receive something when a resync is
 | 
				
			||||||
func (r *Reflector) resyncChan() <-chan time.Time {
 | 
					// required, and a cleanup function.
 | 
				
			||||||
 | 
					func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
 | 
				
			||||||
	if r.resyncPeriod == 0 {
 | 
						if r.resyncPeriod == 0 {
 | 
				
			||||||
		return neverExitWatch
 | 
							return neverExitWatch, func() bool { return false }
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return time.After(r.resyncPeriod)
 | 
						// The cleanup function is required: imagine the scenario where watches
 | 
				
			||||||
 | 
						// always fail so we end up listing frequently. Then, if we don't
 | 
				
			||||||
 | 
						// manually stop the timer, we could end up with many timers active
 | 
				
			||||||
 | 
						// concurrently.
 | 
				
			||||||
 | 
						t := time.NewTimer(r.resyncPeriod)
 | 
				
			||||||
 | 
						return t.C, t.Stop
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
 | 
					func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
 | 
				
			||||||
	var resourceVersion string
 | 
						var resourceVersion string
 | 
				
			||||||
	resyncCh := r.resyncChan()
 | 
						resyncCh, cleanup := r.resyncChan()
 | 
				
			||||||
 | 
						defer cleanup()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	list, err := r.listerWatcher.List()
 | 
						list, err := r.listerWatcher.List()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										16
									
								
								pkg/client/cache/reflector_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										16
									
								
								pkg/client/cache/reflector_test.go
									
									
									
									
										vendored
									
									
								
							@@ -18,6 +18,7 @@ package cache
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"math/rand"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -95,7 +96,8 @@ func TestRunUntil(t *testing.T) {
 | 
				
			|||||||
func TestReflector_resyncChan(t *testing.T) {
 | 
					func TestReflector_resyncChan(t *testing.T) {
 | 
				
			||||||
	s := NewStore(MetaNamespaceKeyFunc)
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
	g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
 | 
						g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
 | 
				
			||||||
	a, b := g.resyncChan(), time.After(100*time.Millisecond)
 | 
						a, _ := g.resyncChan()
 | 
				
			||||||
 | 
						b := time.After(100 * time.Millisecond)
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case <-a:
 | 
						case <-a:
 | 
				
			||||||
		t.Logf("got timeout as expected")
 | 
							t.Logf("got timeout as expected")
 | 
				
			||||||
@@ -104,6 +106,18 @@ func TestReflector_resyncChan(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func BenchmarkReflector_resyncChanMany(b *testing.B) {
 | 
				
			||||||
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
						g := NewReflector(&testLW{}, &api.Pod{}, s, 25*time.Millisecond)
 | 
				
			||||||
 | 
						// The improvement to this (calling the timer's Stop() method) makes
 | 
				
			||||||
 | 
						// this benchmark about 40% faster.
 | 
				
			||||||
 | 
						for i := 0; i < b.N; i++ {
 | 
				
			||||||
 | 
							g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
 | 
				
			||||||
 | 
							_, stop := g.resyncChan()
 | 
				
			||||||
 | 
							stop()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestReflector_watchHandlerError(t *testing.T) {
 | 
					func TestReflector_watchHandlerError(t *testing.T) {
 | 
				
			||||||
	s := NewStore(MetaNamespaceKeyFunc)
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
	g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
 | 
						g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -57,7 +57,7 @@ const (
 | 
				
			|||||||
	// specifically targeted at the case where some problem prevents an update
 | 
						// specifically targeted at the case where some problem prevents an update
 | 
				
			||||||
	// of expectations, without it the RC could stay asleep forever. This should
 | 
						// of expectations, without it the RC could stay asleep forever. This should
 | 
				
			||||||
	// be set based on the expected latency of watch events.
 | 
						// be set based on the expected latency of watch events.
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
	// TODO: Set this per expectation, based on its size.
 | 
						// TODO: Set this per expectation, based on its size.
 | 
				
			||||||
	// Currently an rc can service (create *and* observe the watch events for said
 | 
						// Currently an rc can service (create *and* observe the watch events for said
 | 
				
			||||||
	// creation) about 10-20 pods a second, so it takes about 3.5 min to service
 | 
						// creation) about 10-20 pods a second, so it takes about 3.5 min to service
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -29,6 +29,8 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *Master) serviceWriterLoop(stop chan struct{}) {
 | 
					func (m *Master) serviceWriterLoop(stop chan struct{}) {
 | 
				
			||||||
 | 
						t := time.NewTicker(10 * time.Second)
 | 
				
			||||||
 | 
						defer t.Stop()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		// Update service & endpoint records.
 | 
							// Update service & endpoint records.
 | 
				
			||||||
		// TODO: when it becomes possible to change this stuff,
 | 
							// TODO: when it becomes possible to change this stuff,
 | 
				
			||||||
@@ -49,12 +51,14 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) {
 | 
				
			|||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-stop:
 | 
							case <-stop:
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		case <-time.After(10 * time.Second):
 | 
							case <-t.C:
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *Master) roServiceWriterLoop(stop chan struct{}) {
 | 
					func (m *Master) roServiceWriterLoop(stop chan struct{}) {
 | 
				
			||||||
 | 
						t := time.NewTicker(10 * time.Second)
 | 
				
			||||||
 | 
						defer t.Stop()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		// Update service & endpoint records.
 | 
							// Update service & endpoint records.
 | 
				
			||||||
		// TODO: when it becomes possible to change this stuff,
 | 
							// TODO: when it becomes possible to change this stuff,
 | 
				
			||||||
@@ -74,7 +78,7 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) {
 | 
				
			|||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-stop:
 | 
							case <-stop:
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		case <-time.After(10 * time.Second):
 | 
							case <-t.C:
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -366,16 +366,16 @@ const syncInterval = 5 * time.Second
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
 | 
					// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
 | 
				
			||||||
func (proxier *Proxier) SyncLoop() {
 | 
					func (proxier *Proxier) SyncLoop() {
 | 
				
			||||||
 | 
						t := time.NewTicker(syncInterval)
 | 
				
			||||||
 | 
						defer t.Stop()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							<-t.C
 | 
				
			||||||
		case <-time.After(syncInterval):
 | 
							glog.V(3).Infof("Periodic sync")
 | 
				
			||||||
			glog.V(3).Infof("Periodic sync")
 | 
							if err := iptablesInit(proxier.iptables); err != nil {
 | 
				
			||||||
			if err := iptablesInit(proxier.iptables); err != nil {
 | 
								glog.Errorf("Failed to ensure iptables: %v", err)
 | 
				
			||||||
				glog.Errorf("Failed to ensure iptables: %v", err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			proxier.ensurePortals()
 | 
					 | 
				
			||||||
			proxier.cleanupStaleStickySessions()
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							proxier.ensurePortals()
 | 
				
			||||||
 | 
							proxier.cleanupStaleStickySessions()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -89,7 +89,12 @@ func poller(interval, timeout time.Duration) WaitFunc {
 | 
				
			|||||||
			defer tick.Stop()
 | 
								defer tick.Stop()
 | 
				
			||||||
			var after <-chan time.Time
 | 
								var after <-chan time.Time
 | 
				
			||||||
			if timeout != 0 {
 | 
								if timeout != 0 {
 | 
				
			||||||
				after = time.After(timeout)
 | 
									// time.After is more convenient, but it
 | 
				
			||||||
 | 
									// potentially leaves timers around much longer
 | 
				
			||||||
 | 
									// than necessary if we exit early.
 | 
				
			||||||
 | 
									timer := time.NewTimer(timeout)
 | 
				
			||||||
 | 
									after = timer.C
 | 
				
			||||||
 | 
									defer timer.Stop()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			for {
 | 
								for {
 | 
				
			||||||
				select {
 | 
									select {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user