mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubemark
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	apiv1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/fields"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	"k8s.io/client-go/informers"
 | |
| 	informersv1 "k8s.io/client-go/informers/core/v1"
 | |
| 	kubeclient "k8s.io/client-go/kubernetes"
 | |
| 	listersv1 "k8s.io/client-go/listers/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 
 | |
| 	"k8s.io/klog/v2"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	namespaceKubemark = "kubemark"
 | |
| 	nodeGroupLabel    = "autoscaling.k8s.io/nodegroup"
 | |
| 	numRetries        = 3
 | |
| )
 | |
| 
 | |
| // KubemarkController is a simplified version of cloud provider for kubemark. It allows
 | |
| // to add and delete nodes from a kubemark cluster and introduces nodegroups
 | |
| // by applying labels to the kubemark's hollow-nodes.
 | |
| type KubemarkController struct {
 | |
| 	nodeTemplate           *apiv1.ReplicationController
 | |
| 	externalCluster        externalCluster
 | |
| 	kubemarkCluster        kubemarkCluster
 | |
| 	rand                   *rand.Rand
 | |
| 	createNodeQueue        chan string
 | |
| 	nodeGroupQueueSize     map[string]int
 | |
| 	nodeGroupQueueSizeLock sync.Mutex
 | |
| }
 | |
| 
 | |
| // externalCluster is used to communicate with the external cluster that hosts
 | |
| // kubemark, in order to be able to list, create and delete hollow nodes
 | |
| // by manipulating the replication controllers.
 | |
| type externalCluster struct {
 | |
| 	rcLister  listersv1.ReplicationControllerLister
 | |
| 	rcSynced  cache.InformerSynced
 | |
| 	podLister listersv1.PodLister
 | |
| 	podSynced cache.InformerSynced
 | |
| 	client    kubeclient.Interface
 | |
| }
 | |
| 
 | |
| // kubemarkCluster is used to delete nodes from kubemark cluster once their
 | |
| // respective replication controllers have been deleted and the nodes have
 | |
| // become unready. This is to cover for the fact that there is no proper cloud
 | |
| // provider for kubemark that would care for deleting the nodes.
 | |
| type kubemarkCluster struct {
 | |
| 	client            kubeclient.Interface
 | |
| 	nodeLister        listersv1.NodeLister
 | |
| 	nodeSynced        cache.InformerSynced
 | |
| 	nodesToDelete     map[string]bool
 | |
| 	nodesToDeleteLock sync.Mutex
 | |
| }
 | |
| 
 | |
| // NewKubemarkController creates KubemarkController using the provided clients to talk to external
 | |
| // and kubemark clusters.
 | |
| func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
 | |
| 	kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
 | |
| 	rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
 | |
| 	podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
 | |
| 	controller := &KubemarkController{
 | |
| 		externalCluster: externalCluster{
 | |
| 			rcLister:  listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
 | |
| 			rcSynced:  rcInformer.HasSynced,
 | |
| 			podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
 | |
| 			podSynced: podInformer.HasSynced,
 | |
| 			client:    externalClient,
 | |
| 		},
 | |
| 		kubemarkCluster: kubemarkCluster{
 | |
| 			nodeLister:        kubemarkNodeInformer.Lister(),
 | |
| 			nodeSynced:        kubemarkNodeInformer.Informer().HasSynced,
 | |
| 			client:            kubemarkClient,
 | |
| 			nodesToDelete:     make(map[string]bool),
 | |
| 			nodesToDeleteLock: sync.Mutex{},
 | |
| 		},
 | |
| 		rand:                   rand.New(rand.NewSource(time.Now().UnixNano())),
 | |
| 		createNodeQueue:        make(chan string, 1000),
 | |
| 		nodeGroupQueueSize:     make(map[string]int),
 | |
| 		nodeGroupQueueSizeLock: sync.Mutex{},
 | |
| 	}
 | |
| 
 | |
| 	kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
 | |
| 	})
 | |
| 
 | |
| 	return controller, nil
 | |
| }
 | |
| 
 | |
| // WaitForCacheSync waits until all caches in the controller are populated.
 | |
| func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
 | |
