Merge pull request #59716 from feiskyer/vmss-disk

Automatic merge from submit-queue (batch tested with PRs 59489, 59716). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add AzureDisk support for vmss nodes

**What this PR does / why we need it**:

This PR adds AzureDisk support for vmss nodes. Changes include

- Upgrade vmss API to 2017-12-01
- Upgrade vmss clients with new version API
- Abstract AzureDisk operations for vmss and vmas
- Added AzureDisk support for vmss
- Unit tests and fake clients fix
 
**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #43287

**Special notes for your reviewer**:

~~Depending on #59652 (the first two commits are from #59652).~~

**Release note**:

```release-note
Add AzureDisk support for vmss nodes
```
This commit is contained in:
Kubernetes Submit Queue
2018-02-14 00:14:34 -08:00
committed by GitHub
69 changed files with 16648 additions and 554 deletions

View File

@@ -14,7 +14,9 @@ go_library(
"azure_blobDiskController.go",
"azure_cache.go",
"azure_client.go",
"azure_controllerCommon.go",
"azure_controller_common.go",
"azure_controller_standard.go",
"azure_controller_vmss.go",
"azure_fakes.go",
"azure_file.go",
"azure_instance_metadata.go",
@@ -44,6 +46,7 @@ go_library(
"//vendor/github.com/Azure/azure-sdk-for-go/arm/disk:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/arm/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/arm/storage:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/storage:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest/adal:go_default_library",
@@ -90,6 +93,7 @@ go_test(
"//vendor/github.com/Azure/azure-sdk-for-go/arm/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/arm/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/arm/storage:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@@ -17,13 +17,17 @@ limitations under the License.
package azure
import (
"k8s.io/apimachinery/pkg/util/wait"
"context"
"net/http"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)
// requestBackoff if backoff is disabled in cloud provider it
@@ -345,6 +349,15 @@ func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualM
})
}
// UpdateVmssVMWithRetry invokes az.VirtualMachineScaleSetVMsClient.Update with exponential backoff retry
func (az *Cloud) UpdateVmssVMWithRetry(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters computepreview.VirtualMachineScaleSetVM) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
resp, err := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s,%s): end", VMScaleSetName, instanceID)
return processHTTPRetryResponse(resp, err)
})
}
// A wait.ConditionFunc function to deal with common HTTP backoff response conditions
func processRetryResponse(resp autorest.Response, err error) (bool, error) {
if isSuccessHTTPResponse(resp) {
@@ -380,3 +393,36 @@ func isSuccessHTTPResponse(resp autorest.Response) bool {
}
return false
}
func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
if err != nil {
return true
}
if resp != nil {
// HTTP 4xx or 5xx suggests we should retry
if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true
}
}
return false
}
func processHTTPRetryResponse(resp *http.Response, err error) (bool, error) {
if resp != nil {
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true, nil
}
}
if shouldRetryHTTPRequest(resp, err) {
glog.Errorf("backoff: failure, will retry, HTTP response=%d, err=%v", resp.StatusCode, err)
// suppress the error object so that backoff process continues
return false, nil
}
// Fall-through: stop periodic backoff
return true, nil
}

View File

