mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Refactor GCE wrapper library to allow execution from E2E test suite
This reverts commit147b6911f5, reversing changes made to6fd986065b.
This commit is contained in:
		| @@ -97,6 +97,7 @@ export PATH=$(dirname "${e2e_test}"):"${PATH}" | ||||
|   --provider="${KUBERNETES_PROVIDER}" \ | ||||
|   --gce-project="${PROJECT:-}" \ | ||||
|   --gce-zone="${ZONE:-}" \ | ||||
|   --gce-service-account="${GCE_SERVICE_ACCOUNT:-}" \ | ||||
|   --gke-cluster="${CLUSTER_NAME:-}" \ | ||||
|   --kube-master="${KUBE_MASTER:-}" \ | ||||
|   --cluster-tag="${CLUSTER_ID:-}" \ | ||||
|   | ||||
| @@ -105,6 +105,7 @@ func-dest | ||||
| fuzz-iters | ||||
| gather-resource-usage | ||||
| gce-project | ||||
| gce-service-account | ||||
| gce-zone | ||||
| gke-cluster | ||||
| go-header-file | ||||
|   | ||||
| @@ -62,13 +62,12 @@ const ( | ||||
|  | ||||
| // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. | ||||
| type GCECloud struct { | ||||
| 	service          *compute.Service | ||||
| 	containerService *container.Service | ||||
| 	projectID        string | ||||
| 	zone             string | ||||
| 	instanceID       string | ||||
| 	externalID       string | ||||
| 	networkURL       string | ||||
| 	service           *compute.Service | ||||
| 	containerService  *container.Service | ||||
| 	projectID         string | ||||
| 	zone              string | ||||
| 	networkURL        string | ||||
| 	useMetadataServer bool | ||||
| } | ||||
|  | ||||
| type Config struct { | ||||
| @@ -101,7 +100,7 @@ func getProjectAndZone() (string, string, error) { | ||||
| 	return projectID, zone, nil | ||||
| } | ||||
|  | ||||
| func getInstanceID() (string, error) { | ||||
| func getInstanceIDViaMetadata() (string, error) { | ||||
| 	result, err := metadata.Get("instance/hostname") | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| @@ -113,7 +112,7 @@ func getInstanceID() (string, error) { | ||||
| 	return parts[0], nil | ||||
| } | ||||
|  | ||||
| func getCurrentExternalID() (string, error) { | ||||
| func getCurrentExternalIDViaMetadata() (string, error) { | ||||
| 	externalID, err := metadata.Get("instance/id") | ||||
| 	if err != nil { | ||||
| 		return "", fmt.Errorf("couldn't get external ID: %v", err) | ||||
| @@ -121,7 +120,7 @@ func getCurrentExternalID() (string, error) { | ||||
| 	return externalID, nil | ||||
| } | ||||
|  | ||||
| func getNetworkName() (string, error) { | ||||
| func getNetworkNameViaMetadata() (string, error) { | ||||
| 	result, err := metadata.Get("instance/network-interfaces/0/network") | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| @@ -133,28 +132,32 @@ func getNetworkName() (string, error) { | ||||
| 	return parts[3], nil | ||||
| } | ||||
|  | ||||
| func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, error) { | ||||
| 	networkList, err := svc.Networks.List(projectID).Do() | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	if networkList == nil || len(networkList.Items) <= 0 { | ||||
| 		return "", fmt.Errorf("GCE Network List call returned no networks for project %q.", projectID) | ||||
| 	} | ||||
|  | ||||
| 	return networkList.Items[0].Name, nil | ||||
| } | ||||
|  | ||||
| // newGCECloud creates a new instance of GCECloud. | ||||
| func newGCECloud(config io.Reader) (*GCECloud, error) { | ||||
| 	projectID, zone, err := getProjectAndZone() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	// TODO: if we want to use this on a machine that doesn't have the http://metadata server | ||||
| 	// e.g. on a user's machine (not VM) somewhere, we need to have an alternative for | ||||
| 	// instance id lookup. | ||||
| 	instanceID, err := getInstanceID() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	externalID, err := getCurrentExternalID() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	networkName, err := getNetworkName() | ||||
|  | ||||
| 	networkName, err := getNetworkNameViaMetadata() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	networkURL := gceNetworkURL(projectID, networkName) | ||||
|  | ||||
| 	tokenSource := google.ComputeTokenSource("") | ||||
| 	if config != nil { | ||||
| 		var cfg Config | ||||
| @@ -176,23 +179,54 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { | ||||
| 			tokenSource = newAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return CreateGCECloud(projectID, zone, networkURL, tokenSource, true /* useMetadataServer */) | ||||
| } | ||||
|  | ||||
| // Creates a GCECloud object using the specified parameters. | ||||
| // If no networkUrl is specified, loads networkName via rest call. | ||||
| // If no tokenSource is specified, uses oauth2.DefaultTokenSource. | ||||
| func CreateGCECloud(projectID, zone, networkURL string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) { | ||||
| 	if tokenSource == nil { | ||||
| 		var err error | ||||
| 		tokenSource, err = google.DefaultTokenSource( | ||||
| 			oauth2.NoContext, | ||||
| 			compute.CloudPlatformScope, | ||||
| 			compute.ComputeScope) | ||||
| 		glog.Infof("Using DefaultTokenSource %#v", tokenSource) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} else { | ||||
| 		glog.Infof("Using existing Token Source %#v", tokenSource) | ||||
| 	} | ||||
|  | ||||
| 	client := oauth2.NewClient(oauth2.NoContext, tokenSource) | ||||
| 	svc, err := compute.New(client) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	containerSvc, err := container.New(client) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if networkURL == "" { | ||||
| 		networkName, err := getNetworkNameViaAPICall(svc, projectID) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		networkURL = gceNetworkURL(projectID, networkName) | ||||
| 	} | ||||
|  | ||||
| 	return &GCECloud{ | ||||
| 		service:          svc, | ||||
| 		containerService: containerSvc, | ||||
| 		projectID:        projectID, | ||||
| 		zone:             zone, | ||||
| 		instanceID:       instanceID, | ||||
| 		externalID:       externalID, | ||||
| 		networkURL:       networkURL, | ||||
| 		service:           svc, | ||||
| 		containerService:  containerSvc, | ||||
| 		projectID:         projectID, | ||||
| 		zone:              zone, | ||||
| 		networkURL:        networkURL, | ||||
| 		useMetadataServer: useMetadataServer, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| @@ -1368,16 +1402,31 @@ func (gce *GCECloud) NodeAddresses(_ string) ([]api.NodeAddress, error) { | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) isCurrentInstance(instance string) bool { | ||||
| 	return gce.instanceID == canonicalizeInstanceName(instance) | ||||
| // isCurrentInstance uses metadata server to check if specified instanceID matches current machine's instanceID | ||||
| func (gce *GCECloud) isCurrentInstance(instanceID string) bool { | ||||
| 	currentInstanceID, err := getInstanceIDViaMetadata() | ||||
| 	if err != nil { | ||||
| 		// Log and swallow error | ||||
| 		glog.Errorf("Failed to fetch instanceID via Metadata: %v", err) | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	return currentInstanceID == canonicalizeInstanceName(instanceID) | ||||
| } | ||||
|  | ||||
| // ExternalID returns the cloud provider ID of the specified instance (deprecated). | ||||
| func (gce *GCECloud) ExternalID(instance string) (string, error) { | ||||
| 	// if we are asking about the current instance, just go to metadata | ||||
| 	if gce.isCurrentInstance(instance) { | ||||
| 		return gce.externalID, nil | ||||
| 	if gce.useMetadataServer { | ||||
| 		// Use metadata, if possible, to fetch ID. See issue #12000 | ||||
| 		if gce.isCurrentInstance(instance) { | ||||
| 			externalInstanceID, err := getCurrentExternalIDViaMetadata() | ||||
| 			if err == nil { | ||||
| 				return externalInstanceID, nil | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Fallback to GCE API call if metadata server fails to retrieve ID | ||||
| 	inst, err := gce.getInstanceByName(instance) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| @@ -1494,7 +1543,29 @@ func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) { | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) AttachDisk(diskName string, readOnly bool) error { | ||||
| func (gce *GCECloud) CreateDisk(name string, sizeGb int64) error { | ||||
| 	diskToCreate := &compute.Disk{ | ||||
| 		Name:   name, | ||||
| 		SizeGb: sizeGb, | ||||
| 	} | ||||
| 	createOp, err := gce.service.Disks.Insert(gce.projectID, gce.zone, diskToCreate).Do() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return gce.waitForZoneOp(createOp) | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) DeleteDisk(diskToDelete string) error { | ||||
| 	deleteOp, err := gce.service.Disks.Delete(gce.projectID, gce.zone, diskToDelete).Do() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return gce.waitForZoneOp(deleteOp) | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) error { | ||||
| 	disk, err := gce.getDisk(diskName) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -1505,7 +1576,7 @@ func (gce *GCECloud) AttachDisk(diskName string, readOnly bool) error { | ||||
| 	} | ||||
| 	attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite) | ||||
|  | ||||
| 	attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, gce.instanceID, attachedDisk).Do() | ||||
| 	attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, gce.zone, instanceID, attachedDisk).Do() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -1513,8 +1584,8 @@ func (gce *GCECloud) AttachDisk(diskName string, readOnly bool) error { | ||||
| 	return gce.waitForZoneOp(attachOp) | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) DetachDisk(devicePath string) error { | ||||
| 	detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, gce.zone, gce.instanceID, devicePath).Do() | ||||
| func (gce *GCECloud) DetachDisk(devicePath, instanceID string) error { | ||||
| 	detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, gce.zone, instanceID, devicePath).Do() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -1522,6 +1593,22 @@ func (gce *GCECloud) DetachDisk(devicePath string) error { | ||||
| 	return gce.waitForZoneOp(detachOp) | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) { | ||||
| 	instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceID).Do() | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} | ||||
|  | ||||
| 	for _, disk := range instance.Disks { | ||||
| 		if disk.DeviceName == diskName { | ||||
| 			// Disk is still attached to node | ||||
| 			return true, nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| func (gce *GCECloud) getDisk(diskName string) (*compute.Disk, error) { | ||||
| 	return gce.service.Disks.Get(gce.projectID, gce.zone, diskName).Do() | ||||
| } | ||||
|   | ||||
| @@ -294,3 +294,7 @@ func (f *PersistentVolumeRecycler) GetMounter() mount.Interface { | ||||
| func (f *PersistentVolumeRecycler) GetWriter() ioutil.Writer { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (f *PersistentVolumeRecycler) GetHostName() string { | ||||
| 	return "" | ||||
| } | ||||
|   | ||||
| @@ -93,6 +93,11 @@ func (vh *volumeHost) GetWriter() io.Writer { | ||||
| 	return vh.kubelet.writer | ||||
| } | ||||
|  | ||||
| // Returns the hostname of the host kubelet is running on | ||||
| func (vh *volumeHost) GetHostName() string { | ||||
| 	return vh.kubelet.hostname | ||||
| } | ||||
|  | ||||
| func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) { | ||||
| 	plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -132,7 +132,7 @@ func attachDiskAndVerify(b *gcePersistentDiskBuilder, sdBeforeSet sets.String) ( | ||||
| 			glog.Warningf("Retrying attach for GCE PD %q (retry count=%v).", b.pdName, numRetries) | ||||
| 		} | ||||
|  | ||||
| 		if err := gceCloud.AttachDisk(b.pdName, b.readOnly); err != nil { | ||||
| 		if err := gceCloud.AttachDisk(b.pdName, b.plugin.host.GetHostName(), b.readOnly); err != nil { | ||||
| 			glog.Errorf("Error attaching PD %q: %v", b.pdName, err) | ||||
| 			time.Sleep(errorSleepDuration) | ||||
| 			continue | ||||
| @@ -206,7 +206,7 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) { | ||||
| 			glog.Warningf("Retrying detach for GCE PD %q (retry count=%v).", c.pdName, numRetries) | ||||
| 		} | ||||
|  | ||||
| 		if err := gceCloud.DetachDisk(c.pdName); err != nil { | ||||
| 		if err := gceCloud.DetachDisk(c.pdName, c.plugin.host.GetHostName()); err != nil { | ||||
| 			glog.Errorf("Error detaching PD %q: %v", c.pdName, err) | ||||
| 			time.Sleep(errorSleepDuration) | ||||
| 			continue | ||||
|   | ||||
| @@ -156,6 +156,9 @@ type VolumeHost interface { | ||||
|  | ||||
| 	// Get writer interface for writing data to disk. | ||||
| 	GetWriter() io.Writer | ||||
|  | ||||
| 	// Returns the hostname of the host kubelet is running on | ||||
| 	GetHostName() string | ||||
| } | ||||
|  | ||||
| // VolumePluginMgr tracks registered plugins. | ||||
|   | ||||
| @@ -92,6 +92,11 @@ func (f *fakeVolumeHost) NewWrapperCleaner(spec *Spec, podUID types.UID) (Cleane | ||||
| 	return plug.NewCleaner(spec.Name(), podUID) | ||||
| } | ||||
|  | ||||
| // Returns the hostname of the host kubelet is running on | ||||
| func (f *fakeVolumeHost) GetHostName() string { | ||||
| 	return "fakeHostName" | ||||
| } | ||||
|  | ||||
| func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin { | ||||
| 	if _, ok := config.OtherAttributes["fake-property"]; ok { | ||||
| 		return []VolumePlugin{ | ||||
|   | ||||
| @@ -30,9 +30,12 @@ import ( | ||||
| 	"github.com/onsi/ginkgo/config" | ||||
| 	"github.com/onsi/ginkgo/reporters" | ||||
| 	"github.com/onsi/gomega" | ||||
| 	"golang.org/x/oauth2" | ||||
| 	"golang.org/x/oauth2/google" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" | ||||
| 	"k8s.io/kubernetes/pkg/cloudprovider" | ||||
| 	gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" | ||||
| 	"k8s.io/kubernetes/pkg/util" | ||||
| ) | ||||
|  | ||||
| @@ -73,6 +76,7 @@ func init() { | ||||
| 	flag.StringVar(&cloudConfig.MasterName, "kube-master", "", "Name of the kubernetes master. Only required if provider is gce or gke") | ||||
| 	flag.StringVar(&cloudConfig.ProjectID, "gce-project", "", "The GCE project being used, if applicable") | ||||
| 	flag.StringVar(&cloudConfig.Zone, "gce-zone", "", "GCE zone being used, if applicable") | ||||
| 	flag.StringVar(&cloudConfig.ServiceAccount, "gce-service-account", "", "GCE service account to use for GCE API calls, if applicable") | ||||
| 	flag.StringVar(&cloudConfig.Cluster, "gke-cluster", "", "GKE name of cluster being used, if applicable") | ||||
| 	flag.StringVar(&cloudConfig.NodeInstanceGroup, "node-instance-group", "", "Name of the managed instance group for nodes. Valid only for gce, gke or aws") | ||||
| 	flag.IntVar(&cloudConfig.NumNodes, "num-nodes", -1, "Number of nodes in the cluster") | ||||
| @@ -102,6 +106,23 @@ func TestE2E(t *testing.T) { | ||||
| 		glog.Info("The --provider flag is not set.  Treating as a conformance test.  Some tests may not be run.") | ||||
| 	} | ||||
|  | ||||
| 	if testContext.Provider == "gce" || testContext.Provider == "gke" { | ||||
| 		var err error | ||||
| 		Logf("Fetching cloud provider for %q\r\n", testContext.Provider) | ||||
| 		var tokenSource oauth2.TokenSource | ||||
| 		tokenSource = nil | ||||
| 		if cloudConfig.ServiceAccount != "" { | ||||
| 			// Use specified service account for auth | ||||
| 			Logf("Using service account %q as token source.", cloudConfig.ServiceAccount) | ||||
| 			tokenSource = google.ComputeTokenSource(cloudConfig.ServiceAccount) | ||||
| 		} | ||||
| 		cloudConfig.Provider, err = gcecloud.CreateGCECloud(testContext.CloudConfig.ProjectID, testContext.CloudConfig.Zone, "" /* networkUrl */, tokenSource, false /* useMetadataServer */) | ||||
| 		if err != nil { | ||||
| 			glog.Fatal("Error building GCE provider: ", err) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	if testContext.Provider == "aws" { | ||||
| 		awsConfig := "[Global]\n" | ||||
| 		if cloudConfig.Zone == "" { | ||||
|   | ||||
							
								
								
									
										152
									
								
								test/e2e/pd.go
									
									
									
									
									
								
							
							
						
						
									
										152
									
								
								test/e2e/pd.go
									
									
									
									
									
								
							| @@ -18,8 +18,8 @@ package e2e | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"google.golang.org/api/googleapi" | ||||
| 	mathrand "math/rand" | ||||
| 	"os/exec" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| @@ -31,6 +31,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||
| 	awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" | ||||
| 	gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" | ||||
| 	"k8s.io/kubernetes/pkg/fields" | ||||
| 	"k8s.io/kubernetes/pkg/labels" | ||||
| 	"k8s.io/kubernetes/pkg/util" | ||||
| @@ -69,7 +70,7 @@ var _ = Describe("Pod Disks", func() { | ||||
| 		SkipUnlessProviderIs("gce", "gke", "aws") | ||||
|  | ||||
| 		By("creating PD") | ||||
| 		diskName, err := createPD() | ||||
| 		diskName, err := createPDWithRetry() | ||||
| 		expectNoError(err, "Error creating PD") | ||||
|  | ||||
| 		host0Pod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */) | ||||
| @@ -77,14 +78,12 @@ var _ = Describe("Pod Disks", func() { | ||||
| 		containerName := "mycontainer" | ||||
|  | ||||
| 		defer func() { | ||||
| 			By("cleaning up PD-RW test environment") | ||||
| 			// Teardown pods, PD. Ignore errors. | ||||
| 			// Teardown should do nothing unless test failed. | ||||
| 			By("cleaning up PD-RW test environment") | ||||
| 			podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)) | ||||
| 			podClient.Delete(host1Pod.Name, api.NewDeleteOptions(0)) | ||||
| 			detachPD(host0Name, diskName) | ||||
| 			detachPD(host1Name, diskName) | ||||
| 			deletePDWithRetry(diskName) | ||||
| 			detachAndDeletePDs(diskName, []string{host0Name, host1Name}) | ||||
| 		}() | ||||
|  | ||||
| 		By("submitting host0Pod to kubernetes") | ||||
| @@ -117,9 +116,6 @@ var _ = Describe("Pod Disks", func() { | ||||
| 		By("deleting host1Pod") | ||||
| 		expectNoError(podClient.Delete(host1Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host1Pod") | ||||
|  | ||||
| 		By(fmt.Sprintf("deleting PD %q", diskName)) | ||||
| 		deletePDWithRetry(diskName) | ||||
|  | ||||
| 		return | ||||
| 	}) | ||||
|  | ||||
| @@ -127,7 +123,7 @@ var _ = Describe("Pod Disks", func() { | ||||
| 		SkipUnlessProviderIs("gce", "gke") | ||||
|  | ||||
| 		By("creating PD") | ||||
| 		diskName, err := createPD() | ||||
| 		diskName, err := createPDWithRetry() | ||||
| 		expectNoError(err, "Error creating PD") | ||||
|  | ||||
| 		rwPod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */) | ||||
| @@ -141,10 +137,7 @@ var _ = Describe("Pod Disks", func() { | ||||
| 			podClient.Delete(rwPod.Name, api.NewDeleteOptions(0)) | ||||
| 			podClient.Delete(host0ROPod.Name, api.NewDeleteOptions(0)) | ||||
| 			podClient.Delete(host1ROPod.Name, api.NewDeleteOptions(0)) | ||||
|  | ||||
| 			detachPD(host0Name, diskName) | ||||
| 			detachPD(host1Name, diskName) | ||||
| 			deletePDWithRetry(diskName) | ||||
| 			detachAndDeletePDs(diskName, []string{host0Name, host1Name}) | ||||
| 		}() | ||||
|  | ||||
| 		By("submitting rwPod to ensure PD is formatted") | ||||
| @@ -171,18 +164,13 @@ var _ = Describe("Pod Disks", func() { | ||||
|  | ||||
| 		By("deleting host1ROPod") | ||||
| 		expectNoError(podClient.Delete(host1ROPod.Name, api.NewDeleteOptions(0)), "Failed to delete host1ROPod") | ||||
|  | ||||
| 		By(fmt.Sprintf("deleting PD %q", diskName)) | ||||
| 		deletePDWithRetry(diskName) | ||||
|  | ||||
| 		expectNoError(err, "Error deleting PD") | ||||
| 	}) | ||||
|  | ||||
| 	It("should schedule a pod w/ a RW PD shared between multiple containers, write to PD, delete pod, verify contents, and repeat in rapid succession", func() { | ||||
| 		SkipUnlessProviderIs("gce", "gke", "aws") | ||||
|  | ||||
| 		By("creating PD") | ||||
| 		diskName, err := createPD() | ||||
| 		diskName, err := createPDWithRetry() | ||||
| 		expectNoError(err, "Error creating PD") | ||||
| 		numContainers := 4 | ||||
|  | ||||
| @@ -193,8 +181,7 @@ var _ = Describe("Pod Disks", func() { | ||||
| 			// Teardown pods, PD. Ignore errors. | ||||
| 			// Teardown should do nothing unless test failed. | ||||
| 			podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)) | ||||
| 			detachPD(host0Name, diskName) | ||||
| 			deletePDWithRetry(diskName) | ||||
| 			detachAndDeletePDs(diskName, []string{host0Name}) | ||||
| 		}() | ||||
|  | ||||
| 		fileAndContentToVerify := make(map[string]string) | ||||
| @@ -225,21 +212,16 @@ var _ = Describe("Pod Disks", func() { | ||||
| 			By("deleting host0Pod") | ||||
| 			expectNoError(podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod") | ||||
| 		} | ||||
|  | ||||
| 		By(fmt.Sprintf("deleting PD %q", diskName)) | ||||
| 		deletePDWithRetry(diskName) | ||||
|  | ||||
| 		return | ||||
| 	}) | ||||
|  | ||||
| 	It("should schedule a pod w/two RW PDs both mounted to one container, write to PD, verify contents, delete pod, recreate pod, verify contents, and repeat in rapid succession", func() { | ||||
| 		SkipUnlessProviderIs("gce", "gke", "aws") | ||||
|  | ||||
| 		By("creating PD1") | ||||
| 		disk1Name, err := createPD() | ||||
| 		disk1Name, err := createPDWithRetry() | ||||
| 		expectNoError(err, "Error creating PD1") | ||||
| 		By("creating PD2") | ||||
| 		disk2Name, err := createPD() | ||||
| 		disk2Name, err := createPDWithRetry() | ||||
| 		expectNoError(err, "Error creating PD2") | ||||
|  | ||||
| 		host0Pod := testPDPod([]string{disk1Name, disk2Name}, host0Name, false /* readOnly */, 1 /* numContainers */) | ||||
| @@ -249,10 +231,8 @@ var _ = Describe("Pod Disks", func() { | ||||
| 			// Teardown pods, PD. Ignore errors. | ||||
| 			// Teardown should do nothing unless test failed. | ||||
| 			podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)) | ||||
| 			detachPD(host0Name, disk1Name) | ||||
| 			detachPD(host0Name, disk2Name) | ||||
| 			deletePDWithRetry(disk1Name) | ||||
| 			deletePDWithRetry(disk2Name) | ||||
| 			detachAndDeletePDs(disk1Name, []string{host0Name}) | ||||
| 			detachAndDeletePDs(disk2Name, []string{host0Name}) | ||||
| 		}() | ||||
|  | ||||
| 		containerName := "mycontainer" | ||||
| @@ -286,16 +266,23 @@ var _ = Describe("Pod Disks", func() { | ||||
| 			By("deleting host0Pod") | ||||
| 			expectNoError(podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod") | ||||
| 		} | ||||
|  | ||||
| 		By(fmt.Sprintf("deleting PD1 %q", disk1Name)) | ||||
| 		deletePDWithRetry(disk1Name) | ||||
| 		By(fmt.Sprintf("deleting PD2 %q", disk2Name)) | ||||
| 		deletePDWithRetry(disk2Name) | ||||
|  | ||||
| 		return | ||||
| 	}) | ||||
| }) | ||||
|  | ||||
| func createPDWithRetry() (string, error) { | ||||
| 	newDiskName := "" | ||||
| 	var err error | ||||
| 	for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { | ||||
| 		if newDiskName, err = createPD(); err != nil { | ||||
| 			Logf("Couldn't create a new PD. Sleeping 5 seconds (%v)", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		Logf("Successfully created a new PD: %q.", newDiskName) | ||||
| 		break | ||||
| 	} | ||||
| 	return newDiskName, err | ||||
| } | ||||
|  | ||||
| func deletePDWithRetry(diskName string) { | ||||
| 	var err error | ||||
| 	for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { | ||||
| @@ -303,7 +290,7 @@ func deletePDWithRetry(diskName string) { | ||||
| 			Logf("Couldn't delete PD %q. Sleeping 5 seconds (%v)", diskName, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		Logf("Deleted PD %v", diskName) | ||||
| 		Logf("Successfully deleted PD %q.", diskName) | ||||
| 		break | ||||
| 	} | ||||
| 	expectNoError(err, "Error deleting PD") | ||||
| @@ -325,9 +312,12 @@ func createPD() (string, error) { | ||||
| 	if testContext.Provider == "gce" || testContext.Provider == "gke" { | ||||
| 		pdName := fmt.Sprintf("%s-%s", testContext.prefix, string(util.NewUUID())) | ||||
|  | ||||
| 		zone := testContext.CloudConfig.Zone | ||||
| 		// TODO: make this hit the compute API directly instead of shelling out to gcloud. | ||||
| 		err := exec.Command("gcloud", "compute", "--quiet", "--project="+testContext.CloudConfig.ProjectID, "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run() | ||||
| 		gceCloud, err := getGCECloud() | ||||
| 		if err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
|  | ||||
| 		err = gceCloud.CreateDisk(pdName, 10 /* sizeGb */) | ||||
| 		if err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
| @@ -345,19 +335,20 @@ func createPD() (string, error) { | ||||
|  | ||||
| func deletePD(pdName string) error { | ||||
| 	if testContext.Provider == "gce" || testContext.Provider == "gke" { | ||||
| 		zone := testContext.CloudConfig.Zone | ||||
|  | ||||
| 		// TODO: make this hit the compute API directly. | ||||
| 		cmd := exec.Command("gcloud", "compute", "--quiet", "--project="+testContext.CloudConfig.ProjectID, "disks", "delete", "--zone="+zone, pdName) | ||||
| 		data, err := cmd.CombinedOutput() | ||||
| 		gceCloud, err := getGCECloud() | ||||
| 		if err != nil { | ||||
| 			dataStr := string(data) | ||||
| 			if strings.Contains(dataStr, "was not found") { | ||||
| 				Logf("PD deletion implicitly succeeded because PD %q does not exist.", pdName) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		err = gceCloud.DeleteDisk(pdName) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			if gerr, ok := err.(*googleapi.Error); ok && len(gerr.Errors) > 0 && gerr.Errors[0].Reason == "notFound" { | ||||
| 				// PD already exists, ignore error. | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			Logf("Error deleting PD: %s (%v)", dataStr, err) | ||||
| 			Logf("Error deleting PD %q: %v", pdName, err) | ||||
| 		} | ||||
| 		return err | ||||
| 	} else { | ||||
| @@ -373,10 +364,23 @@ func detachPD(hostName, pdName string) error { | ||||
| 	if testContext.Provider == "gce" || testContext.Provider == "gke" { | ||||
| 		instanceName := strings.Split(hostName, ".")[0] | ||||
|  | ||||
| 		zone := testContext.CloudConfig.Zone | ||||
| 		gceCloud, err := getGCECloud() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		err = gceCloud.DetachDisk(pdName, instanceName) | ||||
| 		if err != nil { | ||||
| 			if gerr, ok := err.(*googleapi.Error); ok && strings.Contains(gerr.Message, "Invalid value for field 'disk'") { | ||||
| 				// PD already detached, ignore error. | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			Logf("Error detaching PD %q: %v", pdName, err) | ||||
| 		} | ||||
|  | ||||
| 		return err | ||||
|  | ||||
| 		// TODO: make this hit the compute API directly. | ||||
| 		return exec.Command("gcloud", "compute", "--quiet", "--project="+testContext.CloudConfig.ProjectID, "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run() | ||||
| 	} else { | ||||
| 		volumes, ok := testContext.CloudConfig.Provider.(awscloud.Volumes) | ||||
| 		if !ok { | ||||
| @@ -457,19 +461,19 @@ func testPDPod(diskNames []string, targetHost string, readOnly bool, numContaine | ||||
| // Waits for specified PD to to detach from specified hostName | ||||
| func waitForPDDetach(diskName, hostName string) error { | ||||
| 	if testContext.Provider == "gce" || testContext.Provider == "gke" { | ||||
| 		for start := time.Now(); time.Since(start) < gcePDDetachTimeout; time.Sleep(gcePDDetachPollTime) { | ||||
| 			zone := testContext.CloudConfig.Zone | ||||
| 		gceCloud, err := getGCECloud() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 			cmd := exec.Command("gcloud", "compute", "--project="+testContext.CloudConfig.ProjectID, "instances", "describe", "--zone="+zone, hostName) | ||||
| 			data, err := cmd.CombinedOutput() | ||||
| 		for start := time.Now(); time.Since(start) < gcePDDetachTimeout; time.Sleep(gcePDDetachPollTime) { | ||||
| 			diskAttached, err := gceCloud.DiskIsAttached(diskName, hostName) | ||||
| 			if err != nil { | ||||
| 				Logf("Error waiting for PD %q to detach from node %q. 'gcloud compute instances describe' failed with %s (%v)", diskName, hostName, string(data), err) | ||||
| 				Logf("Error waiting for PD %q to detach from node %q. 'DiskIsAttached(...)' failed with %v", diskName, hostName, err) | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			dataStr := strings.ToLower(string(data)) | ||||
| 			diskName = strings.ToLower(diskName) | ||||
| 			if !strings.Contains(string(dataStr), diskName) { | ||||
| 			if !diskAttached { | ||||
| 				// Specified disk does not appear to be attached to specified node | ||||
| 				Logf("GCE PD %q appears to have successfully detached from %q.", diskName, hostName) | ||||
| 				return nil | ||||
| @@ -483,3 +487,23 @@ func waitForPDDetach(diskName, hostName string) error { | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func getGCECloud() (*gcecloud.GCECloud, error) { | ||||
| 	gceCloud, ok := testContext.CloudConfig.Provider.(*gcecloud.GCECloud) | ||||
|  | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("failed to convert CloudConfig.Provider to GCECloud: %#v", testContext.CloudConfig.Provider) | ||||
| 	} | ||||
|  | ||||
| 	return gceCloud, nil | ||||
| } | ||||
|  | ||||
| func detachAndDeletePDs(diskName string, hosts []string) { | ||||
| 	for _, host := range hosts { | ||||
| 		detachPD(host, diskName) | ||||
| 		By(fmt.Sprintf("Waiting for PD %q to detach from %q", diskName, host)) | ||||
| 		waitForPDDetach(diskName, host) | ||||
| 	} | ||||
| 	By(fmt.Sprintf("Deleting PD %q", diskName)) | ||||
| 	deletePDWithRetry(diskName) | ||||
| } | ||||
|   | ||||
| @@ -114,6 +114,7 @@ type CloudConfig struct { | ||||
| 	NodeInstanceGroup string | ||||
| 	NumNodes          int | ||||
| 	ClusterTag        string | ||||
| 	ServiceAccount    string | ||||
|  | ||||
| 	Provider cloudprovider.Interface | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 saadali
					saadali