Change the eviction metric type and fix rate-limited-timed-queue

This commit is contained in:
gmarek
2016-09-08 12:00:07 +02:00
parent 93c9b05bc9
commit c40a36cab0
5 changed files with 70 additions and 290 deletions

View File

@@ -97,13 +97,15 @@ func (q *UniqueQueue) Replace(value TimedValue) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
// Removes the value from the queue, but keeps it in the set, so it won't be added second time.
// Returns true if something was removed.
func (q *UniqueQueue) RemoveFromQueue(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
if !q.set.Has(value) {
return false
}
for i, val := range q.queue {
if val.Value == value {
heap.Remove(&q.queue, i)
@@ -113,6 +115,25 @@ func (q *UniqueQueue) Remove(value string) bool {
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(value) {
return false
}
q.set.Delete(value)
for i, val := range q.queue {
if val.Value == value {
heap.Remove(&q.queue, i)
return true
}
}
return true
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
@@ -174,7 +195,11 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue.
// time to execute the next item in the queue. The same value is processed only once unless
// Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController
// when Node becomes Ready again)
// TODO: figure out a good way to do garbage collection for all Nodes that were removed from
// the cluster.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
@@ -196,7 +221,7 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val.ProcessAt = now.Add(wait + 1)
q.queue.Replace(val)
} else {
q.queue.Remove(val.Value)
q.queue.RemoveFromQueue(val.Value)
}
val, ok = q.queue.Head()
}