| 	return cache.WaitForNamedCacheSync("kubemark", stopCh,
 | |
| 		kubemarkController.externalCluster.rcSynced,
 | |
| 		kubemarkController.externalCluster.podSynced,
 | |
| 		kubemarkController.kubemarkCluster.nodeSynced)
 | |
| }
 | |
| 
 | |
| // Run populates the node template needed for creation of kubemark nodes and
 | |
| // starts the worker routine for creating new nodes.
 | |
| func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
 | |
| 	nodeTemplate, err := kubemarkController.getNodeTemplate()
 | |
| 	if err != nil {
 | |
| 		klog.Fatalf("failed to get node template: %s", err)
 | |
| 	}
 | |
| 	kubemarkController.nodeTemplate = nodeTemplate
 | |
| 
 | |
| 	go kubemarkController.runNodeCreation(stopCh)
 | |
| 	<-stopCh
 | |
| }
 | |
| 
 | |
| // GetNodeNamesForNodeGroup returns list of the nodes in the node group.
 | |
| func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
 | |
| 	selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
 | |
| 	pods, err := kubemarkController.externalCluster.podLister.List(selector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	result := make([]string, 0, len(pods))
 | |
| 	for _, pod := range pods {
 | |
| 		result = append(result, pod.ObjectMeta.Name)
 | |
| 	}
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // GetNodeGroupSize returns the current size for the node group as observed.
 | |
| func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
 | |
| 	selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
 | |
| 	nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	return len(nodes), nil
 | |
| }
 | |
| 
 | |
| // GetNodeGroupTargetSize returns the size of the node group as a sum of current
 | |
| // observed size and number of upcoming nodes.
 | |
| func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
 | |
| 	kubemarkController.nodeGroupQueueSizeLock.Lock()
 | |
| 	defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | |
| 	realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
 | |
| 	if err != nil {
 | |
| 		return realSize, err
 | |
| 	}
 | |
| 	return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
 | |
| }
 | |
| 
 | |
| // SetNodeGroupSize changes the size of node group by adding or removing nodes.
 | |
| func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
 | |
