mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	set nil cache entry based on old cache
fix tests update old entry logic
This commit is contained in:
		@@ -38,6 +38,9 @@ const (
 | 
			
		||||
	// active/expired. If entry doesn't exist in cache, then data is fetched
 | 
			
		||||
	// using getter, saved in cache and returned
 | 
			
		||||
	cacheReadTypeUnsafe
 | 
			
		||||
	// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry
 | 
			
		||||
	// is not expired
 | 
			
		||||
	cacheReadTypeForceRefresh
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// getFunc defines a getter function for timedCache.
 | 
			
		||||
@@ -120,20 +123,20 @@ func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) {
 | 
			
		||||
	entry.lock.Lock()
 | 
			
		||||
	defer entry.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	// entry exists
 | 
			
		||||
	if entry.data != nil {
 | 
			
		||||
	// entry exists and if cache is not force refreshed
 | 
			
		||||
	if entry.data != nil && crt != cacheReadTypeForceRefresh {
 | 
			
		||||
		// allow unsafe read, so return data even if expired
 | 
			
		||||
		if crt == cacheReadTypeUnsafe {
 | 
			
		||||
			return entry.data, nil
 | 
			
		||||
		}
 | 
			
		||||
		// if cached data is not expired, return cached data
 | 
			
		||||
		if time.Since(entry.createdOn) < t.ttl {
 | 
			
		||||
		if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl {
 | 
			
		||||
			return entry.data, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Data is not cached yet or cache data is expired, cache it by getter.
 | 
			
		||||
	// entry is locked before getting to ensure concurrent gets don't result in
 | 
			
		||||
	// multiple ARM calls.
 | 
			
		||||
	// Data is not cached yet, cache data is expired or requested force refresh
 | 
			
		||||
	// cache it by getter. entry is locked before getting to ensure concurrent
 | 
			
		||||
	// gets don't result in multiple ARM calls.
 | 
			
		||||
	data, err := t.getter(key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
 
 | 
			
		||||
@@ -204,3 +204,23 @@ func TestCacheNoConcurrentGet(t *testing.T) {
 | 
			
		||||
	assert.Equal(t, 1, dataSource.called)
 | 
			
		||||
	assert.Equal(t, val, v, "cache should get correct data")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacheForceRefresh(t *testing.T) {
 | 
			
		||||
	key := "key1"
 | 
			
		||||
	val := &fakeDataObj{}
 | 
			
		||||
	data := map[string]*fakeDataObj{
 | 
			
		||||
		key: val,
 | 
			
		||||
	}
 | 
			
		||||
	dataSource, cache := newFakeCache(t)
 | 
			
		||||
	dataSource.set(data)
 | 
			
		||||
 | 
			
		||||
	v, err := cache.Get(key, cacheReadTypeDefault)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.Equal(t, 1, dataSource.called)
 | 
			
		||||
	assert.Equal(t, val, v, "cache should get correct data")
 | 
			
		||||
 | 
			
		||||
	v, err = cache.Get(key, cacheReadTypeForceRefresh)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	assert.Equal(t, 2, dataSource.called)
 | 
			
		||||
	assert.Equal(t, val, v, "should refetch unexpired data as forced refresh")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -130,19 +130,21 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
 | 
			
		||||
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
 | 
			
		||||
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
 | 
			
		||||
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
 | 
			
		||||
	getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
 | 
			
		||||
	getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
 | 
			
		||||
		var found bool
 | 
			
		||||
		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return "", "", nil, err
 | 
			
		||||
			return "", "", nil, found, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		virtualMachines := cached.(*sync.Map)
 | 
			
		||||
		if vm, ok := virtualMachines.Load(nodeName); ok {
 | 
			
		||||
			result := vm.(*vmssVirtualMachinesEntry)
 | 
			
		||||
			return result.vmssName, result.instanceID, result.virtualMachine, nil
 | 
			
		||||
			found = true
 | 
			
		||||
			return result.vmssName, result.instanceID, result.virtualMachine, found, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return "", "", nil, nil
 | 
			
		||||
		return "", "", nil, found, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err := getScaleSetVMInstanceID(nodeName)
 | 
			
		||||
@@ -150,22 +152,24 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
 | 
			
		||||
		return "", "", nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	vmssName, instanceID, vm, err := getter(nodeName)
 | 
			
		||||
	vmssName, instanceID, vm, found, err := getter(nodeName, crt)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", "", nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if vm != nil {
 | 
			
		||||
 | 
			
		||||
	if !found {
 | 
			
		||||
		klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
 | 
			
		||||
		vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return "", "", nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if found && vm != nil {
 | 
			
		||||
		return vmssName, instanceID, vm, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
 | 
			
		||||
	ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
 | 
			
		||||
	vmssName, instanceID, vm, err = getter(nodeName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", "", nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if vm == nil {
 | 
			
		||||
	if !found || vm == nil {
 | 
			
		||||
		return "", "", nil, cloudprovider.InstanceNotFound
 | 
			
		||||
	}
 | 
			
		||||
	return vmssName, instanceID, vm, nil
 | 
			
		||||
@@ -196,7 +200,7 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
 | 
			
		||||
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
 | 
			
		||||
// The node must belong to one of scale sets.
 | 
			
		||||
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
 | 
			
		||||
	getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
 | 
			
		||||
	getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
 | 
			
		||||
		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, false, err
 | 
			
		||||
@@ -219,21 +223,21 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
 | 
			
		||||
		return vm, found, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	vm, found, err := getter()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if found {
 | 
			
		||||
		return vm, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
 | 
			
		||||
	ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
 | 
			
		||||
	vm, found, err = getter()
 | 
			
		||||
	vm, found, err := getter(crt)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if !found {
 | 
			
		||||
		klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
 | 
			
		||||
		vm, found, err = getter(cacheReadTypeForceRefresh)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if found && vm != nil {
 | 
			
		||||
		return vm, nil
 | 
			
		||||
	}
 | 
			
		||||
	if !found || vm == nil {
 | 
			
		||||
		return nil, cloudprovider.InstanceNotFound
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -111,6 +111,26 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
 | 
			
		||||
	getter := func(key string) (interface{}, error) {
 | 
			
		||||
		localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
 | 
			
		||||
 | 
			
		||||
		oldCache := make(map[string]vmssVirtualMachinesEntry)
 | 
			
		||||
 | 
			
		||||
		if ss.vmssVMCache != nil {
 | 
			
		||||
			// get old cache before refreshing the cache
 | 
			
		||||
			entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			if exists {
 | 
			
		||||
				cached := entry.(*cacheEntry).data
 | 
			
		||||
				if cached != nil {
 | 
			
		||||
					virtualMachines := cached.(*sync.Map)
 | 
			
		||||
					virtualMachines.Range(func(key, value interface{}) bool {
 | 
			
		||||
						oldCache[key.(string)] = *value.(*vmssVirtualMachinesEntry)
 | 
			
		||||
						return true
 | 
			
		||||
					})
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		allResourceGroups, err := ss.GetResourceGroups()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
@@ -143,8 +163,38 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
 | 
			
		||||
						virtualMachine: &vm,
 | 
			
		||||
						lastUpdate:     time.Now().UTC(),
 | 
			
		||||
					})
 | 
			
		||||
 | 
			
		||||
					if _, exists := oldCache[computerName]; exists {
 | 
			
		||||
						delete(oldCache, computerName)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// add old missing cache data with nil entries to prevent aggressive
 | 
			
		||||
			// ARM calls during cache invalidation
 | 
			
		||||
			for name, vmEntry := range oldCache {
 | 
			
		||||
				// if the nil cache entry has existed for 15 minutes in the cache
 | 
			
		||||
				// then it should not be added back to the cache
 | 
			
		||||
				if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > 15*time.Minute {
 | 
			
		||||
					klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				lastUpdate := time.Now().UTC()
 | 
			
		||||
				if vmEntry.virtualMachine == nil {
 | 
			
		||||
					// if this is already a nil entry then keep the time the nil
 | 
			
		||||
					// entry was first created, so we can cleanup unwanted entries
 | 
			
		||||
					lastUpdate = vmEntry.lastUpdate
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				klog.V(5).Infof("adding old entries to new cache for %s", name)
 | 
			
		||||
				localCache.Store(name, &vmssVirtualMachinesEntry{
 | 
			
		||||
					resourceGroup:  vmEntry.resourceGroup,
 | 
			
		||||
					vmssName:       vmEntry.vmssName,
 | 
			
		||||
					instanceID:     vmEntry.instanceID,
 | 
			
		||||
					virtualMachine: nil,
 | 
			
		||||
					lastUpdate:     lastUpdate,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return localCache, nil
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user