mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #85885 from nilo19/qi-vmss-cache
Provider/Azure: Add cache for VMSS.
This commit is contained in:
		@@ -60,6 +60,7 @@ type scaleSet struct {
 | 
				
			|||||||
	// (e.g. master nodes) may not belong to any scale sets.
 | 
						// (e.g. master nodes) may not belong to any scale sets.
 | 
				
			||||||
	availabilitySet VMSet
 | 
						availabilitySet VMSet
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						vmssCache                 *timedCache
 | 
				
			||||||
	vmssVMCache               *timedCache
 | 
						vmssVMCache               *timedCache
 | 
				
			||||||
	availabilitySetNodesCache *timedCache
 | 
						availabilitySetNodesCache *timedCache
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -77,6 +78,11 @@ func newScaleSet(az *Cloud) (VMSet, error) {
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ss.vmssCache, err = ss.newVMSSCache()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
 | 
						ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
@@ -85,6 +91,43 @@ func newScaleSet(az *Cloud) (VMSet, error) {
 | 
				
			|||||||
	return ss, nil
 | 
						return ss, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.VirtualMachineScaleSet, error) {
 | 
				
			||||||
 | 
						getter := func(vmssName string) (*compute.VirtualMachineScaleSet, error) {
 | 
				
			||||||
 | 
							cached, err := ss.vmssCache.Get(vmssKey, crt)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							vmsses := cached.(*sync.Map)
 | 
				
			||||||
 | 
							if vmss, ok := vmsses.Load(vmssName); ok {
 | 
				
			||||||
 | 
								result := vmss.(*vmssEntry)
 | 
				
			||||||
 | 
								return result.vmss, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						vmss, err := getter(vmssName)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if vmss != nil {
 | 
				
			||||||
 | 
							return vmss, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						klog.V(3).Infof("Couldn't find VMSS with name %s, refreshing the cache", vmssName)
 | 
				
			||||||
 | 
						ss.vmssCache.Delete(vmssKey)
 | 
				
			||||||
 | 
						vmss, err = getter(vmssName)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if vmss == nil {
 | 
				
			||||||
 | 
							return nil, cloudprovider.InstanceNotFound
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return vmss, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
 | 
					// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
 | 
				
			||||||
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
 | 
					// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
 | 
				
			||||||
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
 | 
					func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
 | 
				
			||||||
@@ -903,7 +946,7 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for vmssName := range vmssNamesMap {
 | 
						for vmssName := range vmssNamesMap {
 | 
				
			||||||
		vmss, err := ss.GetScaleSetWithRetry(service, ss.ResourceGroup, vmssName)
 | 
							vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -1208,7 +1251,7 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for vmssName := range vmssNamesMap {
 | 
						for vmssName := range vmssNamesMap {
 | 
				
			||||||
		vmss, err := ss.GetScaleSetWithRetry(service, ss.ResourceGroup, vmssName)
 | 
							vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
 | 
							// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
 | 
				
			||||||
		// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
 | 
							// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,6 +19,7 @@ limitations under the License.
 | 
				
			|||||||
package azure
 | 
					package azure
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -33,10 +34,12 @@ import (
 | 
				
			|||||||
var (
 | 
					var (
 | 
				
			||||||
	vmssNameSeparator = "_"
 | 
						vmssNameSeparator = "_"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						vmssKey                 = "k8svmssKey"
 | 
				
			||||||
	vmssVirtualMachinesKey  = "k8svmssVirtualMachinesKey"
 | 
						vmssVirtualMachinesKey  = "k8svmssVirtualMachinesKey"
 | 
				
			||||||
	availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
 | 
						availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	availabilitySetNodesCacheTTL = 15 * time.Minute
 | 
						availabilitySetNodesCacheTTL = 15 * time.Minute
 | 
				
			||||||
 | 
						vmssTTL                      = 10 * time.Minute
 | 
				
			||||||
	vmssVirtualMachinesTTL       = 10 * time.Minute
 | 
						vmssVirtualMachinesTTL       = 10 * time.Minute
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -48,6 +51,45 @@ type vmssVirtualMachinesEntry struct {
 | 
				
			|||||||
	lastUpdate     time.Time
 | 
						lastUpdate     time.Time
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type vmssEntry struct {
 | 
				
			||||||
 | 
						vmss       *compute.VirtualMachineScaleSet
 | 
				
			||||||
 | 
						lastUpdate time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ss *scaleSet) newVMSSCache() (*timedCache, error) {
 | 
				
			||||||
 | 
						getter := func(key string) (interface{}, error) {
 | 
				
			||||||
 | 
							localCache := &sync.Map{} // [vmssName]*vmssEntry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							allResourceGroups, err := ss.GetResourceGroups()
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							for _, resourceGroup := range allResourceGroups.List() {
 | 
				
			||||||
 | 
								allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(context.Background(), resourceGroup)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
 | 
				
			||||||
 | 
									return nil, err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								for _, scaleSet := range allScaleSets {
 | 
				
			||||||
 | 
									if scaleSet.Name == nil || *scaleSet.Name == "" {
 | 
				
			||||||
 | 
										klog.Warning("failed to get the name of VMSS")
 | 
				
			||||||
 | 
										continue
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									localCache.Store(*scaleSet.Name, &vmssEntry{
 | 
				
			||||||
 | 
										vmss:       &scaleSet,
 | 
				
			||||||
 | 
										lastUpdate: time.Now().UTC(),
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return localCache, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return newTimedcache(vmssTTL, getter)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func extractVmssVMName(name string) (string, string, error) {
 | 
					func extractVmssVMName(name string) (string, string, error) {
 | 
				
			||||||
	split := strings.SplitAfter(name, vmssNameSeparator)
 | 
						split := strings.SplitAfter(name, vmssNameSeparator)
 | 
				
			||||||
	if len(split) < 2 {
 | 
						if len(split) < 2 {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user