mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Log water mark for incoming queue in cacher
This commit is contained in:
		@@ -162,7 +162,8 @@ type Cacher struct {
 | 
				
			|||||||
	watchers   indexedWatchers
 | 
						watchers   indexedWatchers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Incoming events that should be dispatched to watchers.
 | 
						// Incoming events that should be dispatched to watchers.
 | 
				
			||||||
	incoming chan watchCacheEvent
 | 
						incoming    chan watchCacheEvent
 | 
				
			||||||
 | 
						incomingHWM HighWaterMark
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Handling graceful termination.
 | 
						// Handling graceful termination.
 | 
				
			||||||
	stopLock sync.RWMutex
 | 
						stopLock sync.RWMutex
 | 
				
			||||||
@@ -410,6 +411,10 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cacher) processEvent(event watchCacheEvent) {
 | 
					func (c *Cacher) processEvent(event watchCacheEvent) {
 | 
				
			||||||
 | 
						if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
 | 
				
			||||||
 | 
							// Monitor if this gets backed up, and how much.
 | 
				
			||||||
 | 
							glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	c.incoming <- event
 | 
						c.incoming <- event
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,7 +21,6 @@ import (
 | 
				
			|||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"sync/atomic"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						"k8s.io/kubernetes/pkg/api/unversioned"
 | 
				
			||||||
@@ -47,23 +46,6 @@ const (
 | 
				
			|||||||
	EtcdExpire = "expire"
 | 
						EtcdExpire = "expire"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// HighWaterMark is a thread-safe object for tracking the maximum value seen
 | 
					 | 
				
			||||||
// for some quantity.
 | 
					 | 
				
			||||||
type HighWaterMark int64
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Update returns true if and only if 'current' is the highest value ever seen.
 | 
					 | 
				
			||||||
func (hwm *HighWaterMark) Update(current int64) bool {
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		old := atomic.LoadInt64((*int64)(hwm))
 | 
					 | 
				
			||||||
		if current <= old {
 | 
					 | 
				
			||||||
			return false
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
 | 
					 | 
				
			||||||
			return true
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
 | 
					// TransformFunc attempts to convert an object to another object for use with a watcher.
 | 
				
			||||||
type TransformFunc func(runtime.Object) (runtime.Object, error)
 | 
					type TransformFunc func(runtime.Object) (runtime.Object, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -109,8 +91,8 @@ type etcdWatcher struct {
 | 
				
			|||||||
	emit func(watch.Event)
 | 
						emit func(watch.Event)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// HighWaterMarks for performance debugging.
 | 
						// HighWaterMarks for performance debugging.
 | 
				
			||||||
	incomingHWM HighWaterMark
 | 
						incomingHWM storage.HighWaterMark
 | 
				
			||||||
	outgoingHWM HighWaterMark
 | 
						outgoingHWM storage.HighWaterMark
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cache etcdCache
 | 
						cache etcdCache
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,9 +17,7 @@ limitations under the License.
 | 
				
			|||||||
package etcd
 | 
					package etcd
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"math/rand"
 | 
					 | 
				
			||||||
	rt "runtime"
 | 
						rt "runtime"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
@@ -557,34 +555,3 @@ func TestWatchPurposefulShutdown(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("Unexpected event from stopped watcher: %#v", event)
 | 
							t.Errorf("Unexpected event from stopped watcher: %#v", event)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestHighWaterMark(t *testing.T) {
 | 
					 | 
				
			||||||
	var h HighWaterMark
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for i := int64(10); i < 20; i++ {
 | 
					 | 
				
			||||||
		if !h.Update(i) {
 | 
					 | 
				
			||||||
			t.Errorf("unexpected false for %v", i)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if h.Update(i - 1) {
 | 
					 | 
				
			||||||
			t.Errorf("unexpected true for %v", i-1)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	m := int64(0)
 | 
					 | 
				
			||||||
	wg := sync.WaitGroup{}
 | 
					 | 
				
			||||||
	for i := 0; i < 300; i++ {
 | 
					 | 
				
			||||||
		wg.Add(1)
 | 
					 | 
				
			||||||
		v := rand.Int63()
 | 
					 | 
				
			||||||
		go func(v int64) {
 | 
					 | 
				
			||||||
			defer wg.Done()
 | 
					 | 
				
			||||||
			h.Update(v)
 | 
					 | 
				
			||||||
		}(v)
 | 
					 | 
				
			||||||
		if v > m {
 | 
					 | 
				
			||||||
			m = v
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	wg.Wait()
 | 
					 | 
				
			||||||
	if m != int64(h) {
 | 
					 | 
				
			||||||
		t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,6 +20,7 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/meta"
 | 
						"k8s.io/kubernetes/pkg/api/meta"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/validation/path"
 | 
						"k8s.io/kubernetes/pkg/api/validation/path"
 | 
				
			||||||
@@ -149,3 +150,20 @@ func hasPathPrefix(s, pathPrefix string) bool {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return false
 | 
						return false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// HighWaterMark is a thread-safe object for tracking the maximum value seen
 | 
				
			||||||
 | 
					// for some quantity.
 | 
				
			||||||
 | 
					type HighWaterMark int64
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Update returns true if and only if 'current' is the highest value ever seen.
 | 
				
			||||||
 | 
					func (hwm *HighWaterMark) Update(current int64) bool {
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							old := atomic.LoadInt64((*int64)(hwm))
 | 
				
			||||||
 | 
							if current <= old {
 | 
				
			||||||
 | 
								return false
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
 | 
				
			||||||
 | 
								return true
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -16,7 +16,11 @@ limitations under the License.
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
package storage
 | 
					package storage
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import "testing"
 | 
					import (
 | 
				
			||||||
 | 
						"math/rand"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEtcdParseWatchResourceVersion(t *testing.T) {
 | 
					func TestEtcdParseWatchResourceVersion(t *testing.T) {
 | 
				
			||||||
	testCases := []struct {
 | 
						testCases := []struct {
 | 
				
			||||||
@@ -99,3 +103,34 @@ func TestHasPathPrefix(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestHighWaterMark(t *testing.T) {
 | 
				
			||||||
 | 
						var h HighWaterMark
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := int64(10); i < 20; i++ {
 | 
				
			||||||
 | 
							if !h.Update(i) {
 | 
				
			||||||
 | 
								t.Errorf("unexpected false for %v", i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if h.Update(i - 1) {
 | 
				
			||||||
 | 
								t.Errorf("unexpected true for %v", i-1)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m := int64(0)
 | 
				
			||||||
 | 
						wg := sync.WaitGroup{}
 | 
				
			||||||
 | 
						for i := 0; i < 300; i++ {
 | 
				
			||||||
 | 
							wg.Add(1)
 | 
				
			||||||
 | 
							v := rand.Int63()
 | 
				
			||||||
 | 
							go func(v int64) {
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
								h.Update(v)
 | 
				
			||||||
 | 
							}(v)
 | 
				
			||||||
 | 
							if v > m {
 | 
				
			||||||
 | 
								m = v
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
 | 
						if m != int64(h) {
 | 
				
			||||||
 | 
							t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user