mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2017 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package kubemark
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"math/rand"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	apiv1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/fields"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	informersv1 "k8s.io/client-go/informers/core/v1"
 | 
						|
	kubeclient "k8s.io/client-go/kubernetes"
 | 
						|
	listersv1 "k8s.io/client-go/listers/core/v1"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	namespaceKubemark = "kubemark"
 | 
						|
	nodeGroupLabel    = "autoscaling.k8s.io/nodegroup"
 | 
						|
	numRetries        = 3
 | 
						|
)
 | 
						|
 | 
						|
// KubemarkController is a simplified version of cloud provider for kubemark. It allows
 | 
						|
// to add and delete nodes from a kubemark cluster and introduces nodegroups
 | 
						|
// by applying labels to the kubemark's hollow-nodes.
 | 
						|
type KubemarkController struct {
 | 
						|
	nodeTemplate           *apiv1.ReplicationController
 | 
						|
	externalCluster        externalCluster
 | 
						|
	kubemarkCluster        kubemarkCluster
 | 
						|
	rand                   *rand.Rand
 | 
						|
	createNodeQueue        chan string
 | 
						|
	nodeGroupQueueSize     map[string]int
 | 
						|
	nodeGroupQueueSizeLock sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// externalCluster is used to communicate with the external cluster that hosts
 | 
						|
// kubemark, in order to be able to list, create and delete hollow nodes
 | 
						|
// by manipulating the replication controllers.
 | 
						|
type externalCluster struct {
 | 
						|
	rcLister  listersv1.ReplicationControllerLister
 | 
						|
	rcSynced  cache.InformerSynced
 | 
						|
	podLister listersv1.PodLister
 | 
						|
	podSynced cache.InformerSynced
 | 
						|
	client    kubeclient.Interface
 | 
						|
}
 | 
						|
 | 
						|
// kubemarkCluster is used to delete nodes from kubemark cluster once their
 | 
						|
// respective replication controllers have been deleted and the nodes have
 | 
						|
// become unready. This is to cover for the fact that there is no proper cloud
 | 
						|
// provider for kubemark that would care for deleting the nodes.
 | 
						|
type kubemarkCluster struct {
 | 
						|
	client            kubeclient.Interface
 | 
						|
	nodeLister        listersv1.NodeLister
 | 
						|
	nodeSynced        cache.InformerSynced
 | 
						|
	nodesToDelete     map[string]bool
 | 
						|
	nodesToDeleteLock sync.Mutex
 | 
						|
}
 | 
						|
 | 
						|
// NewKubemarkController creates KubemarkController using the provided clients to talk to external
 | 
						|
// and kubemark clusters.
 | 
						|
func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
 | 
						|
	kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
 | 
						|
	rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
 | 
						|
	podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
 | 
						|
	controller := &KubemarkController{
 | 
						|
		externalCluster: externalCluster{
 | 
						|
			rcLister:  listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
 | 
						|
			rcSynced:  rcInformer.HasSynced,
 | 
						|
			podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
 | 
						|
			podSynced: podInformer.HasSynced,
 | 
						|
			client:    externalClient,
 | 
						|
		},
 | 
						|
		kubemarkCluster: kubemarkCluster{
 | 
						|
			nodeLister:        kubemarkNodeInformer.Lister(),
 | 
						|
			nodeSynced:        kubemarkNodeInformer.Informer().HasSynced,
 | 
						|
			client:            kubemarkClient,
 | 
						|
			nodesToDelete:     make(map[string]bool),
 | 
						|
			nodesToDeleteLock: sync.Mutex{},
 | 
						|
		},
 | 
						|
		rand:                   rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
 | 
						|
		createNodeQueue:        make(chan string, 1000),
 | 
						|
		nodeGroupQueueSize:     make(map[string]int),
 | 
						|
		nodeGroupQueueSizeLock: sync.Mutex{},
 | 
						|
	}
 | 
						|
 | 
						|
	kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | 
						|
		UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
 | 
						|
	})
 | 
						|
 | 
						|
	return controller, nil
 | 
						|
}
 | 
						|
 | 
						|
// WaitForCacheSync waits until all caches in the controller are populated.
 | 
						|
