mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	route controller + azure v6 routes
This commit is contained in:
		@@ -62,19 +62,19 @@ type RouteController struct {
 | 
			
		||||
	routes           cloudprovider.Routes
 | 
			
		||||
	kubeClient       clientset.Interface
 | 
			
		||||
	clusterName      string
 | 
			
		||||
	clusterCIDR      *net.IPNet
 | 
			
		||||
	clusterCIDRs     []*net.IPNet
 | 
			
		||||
	nodeLister       corelisters.NodeLister
 | 
			
		||||
	nodeListerSynced cache.InformerSynced
 | 
			
		||||
	broadcaster      record.EventBroadcaster
 | 
			
		||||
	recorder         record.EventRecorder
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDR *net.IPNet) *RouteController {
 | 
			
		||||
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDRs []*net.IPNet) *RouteController {
 | 
			
		||||
	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
 | 
			
		||||
		metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if clusterCIDR == nil {
 | 
			
		||||
	if len(clusterCIDRs) == 0 {
 | 
			
		||||
		klog.Fatal("RouteController: Must specify clusterCIDR.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -86,7 +86,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
 | 
			
		||||
		routes:           routes,
 | 
			
		||||
		kubeClient:       kubeClient,
 | 
			
		||||
		clusterName:      clusterName,
 | 
			
		||||
		clusterCIDR:      clusterCIDR,
 | 
			
		||||
		clusterCIDRs:     clusterCIDRs,
 | 
			
		||||
		nodeLister:       nodeInformer.Lister(),
 | 
			
		||||
		nodeListerSynced: nodeInformer.Informer().HasSynced,
 | 
			
		||||
		broadcaster:      eventBroadcaster,
 | 
			
		||||
@@ -137,33 +137,52 @@ func (rc *RouteController) reconcileNodeRoutes() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error {
 | 
			
		||||
	// nodeCIDRs maps nodeName->nodeCIDR
 | 
			
		||||
	nodeCIDRs := make(map[types.NodeName]string)
 | 
			
		||||
	var l sync.Mutex
 | 
			
		||||
	// for each node a map of podCIDRs and their created status
 | 
			
		||||
	nodeRoutesStatuses := make(map[types.NodeName]map[string]bool)
 | 
			
		||||
	// routeMap maps routeTargetNode->route
 | 
			
		||||
	routeMap := make(map[types.NodeName]*cloudprovider.Route)
 | 
			
		||||
	routeMap := make(map[types.NodeName][]*cloudprovider.Route)
 | 
			
		||||
	for _, route := range routes {
 | 
			
		||||
		if route.TargetNode != "" {
 | 
			
		||||
			routeMap[route.TargetNode] = route
 | 
			
		||||
			routeMap[route.TargetNode] = append(routeMap[route.TargetNode], route)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	rateLimiter := make(chan struct{}, maxConcurrentRouteCreations)
 | 
			
		||||
	// searches existing routes by node for a matching route
 | 
			
		||||
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		// Skip if the node hasn't been assigned a CIDR yet.
 | 
			
		||||
		if node.Spec.PodCIDR == "" {
 | 
			
		||||
		if len(node.Spec.PodCIDRs) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		nodeName := types.NodeName(node.Name)
 | 
			
		||||
		// Check if we have a route for this node w/ the correct CIDR.
 | 
			
		||||
		r := routeMap[nodeName]
 | 
			
		||||
		if r == nil || r.DestinationCIDR != node.Spec.PodCIDR {
 | 
			
		||||
			// If not, create the route.
 | 
			
		||||
		l.Lock()
 | 
			
		||||
		nodeRoutesStatuses[nodeName] = make(map[string]bool)
 | 
			
		||||
		l.Unlock()
 | 
			
		||||
		// for every node, for every cidr
 | 
			
		||||
		for _, podCIDR := range node.Spec.PodCIDRs {
 | 
			
		||||
			// we add it to our nodeCIDRs map here because add and delete go routines run at the same time
 | 
			
		||||
			l.Lock()
 | 
			
		||||
			nodeRoutesStatuses[nodeName][podCIDR] = false
 | 
			
		||||
			l.Unlock()
 | 
			
		||||
			// ignore if already created
 | 
			
		||||
			if hasRoute(routeMap, nodeName, podCIDR) {
 | 
			
		||||
				l.Lock()
 | 
			
		||||
				nodeRoutesStatuses[nodeName][podCIDR] = true // a route for this podCIDR is already created
 | 
			
		||||
				l.Unlock()
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// if we are here, then a route needs to be created for this node
 | 
			
		||||
			route := &cloudprovider.Route{
 | 
			
		||||
				TargetNode:      nodeName,
 | 
			
		||||
				DestinationCIDR: node.Spec.PodCIDR,
 | 
			
		||||
				DestinationCIDR: podCIDR,
 | 
			
		||||
			}
 | 
			
		||||
			// cloud providers that:
 | 
			
		||||
			// - depend on nameHint
 | 
			
		||||
			// - trying to support dual stack
 | 
			
		||||
			// will have to carefully generate new route names that allow node->(multi cidr)
 | 
			
		||||
			nameHint := string(node.UID)
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(nodeName types.NodeName, nameHint string, route *cloudprovider.Route) {
 | 
			
		||||
@@ -176,8 +195,6 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
 | 
			
		||||
					klog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
 | 
			
		||||
					err := rc.routes.CreateRoute(context.TODO(), rc.clusterName, nameHint, route)
 | 
			
		||||
					<-rateLimiter
 | 
			
		||||
 | 
			
		||||
					rc.updateNetworkingCondition(nodeName, err == nil)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						msg := fmt.Sprintf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Since(startTime), err)
 | 
			
		||||
						if rc.recorder != nil {
 | 
			
		||||
@@ -188,10 +205,13 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
 | 
			
		||||
									UID:       types.UID(nodeName),
 | 
			
		||||
									Namespace: "",
 | 
			
		||||
								}, v1.EventTypeWarning, "FailedToCreateRoute", msg)
 | 
			
		||||
							klog.V(4).Infof(msg)
 | 
			
		||||
							return err
 | 
			
		||||
						}
 | 
			
		||||
						klog.V(4).Infof(msg)
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
					l.Lock()
 | 
			
		||||
					nodeRoutesStatuses[nodeName][route.DestinationCIDR] = true
 | 
			
		||||
					l.Unlock()
 | 
			
		||||
					klog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
 | 
			
		||||
					return nil
 | 
			
		||||
				})
 | 
			
		||||
@@ -199,23 +219,31 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
 | 
			
		||||
					klog.Errorf("Could not create route %s %s for node %s: %v", nameHint, route.DestinationCIDR, nodeName, err)
 | 
			
		||||
				}
 | 
			
		||||
			}(nodeName, nameHint, route)
 | 
			
		||||
		} else {
 | 
			
		||||
			// Update condition only if it doesn't reflect the current state.
 | 
			
		||||
			_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeNetworkUnavailable)
 | 
			
		||||
			if condition == nil || condition.Status != v1.ConditionFalse {
 | 
			
		||||
				rc.updateNetworkingCondition(types.NodeName(node.Name), true)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		nodeCIDRs[nodeName] = node.Spec.PodCIDR
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// searches our bag of node->cidrs for a match
 | 
			
		||||
	nodeHasCidr := func(nodeName types.NodeName, cidr string) bool {
 | 
			
		||||
		l.Lock()
 | 
			
		||||
		defer l.Unlock()
 | 
			
		||||
 | 
			
		||||
		nodeRoutes := nodeRoutesStatuses[nodeName]
 | 
			
		||||
		if nodeRoutes == nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		_, exist := nodeRoutes[cidr]
 | 
			
		||||
		return exist
 | 
			
		||||
	}
 | 
			
		||||
	// delete routes that are not in use
 | 
			
		||||
	for _, route := range routes {
 | 
			
		||||
		if rc.isResponsibleForRoute(route) {
 | 
			
		||||
			// Check if this route is a blackhole, or applies to a node we know about & has an incorrect CIDR.
 | 
			
		||||
			if route.Blackhole || (nodeCIDRs[route.TargetNode] != route.DestinationCIDR) {
 | 
			
		||||
			if route.Blackhole || !nodeHasCidr(route.TargetNode, route.DestinationCIDR) {
 | 
			
		||||
				wg.Add(1)
 | 
			
		||||
				// Delete the route.
 | 
			
		||||
				go func(route *cloudprovider.Route, startTime time.Time) {
 | 
			
		||||
					defer wg.Done()
 | 
			
		||||
					// respect the rate limiter
 | 
			
		||||
					rateLimiter <- struct{}{}
 | 
			
		||||
					klog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
 | 
			
		||||
					if err := rc.routes.DeleteRoute(context.TODO(), rc.clusterName, route); err != nil {
 | 
			
		||||
@@ -229,17 +257,62 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	// after all routes have been created (or not), we start updating
 | 
			
		||||
	// all nodes' statuses with the outcome
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		nodeRoutes := nodeRoutesStatuses[types.NodeName(node.Name)]
 | 
			
		||||
		allRoutesCreated := true
 | 
			
		||||
 | 
			
		||||
		if len(nodeRoutes) == 0 {
 | 
			
		||||
			go func(n *v1.Node) {
 | 
			
		||||
				defer wg.Done()
 | 
			
		||||
				klog.Infof("node %v has no routes assigned to it. NodeNetworkUnavailable will be set to true", n.Name)
 | 
			
		||||
				rc.updateNetworkingCondition(n, false)
 | 
			
		||||
			}(node)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// check if all routes were created. if so, then it should be ready
 | 
			
		||||
		for _, created := range nodeRoutes {
 | 
			
		||||
			if !created {
 | 
			
		||||
				allRoutesCreated = false
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		go func(n *v1.Node) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			rc.updateNetworkingCondition(n, allRoutesCreated)
 | 
			
		||||
		}(node)
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rc *RouteController) updateNetworkingCondition(nodeName types.NodeName, routeCreated bool) error {
 | 
			
		||||
func (rc *RouteController) updateNetworkingCondition(node *v1.Node, routesCreated bool) error {
 | 
			
		||||
	_, condition := nodeutil.GetNodeCondition(&(node.Status), v1.NodeNetworkUnavailable)
 | 
			
		||||
	if routesCreated && condition != nil && condition.Status == v1.ConditionFalse {
 | 
			
		||||
		klog.V(2).Infof("set node %v with NodeNetworkUnavailable=false was canceled because it is already set", node.Name)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !routesCreated && condition != nil && condition.Status == v1.ConditionTrue {
 | 
			
		||||
		klog.V(2).Infof("set node %v with NodeNetworkUnavailable=true was canceled because it is already set", node.Name)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.Infof("Patching node status %v with %v previous condition was:%+v", node.Name, routesCreated, condition)
 | 
			
		||||
 | 
			
		||||
	// either condition is not there, or has a value != to what we need
 | 
			
		||||
	// start setting it
 | 
			
		||||
	err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
 | 
			
		||||
		var err error
 | 
			
		||||
		// Patch could also fail, even though the chance is very slim. So we still do
 | 
			
		||||
		// patch in the retry loop.
 | 
			
		||||
		currentTime := metav1.Now()
 | 
			
		||||
		if routeCreated {
 | 
			
		||||
			err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
 | 
			
		||||
		if routesCreated {
 | 
			
		||||
			err = utilnode.SetNodeCondition(rc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
 | 
			
		||||
				Type:               v1.NodeNetworkUnavailable,
 | 
			
		||||
				Status:             v1.ConditionFalse,
 | 
			
		||||
				Reason:             "RouteCreated",
 | 
			
		||||
@@ -247,7 +320,7 @@ func (rc *RouteController) updateNetworkingCondition(nodeName types.NodeName, ro
 | 
			
		||||
				LastTransitionTime: currentTime,
 | 
			
		||||
			})
 | 
			
		||||
		} else {
 | 
			
		||||
			err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
 | 
			
		||||
			err = utilnode.SetNodeCondition(rc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
 | 
			
		||||
				Type:               v1.NodeNetworkUnavailable,
 | 
			
		||||
				Status:             v1.ConditionTrue,
 | 
			
		||||
				Reason:             "NoRouteCreated",
 | 
			
		||||
@@ -256,13 +329,13 @@ func (rc *RouteController) updateNetworkingCondition(nodeName types.NodeName, ro
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			klog.V(4).Infof("Error updating node %s, retrying: %v", nodeName, err)
 | 
			
		||||
			klog.V(4).Infof("Error updating node %s, retrying: %v", types.NodeName(node.Name), err)
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		klog.Errorf("Error updating node %s: %v", nodeName, err)
 | 
			
		||||
		klog.Errorf("Error updating node %s: %v", node.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
@@ -279,8 +352,24 @@ func (rc *RouteController) isResponsibleForRoute(route *cloudprovider.Route) boo
 | 
			
		||||
	for i := range lastIP {
 | 
			
		||||
		lastIP[i] = cidr.IP[i] | ^cidr.Mask[i]
 | 
			
		||||
	}
 | 
			
		||||
	if !rc.clusterCIDR.Contains(cidr.IP) || !rc.clusterCIDR.Contains(lastIP) {
 | 
			
		||||
		return false
 | 
			
		||||
 | 
			
		||||
	// check across all cluster cidrs
 | 
			
		||||
	for _, clusterCIDR := range rc.clusterCIDRs {
 | 
			
		||||
		if clusterCIDR.Contains(cidr.IP) || clusterCIDR.Contains(lastIP) {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// checks if a node owns a route with a specific cidr
 | 
			
		||||
func hasRoute(rm map[types.NodeName][]*cloudprovider.Route, nodeName types.NodeName, cidr string) bool {
 | 
			
		||||
	if routes, ok := rm[nodeName]; ok {
 | 
			
		||||
		for _, route := range routes {
 | 
			
		||||
			if route.DestinationCIDR == cidr {
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user