mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Parallelize taint manager
This commit is contained in:
		@@ -18,6 +18,8 @@ package scheduler
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"hash/fnv"
 | 
			
		||||
	"io"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -58,6 +60,32 @@ type podUpdateItem struct {
 | 
			
		||||
	newTolerations []v1.Toleration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *nodeUpdateItem) name() string {
 | 
			
		||||
	if n.newNode != nil {
 | 
			
		||||
		return n.newNode.ObjectMeta.Name
 | 
			
		||||
	}
 | 
			
		||||
	if n.oldNode != nil {
 | 
			
		||||
		return n.oldNode.ObjectMeta.Name
 | 
			
		||||
	}
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *podUpdateItem) nodeName() string {
 | 
			
		||||
	if p.newPod != nil {
 | 
			
		||||
		return p.newPod.Spec.NodeName
 | 
			
		||||
	}
 | 
			
		||||
	if p.oldPod != nil {
 | 
			
		||||
		return p.oldPod.Spec.NodeName
 | 
			
		||||
	}
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func hash(val string) int {
 | 
			
		||||
	hasher := fnv.New32a()
 | 
			
		||||
	io.WriteString(hasher, val)
 | 
			
		||||
	return int(hasher.Sum32())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
 | 
			
		||||
// from Nodes tainted with NoExecute Taints.
 | 
			
		||||
type NoExecuteTaintManager struct {
 | 
			
		||||
@@ -69,8 +97,8 @@ type NoExecuteTaintManager struct {
 | 
			
		||||
	taintedNodesLock sync.Mutex
 | 
			
		||||
	taintedNodes     map[string][]v1.Taint
 | 
			
		||||
 | 
			
		||||
	nodeUpdateChannel chan *nodeUpdateItem
 | 
			
		||||
	podUpdateChannel  chan *podUpdateItem
 | 
			
		||||
	nodeUpdateChannels []chan *nodeUpdateItem
 | 
			
		||||
	podUpdateChannels  []chan *podUpdateItem
 | 
			
		||||
 | 
			
		||||
	nodeUpdateQueue workqueue.Interface
 | 
			
		||||
	podUpdateQueue  workqueue.Interface
 | 
			
		||||
@@ -160,11 +188,9 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tm := &NoExecuteTaintManager{
 | 
			
		||||
		client:            c,
 | 
			
		||||
		recorder:          recorder,
 | 
			
		||||
		taintedNodes:      make(map[string][]v1.Taint),
 | 
			
		||||
		nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
 | 
			
		||||
		podUpdateChannel:  make(chan *podUpdateItem, podUpdateChannelSize),
 | 
			
		||||
		client:       c,
 | 
			
		||||
		recorder:     recorder,
 | 
			
		||||
		taintedNodes: make(map[string][]v1.Taint),
 | 
			
		||||
 | 
			
		||||
		nodeUpdateQueue: workqueue.New(),
 | 
			
		||||
		podUpdateQueue:  workqueue.New(),
 | 
			
		||||
@@ -177,6 +203,15 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
 | 
			
		||||
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
 | 
			
		||||
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	glog.V(0).Infof("Starting NoExecuteTaintManager")
 | 
			
		||||
 | 
			
		||||
	// TODO: Figure out a reasonable number of workers and propagate the
 | 
			
		||||
	// number of workers up making it a paramater of Run() function.
 | 
			
		||||
	workers := 8
 | 
			
		||||
	for i := 0; i < workers; i++ {
 | 
			
		||||
		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, nodeUpdateChannelSize))
 | 
			
		||||
		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Functions that are responsible for taking work items out of the workqueues and putting them
 | 
			
		||||
	// into channels.
 | 
			
		||||
	go func(stopCh <-chan struct{}) {
 | 
			
		||||
@@ -186,10 +221,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			nodeUpdate := item.(*nodeUpdateItem)
 | 
			
		||||
			hash := hash(nodeUpdate.name())
 | 
			
		||||
			select {
 | 
			
		||||
			case <-stopCh:
 | 
			
		||||
				break
 | 
			
		||||
			case tc.nodeUpdateChannel <- nodeUpdate:
 | 
			
		||||
			case tc.nodeUpdateChannels[hash%workers] <- nodeUpdate:
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}(stopCh)
 | 
			
		||||
@@ -201,14 +237,26 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			podUpdate := item.(*podUpdateItem)
 | 
			
		||||
			hash := hash(podUpdate.nodeName())
 | 
			
		||||
			select {
 | 
			
		||||
			case <-stopCh:
 | 
			
		||||
				break
 | 
			
		||||
			case tc.podUpdateChannel <- podUpdate:
 | 
			
		||||
			case tc.podUpdateChannels[hash%workers] <- podUpdate:
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}(stopCh)
 | 
			
		||||
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	wg.Add(workers)
 | 
			
		||||
	for i := 0; i < workers; i++ {
 | 
			
		||||
		go tc.worker(i, wg.Done, stopCh)
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
 | 
			
		||||
	defer done()
 | 
			
		||||
 | 
			
		||||
	// When processing events we want to prioritize Node updates over Pod updates,
 | 
			
		||||
	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
 | 
			
		||||
	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
 | 
			
		||||
@@ -216,15 +264,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-stopCh:
 | 
			
		||||
			break
 | 
			
		||||
		case nodeUpdate := <-tc.nodeUpdateChannel:
 | 
			
		||||
			return
 | 
			
		||||
		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
 | 
			
		||||
			tc.handleNodeUpdate(nodeUpdate)
 | 
			
		||||
		case podUpdate := <-tc.podUpdateChannel:
 | 
			
		||||
		case podUpdate := <-tc.podUpdateChannels[worker]:
 | 
			
		||||
			// If we found a Pod update we need to empty Node queue first.
 | 
			
		||||
		priority:
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case nodeUpdate := <-tc.nodeUpdateChannel:
 | 
			
		||||
				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
 | 
			
		||||
					tc.handleNodeUpdate(nodeUpdate)
 | 
			
		||||
				default:
 | 
			
		||||
					break priority
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user