mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	fake util.clock tick
This commit is contained in:
		@@ -28,6 +28,7 @@ type Clock interface {
 | 
			
		||||
	Since(time.Time) time.Duration
 | 
			
		||||
	After(d time.Duration) <-chan time.Time
 | 
			
		||||
	Sleep(d time.Duration)
 | 
			
		||||
	Tick(d time.Duration) <-chan time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
@@ -54,6 +55,10 @@ func (RealClock) After(d time.Duration) <-chan time.Time {
 | 
			
		||||
	return time.After(d)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (RealClock) Tick(d time.Duration) <-chan time.Time {
 | 
			
		||||
	return time.Tick(d)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (RealClock) Sleep(d time.Duration) {
 | 
			
		||||
	time.Sleep(d)
 | 
			
		||||
}
 | 
			
		||||
@@ -69,6 +74,8 @@ type FakeClock struct {
 | 
			
		||||
 | 
			
		||||
type fakeClockWaiter struct {
 | 
			
		||||
	targetTime    time.Time
 | 
			
		||||
	stepInterval  time.Duration
 | 
			
		||||
	skipIfBlocked bool
 | 
			
		||||
	destChan      chan<- time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -105,7 +112,22 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time {
 | 
			
		||||
	return ch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Move clock by Duration, notify anyone that's called After
 | 
			
		||||
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
 | 
			
		||||
	f.lock.Lock()
 | 
			
		||||
	defer f.lock.Unlock()
 | 
			
		||||
	tickTime := f.time.Add(d)
 | 
			
		||||
	ch := make(chan time.Time, 1) // hold one tick
 | 
			
		||||
	f.waiters = append(f.waiters, fakeClockWaiter{
 | 
			
		||||
		targetTime:    tickTime,
 | 
			
		||||
		stepInterval:  d,
 | 
			
		||||
		skipIfBlocked: true,
 | 
			
		||||
		destChan:      ch,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return ch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Move clock by Duration, notify anyone that's called After or Tick
 | 
			
		||||
func (f *FakeClock) Step(d time.Duration) {
 | 
			
		||||
	f.lock.Lock()
 | 
			
		||||
	defer f.lock.Unlock()
 | 
			
		||||
@@ -126,7 +148,23 @@ func (f *FakeClock) setTimeLocked(t time.Time) {
 | 
			
		||||
	for i := range f.waiters {
 | 
			
		||||
		w := &f.waiters[i]
 | 
			
		||||
		if !w.targetTime.After(t) {
 | 
			
		||||
 | 
			
		||||
			if w.skipIfBlocked {
 | 
			
		||||
				select {
 | 
			
		||||
				case w.destChan <- t:
 | 
			
		||||
				default:
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				w.destChan <- t
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if w.stepInterval > 0 {
 | 
			
		||||
				for !w.targetTime.After(t) {
 | 
			
		||||
					w.targetTime = w.targetTime.Add(w.stepInterval)
 | 
			
		||||
				}
 | 
			
		||||
				newWaiters = append(newWaiters, *w)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		} else {
 | 
			
		||||
			newWaiters = append(newWaiters, f.waiters[i])
 | 
			
		||||
		}
 | 
			
		||||
@@ -169,6 +207,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time {
 | 
			
		||||
	panic("IntervalClock doesn't implement After")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Unimplemented, will panic.
 | 
			
		||||
// TODO: make interval clock use FakeClock so this can be implemented.
 | 
			
		||||
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
 | 
			
		||||
	panic("IntervalClock doesn't implement Tick")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*IntervalClock) Sleep(d time.Duration) {
 | 
			
		||||
	panic("IntervalClock doesn't implement Sleep")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -104,3 +104,81 @@ func TestFakeAfter(t *testing.T) {
 | 
			
		||||
		t.Errorf("unexpected non-channel read")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFakeTick(t *testing.T) {
 | 
			
		||||
	tc := NewFakeClock(time.Now())
 | 
			
		||||
	if tc.HasWaiters() {
 | 
			
		||||
		t.Errorf("unexpected waiter?")
 | 
			
		||||
	}
 | 
			
		||||
	oneSec := tc.Tick(time.Second)
 | 
			
		||||
	if !tc.HasWaiters() {
 | 
			
		||||
		t.Errorf("unexpected lack of waiter?")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	oneOhOneSec := tc.Tick(time.Second + time.Millisecond)
 | 
			
		||||
	twoSec := tc.Tick(2 * time.Second)
 | 
			
		||||
	select {
 | 
			
		||||
	case <-oneSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-oneOhOneSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-twoSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tc.Step(999 * time.Millisecond) // t=.999
 | 
			
		||||
	select {
 | 
			
		||||
	case <-oneSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-oneOhOneSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-twoSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tc.Step(time.Millisecond) // t=1.000
 | 
			
		||||
	select {
 | 
			
		||||
	case <-oneSec:
 | 
			
		||||
		// Expected!
 | 
			
		||||
	case <-oneOhOneSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-twoSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	default:
 | 
			
		||||
		t.Errorf("unexpected non-channel read")
 | 
			
		||||
	}
 | 
			
		||||
	tc.Step(time.Millisecond) // t=1.001
 | 
			
		||||
	select {
 | 
			
		||||
	case <-oneSec:
 | 
			
		||||
		// should not double-trigger!
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	case <-oneOhOneSec:
 | 
			
		||||
		// Expected!
 | 
			
		||||
	case <-twoSec:
 | 
			
		||||
		t.Errorf("unexpected channel read")
 | 
			
		||||
	default:
 | 
			
		||||
		t.Errorf("unexpected non-channel read")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tc.Step(time.Second) // t=2.001
 | 
			
		||||
	tc.Step(time.Second) // t=3.001
 | 
			
		||||
	tc.Step(time.Second) // t=4.001
 | 
			
		||||
	tc.Step(time.Second) // t=5.001
 | 
			
		||||
 | 
			
		||||
	// The one second ticker should not accumulate ticks
 | 
			
		||||
	accumulatedTicks := 0
 | 
			
		||||
	drained := false
 | 
			
		||||
	for !drained {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-oneSec:
 | 
			
		||||
			accumulatedTicks++
 | 
			
		||||
		default:
 | 
			
		||||
			drained = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if accumulatedTicks != 1 {
 | 
			
		||||
		t.Errorf("unexpected number of accumulated ticks: %d", accumulatedTicks)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -41,7 +41,7 @@ func newDelayingQueue(clock util.Clock) DelayingInterface {
 | 
			
		||||
	ret := &delayingType{
 | 
			
		||||
		Interface:       New(),
 | 
			
		||||
		clock:           clock,
 | 
			
		||||
		heartbeat:       time.Tick(maxWait),
 | 
			
		||||
		heartbeat:       clock.Tick(maxWait),
 | 
			
		||||
		stopCh:          make(chan struct{}),
 | 
			
		||||
		waitingForAddCh: make(chan waitFor, 1000),
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user