| 	currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	switch delta := size - currSize; {
 | |
| 	case delta < 0:
 | |
| 		absDelta := -delta
 | |
| 		nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if len(nodes) < absDelta {
 | |
| 			return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
 | |
| 		}
 | |
| 		for i, node := range nodes {
 | |
| 			if i == absDelta {
 | |
| 				return nil
 | |
| 			}
 | |
| 			if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 	case delta > 0:
 | |
| 		kubemarkController.nodeGroupQueueSizeLock.Lock()
 | |
| 		kubemarkController.nodeGroupQueueSize[nodeGroup] += delta
 | |
| 		kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | |
| 		for i := 0; i < delta; i++ {
 | |
| 			kubemarkController.createNodeQueue <- nodeGroup
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetNodeGroupForNode returns the name of the node group to which the node
 | |
| // belongs.
 | |
| func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
 | |
| 	pod := kubemarkController.getPodByName(node)
 | |
| 	if pod == nil {
 | |
| 		return "", fmt.Errorf("node %s does not exist", node)
 | |
| 	}
 | |
| 	nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
 | |
| 	if ok {
 | |
| 		return nodeGroup, nil
 | |
| 	}
 | |
| 	return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
 | |
| 	node := kubemarkController.nodeTemplate.DeepCopy()
 | |
| 	node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
 | |
| 	node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
 | |
| 	node.Spec.Template.Labels = node.Labels
 | |
| 
 | |
| 	var err error
 | |
| 	for i := 0; i < numRetries; i++ {
 | |
| 		_, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(context.TODO(), node, metav1.CreateOptions{})
 | |
| 		if err == nil {
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
 | |
| 	pod := kubemarkController.getPodByName(node)
 | |
| 	if pod == nil {
 | |
| 		klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
 | |
| 		return nil
 | |
| 	}
 | |
| 	if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
 | |
| 		return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
 | |
| 	}
 | |
| 	policy := metav1.DeletePropagationForeground
 | |
| 	var err error
 | |
| 	for i := 0; i < numRetries; i++ {
 | |
| 		err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(context.TODO(), pod.ObjectMeta.Labels["name"],
 | |
| 			metav1.DeleteOptions{PropagationPolicy: &policy})
 | |
| 		if err == nil {
 | |
| 			klog.Infof("marking node %s for deletion", node)
 | |
| 			// Mark node for deletion from kubemark cluster.
 | |
| 			// Once it becomes unready after replication controller
 | |
| 			// deletion has been noticed, we will delete it explicitly.
 | |
| 			// This is to cover for the fact that kubemark does not
 | |
| 			// take care of this itself.
 | |
| 			kubemarkController.kubemarkCluster.markNodeForDeletion(node)
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	return fmt.Errorf("Failed to delete node %s: %v", node, err)
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
 | |
| 	rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	for _, rc := range rcs {
 | |
| 		if rc.ObjectMeta.Name == name {
 | |
| 			return rc
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
 | |
| 	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	for _, pod := range pods {
 | |
| 		if pod.ObjectMeta.Name == name {
 | |
| 			return pod
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
 | |
| 	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	for _, pod := range pods {
 | |
| 		if pod.ObjectMeta.Name == podName {
 | |
| 			return pod.Labels["name"], nil
 | |
| 		}
 | |
| 	}
 | |
| 	return "", fmt.Errorf("pod %s not found", podName)
 | |
| }
 | |
| 
 | |
| // getNodeTemplate returns the template for hollow node replication controllers
 | |
| // by looking for an existing hollow node specification. This requires at least
 | |
| // one kubemark node to be present on startup.
 | |
| func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
 | |
| 	podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
 | |
| 		nodeTemplate := &apiv1.ReplicationController{
 | |
| 			Spec: apiv1.ReplicationControllerSpec{
 | |
| 				Template: hollowNode.Spec.Template,
 | |
| 			},
 | |
| 		}
 | |
| 
 | |
| 		nodeTemplate.Spec.Selector = nil
 | |
| 		nodeTemplate.Namespace = namespaceKubemark
 | |
| 		one := int32(1)
 | |
| 		nodeTemplate.Spec.Replicas = &one
 | |
| 
 | |
| 		return nodeTemplate, nil
 | |
| 	}
 | |
| 	return nil, fmt.Errorf("can't get hollow node template")
 | |
| }
 | |
| 
 | |
| func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case nodeGroup := <-kubemarkController.createNodeQueue:
 | |
| 			kubemarkController.nodeGroupQueueSizeLock.Lock()
 | |
| 			err := kubemarkController.addNodeToNodeGroup(nodeGroup)
 | |
| 			if err != nil {
 | |
| 				klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
 | |
| 			} else {
 | |
| 				kubemarkController.nodeGroupQueueSize[nodeGroup]--
 | |
| 			}
 | |
| 			kubemarkController.nodeGroupQueueSizeLock.Unlock()
 | |
| 		case <-stop:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
 | |
| 	selector, _ := labels.Parse(nodeGroupLabel)
 | |
| 	nodes, err := kubemarkCluster.nodeLister.List(selector)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	for _, node := range nodes {
 | |
| 		return node.Name, nil
 | |
| 	}
 | |
| 	return "", fmt.Errorf("did not find any hollow nodes in the cluster")
 | |
| }
 | |
| 
 | |
| func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
 | |
| 	node, ok := newObj.(*apiv1.Node)
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	for _, condition := range node.Status.Conditions {
 | |
| 		// Delete node if it is in unready state, and it has been
 | |
| 		// explicitly marked for deletion.
 | |
| 		if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
 | |
| 			kubemarkCluster.nodesToDeleteLock.Lock()
 | |
| 			defer kubemarkCluster.nodesToDeleteLock.Unlock()
 | |
| 			if kubemarkCluster.nodesToDelete[node.Name] {
 | |
| 				kubemarkCluster.nodesToDelete[node.Name] = false
 | |
| 				if err := kubemarkCluster.client.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}); err != nil {
 | |
| 					klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
 | |
| 				}
 | |
| 			}
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
 | |
| 	kubemarkCluster.nodesToDeleteLock.Lock()
 | |
| 	defer kubemarkCluster.nodesToDeleteLock.Unlock()
 | |
| 	kubemarkCluster.nodesToDelete[name] = true
 | |
| }
 | |
| 
 | |
| func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 | |
| 	rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
 | |
| 	return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
 | |
| }
 | |
| 
 | |
| func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 | |
| 	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
 | |
| 	return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
 | |
| }
 | 