@@ -17,12 +17,15 @@ limitations under the License.
package azure
import (
"context"
"net/http"
"time"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/disk"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/azure-sdk-for-go/arm/storage"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/golang/glog"
@@ -79,21 +82,20 @@ type SecurityGroupsClient interface {
List(resourceGroupName string) (result network.SecurityGroupListResult, err error)
}
// VirtualMachineScaleSetsClient defines needed functions for azure compute.VirtualMachineScaleSetsClient
// VirtualMachineScaleSetsClient defines needed functions for azure computepreview.VirtualMachineScaleSetsClient
type VirtualMachineScaleSetsClient interface {
CreateOrUpdate(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error)
Get(resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error)
List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error)
ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error)
UpdateInstances(resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error)
CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters computepreview.VirtualMachineScaleSet) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result computepreview.VirtualMachineScaleSet, err error)
List(ctx context.Context, resourceGroupName string) (result []computepreview.VirtualMachineScaleSet, err error)
UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error)
}
// VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient
// VirtualMachineScaleSetVMsClient defines needed functions for azure computepreview.VirtualMachineScaleSetVMsClient
type VirtualMachineScaleSetVMsClient interface {
Get(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVM, err error)
GetInstanceView(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVMInstanceView, err error)
List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error)
ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error)
Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVM, err error)
GetInstanceView(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVMInstanceView, err error)
List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []computepreview.VirtualMachineScaleSetVM, err error)
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters computepreview.VirtualMachineScaleSetVM) (resp *http.Response, err error)
}
// RoutesClient defines needed functions for azure network.RoutesClient
@@ -138,6 +140,10 @@ type azVirtualMachinesClient struct {
rateLimiter flowcontrol.RateLimiter
}
func getContextWithCancel() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
func newAzVirtualMachinesClient(config *azClientConfig) *azVirtualMachinesClient {
virtualMachinesClient := compute.NewVirtualMachinesClient(config.subscriptionID)
virtualMachinesClient.BaseURI = config.resourceManagerEndpoint
@@ -603,12 +609,12 @@ func (az *azSecurityGroupsClient) List(resourceGroupName string) (result network
// azVirtualMachineScaleSetsClient implements VirtualMachineScaleSetsClient.
type azVirtualMachineScaleSetsClient struct {
client compute.VirtualMachineScaleSetsClient
client computepreview.VirtualMachineScaleSetsClient
rateLimiter flowcontrol.RateLimiter
}
func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachineScaleSetsClient {
virtualMachineScaleSetsClient := compute.NewVirtualMachineScaleSetsClient(config.subscriptionID)
virtualMachineScaleSetsClient := computepreview.NewVirtualMachineScaleSetsClient(config.subscriptionID)
virtualMachineScaleSetsClient.BaseURI = config.resourceManagerEndpoint
virtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(config.servicePrincipalToken)
virtualMachineScaleSetsClient.PollingDelay = 5 * time.Second
@@ -620,23 +626,26 @@ func newAzVirtualMachineScaleSetsClient(config *azClientConfig) *azVirtualMachin
}
}
func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) {
func (az *azVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters computepreview.VirtualMachineScaleSet) (resp *http.Response, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): start", resourceGroupName, VMScaleSetName)
defer func() {
glog.V(10).Infof("azVirtualMachineScaleSetsClient.CreateOrUpdate(%q,%q): end", resourceGroupName, VMScaleSetName)
}()
errChan := make(chan error, 1)
mc := newMetricContext("vmss", "create_or_update", resourceGroupName, az.client.SubscriptionID)
resultChan, proxyErrChan := az.client.CreateOrUpdate(resourceGroupName, VMScaleSetName, parameters, cancel)
err := <-proxyErrChan
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
mc.Observe(err)
errChan <- err
return resultChan, errChan
if err != nil {
return future.Response(), err
}
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azVirtualMachineScaleSetsClient) Get(resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) {
func (az *azVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result computepreview.VirtualMachineScaleSet, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetsClient.Get(%q,%q): start", resourceGroupName, VMScaleSetName)
defer func() {
@@ -644,12 +653,12 @@ func (az *azVirtualMachineScaleSetsClient) Get(resourceGroupName string, VMScale
}()
mc := newMetricContext("vmss", "get", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.Get(resourceGroupName, VMScaleSetName)
result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName)
mc.Observe(err)
return
}
func (az *azVirtualMachineScaleSetsClient) List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error) {
func (az *azVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []computepreview.VirtualMachineScaleSet, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetsClient.List(%q,%q): start", resourceGroupName)
defer func() {
@@ -657,48 +666,51 @@ func (az *azVirtualMachineScaleSetsClient) List(resourceGroupName string) (resul
}()
mc := newMetricContext("vmss", "list", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.List(resourceGroupName)
iterator, err := az.client.ListComplete(ctx, resourceGroupName)
mc.Observe(err)
return
if err != nil {
return nil, err
}
result = make([]computepreview.VirtualMachineScaleSet, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, err
}
result = append(result, iterator.Value())
}
return result, nil
}
func (az *azVirtualMachineScaleSetsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetsClient.ListNextResults(%q): start", lastResults)
defer func() {
glog.V(10).Infof("azVirtualMachineScaleSetsClient.ListNextResults(%q): end", lastResults)
}()
mc := newMetricContext("vmss", "list_next_results", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.ListNextResults(lastResults)
mc.Observe(err)
return
}
func (az *azVirtualMachineScaleSetsClient) UpdateInstances(resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) {
func (az *azVirtualMachineScaleSetsClient) UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetsClient.UpdateInstances(%q,%q,%q): start", resourceGroupName, VMScaleSetName, VMInstanceIDs)
defer func() {
glog.V(10).Infof("azVirtualMachineScaleSetsClient.UpdateInstances(%q,%q,%q): end", resourceGroupName, VMScaleSetName, VMInstanceIDs)
}()
errChan := make(chan error, 1)
mc := newMetricContext("vmss", "update_instances", resourceGroupName, az.client.SubscriptionID)
resultChan, proxyErrChan := az.client.UpdateInstances(resourceGroupName, VMScaleSetName, VMInstanceIDs, cancel)
err := <-proxyErrChan
future, err := az.client.UpdateInstances(ctx, resourceGroupName, VMScaleSetName, VMInstanceIDs)
mc.Observe(err)
errChan <- err
return resultChan, errChan
if err != nil {
return future.Response(), err
}
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
// azVirtualMachineScaleSetVMsClient implements VirtualMachineScaleSetVMsClient.
type azVirtualMachineScaleSetVMsClient struct {
client compute.VirtualMachineScaleSetVMsClient
client computepreview.VirtualMachineScaleSetVMsClient
rateLimiter flowcontrol.RateLimiter
}
func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMachineScaleSetVMsClient {
virtualMachineScaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(config.subscriptionID)
virtualMachineScaleSetVMsClient := computepreview.NewVirtualMachineScaleSetVMsClient(config.subscriptionID)
virtualMachineScaleSetVMsClient.BaseURI = config.resourceManagerEndpoint
virtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(config.servicePrincipalToken)
virtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second
@@ -710,7 +722,7 @@ func newAzVirtualMachineScaleSetVMsClient(config *azClientConfig) *azVirtualMach
}
}
func (az *azVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVM, err error) {
func (az *azVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVM, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Get(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID)
defer func() {
@@ -718,12 +730,12 @@ func (az *azVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, VMSca
}()
mc := newMetricContext("vmssvm", "get", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.Get(resourceGroupName, VMScaleSetName, instanceID)
result, err = az.client.Get(ctx, resourceGroupName, VMScaleSetName, instanceID)
mc.Observe(err)
return
}
func (az *azVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVMInstanceView, err error) {
func (az *azVirtualMachineScaleSetVMsClient) GetInstanceView(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVMInstanceView, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.GetInstanceView(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID)
defer func() {
@@ -731,12 +743,12 @@ func (az *azVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupName s
}()
mc := newMetricContext("vmssvm", "get_instance_view", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.GetInstanceView(resourceGroupName, VMScaleSetName, instanceID)
result, err = az.client.GetInstanceView(ctx, resourceGroupName, VMScaleSetName, instanceID)
mc.Observe(err)
return
}
func (az *azVirtualMachineScaleSetVMsClient) List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) {
func (az *azVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []computepreview.VirtualMachineScaleSetVM, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.List(%q,%q,%q): start", resourceGroupName, virtualMachineScaleSetName, filter)
defer func() {
@@ -744,22 +756,41 @@ func (az *azVirtualMachineScaleSetVMsClient) List(resourceGroupName string, virt
}()
mc := newMetricContext("vmssvm", "list", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.List(resourceGroupName, virtualMachineScaleSetName, filter, selectParameter, expand)
iterator, err := az.client.ListComplete(ctx, resourceGroupName, virtualMachineScaleSetName, filter, selectParameter, expand)
mc.Observe(err)
return
if err != nil {
return nil, err
}
result = make([]computepreview.VirtualMachineScaleSetVM, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, err
}
result = append(result, iterator.Value())
}
return result, nil
}
func (az *azVirtualMachineScaleSetVMsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) {
func (az *azVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters computepreview.VirtualMachineScaleSetVM) (resp *http.Response, err error) {
az.rateLimiter.Accept()
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.ListNextResults(%q,%q,%q): start", lastResults)
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): start", resourceGroupName, VMScaleSetName, instanceID)
defer func() {
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.ListNextResults(%q,%q,%q): end", lastResults)
glog.V(10).Infof("azVirtualMachineScaleSetVMsClient.Update(%q,%q,%q): end", resourceGroupName, VMScaleSetName, instanceID)
}()
mc := newMetricContext("vmssvm", "list_next_results", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.ListNextResults(lastResults)
mc := newMetricContext("vmssvm", "update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters)
mc.Observe(err)
return
if err != nil {
return future.Response(), err
}
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
// azRoutesClient implements RoutesClient.

View File

@@ -0,0 +1,192 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"k8s.io/apimachinery/pkg/types"
kwait "k8s.io/apimachinery/pkg/util/wait"
)
const (
storageAccountNameTemplate = "pvc%s"
// for limits check https://docs.microsoft.com/en-us/azure/azure-subscription-service-limits#storage-limits
maxStorageAccounts = 100 // max # is 200 (250 with special request). this allows 100 for everything else including stand alone disks
maxDisksPerStorageAccounts = 60
storageAccountUtilizationBeforeGrowing = 0.5
maxLUN = 64 // max number of LUNs per VM
errLeaseFailed = "AcquireDiskLeaseFailed"
errLeaseIDMissing = "LeaseIdMissing"
errContainerNotFound = "ContainerNotFound"
errDiskBlobNotFound = "DiskBlobNotFound"
)
var defaultBackOff = kwait.Backoff{
Steps: 20,
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.0,
}
type controllerCommon struct {
subscriptionID string
location string
storageEndpointSuffix string
resourceGroup string
cloud *Cloud
}
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
// 1. vmType is standard, attach with availabilitySet.AttachDisk.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode)
}
// 2. vmType is Virtual Machine Scale Set (vmss), convert vmSet to scaleSet.
ss, ok := c.cloud.vmSet.(*scaleSet)
if !ok {
return fmt.Errorf("error of converting vmSet (%q) to scaleSet with vmType %q", c.cloud.vmSet, c.cloud.VMType)
}
// 3. If the node is managed by availability set, then attach with availabilitySet.AttachDisk.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName))
if err != nil {
return err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode)
}
// 4. Node is managed by vmss, attach with scaleSet.AttachDisk.
return ss.AttachDisk(isManagedDisk, diskName, diskURI, nodeName, lun, cachingMode)
}
// DetachDiskByName detaches a vhd from host. The vhd can be identified by diskName or diskURI.
func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
// 1. vmType is standard, detach with availabilitySet.DetachDiskByName.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet.DetachDiskByName(diskName, diskURI, nodeName)
}
// 2. vmType is Virtual Machine Scale Set (vmss), convert vmSet to scaleSet.
ss, ok := c.cloud.vmSet.(*scaleSet)
if !ok {
return fmt.Errorf("error of converting vmSet (%q) to scaleSet with vmType %q", c.cloud.vmSet, c.cloud.VMType)
}
// 3. If the node is managed by availability set, then detach with availabilitySet.DetachDiskByName.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName))
if err != nil {
return err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.DetachDiskByName(diskName, diskURI, nodeName)
}
// 4. Node is managed by vmss, detach with scaleSet.DetachDiskByName.
return ss.DetachDiskByName(diskName, diskURI, nodeName)
}
// GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI.
func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
// 1. vmType is standard, get with availabilitySet.GetDiskLun.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet.GetDiskLun(diskName, diskURI, nodeName)
}
// 2. vmType is Virtual Machine Scale Set (vmss), convert vmSet to scaleSet.
ss, ok := c.cloud.vmSet.(*scaleSet)
if !ok {
return -1, fmt.Errorf("error of converting vmSet (%q) to scaleSet with vmType %q", c.cloud.vmSet, c.cloud.VMType)
}
// 3. If the node is managed by availability set, then get with availabilitySet.GetDiskLun.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName))
if err != nil {
return -1, err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetDiskLun(diskName, diskURI, nodeName)
}
// 4. Node is managed by vmss, get with scaleSet.GetDiskLun.
return ss.GetDiskLun(diskName, diskURI, nodeName)
}
// GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used.
func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
// 1. vmType is standard, get with availabilitySet.GetNextDiskLun.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet.GetNextDiskLun(nodeName)
}
// 2. vmType is Virtual Machine Scale Set (vmss), convert vmSet to scaleSet.
ss, ok := c.cloud.vmSet.(*scaleSet)
if !ok {
return -1, fmt.Errorf("error of converting vmSet (%q) to scaleSet with vmType %q", c.cloud.vmSet, c.cloud.VMType)
}
// 3. If the node is managed by availability set, then get with availabilitySet.GetNextDiskLun.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName))
if err != nil {
return -1, err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetNextDiskLun(nodeName)
}
// 4. Node is managed by vmss, get with scaleSet.GetNextDiskLun.
return ss.GetNextDiskLun(nodeName)
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName.
func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
// 1. vmType is standard, check with availabilitySet.DisksAreAttached.
if c.cloud.VMType == vmTypeStandard {
return c.cloud.vmSet.DisksAreAttached(diskNames, nodeName)
}
// 2. vmType is Virtual Machine Scale Set (vmss), convert vmSet to scaleSet.
ss, ok := c.cloud.vmSet.(*scaleSet)
if !ok {
return nil, fmt.Errorf("error of converting vmSet (%q) to scaleSet with vmType %q", c.cloud.vmSet, c.cloud.VMType)
}
// 3. If the node is managed by availability set, then check with availabilitySet.DisksAreAttached.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName))
if err != nil {
return nil, err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.DisksAreAttached(diskNames, nodeName)
}
// 4. Node is managed by vmss, check with scaleSet.DisksAreAttached.
return ss.DisksAreAttached(diskNames, nodeName)
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2017 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,50 +19,18 @@ package azure
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
kwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/cloudprovider"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
)
const (
storageAccountNameTemplate = "pvc%s"
// for limits check https://docs.microsoft.com/en-us/azure/azure-subscription-service-limits#storage-limits
maxStorageAccounts = 100 // max # is 200 (250 with special request). this allows 100 for everything else including stand alone disks
maxDisksPerStorageAccounts = 60
storageAccountUtilizationBeforeGrowing = 0.5
maxLUN = 64 // max number of LUNs per VM
errLeaseFailed = "AcquireDiskLeaseFailed"
errLeaseIDMissing = "LeaseIdMissing"
errContainerNotFound = "ContainerNotFound"
errDiskBlobNotFound = "DiskBlobNotFound"
)
var defaultBackOff = kwait.Backoff{
Steps: 20,
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.0,
}
type controllerCommon struct {
subscriptionID string
location string
storageEndpointSuffix string
resourceGroup string
cloud *Cloud
}
// AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun.
func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
vm, err := c.cloud.getVirtualMachine(nodeName)
func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
vm, err := as.getVirtualMachine(nodeName)
if err != nil {
return err
}
@@ -101,16 +69,16 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
},
}
vmName := mapNodeNameToVMName(nodeName)
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", c.resourceGroup, vmName)
respChan, errChan := c.cloud.VirtualMachinesClient.CreateOrUpdate(c.resourceGroup, vmName, newVM, nil)
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", as.resourceGroup, vmName)
respChan, errChan := as.VirtualMachinesClient.CreateOrUpdate(as.resourceGroup, vmName, newVM, nil)
resp := <-respChan
err = <-errChan
if c.cloud.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", c.resourceGroup, vmName)
retryErr := c.cloud.CreateOrUpdateVMWithRetry(vmName, newVM)
if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName)
retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", c.resourceGroup, vmName)
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.resourceGroup, vmName)
}
}
if err != nil {
@@ -119,20 +87,20 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
glog.Infof("azureDisk - err %s, try detach", detail)
c.cloud.DetachDiskByName(diskName, diskURI, nodeName)
as.DetachDiskByName(diskName, diskURI, nodeName)
}
} else {
glog.V(4).Info("azureDisk - azure attach succeeded")
// Invalidate the cache right after updating
c.cloud.vmCache.Delete(vmName)
as.cloud.vmCache.Delete(vmName)
}
return err
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
vm, err := c.cloud.getVirtualMachine(nodeName)
func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
vm, err := as.getVirtualMachine(nodeName)
if err != nil {
// if host doesn't exist, no need to detach
glog.Warningf("azureDisk - cannot find node %s, skip detaching disk %s", nodeName, diskName)
@@ -166,16 +134,16 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t
},
}
vmName := mapNodeNameToVMName(nodeName)
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", c.resourceGroup, vmName)
respChan, errChan := c.cloud.VirtualMachinesClient.CreateOrUpdate(c.resourceGroup, vmName, newVM, nil)
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", as.resourceGroup, vmName)
respChan, errChan := as.VirtualMachinesClient.CreateOrUpdate(as.resourceGroup, vmName, newVM, nil)
resp := <-respChan
err = <-errChan
if c.cloud.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", c.resourceGroup, vmName)
retryErr := c.cloud.CreateOrUpdateVMWithRetry(vmName, newVM)
if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName)
retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", c.cloud.ResourceGroup, vmName)
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.ResourceGroup, vmName)
}
}
if err != nil {
@@ -183,14 +151,14 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t
} else {
glog.V(4).Info("azureDisk - azure disk detach succeeded")
// Invalidate the cache right after updating
c.cloud.vmCache.Delete(vmName)
as.cloud.vmCache.Delete(vmName)
}
return err
}
// GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI
func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
vm, err := c.cloud.getVirtualMachine(nodeName)
func (as *availabilitySet) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
vm, err := as.getVirtualMachine(nodeName)
if err != nil {
return -1, err
}
@@ -209,8 +177,8 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N
// GetNextDiskLun searches all vhd attachment on the host and find unused lun
// return -1 if all luns are used
func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
vm, err := c.cloud.getVirtualMachine(nodeName)
func (as *availabilitySet) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
vm, err := as.getVirtualMachine(nodeName)
if err != nil {
return -1, err
}
@@ -230,12 +198,12 @@ func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
func (as *availabilitySet) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
vm, err := c.cloud.getVirtualMachine(nodeName)
vm, err := as.getVirtualMachine(nodeName)
if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach
glog.Warningf("azureDisk - Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",

View File

@@ -0,0 +1,214 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"strings"
"github.com/Azure/azure-sdk-for-go/arm/compute"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
)
// AttachDisk attaches a vhd to vm
// the vhd must exist, can be identified by diskName, diskURI, and lun.
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName))
if err != nil {
return err
}
disks := *vm.StorageProfile.DataDisks
if isManagedDisk {
disks = append(disks,
computepreview.DataDisk{
Name: &diskName,
Lun: &lun,
Caching: computepreview.CachingTypes(cachingMode),
CreateOption: "attach",
ManagedDisk: &computepreview.ManagedDiskParameters{
ID: &diskURI,
},
})
} else {
disks = append(disks,
computepreview.DataDisk{
Name: &diskName,
Vhd: &computepreview.VirtualHardDisk{
URI: &diskURI,
},
Lun: &lun,
Caching: computepreview.CachingTypes(cachingMode),
CreateOption: "attach",
})
}
vm.StorageProfile.DataDisks = &disks
ctx, cancel := getContextWithCancel()
defer cancel()
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", ss.resourceGroup, nodeName)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName)
retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName)
}
}
if err != nil {
detail := err.Error()
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
glog.Infof("azureDisk - err %s, try detach", detail)
ss.DetachDiskByName(diskName, diskURI, nodeName)
}
} else {
glog.V(4).Info("azureDisk - azure attach succeeded")
// Invalidate the cache right after updating
ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID))
}
return err
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName))
if err != nil {
return err
}
disks := *vm.StorageProfile.DataDisks
bFoundDisk := false
for i, disk := range disks {
if disk.Lun != nil && (disk.Name != nil && diskName != "" && *disk.Name == diskName) ||
(disk.Vhd != nil && disk.Vhd.URI != nil && diskURI != "" && *disk.Vhd.URI == diskURI) ||
(disk.ManagedDisk != nil && diskURI != "" && *disk.ManagedDisk.ID == diskURI) {
// found the disk
glog.V(4).Infof("azureDisk - detach disk: name %q uri %q", diskName, diskURI)
disks = append(disks[:i], disks[i+1:]...)
bFoundDisk = true
break
}
}
if !bFoundDisk {
return fmt.Errorf("detach azure disk failure, disk %s not found, diskURI: %s", diskName, diskURI)
}
vm.StorageProfile.DataDisks = &disks
ctx, cancel := getContextWithCancel()
defer cancel()
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", ss.resourceGroup, nodeName)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName)
retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName)
}
}
if err != nil {
glog.Errorf("azureDisk - azure disk detach %q from %s failed, err: %v", diskName, nodeName, err)
} else {
glog.V(4).Info("azureDisk - azure detach succeeded")
// Invalidate the cache right after updating
ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID))
}
return err
}
// GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI
func (ss *scaleSet) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) {
_, _, vm, err := ss.getVmssVM(string(nodeName))
if err != nil {
return -1, err
}
disks := *vm.StorageProfile.DataDisks
for _, disk := range disks {
if disk.Lun != nil && (disk.Name != nil && diskName != "" && *disk.Name == diskName) ||
(disk.Vhd != nil && disk.Vhd.URI != nil && diskURI != "" && *disk.Vhd.URI == diskURI) ||
(disk.ManagedDisk != nil && *disk.ManagedDisk.ID == diskURI) {
// found the disk
glog.V(4).Infof("azureDisk - find disk: lun %d name %q uri %q", *disk.Lun, diskName, diskURI)
return *disk.Lun, nil
}
}
return -1, fmt.Errorf("Cannot find Lun for disk %s", diskName)
}
// GetNextDiskLun searches all vhd attachment on the host and find unused lun
// return -1 if all luns are used
func (ss *scaleSet) GetNextDiskLun(nodeName types.NodeName) (int32, error) {
_, _, vm, err := ss.getVmssVM(string(nodeName))
if err != nil {
return -1, err
}
used := make([]bool, maxLUN)
disks := *vm.StorageProfile.DataDisks
for _, disk := range disks {
if disk.Lun != nil {
used[*disk.Lun] = true
}
}
for k, v := range used {
if !v {
return int32(k), nil
}
}
return -1, fmt.Errorf("All Luns are used")
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (ss *scaleSet) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
_, _, vm, err := ss.getVmssVM(string(nodeName))
if err != nil {
if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach
glog.Warningf("azureDisk - Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",
nodeName, diskNames)
return attached, nil
}
return attached, err
}
disks := *vm.StorageProfile.DataDisks
for _, disk := range disks {
for _, diskName := range diskNames {
if disk.Name != nil && diskName != "" && *disk.Name == diskName {
attached[diskName] = true
}
}
}
return attached, nil
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"math/rand"
"net/http"
@@ -28,6 +29,7 @@ import (
"github.com/Azure/azure-sdk-for-go/arm/disk"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/azure-sdk-for-go/arm/storage"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/to"
)
@@ -629,48 +631,39 @@ func getRandomIPPtr() *string {
type fakeVirtualMachineScaleSetVMsClient struct {
mutex *sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSetVM
FakeStore map[string]map[string]computepreview.VirtualMachineScaleSetVM
}
func newFakeVirtualMachineScaleSetVMsClient() *fakeVirtualMachineScaleSetVMsClient {
fVMC := &fakeVirtualMachineScaleSetVMsClient{}
fVMC.FakeStore = make(map[string]map[string]compute.VirtualMachineScaleSetVM)
fVMC.FakeStore = make(map[string]map[string]computepreview.VirtualMachineScaleSetVM)
fVMC.mutex = &sync.Mutex{}
return fVMC
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) setFakeStore(store map[string]map[string]compute.VirtualMachineScaleSetVM) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) setFakeStore(store map[string]map[string]computepreview.VirtualMachineScaleSetVM) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
fVMC.FakeStore = store
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []computepreview.VirtualMachineScaleSetVM, err error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
value := []compute.VirtualMachineScaleSetVM{}
result = []computepreview.VirtualMachineScaleSetVM{}
if _, ok := fVMC.FakeStore[resourceGroupName]; ok {
for _, v := range fVMC.FakeStore[resourceGroupName] {
value = append(value, v)
result = append(result, v)
}
}
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
result.NextLink = nil
result.Value = &value
return result, nil
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) {
return result, nil
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVM, err error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVM, err error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@@ -687,8 +680,8 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(resourceGroupName string, V
}
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupName string, VMScaleSetName string, instanceID string) (result compute.VirtualMachineScaleSetVMInstanceView, err error) {
_, err = fVMC.Get(resourceGroupName, VMScaleSetName, instanceID)
func (fVMC *fakeVirtualMachineScaleSetVMsClient) GetInstanceView(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string) (result computepreview.VirtualMachineScaleSetVMInstanceView, err error) {
_, err = fVMC.Get(ctx, resourceGroupName, VMScaleSetName, instanceID)
if err != nil {
return result, err
}
@@ -696,54 +689,53 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) GetInstanceView(resourceGroupNa
return result, nil
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters computepreview.VirtualMachineScaleSetVM) (resp *http.Response, err error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
vmKey := fmt.Sprintf("%s_%s", VMScaleSetName, instanceID)
if scaleSetMap, ok := fVMC.FakeStore[resourceGroupName]; ok {
if _, ok := scaleSetMap[vmKey]; ok {
scaleSetMap[vmKey] = parameters
}
}
return nil, nil
}
type fakeVirtualMachineScaleSetsClient struct {
mutex *sync.Mutex
FakeStore map[string]map[string]compute.VirtualMachineScaleSet
FakeStore map[string]map[string]computepreview.VirtualMachineScaleSet
}
func newFakeVirtualMachineScaleSetsClient() *fakeVirtualMachineScaleSetsClient {
fVMSSC := &fakeVirtualMachineScaleSetsClient{}
fVMSSC.FakeStore = make(map[string]map[string]compute.VirtualMachineScaleSet)
fVMSSC.FakeStore = make(map[string]map[string]computepreview.VirtualMachineScaleSet)
fVMSSC.mutex = &sync.Mutex{}
return fVMSSC
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) setFakeStore(store map[string]map[string]compute.VirtualMachineScaleSet) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) setFakeStore(store map[string]map[string]computepreview.VirtualMachineScaleSet) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
fVMSSC.FakeStore = store
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) CreateOrUpdate(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters computepreview.VirtualMachineScaleSet) (resp *http.Response, err error) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
resultChan := make(chan compute.VirtualMachineScaleSet, 1)
errChan := make(chan error, 1)
var result compute.VirtualMachineScaleSet
var err error
defer func() {
resultChan <- result
errChan <- err
close(resultChan)
close(errChan)
}()
if _, ok := fVMSSC.FakeStore[resourceGroupName]; !ok {
fVMSSC.FakeStore[resourceGroupName] = make(map[string]compute.VirtualMachineScaleSet)
fVMSSC.FakeStore[resourceGroupName] = make(map[string]computepreview.VirtualMachineScaleSet)
}
fVMSSC.FakeStore[resourceGroupName][VMScaleSetName] = parameters
result = fVMSSC.FakeStore[resourceGroupName][VMScaleSetName]
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
err = nil
return resultChan, errChan
return nil, nil
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result computepreview.VirtualMachineScaleSet, err error) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
@@ -759,45 +751,22 @@ func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(resourceGroupName string, V
}
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []computepreview.VirtualMachineScaleSet, err error) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
value := []compute.VirtualMachineScaleSet{}
result = []computepreview.VirtualMachineScaleSet{}
if _, ok := fVMSSC.FakeStore[resourceGroupName]; ok {
for _, v := range fVMSSC.FakeStore[resourceGroupName] {
value = append(value, v)
result = append(result, v)
}
}
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
result.NextLink = nil
result.Value = &value
return result, nil
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) ListNextResults(resourceGroupName string, lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) {
return result, nil
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) UpdateInstances(resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) {
resultChan := make(chan compute.OperationStatusResponse, 1)
errChan := make(chan error, 1)
var result compute.OperationStatusResponse
var err error
defer func() {
resultChan <- result
errChan <- err
close(resultChan)
close(errChan)
}()
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
err = nil
return resultChan, errChan
func (fVMSSC *fakeVirtualMachineScaleSetsClient) UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error) {
return nil, nil
}
type fakeRoutesClient struct {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"k8s.io/api/core/v1"
@@ -56,4 +57,15 @@ type VMSet interface {
EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
EnsureBackendPoolDeleted(poolID, vmSetName string) error
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error
// DetachDiskByName detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error
// GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI.
GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error)
// GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used.
GetNextDiskLun(nodeName types.NodeName) (int32, error)
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
}

