mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			146 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			146 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package node
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
)
 | 
						|
 | 
						|
// WorkArgs keeps arguments that will be passed to the function executed by the worker.
 | 
						|
type WorkArgs struct {
 | 
						|
	NamespacedName types.NamespacedName
 | 
						|
}
 | 
						|
 | 
						|
// KeyFromWorkArgs creates a key for the given `WorkArgs`
 | 
						|
func (w *WorkArgs) KeyFromWorkArgs() string {
 | 
						|
	return w.NamespacedName.String()
 | 
						|
}
 | 
						|
 | 
						|
// NewWorkArgs is a helper function to create new `WorkArgs`
 | 
						|
func NewWorkArgs(name, namespace string) *WorkArgs {
 | 
						|
	return &WorkArgs{types.NamespacedName{Namespace: namespace, Name: name}}
 | 
						|
}
 | 
						|
 | 
						|
// TimedWorker is a responsible for executing a function no earlier than at FireAt time.
 | 
						|
type TimedWorker struct {
 | 
						|
	WorkItem  *WorkArgs
 | 
						|
	CreatedAt time.Time
 | 
						|
	FireAt    time.Time
 | 
						|
	Timer     *time.Timer
 | 
						|
}
 | 
						|
 | 
						|
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
 | 
						|
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
 | 
						|
	delay := fireAt.Sub(createdAt)
 | 
						|
	if delay <= 0 {
 | 
						|
		go f(args)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	timer := time.AfterFunc(delay, func() { f(args) })
 | 
						|
	return &TimedWorker{
 | 
						|
		WorkItem:  args,
 | 
						|
		CreatedAt: createdAt,
 | 
						|
		FireAt:    fireAt,
 | 
						|
		Timer:     timer,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Cancel cancels the execution of function by the `TimedWorker`
 | 
						|
func (w *TimedWorker) Cancel() {
 | 
						|
	if w != nil {
 | 
						|
		w.Timer.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
 | 
						|
type TimedWorkerQueue struct {
 | 
						|
	sync.Mutex
 | 
						|
	// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
 | 
						|
	workers  map[string]*TimedWorker
 | 
						|
	workFunc func(args *WorkArgs) error
 | 
						|
}
 | 
						|
 | 
						|
// CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
 | 
						|
// given function `f`.
 | 
						|
func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue {
 | 
						|
	return &TimedWorkerQueue{
 | 
						|
		workers:  make(map[string]*TimedWorker),
 | 
						|
		workFunc: f,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) error {
 | 
						|
	return func(args *WorkArgs) error {
 | 
						|
		err := q.workFunc(args)
 | 
						|
		q.Lock()
 | 
						|
		defer q.Unlock()
 | 
						|
		if err == nil {
 | 
						|
			// To avoid duplicated calls we keep the key in the queue, to prevent
 | 
						|
			// subsequent additions.
 | 
						|
			q.workers[key] = nil
 | 
						|
		} else {
 | 
						|
			delete(q.workers, key)
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
 | 
						|
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
 | 
						|
	key := args.KeyFromWorkArgs()
 | 
						|
	glog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)
 | 
						|
 | 
						|
	q.Lock()
 | 
						|
	defer q.Unlock()
 | 
						|
	if _, exists := q.workers[key]; exists {
 | 
						|
		glog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
 | 
						|
	q.workers[key] = worker
 | 
						|
}
 | 
						|
 | 
						|
// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
 | 
						|
func (q *TimedWorkerQueue) CancelWork(key string) bool {
 | 
						|
	q.Lock()
 | 
						|
	defer q.Unlock()
 | 
						|
	worker, found := q.workers[key]
 | 
						|
	result := false
 | 
						|
	if found {
 | 
						|
		glog.V(4).Infof("Cancelling TimedWorkerQueue item %v at %v", key, time.Now())
 | 
						|
		if worker != nil {
 | 
						|
			result = true
 | 
						|
			worker.Cancel()
 | 
						|
		}
 | 
						|
		delete(q.workers, key)
 | 
						|
	}
 | 
						|
	return result
 | 
						|
}
 | 
						|
 | 
						|
// GetWorkerUnsafe returns a TimedWorker corresponding to the given key.
 | 
						|
// Unsafe method - workers have attached goroutines which can fire afater this function is called.
 | 
						|
func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
 | 
						|
	q.Lock()
 | 
						|
	defer q.Unlock()
 | 
						|
	return q.workers[key]
 | 
						|
}
 |