func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
 | 
						|
	return controller.WaitForCacheSync("kubemark", stopCh,
 | 
						|
		kubemarkController.externalCluster.rcSynced,
 | 
						|
		kubemarkController.externalCluster.podSynced,
 | 
						|
		kubemarkController.kubemarkCluster.nodeSynced)
 | 
						|
}
 | 
						|
 | 
						|
// Run populates the node template needed for creation of kubemark nodes and
 | 
						|
// starts the worker routine for creating new nodes.
 | 
						|
func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
 | 
						|
	nodeTemplate, err := kubemarkController.getNodeTemplate()
 | 
						|
	if err != nil {
 | 
						|
		glog.Fatalf("failed to get node template: %s", err)
 | 
						|
	}
 | 
						|
	kubemarkController.nodeTemplate = nodeTemplate
 | 
						|
 | 
						|
	go kubemarkController.runNodeCreation(stopCh)
 | 
						|
	<-stopCh
 | 
						|
}
 | 
						|
 | 
						|
// GetNodeNamesForNodeGroup returns list of the nodes in the node group.
 | 
						|
func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
 | 
						|
	selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
 | 
						|
	pods, err := kubemarkController.externalCluster.podLister.List(selector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	result := make([]string, 0, len(pods))
 | 
						|
	for _, pod := range pods {
 | 
						|
		result = append(result, pod.ObjectMeta.Name)
 | 
						|
	}
 | 
						|
	return result, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetNodeGroupSize returns the current size for the node group as observed.
 | 
						|
func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
 | 
						|
	selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
 | 
						|
	nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return len(nodes), nil
 | 
						|
}
 | 
						|
 | 
						|
// GetNodeGroupTargetSize returns the size of the node group as a sum of current
 | 
						|
// observed size and number of upcoming nodes.
 | 
						|
func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
 | 
						|
	kubemarkController.nodeGroupQueueSizeLock.Lock()
 | 
						|
	defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | 
						|
	realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
 | 
						|
	if err != nil {
 | 
						|
		return realSize, err
 | 
						|
	}
 | 
						|
	return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
 | 
						|
}
 | 
						|
 | 
						|
// SetNodeGroupSize changes the size of node group by adding or removing nodes.
 | 
						|
