NodeController sets NodeTaints instead of deleting Pods

This commit is contained in:
gmarek
2017-02-06 13:58:48 +01:00
parent 46dda7e32a
commit d88af7806c
10 changed files with 534 additions and 94 deletions

View File

@@ -45,6 +45,7 @@ import (
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
@@ -63,6 +64,16 @@ var (
// The minimum kubelet version for which the nodecontroller
// can safely flip pod.Status to NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
UnreachableTaintTemplate = &v1.Taint{
Key: metav1.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
}
NotReadyTaintTemplate = &v1.Taint{
Key: metav1.TaintNodeNotReady,
Effect: v1.TaintEffectNoExecute,
}
)
const (
@@ -132,8 +143,10 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue
podEvictionTimeout time.Duration
zonePodEvictor map[string]*RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNotReadyOrUnreachableTainer map[string]*RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
@@ -166,6 +179,10 @@ type NodeController struct {
// if set to true NodeController will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true NodeController will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@@ -190,7 +207,8 @@ func NewNodeController(
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
runTaintManager bool) (*NodeController, error) {
runTaintManager bool,
useTaintBasedEvictions bool) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
@@ -216,30 +234,32 @@ func NewNodeController(
}
nc := &NodeController{
cloud: cloud,
knownNodeSet: make(map[string]*v1.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: metav1.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
runTaintManager: runTaintManager,
cloud: cloud,
knownNodeSet: make(map[string]*v1.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneNotReadyOrUnreachableTainer: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
lookupIP: net.LookupIP,
now: metav1.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@@ -426,38 +446,100 @@ func (nc *NodeController) Run() {
go nc.taintManager.Run(wait.NeverStop)
}
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zoneNotReadyOrUnreachableTainer {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNotReadyOrUnreachableTainer[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
return true, 0
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
// retry in 50 millisecond
return false, 50 * time.Millisecond
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
_, condition := v1.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
if condition.Status == v1.ConditionFalse {
taintToAdd = *NotReadyTaintTemplate
oppositeTaint = *UnreachableTaintTemplate
} else if condition.Status == v1.ConditionUnknown {
taintToAdd = *UnreachableTaintTemplate
oppositeTaint = *NotReadyTaintTemplate
} else {
// It seems that the Node is ready again, so there's no need to taint it.
return true, 0
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to NodeController eviction")
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
taintToAdd.TimeAdded = metav1.Now()
err = controller.AddOrUpdateTaintOnNode(nc.kubeClient, value.Value, &taintToAdd)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to taint %v unresponsive Node %q: %v",
taintToAdd.Key,
value.Value,
err))
return false, 0
}
err = controller.RemoveTaintOffNode(nc.kubeClient, value.Value, &oppositeTaint, node)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %v unneeded taint from unresponsive Node %q: %v",
oppositeTaint.Key,
value.Value,
err))
return false, 0
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
} else if err != nil {
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.Infof("Pods awaiting deletion due to NodeController eviction")
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
}
}()
}
@@ -478,15 +560,26 @@ func (nc *NodeController) monitorNodeStatus() error {
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
} else {
nc.zoneNotReadyOrUnreachableTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
EvictionsNumber.WithLabelValues(zone).Add(0)
}
nc.cancelPodEviction(added[i])
if nc.useTaintBasedEvictions {
nc.markNodeAsHealthy(added[i])
} else {
nc.cancelPodEviction(added[i])
}
}
for i := range deleted {
@@ -532,21 +625,61 @@ func (nc *NodeController) monitorNodeStatus() error {
decisionTimestamp := nc.now()
if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
glog.V(2).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
if observedReadyCondition.Status == v1.ConditionFalse {
if nc.useTaintBasedEvictions {
if nc.markNodeForTainting(node) {
glog.V(2).Infof("Tainting Node %v with NotReady taint on %v",
node.Name,
decisionTimestamp,
)
}
} else {
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
glog.V(2).Infof("Evicting pods on node %s: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nc.nodeStatusMap[node.Name].readyTransitionTimestamp,
nc.podEvictionTimeout,
)
}
}
}
}
if observedReadyCondition.Status == v1.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
glog.V(2).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
if observedReadyCondition.Status == v1.ConditionUnknown {
if nc.useTaintBasedEvictions {
if nc.markNodeForTainting(node) {
glog.V(2).Infof("Tainting Node %v with NotReady taint on %v",
node.Name,
decisionTimestamp,
)
}
} else {
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node) {
glog.V(2).Infof("Evicting pods on node %s: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nc.nodeStatusMap[node.Name].readyTransitionTimestamp,
nc.podEvictionTimeout-gracePeriod,
)
}
}
}
}
if observedReadyCondition.Status == v1.ConditionTrue {
if nc.cancelPodEviction(node) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
if nc.useTaintBasedEvictions {
removed, err := nc.markNodeAsHealthy(node)
if err != nil {
glog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
if removed {
glog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
}
} else {
if nc.cancelPodEviction(node) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
}
@@ -600,6 +733,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
@@ -629,11 +763,22 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
if allAreFullyDisrupted {
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
nc.cancelPodEviction(nodes[i])
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsHealthy(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
nc.cancelPodEviction(nodes[i])
}
}
// We stop all evictions.
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].SwapLimiter(0)
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
@@ -653,7 +798,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
nc.nodeStatusMap[nodes[i].Name] = v
}
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zonePodEvictor {
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
@@ -676,13 +821,27 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
switch state {
case stateNormal:
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
case statePartialDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
case stateFullDisruption:
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
@@ -898,6 +1057,28 @@ func (nc *NodeController) evictPods(node *v1.Node) bool {
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *NodeController) markNodeForTainting(node *v1.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.zoneNotReadyOrUnreachableTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *NodeController) markNodeAsHealthy(node *v1.Node) (bool, error) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, UnreachableTaintTemplate, node)
if err != nil {
glog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return false, err
}
err = controller.RemoveTaintOffNode(nc.kubeClient, node.Name, NotReadyTaintTemplate, node)
if err != nil {
glog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return false, err
}
return nc.zoneNotReadyOrUnreachableTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS

View File

@@ -100,6 +100,7 @@ func NewNodeControllerFromClient(
nodeCIDRMaskSize,
allocateNodeCIDRs,
false,
false,
)
if err != nil {
return nil, err
@@ -573,11 +574,15 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
return true, 0
})
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
t.Fatalf("Zone %v was unitialized!", zone)
}
}
podEvicted := false