mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-01 18:58:18 +00:00
Resource quota observes deletes faster
This commit is contained in:
@@ -24,53 +24,156 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// ResourceQuotaController is responsible for tracking quota usage status in the system
|
||||
type ResourceQuotaController struct {
|
||||
// Must have authority to list all resources in the system, and update quota status
|
||||
kubeClient client.Interface
|
||||
syncTime <-chan time.Time
|
||||
|
||||
// An index of resource quota objects by namespace
|
||||
rqIndexer cache.Indexer
|
||||
// Watches changes to all resource quota
|
||||
rqController *framework.Controller
|
||||
// A store of pods, populated by the podController
|
||||
podStore cache.StoreToPodLister
|
||||
// Watches changes to all pods (so we can optimize release of compute resources)
|
||||
podController *framework.Controller
|
||||
// ResourceQuota objects that need to be synchronized
|
||||
queue *workqueue.Type
|
||||
// To allow injection of syncUsage for testing.
|
||||
syncHandler func(quota api.ResourceQuota) error
|
||||
syncHandler func(key string) error
|
||||
// function that controls full recalculation of quota usage
|
||||
resyncPeriod controller.ResyncPeriodFunc
|
||||
}
|
||||
|
||||
// NewResourceQuotaController creates a new ResourceQuotaController
|
||||
func NewResourceQuotaController(kubeClient client.Interface) *ResourceQuotaController {
|
||||
func NewResourceQuotaController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *ResourceQuotaController {
|
||||
|
||||
rm := &ResourceQuotaController{
|
||||
kubeClient: kubeClient,
|
||||
rq := &ResourceQuotaController{
|
||||
kubeClient: kubeClient,
|
||||
queue: workqueue.New(),
|
||||
resyncPeriod: resyncPeriod,
|
||||
}
|
||||
|
||||
rq.rqIndexer, rq.rqController = framework.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return rq.kubeClient.ResourceQuotas(api.NamespaceAll).List(unversioned.ListOptions{})
|
||||
},
|
||||
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
|
||||
return rq.kubeClient.ResourceQuotas(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.ResourceQuota{},
|
||||
resyncPeriod(),
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: rq.enqueueResourceQuota,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
|
||||
// We ignore all updates to quota.Status because they are all driven by this controller.
|
||||
// IMPORTANT:
|
||||
// We do not use this function to queue up a full quota recalculation. To do so, would require
|
||||
// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
|
||||
// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
|
||||
// want to pay the price on spurious status updates. As a result, we have a separate routine that is
|
||||
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
|
||||
oldResourceQuota := old.(*api.ResourceQuota)
|
||||
curResourceQuota := cur.(*api.ResourceQuota)
|
||||
if api.Semantic.DeepEqual(oldResourceQuota.Spec.Hard, curResourceQuota.Status.Hard) {
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("Observed updated quota spec for %v/%v", curResourceQuota.Namespace, curResourceQuota.Name)
|
||||
rq.enqueueResourceQuota(curResourceQuota)
|
||||
},
|
||||
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
|
||||
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
|
||||
// way of achieving this is by performing a `stop` operation on the controller.
|
||||
DeleteFunc: rq.enqueueResourceQuota,
|
||||
},
|
||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
// We use this pod controller to rapidly observe when a pod deletion occurs in order to
|
||||
// release compute resources from any associated quota.
|
||||
rq.podStore.Store, rq.podController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return rq.kubeClient.Pods(api.NamespaceAll).List(unversioned.ListOptions{})
|
||||
},
|
||||
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
|
||||
return rq.kubeClient.Pods(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
resyncPeriod(),
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: rq.deletePod,
|
||||
},
|
||||
)
|
||||
|
||||
// set the synchronization handler
|
||||
rm.syncHandler = rm.syncResourceQuota
|
||||
return rm
|
||||
rq.syncHandler = rq.syncResourceQuotaFromKey
|
||||
return rq
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (rm *ResourceQuotaController) Run(period time.Duration) {
|
||||
rm.syncTime = time.Tick(period)
|
||||
go util.Until(func() { rm.synchronize() }, period, util.NeverStop)
|
||||
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
||||
func (rq *ResourceQuotaController) enqueueAll() {
|
||||
defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
|
||||
for _, k := range rq.rqIndexer.ListKeys() {
|
||||
rq.queue.Add(k)
|
||||
}
|
||||
}
|
||||
|
||||
func (rm *ResourceQuotaController) synchronize() {
|
||||
var resourceQuotas []api.ResourceQuota
|
||||
list, err := rm.kubeClient.ResourceQuotas(api.NamespaceAll).List(unversioned.ListOptions{})
|
||||
// obj could be an *api.ResourceQuota, or a DeletionFinalStateUnknown marker item.
|
||||
func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Synchronization error: %v (%#v)", err, err)
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
resourceQuotas = list.Items
|
||||
for ix := range resourceQuotas {
|
||||
glog.V(4).Infof("periodic sync of %v/%v", resourceQuotas[ix].Namespace, resourceQuotas[ix].Name)
|
||||
err := rm.syncHandler(resourceQuotas[ix])
|
||||
if err != nil {
|
||||
glog.Errorf("Error synchronizing: %v", err)
|
||||
}
|
||||
rq.queue.Add(key)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (rq *ResourceQuotaController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := rq.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer rq.queue.Done(key)
|
||||
err := rq.syncHandler(key.(string))
|
||||
if err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Run begins quota controller using the specified number of workers
|
||||
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer util.HandleCrash()
|
||||
go rq.rqController.Run(stopCh)
|
||||
go rq.podController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go util.Until(rq.worker, time.Second, stopCh)
|
||||
}
|
||||
go util.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down ResourceQuotaController")
|
||||
rq.queue.ShutDown()
|
||||
}
|
||||
|
||||
// FilterQuotaPods eliminates pods that no longer have a cost against the quota
|
||||
// pods that have a restart policy of always are always returned
|
||||
// pods that are in a failed state, but have a restart policy of on failure are always returned
|
||||
@@ -100,8 +203,29 @@ func FilterQuotaPods(pods []api.Pod) []*api.Pod {
|
||||
return result
|
||||
}
|
||||
|
||||
// syncResourceQuotaFromKey syncs a quota key
|
||||
func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
obj, exists, err := rq.rqIndexer.GetByKey(key)
|
||||
if !exists {
|
||||
glog.Infof("Resource quota has been deleted %v", key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
|
||||
rq.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
quota := *obj.(*api.ResourceQuota)
|
||||
return rq.syncResourceQuota(quota)
|
||||
}
|
||||
|
||||
// syncResourceQuota runs a complete sync of current status
|
||||
func (rm *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (err error) {
|
||||
func (rq *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (err error) {
|
||||
|
||||
// quota is dirty if any part of spec hard limits differs from the status hard limits
|
||||
dirty := !api.Semantic.DeepEqual(quota.Spec.Hard, quota.Status.Hard)
|
||||
@@ -141,7 +265,7 @@ func (rm *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (e
|
||||
|
||||
pods := &api.PodList{}
|
||||
if set[api.ResourcePods] || set[api.ResourceMemory] || set[api.ResourceCPU] {
|
||||
pods, err = rm.kubeClient.Pods(usage.Namespace).List(unversioned.ListOptions{})
|
||||
pods, err = rq.kubeClient.Pods(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -164,31 +288,31 @@ func (rm *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (e
|
||||
case api.ResourcePods:
|
||||
value = resource.NewQuantity(int64(len(filteredPods)), resource.DecimalSI)
|
||||
case api.ResourceServices:
|
||||
items, err := rm.kubeClient.Services(usage.Namespace).List(unversioned.ListOptions{})
|
||||
items, err := rq.kubeClient.Services(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
|
||||
case api.ResourceReplicationControllers:
|
||||
items, err := rm.kubeClient.ReplicationControllers(usage.Namespace).List(unversioned.ListOptions{})
|
||||
items, err := rq.kubeClient.ReplicationControllers(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
|
||||
case api.ResourceQuotas:
|
||||
items, err := rm.kubeClient.ResourceQuotas(usage.Namespace).List(unversioned.ListOptions{})
|
||||
items, err := rq.kubeClient.ResourceQuotas(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
|
||||
case api.ResourceSecrets:
|
||||
items, err := rm.kubeClient.Secrets(usage.Namespace).List(unversioned.ListOptions{})
|
||||
items, err := rq.kubeClient.Secrets(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
|
||||
case api.ResourcePersistentVolumeClaims:
|
||||
items, err := rm.kubeClient.PersistentVolumeClaims(usage.Namespace).List(unversioned.ListOptions{})
|
||||
items, err := rq.kubeClient.PersistentVolumeClaims(usage.Namespace).List(unversioned.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -210,7 +334,7 @@ func (rm *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (e
|
||||
|
||||
// update the usage only if it changed
|
||||
if dirty {
|
||||
_, err = rm.kubeClient.ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
|
||||
_, err = rq.kubeClient.ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -280,3 +404,38 @@ func PodHasRequests(pod *api.Pod, resourceName api.ResourceName) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// When a pod is deleted, enqueue the quota that manages the pod and update its expectations.
|
||||
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
||||
func (rq *ResourceQuotaController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
// the deleted key/value. Note that this value might be stale. If the pod
|
||||
// changed labels the new rc will not be woken up till the periodic resync.
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a quota records the deletion", obj, rq.resyncPeriod())
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*api.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before quota records the deletion", obj, rq.resyncPeriod())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
quotas, err := rq.rqIndexer.Index("namespace", pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't find resource quota associated with pod %+v, could take up to %v before a quota records the deletion", obj, rq.resyncPeriod())
|
||||
}
|
||||
if len(quotas) == 0 {
|
||||
glog.V(4).Infof("No resource quota associated with namespace %q", pod.Namespace)
|
||||
return
|
||||
}
|
||||
for i := range quotas {
|
||||
quota := quotas[i].(*api.ResourceQuota)
|
||||
rq.enqueueResourceQuota(quota)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user