mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			327 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			327 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package ipam
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	informers "k8s.io/client-go/informers/core/v1"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/kubernetes/scheme"
 | 
						|
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
						|
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
 | 
						|
	nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
 | 
						|
	utilnode "k8s.io/kubernetes/pkg/util/node"
 | 
						|
)
 | 
						|
 | 
						|
type rangeAllocator struct {
 | 
						|
	client      clientset.Interface
 | 
						|
	cidrs       *cidrset.CidrSet
 | 
						|
	clusterCIDR *net.IPNet
 | 
						|
	maxCIDRs    int
 | 
						|
 | 
						|
	// nodeLister is able to list/get nodes and is populated by the shared informer passed to
 | 
						|
	// NewCloudCIDRAllocator.
 | 
						|
	nodeLister corelisters.NodeLister
 | 
						|
	// nodesSynced returns true if the node shared informer has been synced at least once.
 | 
						|
	nodesSynced cache.InformerSynced
 | 
						|
 | 
						|
	// Channel that is used to pass updating Nodes with assigned CIDRs to the background
 | 
						|
	// This increases a throughput of CIDR assignment by not blocking on long operations.
 | 
						|
	nodeCIDRUpdateChannel chan nodeAndCIDR
 | 
						|
	recorder              record.EventRecorder
 | 
						|
 | 
						|
	// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
 | 
						|
	lock              sync.Mutex
 | 
						|
	nodesInProcessing sets.String
 | 
						|
}
 | 
						|
 | 
						|
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
 | 
						|
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
 | 
						|
// Caller must always pass in a list of existing nodes so the new allocator
 | 
						|
// can initialize its CIDR map. NodeList is only nil in testing.
 | 
						|