View File

@@ -24,8 +24,8 @@ import (
"strconv"
"strings"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
@@ -89,17 +89,7 @@ func newScaleSet(az *Cloud) (VMSet, error) {
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
// Known node not managed by scale sets.
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
if err != nil {
return "", "", vm, err
}
if managedByAS {
glog.V(8).Infof("Found node %q in availabilitySetNodesCache", nodeName)
return "", "", vm, ErrorNotVmssInstance
}
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm computepreview.VirtualMachineScaleSetVM, err error) {
instanceID, err = getScaleSetVMInstanceID(nodeName)
if err != nil {
return ssName, instanceID, vm, err
@@ -125,12 +115,12 @@ func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm co
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
}
return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
return ssName, instanceID, *(cachedVM.(*computepreview.VirtualMachineScaleSetVM)), nil
}
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm computepreview.VirtualMachineScaleSetVM, err error) {
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
cachedVM, err := ss.vmssVMCache.Get(vmName)
if err != nil {
@@ -142,7 +132,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm c
return vm, cloudprovider.InstanceNotFound
}
return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
return *(cachedVM.(*computepreview.VirtualMachineScaleSetVM)), nil
}
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
@@ -259,7 +249,7 @@ func (ss *scaleSet) GetIPByNodeName(nodeName, vmSetName string) (string, error)
}
// This returns the full identifier of the primary NIC for the given VM.
func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSetVM) (string, error) {
func (ss *scaleSet) getPrimaryInterfaceID(machine computepreview.VirtualMachineScaleSetVM) (string, error) {
if len(*machine.NetworkProfile.NetworkInterfaces) == 1 {
return *(*machine.NetworkProfile.NetworkInterfaces)[0].ID, nil
}
@@ -300,95 +290,36 @@ func extractScaleSetNameByExternalID(externalID string) (string, error) {
return matches[1], nil
}
// listScaleSetsWithRetry lists scale sets with exponential backoff retry.
func (ss *scaleSet) listScaleSetsWithRetry() ([]string, error) {
// listScaleSets lists all scale sets.
func (ss *scaleSet) listScaleSets() ([]string, error) {
var err error
var result compute.VirtualMachineScaleSetListResult
allScaleSets := make([]string, 0)
ctx, cancel := getContextWithCancel()
defer cancel()
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
result, err = ss.VirtualMachineScaleSetsClient.List(ss.ResourceGroup)
if err != nil {
glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", ss.ResourceGroup, err)
return false, err
}
return true, nil
})
if backoffError != nil {
return nil, backoffError
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, ss.ResourceGroup)
if err != nil {
glog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
return nil, err
}
appendResults := (result.Value != nil && len(*result.Value) > 0)
for appendResults {
for _, scaleSet := range *result.Value {
allScaleSets = append(allScaleSets, *scaleSet.Name)
}
appendResults = false
if result.NextLink != nil {
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
result, err = ss.VirtualMachineScaleSetsClient.ListNextResults(ss.ResourceGroup, result)
if err != nil {
glog.Errorf("VirtualMachineScaleSetsClient.ListNextResults for %v failed: %v", ss.ResourceGroup, err)
return false, err
}
return true, nil
})
if backoffError != nil {
return nil, backoffError
}
appendResults = (result.Value != nil && len(*result.Value) > 0)
}
ssNames := make([]string, len(allScaleSets))
for i := range allScaleSets {
ssNames[i] = *(allScaleSets[i].Name)
}
return allScaleSets, nil
return ssNames, nil
}
// listScaleSetVMsWithRetry lists VMs belonging to the specified scale set with exponential backoff retry.
func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.VirtualMachineScaleSetVM, error) {
// listScaleSetVMs lists VMs belonging to the specified scale set.
func (ss *scaleSet) listScaleSetVMs(scaleSetName string) ([]computepreview.VirtualMachineScaleSetVM, error) {
var err error
var result compute.VirtualMachineScaleSetVMListResult
allVMs := make([]compute.VirtualMachineScaleSetVM, 0)
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", string(compute.InstanceView))
if err != nil {
glog.Errorf("VirtualMachineScaleSetVMsClient.List for %v failed: %v", scaleSetName, err)
return false, err
}
return true, nil
})
if backoffError != nil {
return nil, backoffError
}
appendResults := (result.Value != nil && len(*result.Value) > 0)
for appendResults {
allVMs = append(allVMs, *result.Value...)
appendResults = false
if result.NextLink != nil {
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
result, err = ss.VirtualMachineScaleSetVMsClient.ListNextResults(ss.ResourceGroup, result)
if err != nil {
glog.Errorf("VirtualMachineScaleSetVMsClient.ListNextResults for %v failed: %v", scaleSetName, err)
return false, err
}
return true, nil
})
if backoffError != nil {
return nil, backoffError
}
appendResults = (result.Value != nil && len(*result.Value) > 0)
}
ctx, cancel := getContextWithCancel()
defer cancel()
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, ss.ResourceGroup, scaleSetName, "", "", string(computepreview.InstanceView))
if err != nil {
glog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
return nil, err
}
return allVMs, nil
@@ -517,8 +448,8 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Int
}
// getScaleSetWithRetry gets scale set with exponential backoff retry
func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) {
var result compute.VirtualMachineScaleSet
func (ss *scaleSet) getScaleSetWithRetry(name string) (computepreview.VirtualMachineScaleSet, bool, error) {
var result computepreview.VirtualMachineScaleSet
var exists bool
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
@@ -531,7 +462,7 @@ func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineSca
if cached != nil {
exists = true
result = *(cached.(*compute.VirtualMachineScaleSet))
result = *(cached.(*computepreview.VirtualMachineScaleSet))
}
return true, nil
@@ -541,7 +472,7 @@ func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineSca
}
// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets.
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]computepreview.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*computepreview.VirtualMachineScaleSetNetworkConfiguration, error) {
networkConfigurations := *networkConfigurationList
if len(networkConfigurations) == 1 {
return &networkConfigurations[0], nil
@@ -557,7 +488,7 @@ func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]c
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName)
}
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *computepreview.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*computepreview.VirtualMachineScaleSetIPConfiguration, error) {
ipConfigurations := *config.IPConfigurations
if len(ipConfigurations) == 1 {
return &ipConfigurations[0], nil
@@ -574,24 +505,24 @@ func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachine
}
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSSWithRetry(virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
func (ss *scaleSet) createOrUpdateVMSSWithRetry(virtualMachineScaleSet computepreview.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet, nil)
resp := <-respChan
err := <-errChan
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return processRetryResponse(resp.Response, err)
return processHTTPRetryResponse(resp, err)
})
}
// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstanceIDs computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, scaleSetName, vmInstanceIDs, nil)
resp := <-respChan
err := <-errChan
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, scaleSetName, vmInstanceIDs)
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
return processRetryResponse(resp.Response, err)
return processHTTPRetryResponse(resp, err)
})
}
@@ -624,7 +555,7 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
// Update primary IP configuration's LoadBalancerBackendAddressPools.
foundPool := false
newBackendPools := []compute.SubResource{}
newBackendPools := []computepreview.SubResource{}
if primaryIPConfiguration.LoadBalancerBackendAddressPools != nil {
newBackendPools = *primaryIPConfiguration.LoadBalancerBackendAddressPools
}
@@ -636,17 +567,17 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
}
if !foundPool {
newBackendPools = append(newBackendPools,
compute.SubResource{
computepreview.SubResource{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
ctx, cancel := getContextWithCancel()
defer cancel()
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating", serviceName, vmSetName)
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil)
resp := <-respChan
err := <-errChan
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
if retryErr != nil {
@@ -682,14 +613,14 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
}
// Update instances to latest VMSS model.
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil)
resp := <-respChan
err = <-errChan
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
ctx, cancel := getContextWithCancel()
defer cancel()
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, vmSetName, vmInstanceIDs)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs)
if retryErr != nil {
@@ -734,7 +665,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
return nil
}
existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools
newBackendPools := []compute.SubResource{}
newBackendPools := []computepreview.SubResource{}
foundPool := false
for i := len(existingBackendPools) - 1; i >= 0; i-- {
curPool := existingBackendPools[i]
@@ -752,11 +683,11 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// Update scale set with backoff.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", vmSetName)
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil)
resp := <-respChan
err = <-errChan
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
if retryErr != nil {
@@ -770,14 +701,12 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// Update instances to latest VMSS model.
instanceIDs := []string{"*"}
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
updateRespChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil)
updateResp := <-updateRespChan
err = <-errChan
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, vmSetName, vmInstanceIDs)
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryAPIRequest(updateResp.Response, err) {
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", vmSetName, err)
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs)
if retryErr != nil {
@@ -793,11 +722,9 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// TODO: remove this workaround when figuring out the root cause.
if len(newBackendPools) == 0 {
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating second time", vmSetName)
respChan, errChan = ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil)
resp = <-respChan
err = <-errChan
resp, err = ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
if retryErr != nil {

View File

@@ -58,7 +58,9 @@ func (ss *scaleSet) extractVmssVMName(name string) (string, string, error) {
func (ss *scaleSet) newVmssCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
result, err := ss.VirtualMachineScaleSetsClient.Get(ss.ResourceGroup, key)
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := ss.VirtualMachineScaleSetsClient.Get(ctx, ss.ResourceGroup, key)
exists, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
@@ -76,14 +78,14 @@ func (ss *scaleSet) newVmssCache() (*timedCache, error) {
func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
scaleSetNames, err := ss.listScaleSetsWithRetry()
scaleSetNames, err := ss.listScaleSets()
if err != nil {
return nil, err
}
localCache := make(nodeNameToScaleSetMapping)
for _, ssName := range scaleSetNames {
vms, err := ss.listScaleSetVMsWithRetry(ssName)
vms, err := ss.listScaleSetVMs(ssName)
if err != nil {
return nil, err
}
@@ -136,7 +138,9 @@ func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
return nil, nil
}
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ssName, instanceID)
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, ss.ResourceGroup, ssName, instanceID)
exists, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr

View File

@@ -20,7 +20,7 @@ import (
"fmt"
"testing"
"github.com/Azure/azure-sdk-for-go/arm/compute"
computepreview "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute"
"github.com/stretchr/testify/assert"
)
@@ -37,8 +37,8 @@ func newTestScaleSet(scaleSetName string, vmList []string) (*scaleSet, error) {
func setTestVirtualMachineCloud(ss *Cloud, scaleSetName string, vmList []string) {
virtualMachineScaleSetsClient := newFakeVirtualMachineScaleSetsClient()
scaleSets := make(map[string]map[string]compute.VirtualMachineScaleSet)
scaleSets["rg"] = map[string]compute.VirtualMachineScaleSet{
scaleSets := make(map[string]map[string]computepreview.VirtualMachineScaleSet)
scaleSets["rg"] = map[string]computepreview.VirtualMachineScaleSet{
scaleSetName: {
Name: &scaleSetName,
},
@@ -46,24 +46,24 @@ func setTestVirtualMachineCloud(ss *Cloud, scaleSetName string, vmList []string)
virtualMachineScaleSetsClient.setFakeStore(scaleSets)
virtualMachineScaleSetVMsClient := newFakeVirtualMachineScaleSetVMsClient()
ssVMs := make(map[string]map[string]compute.VirtualMachineScaleSetVM)
ssVMs["rg"] = make(map[string]compute.VirtualMachineScaleSetVM)
ssVMs := make(map[string]map[string]computepreview.VirtualMachineScaleSetVM)
ssVMs["rg"] = make(map[string]computepreview.VirtualMachineScaleSetVM)
for i := range vmList {
ID := fmt.Sprintf("/subscriptions/script/resourceGroups/rg/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%d", scaleSetName, i)
nodeName := vmList[i]
instanceID := fmt.Sprintf("%d", i)
vmName := fmt.Sprintf("%s_%s", scaleSetName, instanceID)
networkInterfaces := []compute.NetworkInterfaceReference{
networkInterfaces := []computepreview.NetworkInterfaceReference{
{
ID: &nodeName,
},
}
ssVMs["rg"][vmName] = compute.VirtualMachineScaleSetVM{
VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{
OsProfile: &compute.OSProfile{
ssVMs["rg"][vmName] = computepreview.VirtualMachineScaleSetVM{
VirtualMachineScaleSetVMProperties: &computepreview.VirtualMachineScaleSetVMProperties{
OsProfile: &computepreview.OSProfile{
ComputerName: &nodeName,
},
NetworkProfile: &compute.NetworkProfile{
NetworkProfile: &computepreview.NetworkProfile{
NetworkInterfaces: &networkInterfaces,
},
},