DRA: add device taint eviction controller

The controller is derived from the node taint eviction controller.
In contrast to that controller it tracks the UID of pods to prevent
deleting the wrong pod when it got replaced.
This commit is contained in:
Patrick Ohly
2025-03-07 15:21:52 +01:00
parent 13d04d4a92
commit a027b439e5
17 changed files with 2888 additions and 1599 deletions

View File

@@ -588,6 +588,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor {
// feature gated
register(newStorageVersionGarbageCollectorControllerDescriptor())
register(newResourceClaimControllerDescriptor())
register(newDeviceTaintEvictionControllerDescriptor())
register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
register(newValidatingAdmissionPolicyStatusControllerDescriptor())
register(newTaintEvictionControllerDescriptor())

View File

@@ -93,6 +93,7 @@ func TestControllerNamesDeclaration(t *testing.T) {
names.EphemeralVolumeController,
names.StorageVersionGarbageCollectorController,
names.ResourceClaimController,
names.DeviceTaintEvictionController,
names.LegacyServiceAccountTokenCleanerController,
names.ValidatingAdmissionPolicyStatusController,
names.ServiceCIDRController,

View File

@@ -41,6 +41,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
pkgcontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/devicetainteviction"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
@@ -231,6 +232,32 @@ func startTaintEvictionController(ctx context.Context, controllerContext Control
return nil, true, nil
}
func newDeviceTaintEvictionControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.DeviceTaintEvictionController,
initFunc: startDeviceTaintEvictionController,
requiredFeatureGates: []featuregate.Feature{
// TODO update app.TestFeatureGatedControllersShouldNotDefineAliases when removing these feature gates.
features.DynamicResourceAllocation,
features.DRADeviceTaints,
},
}
}
func startDeviceTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
deviceTaintEvictionController := devicetainteviction.New(
controllerContext.ClientBuilder.ClientOrDie(names.DeviceTaintEvictionController),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Resource().V1beta1().ResourceClaims(),
controllerContext.InformerFactory.Resource().V1beta1().ResourceSlices(),
controllerContext.InformerFactory.Resource().V1alpha3().DeviceTaintRules(),
controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(),
controllerName,
)
go deviceTaintEvictionController.Run(ctx)
return nil, true, nil
}
func newCloudNodeLifecycleControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: cpnames.CloudNodeLifecycleController,

View File