func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
 | 
						|
	if client == nil {
 | 
						|
		glog.Fatalf("kubeClient is nil when starting NodeController")
 | 
						|
	}
 | 
						|
 | 
						|
	eventBroadcaster := record.NewBroadcaster()
 | 
						|
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
 | 
						|
	eventBroadcaster.StartLogging(glog.Infof)
 | 
						|
	glog.V(0).Infof("Sending events to api server.")
 | 
						|
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
 | 
						|
 | 
						|
	set, err := cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	ra := &rangeAllocator{
 | 
						|
		client:                client,
 | 
						|
		cidrs:                 set,
 | 
						|
		clusterCIDR:           clusterCIDR,
 | 
						|
		nodeLister:            nodeInformer.Lister(),
 | 
						|
		nodesSynced:           nodeInformer.Informer().HasSynced,
 | 
						|
		nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
 | 
						|
		recorder:              recorder,
 | 
						|
		nodesInProcessing:     sets.NewString(),
 | 
						|
	}
 | 
						|
 | 
						|
	if serviceCIDR != nil {
 | 
						|
		ra.filterOutServiceRange(serviceCIDR)
 | 
						|
	} else {
 | 
						|
		glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
 | 
						|
	}
 | 
						|
 | 
						|
	if nodeList != nil {
 | 
						|
		for _, node := range nodeList.Items {
 | 
						|
			if node.Spec.PodCIDR == "" {
 | 
						|
				glog.Infof("Node %v has no CIDR, ignoring", node.Name)
 | 
						|
				continue
 | 
						|
			} else {
 | 
						|
				glog.Infof("Node %v has CIDR %s, occupying it in CIDR map",
 | 
						|
					node.Name, node.Spec.PodCIDR)
 | 
						|
			}
 | 
						|
			if err := ra.occupyCIDR(&node); err != nil {
 | 
						|
				// This will happen if:
 | 
						|
				// 1. We find garbage in the podCIDR field. Retrying is useless.
 | 
						|
				// 2. CIDR out of range: This means a node CIDR has changed.
 | 
						|
				// This error will keep crashing controller-manager.
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | 
						|
		AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
 | 
						|
		UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
 | 
						|
			// If the PodCIDR is not empty we either:
 | 
						|
			// - already processed a Node that already had a CIDR after NC restarted
 | 
						|
			//   (cidr is marked as used),
 | 
						|
			// - already processed a Node successfully and allocated a CIDR for it
 | 
						|
			//   (cidr is marked as used),
 | 
						|
			// - already processed a Node but we did saw a "timeout" response and
 | 
						|
			//   request eventually got through in this case we haven't released
 | 
						|
			//   the allocated CIDR (cidr is still marked as used).
 | 
						|
			// There's a possible error here:
 | 
						|
			// - NC sees a new Node and assigns a CIDR X to it,
 | 
						|
			// - Update Node call fails with a timeout,
 | 
						|
			// - Node is updated by some other component, NC sees an update and
 | 
						|
			//   assigns CIDR Y to the Node,
 | 
						|
			// - Both CIDR X and CIDR Y are marked as used in the local cache,
 | 
						|
			//   even though Node sees only CIDR Y
 | 
						|
			// The problem here is that in in-memory cache we see CIDR X as marked,
 | 
						|
			// which prevents it from being assigned to any new node. The cluster
 | 
						|
			// state is correct.
 | 
						|
			// Restart of NC fixes the issue.
 | 
						|
			if newNode.Spec.PodCIDR == "" {
 | 
						|
				return ra.AllocateOrOccupyCIDR(newNode)
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		}),
 | 
						|
		DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
 | 
						|
	})
 | 
						|
 | 
						|
	return ra, nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
 | 
						|
	defer utilruntime.HandleCrash()
 | 
						|
 | 
						|
	glog.Infof("Starting range CIDR allocator")
 | 
						|
	defer glog.Infof("Shutting down range CIDR allocator")
 | 
						|
 | 
						|
	if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < cidrUpdateWorkers; i++ {
 | 
						|
		go r.worker(stopCh)
 | 
						|
	}
 | 
						|
 | 
						|
	<-stopCh
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case workItem, ok := <-r.nodeCIDRUpdateChannel:
 | 
						|
			if !ok {
 | 
						|
				glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if err := r.updateCIDRAllocation(workItem); err != nil {
 | 
						|
				// Requeue the failed node for update again.
 | 
						|
				r.nodeCIDRUpdateChannel <- workItem
 | 
						|
			}
 | 
						|
		case <-stopChan:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
 | 
						|
	r.lock.Lock()
 | 
						|
	defer r.lock.Unlock()
 | 
						|
	if r.nodesInProcessing.Has(nodeName) {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	r.nodesInProcessing.Insert(nodeName)
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
 | 
						|
	r.lock.Lock()
 | 
						|
	defer r.lock.Unlock()
 | 
						|
	r.nodesInProcessing.Delete(nodeName)
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
 | 
						|
	defer r.removeNodeFromProcessing(node.Name)
 | 
						|
	if node.Spec.PodCIDR == "" {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
 | 
						|
	}
 | 
						|
	if err := r.cidrs.Occupy(podCIDR); err != nil {
 | 
						|
		return fmt.Errorf("failed to mark cidr as occupied: %v", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// WARNING: If you're adding any return calls or defer any more work from this
 | 
						|
// function you have to make sure to update nodesInProcessing properly with the
 | 
						|
// disposition of the node when the work is done.
 | 
						|
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
 | 
						|
	if node == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if !r.insertNodeToProcessing(node.Name) {
 | 
						|
		glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if node.Spec.PodCIDR != "" {
 | 
						|
		return r.occupyCIDR(node)
 | 
						|
	}
 | 
						|
	podCIDR, err := r.cidrs.AllocateNext()
 | 
						|
	if err != nil {
 | 
						|
		r.removeNodeFromProcessing(node.Name)
 | 
						|
		nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
 | 
						|
		return fmt.Errorf("failed to allocate cidr: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
 | 
						|
	r.nodeCIDRUpdateChannel <- nodeAndCIDR{
 | 
						|
		nodeName: node.Name,
 | 
						|
		cidr:     podCIDR,
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
 | 
						|
	if node == nil || node.Spec.PodCIDR == "" {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
 | 
						|
	if err = r.cidrs.Release(podCIDR); err != nil {
 | 
						|
		return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
 | 
						|
// so that they won't be assignable.
 | 
						|
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
 | 
						|
	// Checks if service CIDR has a nonempty intersection with cluster
 | 
						|
	// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
 | 
						|
	// clusterCIDR's Mask applied (this means that clusterCIDR contains
 | 
						|
	// serviceCIDR) or vice versa (which means that serviceCIDR contains
 | 
						|
	// clusterCIDR).
 | 
						|
	if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if err := r.cidrs.Occupy(serviceCIDR); err != nil {
 | 
						|
		glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
 | 
						|
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
 | 
						|
	var err error
 | 
						|
	var node *v1.Node
 | 
						|
	defer r.removeNodeFromProcessing(data.nodeName)
 | 
						|
 | 
						|
	podCIDR := data.cidr.String()
 | 
						|
 | 
						|
	node, err = r.nodeLister.Get(data.nodeName)
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", data.nodeName, err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if node.Spec.PodCIDR == podCIDR {
 | 
						|
		glog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if node.Spec.PodCIDR != "" {
 | 
						|
		glog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one %v.", node.Name, node.Spec.PodCIDR, podCIDR)
 | 
						|
		if err := r.cidrs.Release(data.cidr); err != nil {
 | 
						|
			glog.Errorf("Error when releasing CIDR %v", podCIDR)
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
 | 
						|
	for i := 0; i < cidrUpdateRetries; i++ {
 | 
						|
		if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil {
 | 
						|
			glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, podCIDR, err)
 | 
						|
	nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
 | 
						|
	// We accept the fact that we may leak CIDRs here. This is safer than releasing
 | 
						|
	// them in case when we don't know if request went through.
 | 
						|
	// NodeController restart will return all falsely allocated CIDRs to the pool.
 | 
						|
	if !apierrors.IsServerTimeout(err) {
 | 
						|
		glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", node.Name, err)
 | 
						|
		if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil {
 | 
						|
			glog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 |