mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	@@ -49,6 +49,7 @@ import (
 | 
			
		||||
	certcontroller "k8s.io/kubernetes/pkg/controller/certificates"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/daemon"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/deployment"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/disruption"
 | 
			
		||||
	endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/framework/informers"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/garbagecollector"
 | 
			
		||||
@@ -367,6 +368,18 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	groupVersion = "policy/v1alpha1"
 | 
			
		||||
	resources, found = resourceMap[groupVersion]
 | 
			
		||||
	glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap)
 | 
			
		||||
	if containsVersion(versions, groupVersion) && found {
 | 
			
		||||
		glog.Infof("Starting %s apis", groupVersion)
 | 
			
		||||
		if containsResource(resources, "poddisruptionbudgets") {
 | 
			
		||||
			glog.Infof("Starting disruption controller")
 | 
			
		||||
			go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), kubeClient).Run(wait.NeverStop)
 | 
			
		||||
			time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	groupVersion = "apps/v1alpha1"
 | 
			
		||||
	resources, found = resourceMap[groupVersion]
 | 
			
		||||
	glog.Infof("Attempting to start petset, full resource map %+v", resourceMap)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										42
									
								
								pkg/client/cache/listers.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										42
									
								
								pkg/client/cache/listers.go
									
									
									
									
										vendored
									
									
								
							@@ -26,6 +26,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/batch"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/certificates"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/extensions"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/policy"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -710,3 +711,44 @@ func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []
 | 
			
		||||
 | 
			
		||||
	return namespaces, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type StoreToPodDisruptionBudgetLister struct {
 | 
			
		||||
	Store
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetPodPodDisruptionBudgets returns a list of PodDisruptionBudgets matching a pod.  Returns an error only if no matching PodDisruptionBudgets are found.
 | 
			
		||||
func (s *StoreToPodDisruptionBudgetLister) GetPodPodDisruptionBudgets(pod *api.Pod) (pdbList []policy.PodDisruptionBudget, err error) {
 | 
			
		||||
	var selector labels.Selector
 | 
			
		||||
 | 
			
		||||
	if len(pod.Labels) == 0 {
 | 
			
		||||
		err = fmt.Errorf("no PodDisruptionBudgets found for pod %v because it has no labels", pod.Name)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, m := range s.Store.List() {
 | 
			
		||||
		pdb, ok := m.(*policy.PodDisruptionBudget)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.Errorf("Unexpected: %v is not a PodDisruptionBudget", m)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if pdb.Namespace != pod.Namespace {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		selector, err = unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Warningf("invalid selector: %v", err)
 | 
			
		||||
			// TODO(mml): add an event to the PDB
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// If a PDB with a nil or empty selector creeps in, it should match nothing, not everything.
 | 
			
		||||
		if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		pdbList = append(pdbList, *pdb)
 | 
			
		||||
	}
 | 
			
		||||
	if len(pdbList) == 0 {
 | 
			
		||||
		err = fmt.Errorf("could not find PodDisruptionBudget for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -183,6 +183,10 @@ func (c *Client) Rbac() RbacInterface {
 | 
			
		||||
	return c.RbacClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Client) Policy() PolicyInterface {
 | 
			
		||||
	return c.PolicyClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Client) Discovery() discovery.DiscoveryInterface {
 | 
			
		||||
	return c.DiscoveryClient
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										608
									
								
								pkg/controller/disruption/disruption.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										608
									
								
								pkg/controller/disruption/disruption.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,608 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package disruption
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/extensions"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/policy"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/record"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/framework"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/types"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/intstr"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/workqueue"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const statusUpdateRetries = 2
 | 
			
		||||
 | 
			
		||||
type updater func(*policy.PodDisruptionBudget) error
 | 
			
		||||
 | 
			
		||||
type DisruptionController struct {
 | 
			
		||||
	kubeClient *client.Client
 | 
			
		||||
 | 
			
		||||
	pdbStore      cache.Store
 | 
			
		||||
	pdbController *framework.Controller
 | 
			
		||||
	pdbLister     cache.StoreToPodDisruptionBudgetLister
 | 
			
		||||
 | 
			
		||||
	podController framework.ControllerInterface
 | 
			
		||||
	podLister     cache.StoreToPodLister
 | 
			
		||||
 | 
			
		||||
	rcIndexer    cache.Indexer
 | 
			
		||||
	rcController *framework.Controller
 | 
			
		||||
	rcLister     cache.StoreToReplicationControllerLister
 | 
			
		||||
 | 
			
		||||
	rsStore      cache.Store
 | 
			
		||||
	rsController *framework.Controller
 | 
			
		||||
	rsLister     cache.StoreToReplicaSetLister
 | 
			
		||||
 | 
			
		||||
	dStore      cache.Store
 | 
			
		||||
	dController *framework.Controller
 | 
			
		||||
	dLister     cache.StoreToDeploymentLister
 | 
			
		||||
 | 
			
		||||
	queue *workqueue.Type
 | 
			
		||||
 | 
			
		||||
	broadcaster record.EventBroadcaster
 | 
			
		||||
	recorder    record.EventRecorder
 | 
			
		||||
 | 
			
		||||
	getUpdater func() updater
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// controllerAndScale is used to return (controller, scale) pairs from the
 | 
			
		||||
// controller finder functions.
 | 
			
		||||
type controllerAndScale struct {
 | 
			
		||||
	types.UID
 | 
			
		||||
	scale int32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// podControllerFinder is a function type that maps a pod to a list of
 | 
			
		||||
// controllers and their scale.
 | 
			
		||||
type podControllerFinder func(*api.Pod) ([]controllerAndScale, error)
 | 
			
		||||
 | 
			
		||||
func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClient *client.Client) *DisruptionController {
 | 
			
		||||
	dc := &DisruptionController{
 | 
			
		||||
		kubeClient:    kubeClient,
 | 
			
		||||
		podController: podInformer.GetController(),
 | 
			
		||||
		queue:         workqueue.New(),
 | 
			
		||||
		broadcaster:   record.NewBroadcaster(),
 | 
			
		||||
	}
 | 
			
		||||
	dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
 | 
			
		||||
 | 
			
		||||
	dc.getUpdater = func() updater { return dc.writePdbStatus }
 | 
			
		||||
 | 
			
		||||
	dc.podLister.Indexer = podInformer.GetIndexer()
 | 
			
		||||
 | 
			
		||||
	podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
 | 
			
		||||
		AddFunc:    dc.addPod,
 | 
			
		||||
		UpdateFunc: dc.updatePod,
 | 
			
		||||
		DeleteFunc: dc.deletePod,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	dc.pdbStore, dc.pdbController = framework.NewInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&policy.PodDisruptionBudget{},
 | 
			
		||||
		30*time.Second,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{
 | 
			
		||||
			AddFunc:    dc.addDb,
 | 
			
		||||
			UpdateFunc: dc.updateDb,
 | 
			
		||||
			DeleteFunc: dc.removeDb,
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	dc.pdbLister.Store = dc.pdbStore
 | 
			
		||||
 | 
			
		||||
	dc.rcIndexer, dc.rcController = framework.NewIndexerInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return dc.kubeClient.ReplicationControllers(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return dc.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&api.ReplicationController{},
 | 
			
		||||
		30*time.Second,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{},
 | 
			
		||||
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	dc.rcLister.Indexer = dc.rcIndexer
 | 
			
		||||
 | 
			
		||||
	dc.rsStore, dc.rsController = framework.NewInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&extensions.ReplicaSet{},
 | 
			
		||||
		30*time.Second,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	dc.rsLister.Store = dc.rsStore
 | 
			
		||||
 | 
			
		||||
	dc.dStore, dc.dController = framework.NewInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
			
		||||
				return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&extensions.Deployment{},
 | 
			
		||||
		30*time.Second,
 | 
			
		||||
		framework.ResourceEventHandlerFuncs{},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	dc.dLister.Store = dc.dStore
 | 
			
		||||
 | 
			
		||||
	return dc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO(mml): When controllerRef is implemented (#2210), we *could* simply
 | 
			
		||||
// return controllers without their scales, and access scale type-generically
 | 
			
		||||
// via the scale subresource.  That may not be as much of a win as it sounds,
 | 
			
		||||
// however.  We are accessing everything through the pkg/client/cache API that
 | 
			
		||||
// we have to set up and tune to the types we know we'll be accessing anyway,
 | 
			
		||||
// and we may well need further tweaks just to be able to access scale
 | 
			
		||||
// subresources.
 | 
			
		||||
func (dc *DisruptionController) finders() []podControllerFinder {
 | 
			
		||||
	return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getPodReplicaSets finds replicasets which have no matching deployments.
 | 
			
		||||
func (dc *DisruptionController) getPodReplicaSets(pod *api.Pod) ([]controllerAndScale, error) {
 | 
			
		||||
	cas := []controllerAndScale{}
 | 
			
		||||
	rss, err := dc.rsLister.GetPodReplicaSets(pod)
 | 
			
		||||
	// GetPodReplicaSets returns an error only if no ReplicaSets are found.  We
 | 
			
		||||
	// don't return that as an error to the caller.
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cas, nil
 | 
			
		||||
	}
 | 
			
		||||
	controllerScale := map[types.UID]int32{}
 | 
			
		||||
	for _, rs := range rss {
 | 
			
		||||
		// GetDeploymentsForReplicaSet returns an error only if no matching
 | 
			
		||||
		// deployments are found.
 | 
			
		||||
		_, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
 | 
			
		||||
		if err == nil { // A deployment was found, so this finder will not count this RS.
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		controllerScale[rs.UID] = rs.Spec.Replicas
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for uid, scale := range controllerScale {
 | 
			
		||||
		cas = append(cas, controllerAndScale{UID: uid, scale: scale})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return cas, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
 | 
			
		||||
func (dc *DisruptionController) getPodDeployments(pod *api.Pod) ([]controllerAndScale, error) {
 | 
			
		||||
	cas := []controllerAndScale{}
 | 
			
		||||
	rss, err := dc.rsLister.GetPodReplicaSets(pod)
 | 
			
		||||
	// GetPodReplicaSets returns an error only if no ReplicaSets are found.  We
 | 
			
		||||
	// don't return that as an error to the caller.
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cas, nil
 | 
			
		||||
	}
 | 
			
		||||
	controllerScale := map[types.UID]int32{}
 | 
			
		||||
	for _, rs := range rss {
 | 
			
		||||
		ds, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
 | 
			
		||||
		// GetDeploymentsForReplicaSet returns an error only if no matching
 | 
			
		||||
		// deployments are found.  In that case we skip this ReplicaSet.
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		for _, d := range ds {
 | 
			
		||||
			controllerScale[d.UID] = d.Spec.Replicas
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for uid, scale := range controllerScale {
 | 
			
		||||
		cas = append(cas, controllerAndScale{UID: uid, scale: scale})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return cas, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) getPodReplicationControllers(pod *api.Pod) ([]controllerAndScale, error) {
 | 
			
		||||
	cas := []controllerAndScale{}
 | 
			
		||||
	rcs, err := dc.rcLister.GetPodControllers(pod)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		for _, rc := range rcs {
 | 
			
		||||
			cas = append(cas, controllerAndScale{UID: rc.UID, scale: rc.Spec.Replicas})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return cas, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	glog.V(0).Infof("Starting disruption controller")
 | 
			
		||||
	if dc.kubeClient != nil {
 | 
			
		||||
		glog.V(0).Infof("Sending events to api server.")
 | 
			
		||||
		dc.broadcaster.StartRecordingToSink(dc.kubeClient.Events(""))
 | 
			
		||||
	} else {
 | 
			
		||||
		glog.V(0).Infof("No api server defined - no events will be sent to API server.")
 | 
			
		||||
	}
 | 
			
		||||
	go dc.pdbController.Run(stopCh)
 | 
			
		||||
	go dc.podController.Run(stopCh)
 | 
			
		||||
	go dc.rcController.Run(stopCh)
 | 
			
		||||
	go dc.rsController.Run(stopCh)
 | 
			
		||||
	go dc.dController.Run(stopCh)
 | 
			
		||||
	go wait.Until(dc.worker, time.Second, stopCh)
 | 
			
		||||
	<-stopCh
 | 
			
		||||
	glog.V(0).Infof("Shutting down disruption controller")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) addDb(obj interface{}) {
 | 
			
		||||
	pdb := obj.(*policy.PodDisruptionBudget)
 | 
			
		||||
	glog.V(4).Infof("add DB %q", pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) updateDb(old, cur interface{}) {
 | 
			
		||||
	// TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
 | 
			
		||||
	pdb := cur.(*policy.PodDisruptionBudget)
 | 
			
		||||
	glog.V(4).Infof("update DB %q", pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) removeDb(obj interface{}) {
 | 
			
		||||
	pdb := obj.(*policy.PodDisruptionBudget)
 | 
			
		||||
	glog.V(4).Infof("remove DB %q", pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) addPod(obj interface{}) {
 | 
			
		||||
	pod := obj.(*api.Pod)
 | 
			
		||||
	glog.V(4).Infof("addPod called on pod %q", pod.Name)
 | 
			
		||||
	pdb := dc.getPdbForPod(pod)
 | 
			
		||||
	if pdb == nil {
 | 
			
		||||
		glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) updatePod(old, cur interface{}) {
 | 
			
		||||
	pod := cur.(*api.Pod)
 | 
			
		||||
	glog.V(4).Infof("updatePod called on pod %q", pod.Name)
 | 
			
		||||
	pdb := dc.getPdbForPod(pod)
 | 
			
		||||
	if pdb == nil {
 | 
			
		||||
		glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) deletePod(obj interface{}) {
 | 
			
		||||
	pod, ok := obj.(*api.Pod)
 | 
			
		||||
	// When a delete is dropped, the relist will notice a pod in the store not
 | 
			
		||||
	// in the list, leading to the insertion of a tombstone object which contains
 | 
			
		||||
	// the deleted key/value. Note that this value might be stale. If the pod
 | 
			
		||||
	// changed labels the new ReplicaSet will not be woken up till the periodic
 | 
			
		||||
	// resync.
 | 
			
		||||
	if !ok {
 | 
			
		||||
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.Errorf("Couldn't get object from tombstone %+v", obj)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		pod, ok = tombstone.Obj.(*api.Pod)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("deletePod called on pod %q", pod.Name)
 | 
			
		||||
	pdb := dc.getPdbForPod(pod)
 | 
			
		||||
	if pdb == nil {
 | 
			
		||||
		glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
 | 
			
		||||
	dc.enqueuePdb(pdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
 | 
			
		||||
	key, err := controller.KeyFunc(pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	dc.queue.Add(key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget {
 | 
			
		||||
	// GetPodPodDisruptionBudgets returns an error only if no
 | 
			
		||||
	// PodDisruptionBudgets are found.  We don't return that as an error to the
 | 
			
		||||
	// caller.
 | 
			
		||||
	pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.V(0).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(pdbs) > 1 {
 | 
			
		||||
		msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets.  Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
 | 
			
		||||
		glog.Warning(msg)
 | 
			
		||||
		dc.recorder.Event(pod, api.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
 | 
			
		||||
	}
 | 
			
		||||
	return &pdbs[0]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*api.Pod, error) {
 | 
			
		||||
	sel, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
 | 
			
		||||
	if sel.Empty() {
 | 
			
		||||
		return []*api.Pod{}, nil
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return []*api.Pod{}, err
 | 
			
		||||
	}
 | 
			
		||||
	podList, err := dc.podLister.Pods(pdb.Namespace).List(sel)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return []*api.Pod{}, err
 | 
			
		||||
	}
 | 
			
		||||
	pods := []*api.Pod{}
 | 
			
		||||
	for i := range podList.Items {
 | 
			
		||||
		pod := podList.Items[i]
 | 
			
		||||
		pods = append(pods, &pod)
 | 
			
		||||
	}
 | 
			
		||||
	return pods, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) worker() {
 | 
			
		||||
	work := func() bool {
 | 
			
		||||
		key, quit := dc.queue.Get()
 | 
			
		||||
		if quit {
 | 
			
		||||
			return quit
 | 
			
		||||
		}
 | 
			
		||||
		defer dc.queue.Done(key)
 | 
			
		||||
		glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string))
 | 
			
		||||
		if err := dc.sync(key.(string)); err != nil {
 | 
			
		||||
			glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err)
 | 
			
		||||
			// TODO(mml): In order to be safe in the face of a total inability to write state
 | 
			
		||||
			// changes, we should write an expiration timestamp here and consumers
 | 
			
		||||
			// of the PDB state (the /evict subresource handler) should check that
 | 
			
		||||
			// any 'true' state is relatively fresh.
 | 
			
		||||
 | 
			
		||||
			// TODO(mml): file an issue to that effect
 | 
			
		||||
 | 
			
		||||
			// TODO(mml): If we used a workqueue.RateLimitingInterface, we could
 | 
			
		||||
			// improve our behavior (be a better citizen) when we need to retry.
 | 
			
		||||
			dc.queue.Add(key)
 | 
			
		||||
		}
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		if quit := work(); quit {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) sync(key string) error {
 | 
			
		||||
	startTime := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime))
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	obj, exists, err := dc.pdbLister.Store.GetByKey(key)
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pdb := obj.(*policy.PodDisruptionBudget)
 | 
			
		||||
 | 
			
		||||
	if err := dc.trySync(pdb); err != nil {
 | 
			
		||||
		return dc.failSafe(pdb)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
 | 
			
		||||
	pods, err := dc.getPodsForPdb(pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	currentHealthy := countHealthyPods(pods)
 | 
			
		||||
	err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*api.Pod) (expectedCount, desiredHealthy int32, err error) {
 | 
			
		||||
	err = nil
 | 
			
		||||
	if pdb.Spec.MinAvailable.Type == intstr.Int {
 | 
			
		||||
		desiredHealthy = pdb.Spec.MinAvailable.IntVal
 | 
			
		||||
		expectedCount = int32(len(pods))
 | 
			
		||||
	} else if pdb.Spec.MinAvailable.Type == intstr.String {
 | 
			
		||||
		// When the user specifies a fraction of pods that must be available, we
 | 
			
		||||
		// use as the fraction's denominator
 | 
			
		||||
		// SUM_{all c in C} scale(c)
 | 
			
		||||
		// where C is the union of C_p1, C_p2, ..., C_pN
 | 
			
		||||
		// and each C_pi is the set of controllers controlling the pod pi
 | 
			
		||||
 | 
			
		||||
		// k8s only defines what will happens when 0 or 1 controllers control a
 | 
			
		||||
		// given pod.  We explicitly exclude the 0 controllers case here, and we
 | 
			
		||||
		// report an error if we find a pod with more than 1 controller.  Thus in
 | 
			
		||||
		// practice each C_pi is a set of exactly 1 controller.
 | 
			
		||||
 | 
			
		||||
		// A mapping from controllers to their scale.
 | 
			
		||||
		controllerScale := map[types.UID]int32{}
 | 
			
		||||
 | 
			
		||||
		// 1. Find the controller(s) for each pod.  If any pod has 0 controllers,
 | 
			
		||||
		// that's an error.  If any pod has more than 1 controller, that's also an
 | 
			
		||||
		// error.
 | 
			
		||||
		for _, pod := range pods {
 | 
			
		||||
			controllerCount := 0
 | 
			
		||||
			for _, finder := range dc.finders() {
 | 
			
		||||
				var controllers []controllerAndScale
 | 
			
		||||
				controllers, err = finder(pod)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				for _, controller := range controllers {
 | 
			
		||||
					controllerScale[controller.UID] = controller.scale
 | 
			
		||||
					controllerCount++
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if controllerCount == 0 {
 | 
			
		||||
				err = fmt.Errorf("asked for percentage, but found no controllers for pod %q", pod.Name)
 | 
			
		||||
				return
 | 
			
		||||
			} else if controllerCount > 1 {
 | 
			
		||||
				err = fmt.Errorf("pod %q has %v>1 controllers", pod.Name, controllerCount)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 2. Add up all the controllers.
 | 
			
		||||
		expectedCount = 0
 | 
			
		||||
		for _, count := range controllerScale {
 | 
			
		||||
			expectedCount += count
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 3. Do the math.
 | 
			
		||||
		var dh int
 | 
			
		||||
		dh, err = intstr.GetValueFromIntOrPercent(&pdb.Spec.MinAvailable, int(expectedCount), true)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		desiredHealthy = int32(dh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func countHealthyPods(pods []*api.Pod) (currentHealthy int32) {
 | 
			
		||||
Pod:
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		for _, c := range pod.Status.Conditions {
 | 
			
		||||
			if c.Type == api.PodReady && c.Status == api.ConditionTrue {
 | 
			
		||||
				currentHealthy++
 | 
			
		||||
				continue Pod
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// failSafe is an attempt to at least update the PodDisruptionAllowed field to
 | 
			
		||||
// false if everything something else has failed.  This is one place we
 | 
			
		||||
// implement the  "fail open" part of the design since if we manage to update
 | 
			
		||||
// this field correctly, we will prevent the /evict handler from approving an
 | 
			
		||||
// eviction when it may be unsafe to do so.
 | 
			
		||||
func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error {
 | 
			
		||||
	obj, err := api.Scheme.DeepCopy(*pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	newPdb := obj.(policy.PodDisruptionBudget)
 | 
			
		||||
	newPdb.Status.PodDisruptionAllowed = false
 | 
			
		||||
 | 
			
		||||
	return dc.getUpdater()(&newPdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error {
 | 
			
		||||
	// We require expectedCount to be > 0 so that PDBs which currently match no
 | 
			
		||||
	// pods are in a safe state when their first pods appear but this controller
 | 
			
		||||
	// has not updated their status yet.  This isn't the only race, but it's a
 | 
			
		||||
	// common one that's easy to detect.
 | 
			
		||||
	disruptionAllowed := currentHealthy >= desiredHealthy && expectedCount > 0
 | 
			
		||||
 | 
			
		||||
	if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionAllowed == disruptionAllowed {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	obj, err := api.Scheme.DeepCopy(*pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	newPdb := obj.(policy.PodDisruptionBudget)
 | 
			
		||||
 | 
			
		||||
	newPdb.Status = policy.PodDisruptionBudgetStatus{
 | 
			
		||||
		CurrentHealthy:       currentHealthy,
 | 
			
		||||
		DesiredHealthy:       desiredHealthy,
 | 
			
		||||
		ExpectedPods:         expectedCount,
 | 
			
		||||
		PodDisruptionAllowed: disruptionAllowed,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return dc.getUpdater()(&newPdb)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// refresh tries to re-GET the given PDB.  If there are any errors, it just
 | 
			
		||||
// returns the old PDB.  Intended to be used in a retry loop where it runs a
 | 
			
		||||
// bounded number of times.
 | 
			
		||||
func refresh(pdbClient client.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget {
 | 
			
		||||
	newPdb, err := pdbClient.Get(pdb.Name)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		return newPdb
 | 
			
		||||
	} else {
 | 
			
		||||
		return pdb
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
 | 
			
		||||
	pdbClient := dc.kubeClient.Policy().PodDisruptionBudgets(pdb.Namespace)
 | 
			
		||||
	st := pdb.Status
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) {
 | 
			
		||||
		pdb.Status = st
 | 
			
		||||
		if _, err = pdbClient.UpdateStatus(pdb); err == nil {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										488
									
								
								pkg/controller/disruption/disruption_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										488
									
								
								pkg/controller/disruption/disruption_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,488 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package disruption
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/extensions"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/policy"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/intstr"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/uuid"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type pdbStates map[string]policy.PodDisruptionBudget
 | 
			
		||||
 | 
			
		||||
func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error {
 | 
			
		||||
	key, err := controller.KeyFunc(pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	obj, err := api.Scheme.DeepCopy(*pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	(*ps)[key] = obj.(policy.PodDisruptionBudget)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget {
 | 
			
		||||
	return (*ps)[key]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionAllowed bool, currentHealthy, desiredHealthy, expectedPods int32) {
 | 
			
		||||
	expectedStatus := policy.PodDisruptionBudgetStatus{
 | 
			
		||||
		PodDisruptionAllowed: disruptionAllowed,
 | 
			
		||||
		CurrentHealthy:       currentHealthy,
 | 
			
		||||
		DesiredHealthy:       desiredHealthy,
 | 
			
		||||
		ExpectedPods:         expectedPods,
 | 
			
		||||
	}
 | 
			
		||||
	actualStatus := ps.Get(key).Status
 | 
			
		||||
	if !reflect.DeepEqual(actualStatus, expectedStatus) {
 | 
			
		||||
		t.Fatalf("PDB %q status mismatch.  Expected %+v but got %+v.", key, expectedStatus, actualStatus)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptionAllowed bool) {
 | 
			
		||||
	pdb := ps.Get(key)
 | 
			
		||||
	if pdb.Status.PodDisruptionAllowed != disruptionAllowed {
 | 
			
		||||
		t.Fatalf("PodDisruptionAllowed mismatch for PDB %q.  Expected %v but got %v.", key, disruptionAllowed, pdb.Status.PodDisruptionAllowed)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
 | 
			
		||||
	ps := &pdbStates{}
 | 
			
		||||
 | 
			
		||||
	dc := &DisruptionController{
 | 
			
		||||
		pdbLister:  cache.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)},
 | 
			
		||||
		podLister:  cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
 | 
			
		||||
		rcLister:   cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
 | 
			
		||||
		rsLister:   cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)},
 | 
			
		||||
		dLister:    cache.StoreToDeploymentLister{Store: cache.NewStore(controller.KeyFunc)},
 | 
			
		||||
		getUpdater: func() updater { return ps.Set },
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return dc, ps
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func fooBar() map[string]string {
 | 
			
		||||
	return map[string]string{"foo": "bar"}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newSel(labels map[string]string) *unversioned.LabelSelector {
 | 
			
		||||
	return &unversioned.LabelSelector{MatchLabels: labels}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newSelFooBar() *unversioned.LabelSelector {
 | 
			
		||||
	return newSel(map[string]string{"foo": "bar"})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newPodDisruptionBudget(t *testing.T, minAvailable intstr.IntOrString) (*policy.PodDisruptionBudget, string) {
 | 
			
		||||
 | 
			
		||||
	pdb := &policy.PodDisruptionBudget{
 | 
			
		||||
		TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			UID:             uuid.NewUUID(),
 | 
			
		||||
			Name:            "foobar",
 | 
			
		||||
			Namespace:       api.NamespaceDefault,
 | 
			
		||||
			ResourceVersion: "18",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: policy.PodDisruptionBudgetSpec{
 | 
			
		||||
			MinAvailable: minAvailable,
 | 
			
		||||
			Selector:     newSelFooBar(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pdbName, err := controller.KeyFunc(pdb)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error naming pdb %q: %v", pdb.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return pdb, pdbName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newPod(t *testing.T, name string) (*api.Pod, string) {
 | 
			
		||||
	pod := &api.Pod{
 | 
			
		||||
		TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			UID:             uuid.NewUUID(),
 | 
			
		||||
			Annotations:     make(map[string]string),
 | 
			
		||||
			Name:            name,
 | 
			
		||||
			Namespace:       api.NamespaceDefault,
 | 
			
		||||
			ResourceVersion: "18",
 | 
			
		||||
			Labels:          fooBar(),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: api.PodSpec{},
 | 
			
		||||
		Status: api.PodStatus{
 | 
			
		||||
			Conditions: []api.PodCondition{
 | 
			
		||||
				{Type: api.PodReady, Status: api.ConditionTrue},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	podName, err := controller.KeyFunc(pod)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error naming pod %q: %v", pod.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return pod, podName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newReplicationController(t *testing.T, size int32) (*api.ReplicationController, string) {
 | 
			
		||||
	rc := &api.ReplicationController{
 | 
			
		||||
		TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			UID:             uuid.NewUUID(),
 | 
			
		||||
			Name:            "foobar",
 | 
			
		||||
			Namespace:       api.NamespaceDefault,
 | 
			
		||||
			ResourceVersion: "18",
 | 
			
		||||
			Labels:          fooBar(),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: api.ReplicationControllerSpec{
 | 
			
		||||
			Replicas: size,
 | 
			
		||||
			Selector: fooBar(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rcName, err := controller.KeyFunc(rc)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error naming RC %q", rc.Name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return rc, rcName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newDeployment(t *testing.T, size int32) (*extensions.Deployment, string) {
 | 
			
		||||
	d := &extensions.Deployment{
 | 
			
		||||
		TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			UID:             uuid.NewUUID(),
 | 
			
		||||
			Name:            "foobar",
 | 
			
		||||
			Namespace:       api.NamespaceDefault,
 | 
			
		||||
			ResourceVersion: "18",
 | 
			
		||||
			Labels:          fooBar(),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: extensions.DeploymentSpec{
 | 
			
		||||
			Replicas: size,
 | 
			
		||||
			Selector: newSelFooBar(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dName, err := controller.KeyFunc(d)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error naming Deployment %q: %v", d.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return d, dName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newReplicaSet(t *testing.T, size int32) (*extensions.ReplicaSet, string) {
 | 
			
		||||
	rs := &extensions.ReplicaSet{
 | 
			
		||||
		TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
 | 
			
		||||
		ObjectMeta: api.ObjectMeta{
 | 
			
		||||
			UID:             uuid.NewUUID(),
 | 
			
		||||
			Name:            "foobar",
 | 
			
		||||
			Namespace:       api.NamespaceDefault,
 | 
			
		||||
			ResourceVersion: "18",
 | 
			
		||||
			Labels:          fooBar(),
 | 
			
		||||
		},
 | 
			
		||||
		Spec: extensions.ReplicaSetSpec{
 | 
			
		||||
			Replicas: size,
 | 
			
		||||
			Selector: newSelFooBar(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rsName, err := controller.KeyFunc(rs)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error naming ReplicaSet %q: %v", rs.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return rs, rsName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func update(t *testing.T, store cache.Store, obj interface{}) {
 | 
			
		||||
	if err := store.Update(obj); err != nil {
 | 
			
		||||
		t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func add(t *testing.T, store cache.Store, obj interface{}) {
 | 
			
		||||
	if err := store.Add(obj); err != nil {
 | 
			
		||||
		t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create one with no selector.  Verify it matches 0 pods.
 | 
			
		||||
func TestNoSelector(t *testing.T) {
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3))
 | 
			
		||||
	pdb.Spec.Selector = &unversioned.LabelSelector{}
 | 
			
		||||
	pod, _ := newPod(t, "yo-yo-yo")
 | 
			
		||||
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 0, 3, 0)
 | 
			
		||||
 | 
			
		||||
	add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 0, 3, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Verify that available/expected counts go up as we add pods, then verify that
 | 
			
		||||
// available count goes down when we make a pod unavailable.
 | 
			
		||||
func TestUnavailable(t *testing.T) {
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	// Add three pods, verifying that the counts go up at each step.
 | 
			
		||||
	pods := []*api.Pod{}
 | 
			
		||||
	for i := int32(0); i < 3; i++ {
 | 
			
		||||
		ps.VerifyPdbStatus(t, pdbName, false, i, 3, i)
 | 
			
		||||
		pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i))
 | 
			
		||||
		pods = append(pods, pod)
 | 
			
		||||
		add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
		dc.sync(pdbName)
 | 
			
		||||
	}
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 3, 3, 3)
 | 
			
		||||
 | 
			
		||||
	// Now set one pod as unavailable
 | 
			
		||||
	pods[0].Status.Conditions = []api.PodCondition{}
 | 
			
		||||
	update(t, dc.podLister.Indexer, pods[0])
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	// Verify expected update
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 2, 3, 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create a pod  with no controller, and verify that a PDB with a percentage
 | 
			
		||||
// specified won't allow a disruption.
 | 
			
		||||
func TestNakedPod(t *testing.T) {
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	// This verifies that when a PDB has 0 pods, disruptions are not allowed.
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, false)
 | 
			
		||||
 | 
			
		||||
	pod, _ := newPod(t, "naked")
 | 
			
		||||
	add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Verify that we count the scale of a ReplicaSet even when it has no Deployment.
 | 
			
		||||
func TestReplicaSet(t *testing.T) {
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%"))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
 | 
			
		||||
	rs, _ := newReplicaSet(t, 10)
 | 
			
		||||
	add(t, dc.rsLister.Store, rs)
 | 
			
		||||
 | 
			
		||||
	pod, _ := newPod(t, "pod")
 | 
			
		||||
	add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 1, 2, 10)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Verify that multiple controllers doesn't allow the PDB to be set true.
 | 
			
		||||
func TestMultipleControllers(t *testing.T) {
 | 
			
		||||
	const rcCount = 2
 | 
			
		||||
	const podCount = 2
 | 
			
		||||
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%"))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < podCount; i++ {
 | 
			
		||||
		pod, _ := newPod(t, fmt.Sprintf("pod %d", i))
 | 
			
		||||
		add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
	}
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	// No controllers yet => no disruption allowed
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, false)
 | 
			
		||||
 | 
			
		||||
	rc, _ := newReplicationController(t, 1)
 | 
			
		||||
	rc.Name = "rc 1"
 | 
			
		||||
	add(t, dc.rcLister.Indexer, rc)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	// One RC and 200%>1% healthy => disruption allowed
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, true)
 | 
			
		||||
 | 
			
		||||
	rc, _ = newReplicationController(t, 1)
 | 
			
		||||
	rc.Name = "rc 2"
 | 
			
		||||
	add(t, dc.rcLister.Indexer, rc)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	// 100%>1% healthy BUT two RCs => no disruption allowed
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestReplicationController(t *testing.T) {
 | 
			
		||||
	// The budget in this test matches foo=bar, but the RC and its pods match
 | 
			
		||||
	// {foo=bar, baz=quux}.  Later, when we add a rogue pod with only a foo=bar
 | 
			
		||||
	// label, it will match the budget but have no controllers, which should
 | 
			
		||||
	// trigger the controller to set PodDisruptionAllowed to false.
 | 
			
		||||
	labels := map[string]string{
 | 
			
		||||
		"foo": "bar",
 | 
			
		||||
		"baz": "quux",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	// 67% should round up to 3
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("67%"))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
	rc, _ := newReplicationController(t, 3)
 | 
			
		||||
	rc.Spec.Selector = labels
 | 
			
		||||
	add(t, dc.rcLister.Indexer, rc)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	// It starts out at 0 expected because, with no pods, the PDB doesn't know
 | 
			
		||||
	// about the RC.  This is a known bug.  TODO(mml): file issue
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 0, 0, 0)
 | 
			
		||||
 | 
			
		||||
	pods := []*api.Pod{}
 | 
			
		||||
 | 
			
		||||
	for i := int32(0); i < 3; i++ {
 | 
			
		||||
		pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
 | 
			
		||||
		pods = append(pods, pod)
 | 
			
		||||
		pod.Labels = labels
 | 
			
		||||
		add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
		dc.sync(pdbName)
 | 
			
		||||
		if i < 2 {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, false, i+1, 3, 3)
 | 
			
		||||
		} else {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, true, 3, 3, 3)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rogue, _ := newPod(t, "rogue")
 | 
			
		||||
	add(t, dc.podLister.Indexer, rogue)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyDisruptionAllowed(t, pdbName, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTwoControllers(t *testing.T) {
 | 
			
		||||
	// Most of this test is in verifying intermediate cases as we define the
 | 
			
		||||
	// three controllers and create the pods.
 | 
			
		||||
	rcLabels := map[string]string{
 | 
			
		||||
		"foo": "bar",
 | 
			
		||||
		"baz": "quux",
 | 
			
		||||
	}
 | 
			
		||||
	dLabels := map[string]string{
 | 
			
		||||
		"foo": "bar",
 | 
			
		||||
		"baz": "quuux",
 | 
			
		||||
	}
 | 
			
		||||
	dc, ps := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
	pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
 | 
			
		||||
	add(t, dc.pdbLister.Store, pdb)
 | 
			
		||||
	rc, _ := newReplicationController(t, 11)
 | 
			
		||||
	rc.Spec.Selector = rcLabels
 | 
			
		||||
	add(t, dc.rcLister.Indexer, rc)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 0, 0, 0)
 | 
			
		||||
 | 
			
		||||
	pods := []*api.Pod{}
 | 
			
		||||
 | 
			
		||||
	for i := int32(0); i < 11; i++ {
 | 
			
		||||
		pod, _ := newPod(t, fmt.Sprintf("quux %d", i))
 | 
			
		||||
		pods = append(pods, pod)
 | 
			
		||||
		pod.Labels = rcLabels
 | 
			
		||||
		if i < 7 {
 | 
			
		||||
			pod.Status.Conditions = []api.PodCondition{}
 | 
			
		||||
		}
 | 
			
		||||
		add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
		dc.sync(pdbName)
 | 
			
		||||
		if i < 7 {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, false, 0, 4, 11)
 | 
			
		||||
		} else if i < 10 {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, false, i-6, 4, 11)
 | 
			
		||||
		} else {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d, _ := newDeployment(t, 11)
 | 
			
		||||
	d.Spec.Selector = newSel(dLabels)
 | 
			
		||||
	add(t, dc.dLister.Store, d)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11)
 | 
			
		||||
 | 
			
		||||
	rs, _ := newReplicaSet(t, 11)
 | 
			
		||||
	rs.Spec.Selector = newSel(dLabels)
 | 
			
		||||
	rs.Labels = dLabels
 | 
			
		||||
	add(t, dc.rsLister.Store, rs)
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11)
 | 
			
		||||
 | 
			
		||||
	for i := int32(0); i < 11; i++ {
 | 
			
		||||
		pod, _ := newPod(t, fmt.Sprintf("quuux %d", i))
 | 
			
		||||
		pods = append(pods, pod)
 | 
			
		||||
		pod.Labels = dLabels
 | 
			
		||||
		if i < 7 {
 | 
			
		||||
			pod.Status.Conditions = []api.PodCondition{}
 | 
			
		||||
		}
 | 
			
		||||
		add(t, dc.podLister.Indexer, pod)
 | 
			
		||||
		dc.sync(pdbName)
 | 
			
		||||
		if i < 7 {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, false, 4, 7, 22)
 | 
			
		||||
		} else if i < 9 {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, false, 4+i-6, 7, 22)
 | 
			
		||||
		} else {
 | 
			
		||||
			ps.VerifyPdbStatus(t, pdbName, true, 4+i-6, 7, 22)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Now we verify we can bring down 1 pod and a disruption is still permitted,
 | 
			
		||||
	// but if we bring down two, it's not.  Then we make the pod ready again and
 | 
			
		||||
	// verify that a disruption is permitted again.
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 8, 7, 22)
 | 
			
		||||
	pods[10].Status.Conditions = []api.PodCondition{}
 | 
			
		||||
	update(t, dc.podLister.Indexer, pods[10])
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22)
 | 
			
		||||
 | 
			
		||||
	pods[9].Status.Conditions = []api.PodCondition{}
 | 
			
		||||
	update(t, dc.podLister.Indexer, pods[9])
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, false, 6, 7, 22)
 | 
			
		||||
 | 
			
		||||
	pods[10].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}}
 | 
			
		||||
	update(t, dc.podLister.Indexer, pods[10])
 | 
			
		||||
	dc.sync(pdbName)
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										104
									
								
								test/e2e/disruption.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										104
									
								
								test/e2e/disruption.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,104 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package e2e
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	. "github.com/onsi/ginkgo"
 | 
			
		||||
	. "github.com/onsi/gomega"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/policy"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/intstr"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var _ = framework.KubeDescribe("DisruptionController [Feature:PodDisruptionbudget]", func() {
 | 
			
		||||
	f := framework.NewDefaultFramework("disruption")
 | 
			
		||||
	var ns string
 | 
			
		||||
	var c *client.Client
 | 
			
		||||
 | 
			
		||||
	BeforeEach(func() {
 | 
			
		||||
		c = f.Client
 | 
			
		||||
		ns = f.Namespace.Name
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	It("should create a PodDisruptionBudget", func() {
 | 
			
		||||
		pdb := policy.PodDisruptionBudget{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{
 | 
			
		||||
				Name:      "foo",
 | 
			
		||||
				Namespace: ns,
 | 
			
		||||
			},
 | 
			
		||||
			Spec: policy.PodDisruptionBudgetSpec{
 | 
			
		||||
				Selector:     &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
 | 
			
		||||
				MinAvailable: intstr.FromString("1%"),
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		_, err := c.Policy().PodDisruptionBudgets(ns).Create(&pdb)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	It("should update PodDisruptionBudget status", func() {
 | 
			
		||||
		pdb := policy.PodDisruptionBudget{
 | 
			
		||||
			ObjectMeta: api.ObjectMeta{
 | 
			
		||||
				Name:      "foo",
 | 
			
		||||
				Namespace: ns,
 | 
			
		||||
			},
 | 
			
		||||
			Spec: policy.PodDisruptionBudgetSpec{
 | 
			
		||||
				Selector:     &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
 | 
			
		||||
				MinAvailable: intstr.FromInt(2),
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		_, err := c.Policy().PodDisruptionBudgets(ns).Create(&pdb)
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
		for i := 0; i < 2; i++ {
 | 
			
		||||
			pod := &api.Pod{
 | 
			
		||||
				ObjectMeta: api.ObjectMeta{
 | 
			
		||||
					Name:      fmt.Sprintf("pod-%d", i),
 | 
			
		||||
					Namespace: ns,
 | 
			
		||||
					Labels:    map[string]string{"foo": "bar"},
 | 
			
		||||
				},
 | 
			
		||||
				Spec: api.PodSpec{
 | 
			
		||||
					Containers: []api.Container{
 | 
			
		||||
						{
 | 
			
		||||
							Name:  "busybox",
 | 
			
		||||
							Image: "gcr.io/google_containers/echoserver:1.4",
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
					RestartPolicy: api.RestartPolicyAlways,
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			_, err := c.Pods(ns).Create(pod)
 | 
			
		||||
			framework.ExpectNoError(err, "Creating pod %q in namespace %q", pod.Name, ns)
 | 
			
		||||
		}
 | 
			
		||||
		err = wait.PollImmediate(framework.Poll, 60*time.Second, func() (bool, error) {
 | 
			
		||||
			pdb, err := c.Policy().PodDisruptionBudgets(ns).Get("foo")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return false, err
 | 
			
		||||
			}
 | 
			
		||||
			return pdb.Status.PodDisruptionAllowed, nil
 | 
			
		||||
		})
 | 
			
		||||
		Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
})
 | 
			
		||||
		Reference in New Issue
	
	Block a user