mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-30 01:42:48 +00:00 
			
		
		
		
	Impement bulk polling of volumes
This implements Bulk volume polling using ideas presented by justin in https://github.com/kubernetes/kubernetes/pull/39564 But it changes the implementation to use an interface and doesn't affect other implementations.
This commit is contained in:
		| @@ -330,8 +330,8 @@ type Volumes interface { | ||||
| 	// Check if the volume is already attached to the node with the specified NodeName | ||||
| 	DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) | ||||
|  | ||||
| 	// Check if a list of volumes are attached to the node with the specified NodeName | ||||
| 	DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error) | ||||
| 	// Check if disks specified in argument map are still attached to their respective nodes. | ||||
| 	DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) | ||||
| } | ||||
|  | ||||
| // InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups | ||||
| @@ -1761,36 +1761,66 @@ func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeN | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error) { | ||||
| 	idToDiskName := make(map[awsVolumeID]KubernetesVolumeID) | ||||
| 	attached := make(map[KubernetesVolumeID]bool) | ||||
| 	for _, diskName := range diskNames { | ||||
| 		volumeID, err := diskName.mapToAWSVolumeID() | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) | ||||
| 		} | ||||
| 		idToDiskName[volumeID] = diskName | ||||
| 		attached[diskName] = false | ||||
| func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) { | ||||
| 	attached := make(map[types.NodeName]map[KubernetesVolumeID]bool) | ||||
|  | ||||
| 	if len(nodeDisks) == 0 { | ||||
| 		return attached, nil | ||||
| 	} | ||||
| 	_, instance, err := c.getFullInstance(nodeName) | ||||
|  | ||||
| 	dnsNameSlice := []string{} | ||||
| 	for nodeName, diskNames := range nodeDisks { | ||||
| 		for _, diskName := range diskNames { | ||||
| 			setNodeDisk(attached, diskName, nodeName, false) | ||||
| 		} | ||||
| 		dnsNameSlice = append(dnsNameSlice, mapNodeNameToPrivateDNSName(nodeName)) | ||||
| 	} | ||||
|  | ||||
| 	awsInstances, err := c.getInstancesByNodeNames(dnsNameSlice) | ||||
| 	if err != nil { | ||||
| 		if err == cloudprovider.InstanceNotFound { | ||||
| 		// When there is an error fetching instance information | ||||
| 		// it is safer to return nil and let volume information not be touched. | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(awsInstances) == 0 { | ||||
| 		glog.V(2).Infof("DisksAreAttached will assume no disks are attached to any node on AWS cluster.") | ||||
| 		return attached, nil | ||||
| 	} | ||||
|  | ||||
| 	awsInstanceMap := make(map[types.NodeName]*ec2.Instance) | ||||
| 	for _, awsInstance := range awsInstances { | ||||
| 		awsInstanceMap[mapInstanceToNodeName(awsInstance)] = awsInstance | ||||
| 	} | ||||
|  | ||||
| 	// Note that we check that the volume is attached to the correct node, not that it is attached to _a_ node | ||||
| 	for nodeName, diskNames := range nodeDisks { | ||||
| 		awsInstance := awsInstanceMap[nodeName] | ||||
| 		if awsInstance == nil { | ||||
| 			// If instance no longer exists, safe to assume volume is not attached. | ||||
| 			glog.Warningf( | ||||
| 				"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.", | ||||
| 				nodeName, | ||||
| 				diskNames) | ||||
| 			return attached, nil | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		return attached, err | ||||
| 	} | ||||
| 	for _, blockDevice := range instance.BlockDeviceMappings { | ||||
| 		volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) | ||||
| 		diskName, found := idToDiskName[volumeID] | ||||
| 		if found { | ||||
| 			// Disk is still attached to node | ||||
| 			attached[diskName] = true | ||||
| 		idToDiskName := make(map[awsVolumeID]KubernetesVolumeID) | ||||
| 		for _, diskName := range diskNames { | ||||
| 			volumeID, err := diskName.mapToAWSVolumeID() | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) | ||||
| 			} | ||||
| 			idToDiskName[volumeID] = diskName | ||||
| 		} | ||||
|  | ||||
| 		for _, blockDevice := range awsInstance.BlockDeviceMappings { | ||||
| 			volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) | ||||
| 			diskName, found := idToDiskName[volumeID] | ||||
| 			if found { | ||||
| 				// Disk is still attached to node | ||||
| 				setNodeDisk(attached, diskName, nodeName, true) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -3130,7 +3160,24 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins | ||||
| 			return c.lastInstancesByNodeNames, nil | ||||
| 		} | ||||
| 	} | ||||
| 	names := aws.StringSlice(nodeNames.List()) | ||||
| 	instances, err := c.getInstancesByNodeNames(nodeNames.List()) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(instances) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	glog.V(2).Infof("Caching instances for %v", nodeNames) | ||||
| 	c.lastNodeNames = nodeNames | ||||
| 	c.lastInstancesByNodeNames = instances | ||||
| 	return instances, nil | ||||
| } | ||||
|  | ||||
| func (c *Cloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, error) { | ||||
| 	names := aws.StringSlice(nodeNames) | ||||
|  | ||||
| 	nodeNameFilter := &ec2.Filter{ | ||||
| 		Name:   aws.String("private-dns-name"), | ||||
| @@ -3152,10 +3199,6 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins | ||||
| 		glog.V(3).Infof("Failed to find any instances %v", nodeNames) | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	glog.V(2).Infof("Caching instances for %v", nodeNames) | ||||
| 	c.lastNodeNames = nodeNames | ||||
| 	c.lastInstancesByNodeNames = instances | ||||
| 	return instances, nil | ||||
| } | ||||
|  | ||||
| @@ -3235,3 +3278,18 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins | ||||
| 	awsInstance := newAWSInstance(c.ec2, instance) | ||||
| 	return awsInstance, instance, err | ||||
| } | ||||
|  | ||||
| func setNodeDisk( | ||||
| 	nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool, | ||||
| 	volumeID KubernetesVolumeID, | ||||
| 	nodeName types.NodeName, | ||||
| 	check bool) { | ||||
|  | ||||
| 	volumeMap := nodeDiskMap[nodeName] | ||||
|  | ||||
| 	if volumeMap == nil { | ||||
| 		volumeMap = make(map[KubernetesVolumeID]bool) | ||||
| 		nodeDiskMap[nodeName] = volumeMap | ||||
| 	} | ||||
| 	volumeMap[volumeID] = check | ||||
| } | ||||
|   | ||||
| @@ -121,12 +121,7 @@ func (rc *reconciler) updateSyncTime() { | ||||
|  | ||||
| func (rc *reconciler) syncStates() { | ||||
| 	volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode() | ||||
| 	for nodeName, volumes := range volumesPerNode { | ||||
| 		err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Error in syncing states for volumes: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) | ||||
| } | ||||
|  | ||||
| func (rc *reconciler) reconcile() { | ||||
|   | ||||
| @@ -1134,6 +1134,10 @@ func (plugin *mockVolumePlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *mockVolumePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
|   | ||||
| @@ -78,37 +78,67 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName | ||||
| } | ||||
|  | ||||
| func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { | ||||
| 	volumesAttachedCheck := make(map[*volume.Spec]bool) | ||||
| 	volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec) | ||||
| 	volumeIDList := []aws.KubernetesVolumeID{} | ||||
| 	for _, spec := range specs { | ||||
| 		volumeSource, _, err := getVolumeSource(spec) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		name := aws.KubernetesVolumeID(volumeSource.VolumeID) | ||||
| 		volumeIDList = append(volumeIDList, name) | ||||
| 		volumesAttachedCheck[spec] = true | ||||
| 		volumeSpecMap[name] = spec | ||||
| 	glog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for AWS", nodeName) | ||||
| 	volumeNodeMap := map[types.NodeName][]*volume.Spec{ | ||||
| 		nodeName: specs, | ||||
| 	} | ||||
| 	attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName) | ||||
| 	nodeVolumesResult := make(map[*volume.Spec]bool) | ||||
| 	nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap) | ||||
| 	if err != nil { | ||||
| 		// Log error and continue with attach | ||||
| 		glog.Errorf( | ||||
| 			"Error checking if volumes (%v) is already attached to current node (%q). err=%v", | ||||
| 			volumeIDList, nodeName, err) | ||||
| 		glog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err) | ||||
| 		return nodeVolumesResult, err | ||||
| 	} | ||||
|  | ||||
| 	if result, ok := nodesVerificationMap[nodeName]; ok { | ||||
| 		return result, nil | ||||
| 	} | ||||
| 	return nodeVolumesResult, nil | ||||
| } | ||||
|  | ||||
| func (attacher *awsElasticBlockStoreAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) { | ||||
| 	volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool) | ||||
| 	diskNamesByNode := make(map[types.NodeName][]aws.KubernetesVolumeID) | ||||
| 	volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec) | ||||
|  | ||||
| 	for nodeName, volumeSpecs := range volumesByNode { | ||||
| 		for _, volumeSpec := range volumeSpecs { | ||||
| 			volumeSource, _, err := getVolumeSource(volumeSpec) | ||||
|  | ||||
| 			if err != nil { | ||||
| 				glog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			name := aws.KubernetesVolumeID(volumeSource.VolumeID) | ||||
| 			diskNamesByNode[nodeName] = append(diskNamesByNode[nodeName], name) | ||||
|  | ||||
| 			nodeDisk, nodeDiskExists := volumesAttachedCheck[nodeName] | ||||
|  | ||||
| 			if !nodeDiskExists { | ||||
| 				nodeDisk = make(map[*volume.Spec]bool) | ||||
| 			} | ||||
| 			nodeDisk[volumeSpec] = true | ||||
| 			volumeSpecMap[name] = volumeSpec | ||||
| 			volumesAttachedCheck[nodeName] = nodeDisk | ||||
| 		} | ||||
| 	} | ||||
| 	attachedResult, err := attacher.awsVolumes.DisksAreAttached(diskNamesByNode) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Error checking if volumes are attached to nodes err = %v", err) | ||||
| 		return volumesAttachedCheck, err | ||||
| 	} | ||||
|  | ||||
| 	for volumeID, attached := range attachedResult { | ||||
| 		if !attached { | ||||
| 			spec := volumeSpecMap[volumeID] | ||||
| 			volumesAttachedCheck[spec] = false | ||||
| 			glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) | ||||
| 	for nodeName, nodeDisks := range attachedResult { | ||||
| 		for diskName, attached := range nodeDisks { | ||||
| 			if !attached { | ||||
| 				spec := volumeSpecMap[diskName] | ||||
| 				setNodeDisk(volumesAttachedCheck, spec, nodeName, false) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return volumesAttachedCheck, nil | ||||
| } | ||||
|  | ||||
| @@ -249,3 +279,17 @@ func (detacher *awsElasticBlockStoreDetacher) Detach(deviceMountPath string, nod | ||||
| func (detacher *awsElasticBlockStoreDetacher) UnmountDevice(deviceMountPath string) error { | ||||
| 	return volumeutil.UnmountPath(deviceMountPath, detacher.mounter) | ||||
| } | ||||
|  | ||||
| func setNodeDisk( | ||||
| 	nodeDiskMap map[types.NodeName]map[*volume.Spec]bool, | ||||
| 	volumeSpec *volume.Spec, | ||||
| 	nodeName types.NodeName, | ||||
| 	check bool) { | ||||
|  | ||||
| 	volumeMap := nodeDiskMap[nodeName] | ||||
| 	if volumeMap == nil { | ||||
| 		volumeMap = make(map[*volume.Spec]bool) | ||||
| 		nodeDiskMap[nodeName] = volumeMap | ||||
| 	} | ||||
| 	volumeMap[volumeSpec] = check | ||||
| } | ||||
|   | ||||
| @@ -321,7 +321,7 @@ func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeNa | ||||
| 	return expected.isAttached, expected.ret | ||||
| } | ||||
|  | ||||
| func (testcase *testcase) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) { | ||||
| func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { | ||||
| 	return nil, errors.New("Not implemented") | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -91,6 +91,10 @@ func (plugin *awsElasticBlockStorePlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *awsElasticBlockStorePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *awsElasticBlockStorePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -107,6 +107,10 @@ func (plugin *azureDataDiskPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *azureDataDiskPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *azureDataDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -83,6 +83,10 @@ func (plugin *azureFilePlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *azureFilePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *azureFilePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -76,6 +76,10 @@ func (plugin *cephfsPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -101,6 +101,10 @@ func (plugin *cinderPlugin) RequiresRemount() bool { | ||||
|  | ||||
| func (plugin *cinderPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
|  | ||||
| } | ||||
| func (plugin *cinderPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *cinderPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
|   | ||||
| @@ -80,6 +80,10 @@ func (plugin *configMapPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *configMapPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *configMapPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return &configMapVolumeMounter{ | ||||
| 		configMapVolume: &configMapVolume{spec.Name(), pod.UID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}, | ||||
|   | ||||
| @@ -86,6 +86,10 @@ func (plugin *downwardAPIPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *downwardAPIPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *downwardAPIPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	v := &downwardAPIVolume{ | ||||
| 		volName: spec.Name(), | ||||
|   | ||||
| @@ -94,6 +94,10 @@ func (plugin *emptyDirPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *emptyDirPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *emptyDirPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(), &realMountDetector{plugin.host.GetMounter()}, opts) | ||||
| } | ||||
|   | ||||
| @@ -82,6 +82,10 @@ func (plugin *fcPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *fcPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *fcPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -177,6 +177,10 @@ func (plugin *flexVolumePlugin) unsupported(commands ...string) { | ||||
| 	plugin.unsupportedCommands = append(plugin.unsupportedCommands, commands...) | ||||
| } | ||||
|  | ||||
| func (plugin *flexVolumePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| // Returns true iff the given command is known to be unsupported. | ||||
| func (plugin *flexVolumePlugin) isUnsupported(command string) bool { | ||||
| 	plugin.Lock() | ||||
|   | ||||
| @@ -116,6 +116,10 @@ func (p *flockerPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *flockerPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -86,6 +86,10 @@ func (plugin *gcePersistentDiskPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *gcePersistentDiskPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *gcePersistentDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -85,6 +85,10 @@ func (plugin *gitRepoPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *gitRepoPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *gitRepoPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return &gitRepoVolumeMounter{ | ||||
| 		gitRepoVolume: &gitRepoVolume{ | ||||
|   | ||||
| @@ -120,6 +120,10 @@ func (plugin *glusterfsPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -87,6 +87,10 @@ func (plugin *hostPathPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *hostPathPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *hostPathPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -86,6 +86,10 @@ func (plugin *iscsiPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *iscsiPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *iscsiPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -92,6 +92,10 @@ func (plugin *nfsPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *nfsPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *nfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -83,6 +83,10 @@ func (plugin *photonPersistentDiskPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *photonPersistentDiskPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *photonPersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return plugin.newMounterInternal(spec, pod.UID, &PhotonDiskUtil{}, plugin.host.GetMounter()) | ||||
| } | ||||
|   | ||||
| @@ -112,6 +112,11 @@ type VolumePlugin interface { | ||||
| 	// Specifying mount options in a volume plugin that doesn't support | ||||
| 	// user specified mount options will result in error creating persistent volumes | ||||
| 	SupportsMountOption() bool | ||||
|  | ||||
| 	// SupportsBulkVolumeVerification checks if volume plugin type is capable | ||||
| 	// of enabling bulk polling of all nodes. This can speed up verification of | ||||
| 	// attached volumes by quite a bit, but underlying pluging must support it. | ||||
| 	SupportsBulkVolumeVerification() bool | ||||
| } | ||||
|  | ||||
| // PersistentVolumePlugin is an extended interface of VolumePlugin and is used | ||||
|   | ||||
| @@ -81,6 +81,10 @@ func (plugin *testPlugins) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *testPlugins) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *testPlugins) NewMounter(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (Mounter, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
|   | ||||
| @@ -179,6 +179,10 @@ func (plugin *portworxVolumePlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *portworxVolumePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func getVolumeSource( | ||||
| 	spec *volume.Spec) (*v1.PortworxVolumeSource, bool, error) { | ||||
| 	if spec.Volume != nil && spec.Volume.PortworxVolume != nil { | ||||
|   | ||||
| @@ -96,6 +96,10 @@ func (plugin *projectedPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *projectedPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *projectedPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return &projectedVolumeMounter{ | ||||
| 		projectedVolume: &projectedVolume{ | ||||
|   | ||||
| @@ -122,6 +122,10 @@ func (plugin *quobytePlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *quobytePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *quobytePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -90,6 +90,10 @@ func (plugin *rbdPlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *rbdPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { | ||||
| 	return []v1.PersistentVolumeAccessMode{ | ||||
| 		v1.ReadWriteOnce, | ||||
|   | ||||
| @@ -89,6 +89,10 @@ func (plugin *secretPlugin) SupportsMountOption() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *secretPlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *secretPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return &secretVolumeMounter{ | ||||
| 		secretVolume: &secretVolume{ | ||||
|   | ||||
| @@ -207,6 +207,10 @@ func (plugin *FakeVolumePlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *FakeVolumePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) { | ||||
| 	plugin.Lock() | ||||
| 	defer plugin.Unlock() | ||||
|   | ||||
| @@ -40,6 +40,7 @@ go_test( | ||||
|     deps = [ | ||||
|         "//pkg/api/v1:go_default_library", | ||||
|         "//pkg/util/mount:go_default_library", | ||||
|         "//pkg/volume:go_default_library", | ||||
|         "//pkg/volume/util/types:go_default_library", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/types", | ||||
|   | ||||
| @@ -24,6 +24,8 @@ import ( | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| @@ -59,12 +61,16 @@ type OperationExecutor interface { | ||||
| 	// It then updates the actual state of the world to reflect that. | ||||
| 	AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error | ||||
|  | ||||
| 	// VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node. | ||||
| 	// VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node. | ||||
| 	// If any volume is not attached right now, it will update the actual state of the world to reflect that. | ||||
| 	// Note that this operation could be operated concurrently with other attach/detach operations. | ||||
| 	// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached | ||||
| 	// even if it is attached. But reconciler can correct this in a short period of time. | ||||
| 	VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error | ||||
| 	VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error | ||||
|  | ||||
| 	// VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node | ||||
| 	// If any volume is not attached right now, it will update actual state of world to reflect that. | ||||
| 	VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) | ||||
|  | ||||
| 	// DetachVolume detaches the volume from the node specified in | ||||
| 	// volumeToDetach, and updates the actual state of the world to reflect | ||||
| @@ -387,8 +393,84 @@ func (oe *operationExecutor) DetachVolume( | ||||
| 	return oe.pendingOperations.Run( | ||||
| 		volumeToDetach.VolumeName, "" /* podName */, detachFunc) | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) VerifyVolumesAreAttached( | ||||
| 	attachedVolumes map[types.NodeName][]AttachedVolume, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) { | ||||
|  | ||||
| 	// A map of plugin names and nodes on which they exist with volumes they manage | ||||
| 	bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec) | ||||
| 	volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName) | ||||
|  | ||||
| 	for node, nodeAttachedVolumes := range attachedVolumes { | ||||
| 		for _, volumeAttached := range nodeAttachedVolumes { | ||||
| 			volumePlugin, err := | ||||
| 				oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) | ||||
|  | ||||
| 			if err != nil || volumePlugin == nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", | ||||
| 					volumeAttached.VolumeName, | ||||
| 					volumeAttached.VolumeSpec.Name(), | ||||
| 					volumeAttached.NodeName, | ||||
| 					err) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			pluginName := volumePlugin.GetPluginName() | ||||
|  | ||||
| 			if volumePlugin.SupportsBulkVolumeVerification() { | ||||
| 				pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName] | ||||
|  | ||||
| 				if !pluginNodesExist { | ||||
| 					pluginNodes = make(map[types.NodeName][]*volume.Spec) | ||||
| 				} | ||||
|  | ||||
| 				volumeSpecList, nodeExists := pluginNodes[node] | ||||
| 				if !nodeExists { | ||||
| 					volumeSpecList = []*volume.Spec{} | ||||
| 				} | ||||
| 				volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) | ||||
| 				pluginNodes[node] = volumeSpecList | ||||
|  | ||||
| 				bulkVerifyPluginsByNode[pluginName] = pluginNodes | ||||
| 				volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName] | ||||
|  | ||||
| 				if !mapExists { | ||||
| 					volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName) | ||||
| 				} | ||||
| 				volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName | ||||
| 				volumeSpecMapByPlugin[pluginName] = volumeSpecMap | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// If node doesn't support Bulk volume polling it is best to poll individually | ||||
| 			nodeError := oe.VerifyVolumesAreAttachedPerNode(nodeAttachedVolumes, node, actualStateOfWorld) | ||||
| 			if nodeError != nil { | ||||
| 				glog.Errorf("BulkVerifyVolumes.VerifyVolumesAreAttached verifying volumes on node %q with %v", node, nodeError) | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode { | ||||
| 		bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( | ||||
| 			pluginNodeVolumes, | ||||
| 			pluginName, | ||||
| 			volumeSpecMapByPlugin[pluginName], | ||||
| 			actualStateOfWorld) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc  error bulk verifying volumes for plugin %q with  %v", pluginName, err) | ||||
| 		} | ||||
| 		// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin | ||||
| 		uniquePluginName := v1.UniqueVolumeName(pluginName) | ||||
| 		err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q  with %v", pluginName, err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( | ||||
| 	attachedVolumes []AttachedVolume, | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { | ||||
|   | ||||
| @@ -26,6 +26,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/util/uuid" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	volumetypes "k8s.io/kubernetes/pkg/volume/util/types" | ||||
| ) | ||||
|  | ||||
| @@ -197,7 +198,7 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { | ||||
|  | ||||
| 	// Act | ||||
| 	for i := 0; i < numVolumesToVerifyAttached; i++ { | ||||
| 		oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */) | ||||
| 		oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */) | ||||
| 	} | ||||
|  | ||||
| 	// Assert | ||||
| @@ -281,6 +282,21 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( | ||||
| 	pluginNodeVolumes map[types.NodeName][]*volume.Spec, | ||||
| 	pluginNane string, | ||||
| 	volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, | ||||
| 	actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { | ||||
| 		startOperationAndBlock(fopg.ch, fopg.quit) | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func getTestPodWithSecret(podName, secretName string) *v1.Pod { | ||||
| 	return &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|   | ||||
| @@ -88,6 +88,14 @@ type OperationGenerator interface { | ||||
|  | ||||
| 	// Generates the function needed to check if the attach_detach controller has attached the volume plugin | ||||
| 	GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
|  | ||||
| 	// GetVolumePluginMgr returns volume plugin manager | ||||
| 	GetVolumePluginMgr() *volume.VolumePluginMgr | ||||
|  | ||||
| 	GenerateBulkVolumeVerifyFunc( | ||||
| 		map[types.NodeName][]*volume.Spec, | ||||
| 		string, | ||||
| 		map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateVolumesAreAttachedFunc( | ||||
| @@ -167,6 +175,71 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( | ||||
| 	pluginNodeVolumes map[types.NodeName][]*volume.Spec, | ||||
| 	pluginName string, | ||||
| 	volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
|  | ||||
| 	return func() error { | ||||
| 		attachableVolumePlugin, err := | ||||
| 			og.volumePluginMgr.FindAttachablePluginByName(pluginName) | ||||
| 		if err != nil || attachableVolumePlugin == nil { | ||||
| 			glog.Errorf( | ||||
| 				"BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", | ||||
| 				pluginName, | ||||
| 				err) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() | ||||
|  | ||||
| 		if newAttacherErr != nil { | ||||
| 			glog.Errorf( | ||||
| 				"BulkVerifyVolumes failed for getting plugin %q with: %v", | ||||
| 				attachableVolumePlugin, | ||||
| 				newAttacherErr) | ||||
| 			return nil | ||||
| 		} | ||||
| 		bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) | ||||
|  | ||||
| 		if !ok { | ||||
| 			glog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) | ||||
| 		if bulkAttachErr != nil { | ||||
| 			glog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		for nodeName, volumeSpecs := range pluginNodeVolumes { | ||||
| 			for _, volumeSpec := range volumeSpecs { | ||||
| 				nodeVolumeSpecs, nodeChecked := attached[nodeName] | ||||
|  | ||||
| 				if !nodeChecked { | ||||
| 					glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached", | ||||
| 						nodeName, | ||||
| 						volumeSpec.Name()) | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				check := nodeVolumeSpecs[volumeSpec] | ||||
|  | ||||
| 				if !check { | ||||
| 					glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q", | ||||
| 						nodeName, | ||||
| 						volumeSpec.Name()) | ||||
| 					actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateAttachVolumeFunc( | ||||
| 	volumeToAttach VolumeToAttach, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| @@ -233,6 +306,10 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { | ||||
| 	return og.volumePluginMgr | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateDetachVolumeFunc( | ||||
| 	volumeToDetach AttachedVolume, | ||||
| 	verifySafeToDetach bool, | ||||
|   | ||||
| @@ -187,6 +187,14 @@ type Attacher interface { | ||||
| 	MountDevice(spec *Spec, devicePath string, deviceMountPath string) error | ||||
| } | ||||
|  | ||||
| type BulkVolumeVerifier interface { | ||||
| 	// BulkVerifyVolumes checks whether the list of volumes still attached to the | ||||
| 	// the clusters in the node. It returns a map which maps from the volume spec to the checking result. | ||||
| 	// If an error occurs during check - error should be returned and volume on nodes | ||||
| 	// should be assumed as still attached. | ||||
| 	BulkVerifyVolumes(volumesByNode map[types.NodeName][]*Spec) (map[types.NodeName]map[*Spec]bool, error) | ||||
| } | ||||
|  | ||||
| // Detacher can detach a volume from a node. | ||||
| type Detacher interface { | ||||
| 	// Detach the given device from the node with the given Name. | ||||
|   | ||||
| @@ -84,6 +84,10 @@ func (plugin *vsphereVolumePlugin) SupportsMountOption() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { | ||||
| 	return plugin.newMounterInternal(spec, pod.UID, &VsphereDiskUtil{}, plugin.host.GetMounter()) | ||||
| } | ||||
|   | ||||
| @@ -63,7 +63,7 @@ func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName | ||||
| 	return false, fmt.Errorf("not implemented") | ||||
| } | ||||
|  | ||||
| func (c *mockVolumes) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) { | ||||
| func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { | ||||
| 	return nil, fmt.Errorf("not implemented") | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Hemant Kumar
					Hemant Kumar