func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
 | 
						|
	currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	switch delta := size - currSize; {
 | 
						|
	case delta < 0:
 | 
						|
		absDelta := -delta
 | 
						|
		nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if len(nodes) < absDelta {
 | 
						|
			return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
 | 
						|
		}
 | 
						|
		for i, node := range nodes {
 | 
						|
			if i == absDelta {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
			if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	case delta > 0:
 | 
						|
		kubemarkController.nodeGroupQueueSizeLock.Lock()
 | 
						|
		for i := 0; i < delta; i++ {
 | 
						|
			kubemarkController.nodeGroupQueueSize[nodeGroup]++
 | 
						|
			kubemarkController.createNodeQueue <- nodeGroup
 | 
						|
		}
 | 
						|
		kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// GetNodeGroupForNode returns the name of the node group to which the node
 | 
						|
// belongs.
 | 
						|
func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
 | 
						|
	pod := kubemarkController.getPodByName(node)
 | 
						|
	if pod == nil {
 | 
						|
		return "", fmt.Errorf("node %s does not exist", node)
 | 
						|
	}
 | 
						|
	nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
 | 
						|
	if ok {
 | 
						|
		return nodeGroup, nil
 | 
						|
	}
 | 
						|
	return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
 | 
						|
	node := kubemarkController.nodeTemplate.DeepCopy()
 | 
						|
	node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
 | 
						|
	node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
 | 
						|
	node.Spec.Template.Labels = node.Labels
 | 
						|
 | 
						|
	var err error
 | 
						|
	for i := 0; i < numRetries; i++ {
 | 
						|
		_, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(node)
 | 
						|
		if err == nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
 | 
						|
	pod := kubemarkController.getPodByName(node)
 | 
						|
	if pod == nil {
 | 
						|
		glog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
 | 
						|
		return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
 | 
						|
	}
 | 
						|
	policy := metav1.DeletePropagationForeground
 | 
						|
	var err error
 | 
						|
	for i := 0; i < numRetries; i++ {
 | 
						|
		err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
 | 
						|
			pod.ObjectMeta.Labels["name"],
 | 
						|
			&metav1.DeleteOptions{PropagationPolicy: &policy})
 | 
						|
		if err == nil {
 | 
						|
			glog.Infof("marking node %s for deletion", node)
 | 
						|
			// Mark node for deletion from kubemark cluster.
 | 
						|
			// Once it becomes unready after replication controller
 | 
						|
			// deletion has been noticed, we will delete it explicitly.
 | 
						|
			// This is to cover for the fact that kubemark does not
 | 
						|
			// take care of this itself.
 | 
						|
			kubemarkController.kubemarkCluster.markNodeForDeletion(node)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return fmt.Errorf("Failed to delete node %s: %v", node, err)
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
 | 
						|
	rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	for _, rc := range rcs {
 | 
						|
		if rc.ObjectMeta.Name == name {
 | 
						|
			return rc
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
 | 
						|
	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	for _, pod := range pods {
 | 
						|
		if pod.ObjectMeta.Name == name {
 | 
						|
			return pod
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
 | 
						|
	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	for _, pod := range pods {
 | 
						|
		if pod.ObjectMeta.Name == podName {
 | 
						|
			return pod.Labels["name"], nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return "", fmt.Errorf("pod %s not found", podName)
 | 
						|
}
 | 
						|
 | 
						|
// getNodeTemplate returns the template for hollow node replication controllers
 | 
						|
// by looking for an existing hollow node specification. This requires at least
 | 
						|
// one kubemark node to be present on startup.
 | 
						|
func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
 | 
						|
	podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
 | 
						|
		nodeTemplate := &apiv1.ReplicationController{
 | 
						|
			Spec: apiv1.ReplicationControllerSpec{
 | 
						|
				Template: hollowNode.Spec.Template,
 | 
						|
			},
 | 
						|
		}
 | 
						|
 | 
						|
		nodeTemplate.Spec.Selector = nil
 | 
						|
		nodeTemplate.Namespace = namespaceKubemark
 | 
						|
		one := int32(1)
 | 
						|
		nodeTemplate.Spec.Replicas = &one
 | 
						|
 | 
						|
		return nodeTemplate, nil
 | 
						|
	}
 | 
						|
	return nil, fmt.Errorf("can't get hollow node template")
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case nodeGroup := <-kubemarkController.createNodeQueue:
 | 
						|
			kubemarkController.nodeGroupQueueSizeLock.Lock()
 | 
						|
			err := kubemarkController.addNodeToNodeGroup(nodeGroup)
 | 
						|
			if err != nil {
 | 
						|
				glog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
 | 
						|
			} else {
 | 
						|
				kubemarkController.nodeGroupQueueSize[nodeGroup]--
 | 
						|
			}
 | 
						|
			kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | 
						|
		case <-stop:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
 | 
						|
	nodes, err := kubemarkCluster.nodeLister.List(labels.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	for _, node := range nodes {
 | 
						|
		return node.Name, nil
 | 
						|
	}
 | 
						|
	return "", fmt.Errorf("did not find any hollow nodes in the cluster")
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
 | 
						|
	node, ok := newObj.(*apiv1.Node)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for _, condition := range node.Status.Conditions {
 | 
						|
		// Delete node if it is in unready state, and it has been
 | 
						|
		// explicitly marked for deletion.
 | 
						|
		if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
 | 
						|
			kubemarkCluster.nodesToDeleteLock.Lock()
 | 
						|
			defer kubemarkCluster.nodesToDeleteLock.Unlock()
 | 
						|
			if kubemarkCluster.nodesToDelete[node.Name] {
 | 
						|
				kubemarkCluster.nodesToDelete[node.Name] = false
 | 
						|
				if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil {
 | 
						|
					glog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
 | 
						|
	kubemarkCluster.nodesToDeleteLock.Lock()
 | 
						|
	defer kubemarkCluster.nodesToDeleteLock.Unlock()
 | 
						|
	kubemarkCluster.nodesToDelete[name] = true
 | 
						|
}
 | 
						|
 | 
						|
func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 | 
						|
	rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
 | 
						|
	return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
 | 
						|
}
 | 
						|
 | 
						|
func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 | 
						|
	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
 | 
						|
	return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
 | 
						|
}
 |