@@ -69,6 +69,7 @@ const (
NodeIpamController = "node-ipam-controller"
NodeLifecycleController = "node-lifecycle-controller"
TaintEvictionController = "taint-eviction-controller"
DeviceTaintEvictionController = "device-taint-eviction-controller"
PersistentVolumeBinderController = "persistentvolume-binder-controller"
PersistentVolumeAttachDetachController = "persistentvolume-attach-detach-controller"
PersistentVolumeExpanderController = "persistentvolume-expander-controller"

View File

@@ -37,6 +37,7 @@ source "${KUBE_ROOT}/hack/lib/util.sh"
# See: https://github.com/kubernetes/kubernetes/issues/89267
allowed_prometheus_importers=(
./cluster/images/etcd-version-monitor/etcd-version-monitor.go
./pkg/controller/devicetainteviction/device_taint_eviction_test.go
./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram.go
./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_test.go
./staging/src/k8s.io/component-base/metrics/prometheusextension/timing_histogram_vec.go

View File

@@ -0,0 +1,916 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devicetainteviction
import (
"context"
"fmt"
"math"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/go-cmp/cmp" //nolint:depguard // Discouraged for production use (https://github.com/kubernetes/kubernetes/issues/104821) but has no good alternative for logging.
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3"
resourceinformers "k8s.io/client-go/informers/resource/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/dynamic-resource-allocation/resourceclaim"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/devicetainteviction/metrics"
"k8s.io/kubernetes/pkg/controller/tainteviction"
utilpod "k8s.io/kubernetes/pkg/util/pod"
)
const (
// retries is the number of times that the controller tries to delete a pod
// that needs to be evicted.
retries = 5
)
// Controller listens to Taint changes of DRA devices and Toleration changes of ResourceClaims,
// then deletes Pods which use ResourceClaims that don't tolerate a NoExecute taint.
// Pods which have already reached a final state (aka terminated) don't need to be deleted.
//
// All of the logic which identifies pods which need to be evicted runs in the
// handle* event handlers. They don't call any blocking method. All the blocking
// calls happen in a [tainteviction.TimedWorkerQueue], using the context passed to Run.
//
// The [resourceslicetracker] takes care of applying taints defined in DeviceTaintRules
// to ResourceSlices. This controller here receives modified ResourceSlices with all
// applicable taints from that tracker and doesn't need to care about where a
// taint came from, the DRA driver or a DeviceTaintRule.
type Controller struct {
name string
// logger is the general-purpose logger to be used for background activities.
logger klog.Logger
// handlerLogger is specifically for logging during event handling. It may be nil
// if no such logging is desired.
eventLogger *klog.Logger
client clientset.Interface
recorder record.EventRecorder
podInformer coreinformers.PodInformer
podLister corelisters.PodLister
claimInformer resourceinformers.ResourceClaimInformer
sliceInformer resourceinformers.ResourceSliceInformer
taintInformer resourcealphainformers.DeviceTaintRuleInformer
classInformer resourceinformers.DeviceClassInformer
haveSynced []cache.InformerSynced
metrics metrics.Metrics
// evictPod ensures that the pod gets evicted at the specified time.
// It doesn't block.
evictPod func(pod tainteviction.NamespacedObject, fireAt time.Time)
// cancelEvict cancels eviction set up with evictPod earlier.
// Idempotent, returns false if there was nothing to cancel.
cancelEvict func(pod tainteviction.NamespacedObject) bool
// allocatedClaims holds all currently known allocated claims.
allocatedClaims map[types.NamespacedName]allocatedClaim // A value is slightly more efficient in BenchmarkTaintUntaint (less allocations!).
// pools indexes all slices by driver and pool name.
pools map[poolID]pool
hasSynced atomic.Int32
}
type poolID struct {
driverName, poolName string
}
type pool struct {
slices sets.Set[*resourceapi.ResourceSlice]
maxGeneration int64
}
// addSlice adds one slice to the pool.
func (p *pool) addSlice(slice *resourceapi.ResourceSlice) {
if slice == nil {
return
}
if p.slices == nil {
p.slices = sets.New[*resourceapi.ResourceSlice]()
p.maxGeneration = math.MinInt64
}
p.slices.Insert(slice)
// Adding a slice can only increase the generation.
if slice.Spec.Pool.Generation > p.maxGeneration {
p.maxGeneration = slice.Spec.Pool.Generation
}
}
// removeSlice removes a slice. It must have been added before.
func (p *pool) removeSlice(slice *resourceapi.ResourceSlice) {
if slice == nil {
return
}
p.slices.Delete(slice)
// Removing a slice might have decreased the generation to
// that of some other slice.
if slice.Spec.Pool.Generation == p.maxGeneration {
maxGeneration := int64(math.MinInt64)
for slice := range p.slices {
if slice.Spec.Pool.Generation > maxGeneration {
maxGeneration = slice.Spec.Pool.Generation
}
}
p.maxGeneration = maxGeneration
}
}
// getTaintedDevices appends all device taints with NoExecute effect.
// The result is sorted by device name.
func (p pool) getTaintedDevices() []taintedDevice {
var buffer []taintedDevice
for slice := range p.slices {
if slice.Spec.Pool.Generation != p.maxGeneration {
continue
}
for _, device := range slice.Spec.Devices {
if device.Basic == nil {
// Unknown device type, not supported.
continue
}
for _, taint := range device.Basic.Taints {
if taint.Effect != resourceapi.DeviceTaintEffectNoExecute {
continue
}
buffer = append(buffer, taintedDevice{deviceName: device.Name, taint: taint})
}
}
}
// slices.SortFunc is more efficient than sort.Slice here.
slices.SortFunc(buffer, func(a, b taintedDevice) int {
return strings.Compare(a.deviceName, b.deviceName)
})
return buffer
}
// getDevice looks up one device by name. Out-dated slices are ignored.
func (p pool) getDevice(deviceName string) *resourceapi.BasicDevice {
for slice := range p.slices {
if slice.Spec.Pool.Generation != p.maxGeneration {
continue
}
for _, device := range slice.Spec.Devices {
if device.Basic == nil {
// Unknown device type, not supported.
continue
}
if device.Name == deviceName {
return device.Basic
}
}
}
return nil
}
type taintedDevice struct {
deviceName string
taint resourceapi.DeviceTaint
}
// allocatedClaim is a ResourceClaim which has an allocation result. It
// may or may not be tainted such that pods need to be evicted.
type allocatedClaim struct {
*resourceapi.ResourceClaim
// evictionTime, if non-nil, is the time at which pods using this claim need to be evicted.
// This is the smallest value of all such per-device values.
// For each device, the value is calculated as `<time of setting the taint> +
// <toleration seconds, 0 if not set>`.
evictionTime *metav1.Time
}
func (tc *Controller) deletePodHandler(c clientset.Interface, emitEventFunc func(tainteviction.NamespacedObject)) func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error {
return func(ctx context.Context, fireAt time.Time, args *tainteviction.WorkArgs) error {
klog.FromContext(ctx).Info("Deleting pod", "pod", args.Object)
var err error
for i := 0; i < retries; i++ {
err = addConditionAndDeletePod(ctx, c, args.Object, &emitEventFunc)
if apierrors.IsNotFound(err) {
// Not a problem, the work is done.
// But we didn't do it, so don't
// bump the metric.
return nil
}
if err == nil {
tc.metrics.PodDeletionsTotal.Inc()
tc.metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt).Seconds()))
return nil
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, podRef tainteviction.NamespacedObject, emitEventFunc *func(tainteviction.NamespacedObject)) (err error) {
pod, err := c.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{})
if err != nil {
return err
}
if pod.UID != podRef.UID {
// This special error suppresses event logging in our caller and prevents further retries.
// We can stop because the pod we were meant to evict is already gone and happens to
// be replaced by some other pod which reuses the same name.
return apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("pods").GroupResource(), pod.Name)
}
// Emit the event only once, and only if we are actually doing something.
if *emitEventFunc != nil {
(*emitEventFunc)(podRef)
*emitEventFunc = nil
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByDeviceTaintManager",
Message: "Device Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
// Unlikely, but it could happen that the pod we got above got replaced with
// another pod using the same name in the meantime. Include a precondition
// to prevent that race. This delete attempt then fails and the next one detects
// the new pod and stops retrying.
return c.CoreV1().Pods(podRef.Namespace).Delete(ctx, podRef.Name, metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
UID: &podRef.UID,
},
})
}
// New creates a new Controller that will use passed clientset to communicate with the API server.
// Spawns no goroutines. That happens in Run.
func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInformer resourceinformers.ResourceClaimInformer, sliceInformer resourceinformers.ResourceSliceInformer, taintInformer resourcealphainformers.DeviceTaintRuleInformer, classInformer resourceinformers.DeviceClassInformer, controllerName string) *Controller {
metrics.Register() // It would be nicer to pass the controller name here, but that probably would break generating https://kubernetes.io/docs/reference/instrumentation/metrics.
tc := &Controller{
name: controllerName,
client: c,
podInformer: podInformer,
podLister: podInformer.Lister(),
claimInformer: claimInformer,
sliceInformer: sliceInformer,
taintInformer: taintInformer,
classInformer: classInformer,
allocatedClaims: make(map[types.NamespacedName]allocatedClaim),
pools: make(map[poolID]pool),
// Instantiate all informers now to ensure that they get started.
haveSynced: []cache.InformerSynced{
podInformer.Informer().HasSynced,
claimInformer.Informer().HasSynced,
sliceInformer.Informer().HasSynced,
taintInformer.Informer().HasSynced,
classInformer.Informer().HasSynced,
},
metrics: metrics.Global,
}
return tc
}
// Run starts the controller which will run until the context is done.
func (tc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name)
defer logger.Info("Shutting down controller", "controller", tc.name)
tc.logger = logger
// Doing debug logging?
if loggerV := logger.V(6); loggerV.Enabled() {
tc.eventLogger = &loggerV
}
// Delayed construction of broadcaster because it spawns goroutines.
// tc.recorder.Eventf is a local in-memory operation which never
// blocks, so it is safe to call from an event handler. The
// actual API calls then happen in those spawned goroutines.
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
tc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: tc.name}).WithLogger(logger)
defer eventBroadcaster.Shutdown()
taintEvictionQueue := tainteviction.CreateWorkerQueue(tc.deletePodHandler(tc.client, tc.emitPodDeletionEvent))
evictPod := tc.evictPod
tc.evictPod = func(podRef tainteviction.NamespacedObject, fireAt time.Time) {
// Only relevant for testing.
if evictPod != nil {
evictPod(podRef, fireAt)
}
taintEvictionQueue.UpdateWork(ctx, &tainteviction.WorkArgs{Object: podRef}, time.Now(), fireAt)
}
cancelEvict := tc.cancelEvict
tc.cancelEvict = func(podRef tainteviction.NamespacedObject) bool {
if cancelEvict != nil {
cancelEvict(podRef)
}
return taintEvictionQueue.CancelWork(logger, podRef.NamespacedName.String())
}
// Start events processing pipeline.
eventBroadcaster.StartStructuredLogging(3)
if tc.client != nil {
logger.Info("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
logger.Error(nil, "kubeClient is nil", "controller", tc.name)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
defer eventBroadcaster.Shutdown()
// mutex serializes event processing.
var mutex sync.Mutex
claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handleClaimChange(nil, claim)
},
UpdateFunc: func(oldObj, newObj any) {
oldClaim, ok := oldObj.(*resourceapi.ResourceClaim)
if !ok {
logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", oldObj))
return
}
newClaim, ok := newObj.(*resourceapi.ResourceClaim)
if !ok {
logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", newObj))
}
mutex.Lock()
defer mutex.Unlock()
tc.handleClaimChange(oldClaim, newClaim)
},
DeleteFunc: func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
logger.Error(nil, "Expected ResourceClaim", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handleClaimChange(claim, nil)
},
})
defer func() {
_ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler)
}()
tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced)
podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
pod, ok := obj.(*v1.Pod)
if !ok {
logger.Error(nil, "Expected ResourcePod", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handlePodChange(nil, pod)
},
UpdateFunc: func(oldObj, newObj any) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", oldObj))
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", newObj))
}
mutex.Lock()
defer mutex.Unlock()
tc.handlePodChange(oldPod, newPod)
},
DeleteFunc: func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
pod, ok := obj.(*v1.Pod)
if !ok {
logger.Error(nil, "Expected Pod", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handlePodChange(pod, nil)
},
})
defer func() {
_ = tc.podInformer.Informer().RemoveEventHandler(podHandler)
}()
tc.haveSynced = append(tc.haveSynced, podHandler.HasSynced)
opts := resourceslicetracker.Options{
EnableDeviceTaints: true,
SliceInformer: tc.sliceInformer,
TaintInformer: tc.taintInformer,
ClassInformer: tc.classInformer,
KubeClient: tc.client,
}
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
if err != nil {
logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err)
return
}
tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced)
defer sliceTracker.Stop()
// Wait for tracker to sync before we react to events.
// This doesn't have to be perfect, it merely avoids unnecessary
// work which might be done as events get emitted for intermediate
// state.
if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) {
return
}
logger.V(1).Info("Underlying informers have synced")
_, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handleSliceChange(nil, slice)
},
UpdateFunc: func(oldObj, newObj any) {
oldSlice, ok := oldObj.(*resourceapi.ResourceSlice)
if !ok {
logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", oldObj))
return
}
newSlice, ok := newObj.(*resourceapi.ResourceSlice)
if !ok {
logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", newObj))
}
mutex.Lock()
defer mutex.Unlock()
tc.handleSliceChange(oldSlice, newSlice)
},
DeleteFunc: func(obj any) {
// No need to check for DeletedFinalStateUnknown here, the resourceslicetracker doesn't use that.
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
logger.Error(nil, "Expected ResourceSlice", "actual", fmt.Sprintf("%T", obj))
return
}
mutex.Lock()
defer mutex.Unlock()
tc.handleSliceChange(slice, nil)
},
})
// sliceTracker.AddEventHandler blocked while delivering events for all known
// ResourceSlices. Therefore our own state is up-to-date once we get here.
tc.hasSynced.Store(1)
<-ctx.Done()
}
func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) {
claim := newClaim
if claim == nil {
claim = oldClaim
}
name := newNamespacedName(claim)
if tc.eventLogger != nil {
// This is intentionally very verbose for debugging.
tc.eventLogger.Info("ResourceClaim changed", "claimObject", name, "oldClaim", klog.Format(oldClaim), "newClaim", klog.Format(newClaim), "diff", cmp.Diff(oldClaim, newClaim))
}
// Deleted?
if newClaim == nil {
delete(tc.allocatedClaims, name)
tc.handlePods(claim)
return
}
// Added?
if oldClaim == nil {
if claim.Status.Allocation == nil {
return
}
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.evictionTime(claim.Status.Allocation),
}
tc.handlePods(claim)
return
}
// If we have two claims, the UID might still be different. Unlikely, but not impossible...
// Treat this like a remove + add.
if oldClaim.UID != newClaim.UID {
tc.handleClaimChange(oldClaim, nil)
tc.handleClaimChange(nil, newClaim)
return
}
syncBothClaims := func() {
// ReservedFor may have changed. If it did, sync both old and new lists,
// otherwise only once (same list).
if !slices.Equal(oldClaim.Status.ReservedFor, newClaim.Status.ReservedFor) {
tc.handlePods(oldClaim)
tc.handlePods(newClaim)
} else {
tc.handlePods(claim)
}
}
// Allocation added?
if oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil {
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.evictionTime(claim.Status.Allocation),
}
syncBothClaims()
return
}
// Allocation removed?
if oldClaim.Status.Allocation != nil && newClaim.Status.Allocation == nil {
delete(tc.allocatedClaims, name)
syncBothClaims()
return
}
// Allocated before and after?
if claim.Status.Allocation != nil {
// The Allocation is immutable, so we don't need to recompute the eviction
// time. Storing the newer claim is enough.
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.allocatedClaims[name].evictionTime,
}
syncBothClaims()
return
}
// If we get here, nothing changed.
}
// evictionTime returns the earliest TimeAdded of any NoExecute taint in any allocated device
// unless that taint is tolerated, nil if none.
func (tc *Controller) evictionTime(allocation *resourceapi.AllocationResult) *metav1.Time {
var evictionTime *metav1.Time
for _, allocatedDevice := range allocation.Devices.Results {
device := tc.pools[poolID{driverName: allocatedDevice.Driver, poolName: allocatedDevice.Pool}].getDevice(allocatedDevice.Device)
if device == nil {
// Unknown device? Can't be tainted...
continue
}
nextTaint:
for _, taint := range device.Taints {
if taint.Effect != resourceapi.DeviceTaintEffectNoExecute {
continue
}
newEvictionTime := taint.TimeAdded
haveToleration := false
tolerationSeconds := int64(math.MaxInt64)
for _, toleration := range allocatedDevice.Tolerations {
if toleration.Effect == resourceapi.DeviceTaintEffectNoExecute &&
resourceclaim.ToleratesTaint(toleration, taint) {
if toleration.TolerationSeconds == nil {
// Tolerate forever -> ignore taint.
continue nextTaint
}
newTolerationSeconds := *toleration.TolerationSeconds
if newTolerationSeconds < 0 {
newTolerationSeconds = 0
}
if newTolerationSeconds < tolerationSeconds {
tolerationSeconds = newTolerationSeconds
}
haveToleration = true
}
}
if haveToleration {
newEvictionTime = &metav1.Time{Time: newEvictionTime.Add(time.Duration(tolerationSeconds) * time.Second)}
}
if evictionTime == nil {
evictionTime = newEvictionTime
continue
}
if newEvictionTime != nil && newEvictionTime.Before(evictionTime) {
evictionTime = newEvictionTime
}
}
}
return evictionTime
}
func (tc *Controller) handleSliceChange(oldSlice, newSlice *resourceapi.ResourceSlice) {
slice := newSlice
if slice == nil {
slice = oldSlice
}
poolID := poolID{
driverName: slice.Spec.Driver,
poolName: slice.Spec.Pool.Name,
}
if tc.eventLogger != nil {
// This is intentionally very verbose for debugging.
tc.eventLogger.Info("ResourceSlice changed", "pool", poolID, "oldSlice", klog.Format(oldSlice), "newSlice", klog.Format(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
}
// Determine old and new device taints. Only devices
// where something changes trigger additional checks for claims
// using them.
//
// The pre-allocated slices are small enough to be allocated on
// the stack (https://stackoverflow.com/a/69187698/222305).
p := tc.pools[poolID]
oldDeviceTaints := p.getTaintedDevices()
p.removeSlice(oldSlice)
p.addSlice(newSlice)
if len(p.slices) == 0 {
delete(tc.pools, poolID)
} else {
tc.pools[poolID] = p
}
newDeviceTaints := p.getTaintedDevices()
// Now determine differences. This depends on both slices having been sorted
// by device name.
if len(oldDeviceTaints) == 0 && len(newDeviceTaints) == 0 {
// Both empty, no changes.
return
}
modifiedDevices := sets.New[string]()
o, n := 0, 0
for o < len(oldDeviceTaints) || n < len(newDeviceTaints) {
// Iterate over devices in both slices with the same name.
for o < len(oldDeviceTaints) && n < len(newDeviceTaints) && oldDeviceTaints[o].deviceName == newDeviceTaints[n].deviceName {
if !apiequality.Semantic.DeepEqual(oldDeviceTaints[o].taint, newDeviceTaints[n].taint) { // TODO: hard-code the comparison?
modifiedDevices.Insert(oldDeviceTaints[o].deviceName)
}
o++
n++
}
// Step over old devices which were removed.
newDeviceName := ""
if n < len(newDeviceTaints) {
newDeviceName = newDeviceTaints[n].deviceName
}
for o < len(oldDeviceTaints) && oldDeviceTaints[o].deviceName != newDeviceName {
modifiedDevices.Insert(oldDeviceTaints[o].deviceName)
o++
}
// Step over new devices which were added.
oldDeviceName := ""
if o < len(oldDeviceTaints) {
oldDeviceName = oldDeviceTaints[o].deviceName
}
if n < len(newDeviceTaints) && newDeviceTaints[n].deviceName != oldDeviceName {
modifiedDevices.Insert(newDeviceTaints[n].deviceName)
n++
}
}
// Now find all claims using at least one modified device,
// update their eviction time, and handle their consuming pods.
for name, claim := range tc.allocatedClaims {
if !usesDevice(claim.Status.Allocation, poolID, modifiedDevices) {
continue
}
newEvictionTime := tc.evictionTime(claim.ResourceClaim.Status.Allocation)
if newEvictionTime.Equal(claim.evictionTime) {
// No change.
continue
}
claim.evictionTime = newEvictionTime
tc.allocatedClaims[name] = claim
// We could collect pods which depend on claims with changes.
// In practice, most pods probably depend on one claim, so
// it is probably more efficient to avoid building such a map
// to make the common case simple.
tc.handlePods(claim.ResourceClaim)
}
}
func usesDevice(allocation *resourceapi.AllocationResult, pool poolID, modifiedDevices sets.Set[string]) bool {
for _, device := range allocation.Devices.Results {
if device.Driver == pool.driverName &&
device.Pool == pool.poolName &&
modifiedDevices.Has(device.Device) {
return true
}
}
return false
}
func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) {
pod := newPod
if pod == nil {
pod = oldPod
}
if tc.eventLogger != nil {
// This is intentionally very verbose for debugging.
tc.eventLogger.Info("Pod changed", "pod", klog.KObj(pod), "oldPod", klog.Format(oldPod), "newPod", klog.Format(newPod), "diff", cmp.Diff(oldPod, newPod))
}
if newPod == nil {
// Nothing left to do for it. No need to emit an event here, it's gone.
tc.cancelEvict(newObject(oldPod))
return
}
// Pods get updated quite frequently. There's no need
// to check them again unless something changed regarding
// their claims.
//
// In particular this prevents adding the pod again
// directly after the eviction condition got added
// to it.
if oldPod != nil &&
apiequality.Semantic.DeepEqual(oldPod.Status.ResourceClaimStatuses, newPod.Status.ResourceClaimStatuses) {
return
}
tc.handlePod(newPod)
}
func (tc *Controller) handlePods(claim *resourceapi.ResourceClaim) {
for _, consumer := range claim.Status.ReservedFor {
if consumer.APIGroup == "" && consumer.Resource == "pods" {
pod, err := tc.podInformer.Lister().Pods(claim.Namespace).Get(consumer.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return
}
// Should not happen.
utilruntime.HandleErrorWithLogger(tc.logger, err, "retrieve pod from cache")
return
}
if pod.UID != consumer.UID {
// Not the pod we were looking for.
return
}
tc.handlePod(pod)
}
}
}
func (tc *Controller) handlePod(pod *v1.Pod) {
// Not scheduled yet? No need to evict.
if pod.Spec.NodeName == "" {
return
}
// If any claim in use by the pod is tainted such that the taint is not tolerated,
// the pod needs to be evicted.
var evictionTime *metav1.Time
for i := range pod.Spec.ResourceClaims {
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err != nil {
// Not created yet or unsupported. Definitely not tainted.
continue
}
if claimName == nil {
// Claim not needed.
continue
}
allocatedClaim, ok := tc.allocatedClaims[types.NamespacedName{Namespace: pod.Namespace, Name: *claimName}]
if !ok {
// Referenced, but not found or not allocated. Also not tainted.
continue
}
if mustCheckOwner && resourceclaim.IsForPod(pod, allocatedClaim.ResourceClaim) != nil {
// Claim and pod don't match. Ignore the claim.
continue
}
if !resourceclaim.IsReservedForPod(pod, allocatedClaim.ResourceClaim) {
// The pod isn't the one which is allowed and/or supposed to use the claim.
// Perhaps that pod instance already got deleted and we are looking at its
// replacement under the same name. Either way, ignore.
continue
}
if allocatedClaim.evictionTime == nil {
continue
}
if evictionTime == nil || allocatedClaim.evictionTime.Before(evictionTime) {
evictionTime = allocatedClaim.evictionTime
}
}
podRef := newObject(pod)
if evictionTime != nil {
tc.evictPod(podRef, evictionTime.Time)
} else {
tc.cancelWorkWithEvent(podRef)
}
}
func (tc *Controller) cancelWorkWithEvent(podRef tainteviction.NamespacedObject) {
if tc.cancelEvict(podRef) {
tc.emitCancelPodDeletionEvent(podRef)
}
}
func (tc *Controller) emitPodDeletionEvent(podRef tainteviction.NamespacedObject) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: podRef.Name,
Namespace: podRef.Namespace,
UID: podRef.UID,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "DeviceTaintManagerEviction", "Marking for deletion")
}
func (tc *Controller) emitCancelPodDeletionEvent(podRef tainteviction.NamespacedObject) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: podRef.Name,
Namespace: podRef.Namespace,
UID: podRef.UID,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "DeviceTaintManagerEviction", "Cancelling deletion")
}
func newNamespacedName(obj metav1.Object) types.NamespacedName {
return types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}
func newObject(obj metav1.Object) tainteviction.NamespacedObject {
return tainteviction.NamespacedObject{
NamespacedName: newNamespacedName(obj),
UID: obj.GetUID(),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package tainteviction contains the logic implementing taint-based eviction
// for Pods running on Nodes with NoExecute taints.
package tainteviction
// Package devicetainteviction contains the logic implementing taint-based eviction
// for Pods using tainted devices (https://github.com/kubernetes/enhancements/issues/5055).
package devicetainteviction

View File

@@ -1,5 +1,5 @@
/*
Copyright 2023 The Kubernetes Authors.
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -23,30 +23,11 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
)
const taintEvictionControllerSubsystem = "taint_eviction_controller"
// controllerSubsystem must be kept in sync with the controller name in cmd/kube-controller-manager/names.
const controllerSubsystem = "device_taint_eviction_controller"
var (
// PodDeletionsTotal counts the number of Pods deleted by TaintEvictionController since its start.
PodDeletionsTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: taintEvictionControllerSubsystem,
Name: "pod_deletions_total",
Help: "Total number of Pods deleted by TaintEvictionController since its start.",
StabilityLevel: metrics.ALPHA,
},
)
// PodDeletionsLatency tracks the latency, in seconds, between the time when a taint effect has been activated
// for the Pod and its deletion.
PodDeletionsLatency = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: taintEvictionControllerSubsystem,
Name: "pod_deletion_duration_seconds",
Help: "Latency, in seconds, between the time when a taint effect has been activated for the Pod and its deletion via TaintEvictionController.",
Buckets: []float64{0.005, 0.025, 0.1, 0.5, 1, 2.5, 10, 30, 60, 120, 180, 240}, // 5ms to 4m
StabilityLevel: metrics.ALPHA,
},
)
Global = New()
)
var registerMetrics sync.Once
@@ -54,7 +35,54 @@ var registerMetrics sync.Once
// Register registers TaintEvictionController metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(PodDeletionsTotal)
legacyregistry.MustRegister(PodDeletionsLatency)
legacyregistry.MustRegister(Global.PodDeletionsTotal)
legacyregistry.MustRegister(Global.PodDeletionsLatency)
})
}
// New returns new instances of all metrics for testing in parallel.
// Optionally, buckets for the histograms can be specified.
func New(buckets ...float64) Metrics {
if len(buckets) == 0 {
buckets = []float64{0.005, 0.025, 0.1, 0.5, 1, 2.5, 10, 30, 60, 120, 180, 240} // 5ms to 4m
}
m := Metrics{
KubeRegistry: metrics.NewKubeRegistry(),
PodDeletionsTotal: metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: controllerSubsystem,
Name: "pod_deletions_total",
Help: "Total number of Pods deleted by DeviceTaintEvictionController since its start.",
StabilityLevel: metrics.ALPHA,
},
),
PodDeletionsLatency: metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: controllerSubsystem,
Name: "pod_deletion_duration_seconds",
Help: "Latency, in seconds, between the time when a device taint effect has been activated and a Pod's deletion via DeviceTaintEvictionController.",
Buckets: []float64{0.005, 0.025, 0.1, 0.5, 1, 2.5, 10, 30, 60, 120, 180, 240}, // 5ms to 4m,
StabilityLevel: metrics.ALPHA,
},
),
}
// This has to be done after construction, otherwise ./hack/update-generated-stable-metrics.sh
// fails to find the default buckets.
if len(buckets) > 0 {
m.PodDeletionsLatency.HistogramOpts.Buckets = buckets
}
m.KubeRegistry.MustRegister(m.PodDeletionsTotal, m.PodDeletionsLatency)
return m
}
// Metrics contains all metrics supported by the device taint eviction controller.
// It implements [metrics.Gatherer].
type Metrics struct {
metrics.KubeRegistry
PodDeletionsTotal *metrics.Counter
PodDeletionsLatency *metrics.Histogram
}
var _ metrics.Gatherer = Metrics{}

View File

@@ -1,614 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainteviction
import (
"context"
"fmt"
"hash/fnv"
"io"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/tainteviction/metrics"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
)
const (
// TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
// the number of workers up making it a parameter of Run() function.
// NodeUpdateChannelSize defines the size of channel for node update events.
NodeUpdateChannelSize = 10
// UpdateWorkerSize defines the size of workers for node update or/and pod update.
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5
)
type nodeUpdateItem struct {
nodeName string
}
type podUpdateItem struct {
podName string
podNamespace string
nodeName string
}
func hash(val string, max int) int {
hasher := fnv.New32a()
io.WriteString(hasher, val)
return int(hasher.Sum32() % uint32(max))
}
// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
// Controller listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
type Controller struct {
name string
client clientset.Interface
broadcaster record.EventBroadcaster
recorder record.EventRecorder
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
getPodsAssignedToNode GetPodsByNodeNameFunc
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint
nodeUpdateChannels []chan nodeUpdateItem
podUpdateChannels []chan podUpdateItem
nodeUpdateQueue workqueue.TypedInterface[nodeUpdateItem]
podUpdateQueue workqueue.TypedInterface[podUpdateItem]
}
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName)
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = addConditionAndDeletePod(ctx, c, name, ns)
if err == nil {
metrics.PodDeletionsTotal.Inc()
metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt) * time.Second))
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
}
func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {
if taints[i].Effect == v1.TaintEffectNoExecute {
result = append(result, taints[i])
}
}
return result
}
// getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(math.MaxInt64)
if len(tolerations) == 0 {
return 0
}
for i := range tolerations {
if tolerations[i].TolerationSeconds != nil {
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds <= 0 {
return 0
} else if tolerationSeconds < minTolerationTime {
minTolerationTime = tolerationSeconds
}
}
}
if minTolerationTime == int64(math.MaxInt64) {
return -1
}
return time.Duration(minTolerationTime) * time.Second
}
// New creates a new Controller that will use passed clientset to communicate with the API server.
func New(ctx context.Context, c clientset.Interface, podInformer corev1informers.PodInformer, nodeInformer corev1informers.NodeInformer, controllerName string) (*Controller, error) {
logger := klog.FromContext(ctx)
metrics.Register()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
podIndexer := podInformer.Informer().GetIndexer()
tm := &Controller{
name: controllerName,
client: c,
broadcaster: eventBroadcaster,
recorder: recorder,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
getPodsAssignedToNode: func(nodeName string) ([]*v1.Pod, error) {
objs, err := podIndexer.ByIndex("spec.nodeName", nodeName)
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, pod)
}
return pods, nil
},
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[nodeUpdateItem]{Name: "noexec_taint_node"}),
podUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[podUpdateItem]{Name: "noexec_taint_pod"}),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent, tm.name))
_, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
tm.PodUpdated(nil, pod)
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
tm.PodUpdated(prevPod, newPod)
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
logger.Error(nil, "Received unexpected object", "object", obj)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
return
}
}
tm.PodUpdated(pod, nil)
},
})
if err != nil {
return nil, fmt.Errorf("unable to add pod event handler: %w", err)
}
_, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
tm.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
tm.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
tm.NodeUpdated(node, nil)
return nil
}),
})
if err != nil {
return nil, fmt.Errorf("unable to add node event handler: %w", err)
}
return tm, nil
}
// Run starts the controller which will run in loop until `stopCh` is closed.
func (tc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name)
defer logger.Info("Shutting down controller", "controller", tc.name)
// Start events processing pipeline.
tc.broadcaster.StartStructuredLogging(3)
if tc.client != nil {
logger.Info("Sending events to api server")
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
logger.Error(nil, "kubeClient is nil", "controller", tc.name)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
defer tc.broadcaster.Shutdown()
defer tc.nodeUpdateQueue.ShutDown()
defer tc.podUpdateQueue.ShutDown()
// wait for the cache to be synced
if !cache.WaitForNamedCacheSync(tc.name, ctx.Done(), tc.podListerSynced, tc.nodeListerSynced) {
return
}
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
nodeUpdate, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(nodeUpdate)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(ctx.Done())
go func(stopCh <-chan struct{}) {
for {
podUpdate, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(podUpdate)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(ctx.Done())
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(ctx, i, wg.Done, ctx.Done())
}
wg.Wait()
}
func (tc *Controller) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest the controller should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(ctx, podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
// PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *Controller) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
podName := ""
podNamespace := ""
nodeName := ""
oldTolerations := []v1.Toleration{}
if oldPod != nil {
podName = oldPod.Name
podNamespace = oldPod.Namespace
nodeName = oldPod.Spec.NodeName
oldTolerations = oldPod.Spec.Tolerations
}
newTolerations := []v1.Toleration{}
if newPod != nil {
podName = newPod.Name
podNamespace = newPod.Namespace
nodeName = newPod.Spec.NodeName
newTolerations = newPod.Spec.Tolerations
}
if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return
}
updateItem := podUpdateItem{
podName: podName,
podNamespace: podNamespace,
nodeName: nodeName,
}
tc.podUpdateQueue.Add(updateItem)
}
// NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *Controller) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
nodeName := ""
oldTaints := []v1.Taint{}
if oldNode != nil {
nodeName = oldNode.Name
oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
}
newTaints := []v1.Taint{}
if newNode != nil {
nodeName = newNode.Name
newTaints = getNoExecuteTaints(newNode.Spec.Taints)
}
if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
return
}
updateItem := nodeUpdateItem{
nodeName: nodeName,
}
tc.nodeUpdateQueue.Add(updateItem)
}
func (tc *Controller) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) {
if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) {
tc.emitCancelPodDeletionEvent(nsName)
}
}
func (tc *Controller) processPodOnNode(
ctx context.Context,
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
logger := klog.FromContext(ctx)
if len(taints) == 0 {
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName))
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.cancelWorkWithEvent(logger, podNamespacedName)
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), now, now)
return
}
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String())
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
func (tc *Controller) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) {
pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName)
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
logger.V(4).Info("Noticed pod update", "pod", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
func (tc *Controller) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) {
node, err := tc.nodeLister.Get(nodeUpdate.nodeName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName))
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
logger.V(4).Info("Noticed node update", "node", klog.KObj(node))
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node))
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node))
for i := range pods {
tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for _, pod := range pods {
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
func (tc *Controller) emitPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
}
func (tc *Controller) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
}

View File

@@ -1,941 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainteviction
import (
"context"
"fmt"
goruntime "runtime"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller/testutil"
)
var timeForControllerToProgressForSanityCheck = 20 * time.Millisecond
func getPodsAssignedToNode(ctx context.Context, c *fake.Clientset) GetPodsByNodeNameFunc {
return func(nodeName string) ([]*corev1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
return []*corev1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
rPods := make([]*corev1.Pod, len(pods.Items))
for i := range pods.Items {
rPods[i] = &pods.Items[i]
}
return rPods, nil
}
}
func createNoExecuteTaint(index int) corev1.Taint {
now := metav1.Now()
return corev1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: corev1.TaintEffectNoExecute,
TimeAdded: &now,
}
}
func addToleration(pod *corev1.Pod, index int, duration int64) *corev1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute}}
} else {
pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &duration}}
}
return pod
}
func addTaintsToNode(node *corev1.Node, key, value string, indices []int) *corev1.Node {
taints := []corev1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
node.Spec.Taints = taints
return node
}
var alwaysReady = func() bool { return true }
func setupNewController(ctx context.Context, fakeClientSet *fake.Clientset) (*Controller, cache.Indexer, cache.Indexer) {
informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer()
nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer()
mgr, _ := New(ctx, fakeClientSet, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), "taint-eviction-controller")
mgr.podListerSynced = alwaysReady
mgr.nodeListerSynced = alwaysReady
mgr.getPodsAssignedToNode = getPodsAssignedToNode(ctx, fakeClientSet)
return mgr, podIndexer, nodeIndexer
}
type timestampedPod struct {
names []string
timestamp time.Duration
}
type durationSlice []timestampedPod
func (a durationSlice) Len() int { return len(a) }
func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp }
func TestFilterNoExecuteTaints(t *testing.T) {
taints := []corev1.Taint{
{
Key: "one",
Value: "one",
Effect: corev1.TaintEffectNoExecute,
},
{
Key: "two",
Value: "two",
Effect: corev1.TaintEffectNoSchedule,
},
}
taints = getNoExecuteTaints(taints)
if len(taints) != 1 || taints[0].Key != "one" {
t.Errorf("Filtering doesn't work. Got %v", taints)
}
}
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
pod *corev1.Pod
taintedNodes map[string][]corev1.Taint
expectPatch bool
expectDelete bool
}{
{
description: "not scheduled - ignore",
pod: testutil.NewPod("pod1", ""),
taintedNodes: map[string][]corev1.Taint{},
expectDelete: false,
},
{
description: "scheduled on untainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{},
expectDelete: false,
},
{
description: "schedule on tainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "schedule on tainted Node with finite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite invalid toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 2, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.pod}})
controller, podIndexer, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
podIndexer.Add(item.pod)
controller.PodUpdated(nil, item.pod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestDeletePod(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
}
controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil)
// wait a bit to see if nothing will panic
time.Sleep(timeForControllerToProgressForSanityCheck)
}
func TestUpdatePod(t *testing.T) {
testCases := []struct {
description string
prevPod *corev1.Pod
awaitForScheduledEviction bool
newPod *corev1.Pod
taintedNodes map[string][]corev1.Taint
expectPatch bool
expectDelete bool
skipOnWindows bool
}{
{
description: "scheduling onto tainted Node",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "scheduling onto tainted Node with toleration",
prevPod: addToleration(testutil.NewPod("pod1", ""), 1, -1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "removing toleration",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
newPod: testutil.NewPod("pod1", "node1"),
awaitForScheduledEviction: true,
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "lengthening toleration shouldn't work",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
awaitForScheduledEviction: true,
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
skipOnWindows: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
if item.skipOnWindows && goruntime.GOOS == "windows" {
// TODO: remove skip once the flaking test has been fixed.
t.Skip("Skip flaking test on Windows.")
}
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.prevPod}})
controller, podIndexer, _ := setupNewController(context.TODO(), fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = item.taintedNodes
go controller.Run(ctx)
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
if item.awaitForScheduledEviction {
nsName := types.NamespacedName{Namespace: item.prevPod.Namespace, Name: item.prevPod.Name}
err := wait.PollImmediate(time.Millisecond*10, time.Second, func() (bool, error) {
scheduledEviction := controller.taintEvictionQueue.GetWorkerUnsafe(nsName.String())
return scheduledEviction != nil, nil
})
if err != nil {
t.Fatalf("Failed to await for scheduled eviction: %q", err)
}
}
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestCreateNode(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
node *corev1.Node
expectPatch bool
expectDelete bool
}{
{
description: "Creating Node matching already assigned Pod",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: testutil.NewNode("node1"),
expectPatch: false,
expectDelete: false,
},
{
description: "Creating tainted Node matching already assigned Pod",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "Creating tainted Node matching already assigned tolerating Pod",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: false,
expectDelete: false,
},
}
for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.node)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(nil, item.node)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
}
}
func TestDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
}
go controller.Run(ctx)
controller.NodeUpdated(testutil.NewNode("node1"), nil)
// await until controller.taintedNodes is empty
err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
controller.taintedNodesLock.Lock()
defer controller.taintedNodesLock.Unlock()
_, ok := controller.taintedNodes["node1"]
return !ok, nil
})
if err != nil {
t.Errorf("Failed to await for processing node deleted: %q", err)
}
cancel()
}
func TestUpdateNode(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectPatch bool
expectDelete bool
additionalSleep time.Duration
}{
{
description: "Added taint, expect node patched and deleted",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "Added tolerated taint",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
{
description: "Only one added taint tolerated",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectPatch: true,
expectDelete: true,
},
{
description: "Taint removed",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
},
oldNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
newNode: testutil.NewNode("node1"),
expectDelete: false,
additionalSleep: 1500 * time.Millisecond,
},
{
description: "Pod with multiple tolerations are evicted when first one runs out",
pods: []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
},
Spec: corev1.PodSpec{
NodeName: "node1",
Tolerations: []corev1.Toleration{
{Key: "testTaint1", Value: "test1", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{1}[0]},
{Key: "testTaint2", Value: "test2", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{100}[0]},
},
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
})
}
}
func TestUpdateNodeWithMultipleTaints(t *testing.T) {
taint1 := createNoExecuteTaint(1)
taint2 := createNoExecuteTaint(2)
minute := int64(60)
pod := testutil.NewPod("pod1", "node1")
pod.Spec.Tolerations = []corev1.Toleration{
{Key: taint1.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute},
{Key: taint2.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &minute},
}
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
untaintedNode := testutil.NewNode("node1")
doubleTaintedNode := testutil.NewNode("node1")
doubleTaintedNode.Spec.Taints = []corev1.Taint{taint1, taint2}
singleTaintedNode := testutil.NewNode("node1")
singleTaintedNode.Spec.Taints = []corev1.Taint{taint1}
ctx, cancel := context.WithCancel(context.TODO())
fakeClientset := fake.NewSimpleClientset(pod)
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
// no taint
nodeIndexer.Add(untaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with no taints")
}
// no taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with permanently tolerated taint")
}
// infinitely tolerated taint -> temporarily tolerated taint
nodeIndexer.Update(doubleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) == nil {
t.Fatalf("pod not queued for deletion after addition of temporarily tolerated taint")
}
// temporarily tolerated taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion after removal of temporarily tolerated taint")
}
// verify pod is not deleted
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
t.Error("Unexpected deletion")
}
}
cancel()
}
func TestUpdateNodeWithMultiplePods(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectedDeleteTimes durationSlice
}{
{
description: "Pods with different toleration times are evicted appropriately",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1"}, 0},
{[]string{"pod2"}, time.Second},
},
},
{
description: "Evict all pods not matching all taints instantly",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1", "pod2", "pod3"}, 0},
},
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
t.Logf("Starting testcase %q", item.description)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes)
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
startedAt := time.Now()
for i := range item.expectedDeleteTimes {
if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp {
// compute a grace duration to give controller time to process updates. Choose big
// enough intervals in the test cases above to avoid flakes.
var increment time.Duration
if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp {
increment = 500 * time.Millisecond
} else {
increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2))
}
sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment
if sleepTime < 0 {
sleepTime = 0
}
t.Logf("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
}
for delay, podName := range item.expectedDeleteTimes[i].names {
deleted := false
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
if podName == deleteAction.GetName() {
deleted = true
}
}
if !deleted {
t.Errorf("Failed to deleted pod %v after %v", podName, delay)
}
}
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
deletedPodName := deleteAction.GetName()
expected := false
for _, podName := range item.expectedDeleteTimes[i].names {
if podName == deletedPodName {
expected = true
}
}
if !expected {
t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName)
}
}
fakeClientset.ClearActions()
}
})
}
}
func TestGetMinTolerationTime(t *testing.T) {
one := int64(1)
two := int64(2)
oneSec := 1 * time.Second
tests := []struct {
tolerations []corev1.Toleration
expected time.Duration
}{
{
tolerations: []corev1.Toleration{},
expected: 0,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: nil,
},
},
expected: -1,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: &two,
},
},
expected: oneSec,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: nil,
},
},
expected: oneSec,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: nil,
},
{
TolerationSeconds: &one,
},
},
expected: oneSec,
},
}
for _, test := range tests {
got := getMinTolerationTime(test.tolerations)
if got != test.expected {
t.Errorf("Incorrect min toleration time: got %v, expected %v", got, test.expected)
}
}
}
// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data
// (e.g. due to watch latency), it will reconcile the remaining pods eventually.
// This scenario is partially covered by TestUpdatePods, but given this is an important
// property of TaintManager, it's better to have explicit test for this.
func TestEventualConsistency(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
prevPod *corev1.Pod
newPod *corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectPatch bool
expectDelete bool
}{
{
description: "existing pod2 scheduled onto tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: testutil.NewPod("pod2", ""),
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "existing pod2 with taint toleration scheduled onto tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100),
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 created on tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 with tait toleration created on tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, podIndexer, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
if item.prevPod != nil {
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
}
// First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet.
controller.NodeUpdated(item.oldNode, item.newNode)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
fakeClientset.ClearActions()
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgressForSanityCheck)
})
}
}
func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) {
t.Helper()
podPatched := false
podDeleted := false
// use Poll instead of PollImmediate to give some processing time to the controller that the expected
// actions are likely to be already sent
err := wait.Poll(10*time.Millisecond, 5*time.Second, func() (bool, error) {
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" {
podPatched = true
}
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
return podPatched == expectPatch && podDeleted == expectDelete, nil
})
if err != nil {
t.Errorf("Failed waiting for the expected actions: %q", err)
}
if podPatched != expectPatch {
t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched)
}
if podDeleted != expectDelete {
t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted)
}
}
// TestPodDeletionEvent Verify that the output events are as expected
func TestPodDeletionEvent(t *testing.T) {
f := func(path cmp.Path) bool {
switch path.String() {
// These fields change at runtime, so ignore it
case "LastTimestamp", "FirstTimestamp", "ObjectMeta.Name":
return true
}
return false
}
t.Run("emitPodDeletionEvent", func(t *testing.T) {
controller := &Controller{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*corev1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: corev1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Marking for deletion Pod test/test",
Source: corev1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
t.Run("emitCancelPodDeletionEvent", func(t *testing.T) {
controller := &Controller{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitCancelPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*corev1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: corev1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Cancelling deletion of Pod test/test",
Source: corev1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
}

View File

@@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainteviction
import (
"k8s.io/apimachinery/pkg/types"
)
// NamespacedObject comprises a resource name with a mandatory namespace
// and optional UID. It gets rendered as "<namespace>/<name>[:<uid>]"
// (text output) or as an object (JSON output).
type NamespacedObject struct {
types.NamespacedName
UID types.UID
}
// String returns the general purpose string representation
func (n NamespacedObject) String() string {
if n.UID != "" {
return n.Namespace + string(types.Separator) + n.Name + ":" + string(n.UID)
}
return n.Namespace + string(types.Separator) + n.Name
}
// MarshalLog emits a struct containing required key/value pair
func (n NamespacedObject) MarshalLog() interface{} {
return struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
UID types.UID `json:"uid,omitempty"`
}{
Name: n.Name,
Namespace: n.Namespace,
UID: n.UID,
}
}

View File

@@ -106,11 +106,11 @@ type Controller struct {
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName)
ns := args.Object.Namespace
name := args.Object.Name
klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.Object)
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
emitEventFunc(args.Object.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {

View File

@@ -28,18 +28,23 @@ import (
// WorkArgs keeps arguments that will be passed to the function executed by the worker.
type WorkArgs struct {
NamespacedName types.NamespacedName
// Object is the work item. The UID is only set if it was set when adding the work item.
Object NamespacedObject
}
// KeyFromWorkArgs creates a key for the given `WorkArgs`
// KeyFromWorkArgs creates a key for the given `WorkArgs`.
//
// The key is the same as the NamespacedName of the object in the work item,
// i.e. the UID is ignored. There cannot be two different
// work items with the same NamespacedName and different UIDs.
func (w *WorkArgs) KeyFromWorkArgs() string {
return w.NamespacedName.String()
return w.Object.NamespacedName.String()
}
// NewWorkArgs is a helper function to create new `WorkArgs`
// NewWorkArgs is a helper function to create new `WorkArgs` without a UID.
func NewWorkArgs(name, namespace string) *WorkArgs {
return &WorkArgs{
NamespacedName: types.NamespacedName{Namespace: namespace, Name: name},
Object: NamespacedObject{NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}},
}
}
@@ -102,31 +107,59 @@ func CreateWorkerQueue(f func(ctx context.Context, fireAt time.Time, args *WorkA
func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Firing worker", "item", key, "firedTime", fireAt)
err := q.workFunc(ctx, fireAt, args)
q.Lock()
defer q.Unlock()
logger.V(4).Info("Worker finished, removing", "item", key, "err", err)
delete(q.workers, key)
return err
}
}
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
// If replace is false, an existing work item will not get replaced, otherwise it
// gets canceled and the new one is added instead.
func (q *TimedWorkerQueue) AddWork(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
logger := klog.FromContext(ctx)
logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
q.Lock()
defer q.Unlock()
if _, exists := q.workers[key]; exists {
logger.Info("Trying to add already existing work, skipping", "args", args)
logger.V(4).Info("Trying to add already existing work, skipping", "item", key, "createTime", createdAt, "firedTime", fireAt)
return
}
logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
worker := createWorker(ctx, args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
q.workers[key] = worker
}
// UpdateWork adds or replaces a work item such that it will be executed not earlier than `fireAt`.
// This is a cheap no-op when the old and new fireAt are the same.
func (q *TimedWorkerQueue) UpdateWork(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
logger := klog.FromContext(ctx)
q.Lock()
defer q.Unlock()
if worker, exists := q.workers[key]; exists {
if worker.FireAt.Compare(fireAt) == 0 {
logger.V(4).Info("Keeping existing work, same time", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt)
return
}
logger.V(4).Info("Replacing existing work", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt)
worker.Cancel()
}
logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
worker := createWorker(ctx, args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
q.workers[key] = worker
}
// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
// The key must be the same as the one returned by WorkArgs.KeyFromWorkArgs, i.e.
// the result of NamespacedName.String.
func (q *TimedWorkerQueue) CancelWork(logger klog.Logger, key string) bool {
q.Lock()
defer q.Unlock()

View File

@@ -95,7 +95,7 @@ var (
// Node with "instance-1" device and no device attributes.
workerNode = &st.MakeNode().Name(nodeName).Label("kubernetes.io/hostname", nodeName).Node
workerNodeSlice = st.MakeResourceSlice(nodeName, driver).Device("instance-1", nil).Obj()
workerNodeSlice = st.MakeResourceSlice(nodeName, driver).Device("instance-1").Obj()
// Node with same device, but now with a "healthy" boolean attribute.
workerNode2 = &st.MakeNode().Name(node2Name).Label("kubernetes.io/hostname", node2Name).Node

View File

@@ -1188,9 +1188,23 @@ func (wrapper *ResourceSliceWrapper) Devices(names ...string) *ResourceSliceWrap
return wrapper
}
// Device sets the devices field of the inner object.
func (wrapper *ResourceSliceWrapper) Device(name string, attrs map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) *ResourceSliceWrapper {
wrapper.Spec.Devices = append(wrapper.Spec.Devices, resourceapi.Device{Name: name, Basic: &resourceapi.BasicDevice{Attributes: attrs}})
// Device extends the devices field of the inner object.
// The device must have a name and may have arbitrary additional fields.
func (wrapper *ResourceSliceWrapper) Device(name string, otherFields ...any) *ResourceSliceWrapper {
device := resourceapi.Device{Name: name, Basic: &resourceapi.BasicDevice{}}
for _, field := range otherFields {
switch typedField := field.(type) {
case map[resourceapi.QualifiedName]resourceapi.DeviceAttribute:
device.Basic.Attributes = typedField
case map[resourceapi.QualifiedName]resourceapi.DeviceCapacity:
device.Basic.Capacity = typedField
case resourceapi.DeviceTaint:
device.Basic.Taints = append(device.Basic.Taints, typedField)
default:
panic(fmt.Sprintf("expected a type which matches a field in BasicDevice, got %T", field))
}
}
wrapper.Spec.Devices = append(wrapper.Spec.Devices, device)
return wrapper
}

View File

@@ -215,6 +215,24 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
eventsRule(),
},
})
if utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints) {
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
// Same name as in k8s.io/kubernetes/cmd/kube-controller-manager/names.
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "device-taint-eviction-controller"},
Rules: []rbacv1.PolicyRule{
// Deletes pods to evict them.
rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
// Sets pod conditions.
rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
// The rest is read-only.
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("resourceslices").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("deviceclasses").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("devicetaintrules").RuleOrDie(),
eventsRule(),
},
})
}
}
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{