mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	pkg/client/unversioned/cache -> pkg/client/cache pkg/client/unversioned/record -> pkg/client/record
		
			
				
	
	
		
			225 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			225 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package cache
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
// Queue is exactly like a Store, but has a Pop() method too.
 | 
						|
type Queue interface {
 | 
						|
	Store
 | 
						|
 | 
						|
	// Pop blocks until it has something to return.
 | 
						|
	Pop() interface{}
 | 
						|
 | 
						|
	// AddIfNotPresent adds a value previously
 | 
						|
	// returned by Pop back into the queue as long
 | 
						|
	// as nothing else (presumably more recent)
 | 
						|
	// has since been added.
 | 
						|
	AddIfNotPresent(interface{}) error
 | 
						|
}
 | 
						|
 | 
						|
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
 | 
						|
// FIFO order processing. If multiple adds/updates of a single item happen while
 | 
						|
// an item is in the queue before it has been processed, it will only be
 | 
						|
// processed once, and when it is processed, the most recent version will be
 | 
						|
// processed. This can't be done with a channel.
 | 
						|
//
 | 
						|
// FIFO solves this use case:
 | 
						|
//  * You want to process every object (exactly) once.
 | 
						|
//  * You want to process the most recent version of the object when you process it.
 | 
						|
//  * You do not want to process deleted objects, they should be removed from the queue.
 | 
						|
//  * You do not want to periodically reprocess objects.
 | 
						|
// Compare with DeltaFIFO for other use cases.
 | 
						|
type FIFO struct {
 | 
						|
	lock sync.RWMutex
 | 
						|
	cond sync.Cond
 | 
						|
	// We depend on the property that items in the set are in the queue and vice versa.
 | 
						|
	items map[string]interface{}
 | 
						|
	queue []string
 | 
						|
	// keyFunc is used to make the key used for queued item insertion and retrieval, and
 | 
						|
	// should be deterministic.
 | 
						|
	keyFunc KeyFunc
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	_ = Queue(&FIFO{}) // FIFO is a Queue
 | 
						|
)
 | 
						|
 | 
						|
// Add inserts an item, and puts it in the queue. The item is only enqueued
 | 
						|
// if it doesn't already exist in the set.
 | 
						|
