add occasional polling to reflector

This commit is contained in:
Daniel Smith
2015-02-26 16:58:00 -08:00
parent 156c505946
commit 554b1c847c
7 changed files with 110 additions and 47 deletions

View File

@@ -49,18 +49,23 @@ type Reflector struct {
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
period time.Duration
resyncPeriod time.Duration
}
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector {
// If resyncPeriod is non-zero, then lists will be executed after every resyncPeriod,
// so that you can use reflectors to periodically process everything as well as
// incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
}
return r
}
@@ -77,8 +82,25 @@ func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
}
var (
// nothing will ever be sent down this channel
neverExitWatch <-chan time.Time = make(chan time.Time)
// Used to indicate that watching stopped so that a resync could happen.
errorResyncRequested = errors.New("resync channel fired")
)
// resyncChan returns a channel which will receive something when a resync is required.
func (r *Reflector) resyncChan() <-chan time.Time {
if r.resyncPeriod == 0 {
return neverExitWatch
}
return time.After(r.resyncPeriod)
}
func (r *Reflector) listAndWatch() {
var resourceVersion string
exitWatch := r.resyncChan()
list, err := r.listerWatcher.List()
if err != nil {
@@ -114,8 +136,10 @@ func (r *Reflector) listAndWatch() {
}
return
}
if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil {
if err != errorResyncRequested {
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
}
return
}
}
@@ -132,41 +156,47 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string) error {
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error {
start := time.Now()
eventCount := 0
loop:
for {
event, ok := <-w.ResultChan()
if !ok {
break
select {
case <-exitWatch:
w.Stop()
return errorResyncRequested
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
glog.Errorf("unable to understand watch event %#v", event)
continue
}
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
r.store.Delete(event.Object)
default:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = meta.ResourceVersion()
eventCount++
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
}
meta, err := meta.Accessor(event.Object)
if err != nil {
glog.Errorf("unable to understand watch event %#v", event)
continue
}
switch event.Type {
case watch.Added:
r.store.Add(event.Object)
case watch.Modified:
r.store.Update(event.Object)
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
r.store.Delete(event.Object)
default:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = meta.ResourceVersion()
eventCount++
}
watchDuration := time.Now().Sub(start)