func (f *FIFO) Add(obj interface{}) error {
 | 
						|
	id, err := f.keyFunc(obj)
 | 
						|
	if err != nil {
 | 
						|
		return KeyError{obj, err}
 | 
						|
	}
 | 
						|
	f.lock.Lock()
 | 
						|
	defer f.lock.Unlock()
 | 
						|
	if _, exists := f.items[id]; !exists {
 | 
						|
		f.queue = append(f.queue, id)
 | 
						|
	}
 | 
						|
	f.items[id] = obj
 | 
						|
	f.cond.Broadcast()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
 | 
						|
// present in the set, it is neither enqueued nor added to the set.
 | 
						|
//
 | 
						|
// This is useful in a single producer/consumer scenario so that the consumer can
 | 
						|
// safely retry items without contending with the producer and potentially enqueueing
 | 
						|
// stale items.
 | 
						|
func (f *FIFO) AddIfNotPresent(obj interface{}) error {
 | 
						|
	id, err := f.keyFunc(obj)
 | 
						|
	if err != nil {
 | 
						|
		return KeyError{obj, err}
 | 
						|
	}
 | 
						|
	f.lock.Lock()
 | 
						|
	defer f.lock.Unlock()
 | 
						|
	if _, exists := f.items[id]; exists {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	f.queue = append(f.queue, id)
 | 
						|
	f.items[id] = obj
 | 
						|
	f.cond.Broadcast()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Update is the same as Add in this implementation.
 | 
						|
func (f *FIFO) Update(obj interface{}) error {
 | 
						|
	return f.Add(obj)
 | 
						|
}
 | 
						|
 | 
						|
// Delete removes an item. It doesn't add it to the queue, because
 | 
						|
// this implementation assumes the consumer only cares about the objects,
 | 
						|
// not the order in which they were created/added.
 | 
						|
func (f *FIFO) Delete(obj interface{}) error {
 | 
						|
	id, err := f.keyFunc(obj)
 | 
						|
	if err != nil {
 | 
						|
		return KeyError{obj, err}
 | 
						|
	}
 | 
						|
	f.lock.Lock()
 | 
						|
	defer f.lock.Unlock()
 | 
						|
	delete(f.items, id)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// List returns a list of all the items.
 | 
						|
func (f *FIFO) List() []interface{} {
 | 
						|
	f.lock.RLock()
 | 
						|
	defer f.lock.RUnlock()
 | 
						|
	list := make([]interface{}, 0, len(f.items))
 | 
						|
	for _, item := range f.items {
 | 
						|
		list = append(list, item)
 | 
						|
	}
 | 
						|
	return list
 | 
						|
}
 | 
						|
 | 
						|
// ListKeys returns a list of all the keys of the objects currently
 | 
						|
// in the FIFO.
 | 
						|
func (f *FIFO) ListKeys() []string {
 | 
						|
	f.lock.RLock()
 | 
						|
	defer f.lock.RUnlock()
 | 
						|
	list := make([]string, 0, len(f.items))
 | 
						|
	for key := range f.items {
 | 
						|
		list = append(list, key)
 | 
						|
	}
 | 
						|
	return list
 | 
						|
}
 | 
						|
 | 
						|
// Get returns the requested item, or sets exists=false.
 | 
						|
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
 | 
						|
	key, err := f.keyFunc(obj)
 | 
						|
	if err != nil {
 | 
						|
		return nil, false, KeyError{obj, err}
 | 
						|
	}
 | 
						|
	return f.GetByKey(key)
 | 
						|
}
 | 
						|
 | 
						|
// GetByKey returns the requested item, or sets exists=false.
 | 
						|
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
 | 
						|
	f.lock.RLock()
 | 
						|
	defer f.lock.RUnlock()
 | 
						|
	item, exists = f.items[key]
 | 
						|
	return item, exists, nil
 | 
						|
}
 | 
						|
 | 
						|
// Pop waits until an item is ready and returns it. If multiple items are
 | 
						|
// ready, they are returned in the order in which they were added/updated.
 | 
						|
// The item is removed from the queue (and the store) before it is returned,
 | 
						|
// so if you don't successfully process it, you need to add it back with
 | 
						|
// AddIfNotPresent().
 | 
						|
func (f *FIFO) Pop() interface{} {
 | 
						|
	f.lock.Lock()
 | 
						|
	defer f.lock.Unlock()
 | 
						|
	for {
 | 
						|
		for len(f.queue) == 0 {
 | 
						|
			f.cond.Wait()
 | 
						|
		}
 | 
						|
		id := f.queue[0]
 | 
						|
		f.queue = f.queue[1:]
 | 
						|
		item, ok := f.items[id]
 | 
						|
		if !ok {
 | 
						|
			// Item may have been deleted subsequently.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		delete(f.items, id)
 | 
						|
		return item
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Replace will delete the contents of 'f', using instead the given map.
 | 
						|
// 'f' takes ownership of the map, you should not reference the map again
 | 
						|
// after calling this function. f's queue is reset, too; upon return, it
 | 
						|
// will contain the items in the map, in no particular order.
 | 
						|
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
 | 
						|
	items := map[string]interface{}{}
 | 
						|
	for _, item := range list {
 | 
						|
		key, err := f.keyFunc(item)
 | 
						|
		if err != nil {
 | 
						|
			return KeyError{item, err}
 | 
						|
		}
 | 
						|
		items[key] = item
 | 
						|
	}
 | 
						|
 | 
						|
	f.lock.Lock()
 | 
						|
	defer f.lock.Unlock()
 | 
						|
	f.items = items
 | 
						|
	f.queue = f.queue[:0]
 | 
						|
	for id := range items {
 | 
						|
		f.queue = append(f.queue, id)
 | 
						|
	}
 | 
						|
	if len(f.queue) > 0 {
 | 
						|
		f.cond.Broadcast()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NewFIFO returns a Store which can be used to queue up items to
 | 
						|
// process.
 | 
						|
func NewFIFO(keyFunc KeyFunc) *FIFO {
 | 
						|
	f := &FIFO{
 | 
						|
		items:   map[string]interface{}{},
 | 
						|
		queue:   []string{},
 | 
						|
		keyFunc: keyFunc,
 | 
						|
	}
 | 
						|
	f.cond.L = &f.lock
 | 
						|
	return f
 | 
						|
}
 |