From 249da77371ae8ee76e02284722221e4b1a59499d Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 20 Jul 2016 18:08:47 -0400 Subject: [PATCH] Extract kubelet node status into separate file --- pkg/kubelet/kubelet.go | 702 +------------------ pkg/kubelet/kubelet_node_status.go | 715 ++++++++++++++++++++ pkg/kubelet/kubelet_node_status_test.go | 860 ++++++++++++++++++++++++ pkg/kubelet/kubelet_test.go | 824 +---------------------- pkg/kubelet/util.go | 10 + 5 files changed, 1594 insertions(+), 1517 deletions(-) create mode 100644 pkg/kubelet/kubelet_node_status.go create mode 100644 pkg/kubelet/kubelet_node_status_test.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 03c704d64c9..4b925fd268d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -21,13 +21,11 @@ import ( "fmt" "io" "io/ioutil" - "math" "net" "net/http" "os" "path" "path/filepath" - goRuntime "runtime" "sort" "strings" "sync" @@ -37,9 +35,7 @@ import ( "github.com/golang/glog" cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" utilpod "k8s.io/kubernetes/pkg/api/pod" - "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/apis/componentconfig" @@ -83,7 +79,6 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" kubeio "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" - utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -93,7 +88,6 @@ import ( utilvalidation "k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/volumehelper" "k8s.io/kubernetes/pkg/watch" @@ -770,6 +764,8 @@ type Kubelet struct { // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up. oneTimeInitializer sync.Once + // flannelExperimentalOverlay determines whether the experimental flannel + // network overlay is active. flannelExperimentalOverlay bool // TODO: Flannelhelper doesn't store any state, we can instantiate it @@ -827,15 +823,6 @@ type Kubelet struct { enableControllerAttachDetach bool } -// dirExists returns true if the path exists and represents a directory. -func dirExists(path string) bool { - s, err := os.Stat(path) - if err != nil { - return false - } - return s.IsDir() -} - // setupDataDirs creates: // 1. the root directory // 2. the pods directory @@ -975,187 +962,6 @@ func (kl *Kubelet) getActivePods() []*api.Pod { return activePods } -// initialNodeStatus determines the initial node status, incorporating node -// labels and information from the cloud provider. -func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: kl.nodeName, - Labels: map[string]string{ - unversioned.LabelHostname: kl.hostname, - unversioned.LabelOS: goRuntime.GOOS, - unversioned.LabelArch: goRuntime.GOARCH, - }, - }, - Spec: api.NodeSpec{ - Unschedulable: !kl.registerSchedulable, - }, - } - // Initially, set NodeNetworkUnavailable to true. - if kl.providerRequiresNetworkingConfiguration() { - node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ - Type: api.NodeNetworkUnavailable, - Status: api.ConditionTrue, - Reason: "NoRouteCreated", - Message: "Node created without a route", - LastTransitionTime: unversioned.NewTime(kl.clock.Now()), - }) - } - - if kl.enableControllerAttachDetach { - if node.Annotations == nil { - node.Annotations = make(map[string]string) - } - - node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true" - } - - // @question: should this be place after the call to the cloud provider? which also applies labels - for k, v := range kl.nodeLabels { - if cv, found := node.ObjectMeta.Labels[k]; found { - glog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv) - } - node.ObjectMeta.Labels[k] = v - } - - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return nil, fmt.Errorf("failed to get instances from cloud provider") - } - - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO: ExternalID is deprecated, we'll have to drop this code - externalID, err := instances.ExternalID(kl.nodeName) - if err != nil { - return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) - } - node.Spec.ExternalID = externalID - - // TODO: We can't assume that the node has credentials to talk to the - // cloudprovider from arbitrary nodes. At most, we should talk to a - // local metadata server here. - node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) - if err != nil { - return nil, err - } - - instanceType, err := instances.InstanceType(kl.nodeName) - if err != nil { - return nil, err - } - if instanceType != "" { - glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelInstanceType, instanceType) - node.ObjectMeta.Labels[unversioned.LabelInstanceType] = instanceType - } - // If the cloud has zone information, label the node with the zone information - zones, ok := kl.cloud.Zones() - if ok { - zone, err := zones.GetZone() - if err != nil { - return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) - } - if zone.FailureDomain != "" { - glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneFailureDomain, zone.FailureDomain) - node.ObjectMeta.Labels[unversioned.LabelZoneFailureDomain] = zone.FailureDomain - } - if zone.Region != "" { - glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneRegion, zone.Region) - node.ObjectMeta.Labels[unversioned.LabelZoneRegion] = zone.Region - } - } - } else { - node.Spec.ExternalID = kl.hostname - if kl.autoDetectCloudProvider { - // If no cloud provider is defined - use the one detected by cadvisor - info, err := kl.GetCachedMachineInfo() - if err == nil { - kl.updateCloudProviderFromMachineInfo(node, info) - } - } - } - if err := kl.setNodeStatus(node); err != nil { - return nil, err - } - - return node, nil -} - -// registerWithApiserver registers the node with the cluster master. It is safe -// to call multiple times, but not concurrently (kl.registrationCompleted is -// not locked). -func (kl *Kubelet) registerWithApiserver() { - if kl.registrationCompleted { - return - } - step := 100 * time.Millisecond - for { - time.Sleep(step) - step = step * 2 - if step >= 7*time.Second { - step = 7 * time.Second - } - - node, err := kl.initialNodeStatus() - if err != nil { - glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) - continue - } - - glog.V(2).Infof("Attempting to register node %s", node.Name) - if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil { - if !apierrors.IsAlreadyExists(err) { - glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) - continue - } - currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) - if err != nil { - glog.Errorf("error getting node %q: %v", kl.nodeName, err) - continue - } - if currentNode == nil { - glog.Errorf("no node instance returned for %q", kl.nodeName) - continue - } - if currentNode.Spec.ExternalID == node.Spec.ExternalID { - glog.Infof("Node %s was previously registered", node.Name) - kl.registrationCompleted = true - return - } - glog.Errorf( - "Previously %q had externalID %q; now it is %q; will delete and recreate.", - kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, - ) - if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { - glog.Errorf("Unable to delete old node: %v", err) - } else { - glog.Errorf("Deleted old node object %q", kl.nodeName) - } - continue - } - glog.Infof("Successfully registered node %s", node.Name) - kl.registrationCompleted = true - return - } -} - -// syncNodeStatus should be called periodically from a goroutine. -// It synchronizes node status to master, registering the kubelet first if -// necessary. -func (kl *Kubelet) syncNodeStatus() { - if kl.kubeClient == nil { - return - } - if kl.registerNode { - // This will exit immediately if it doesn't need to do anything. - kl.registerWithApiserver() - } - if err := kl.updateNodeStatus(); err != nil { - glog.Errorf("Unable to update node status: %v", err) - } -} - // relabelVolumes relabels SELinux volumes to match the pod's // SELinuxOptions specification. This is only needed if the pod uses // hostPID or hostIPC. Otherwise relabeling is delegated to docker. @@ -1678,10 +1484,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { // // Arguments: // -// pod - the pod to sync -// mirrorPod - the mirror pod for the pod to sync, if it is a static pod -// podStatus - the current status (TODO: always from the status manager?) -// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE) +// o - the SyncPodOptions for this invocation // // The workflow is: // * If the pod is being created, record pod worker start latency @@ -1855,8 +1658,11 @@ func podUsesHostNetwork(pod *api.Pod) bool { return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork } -// getPullSecretsForPod inspects the Pod and retrieves the referenced pull secrets -// TODO duplicate secrets are being retrieved multiple times and there is no cache. Creating and using a secret manager interface will make this easier to address. +// getPullSecretsForPod inspects the Pod and retrieves the referenced pull +// secrets. +// TODO: duplicate secrets are being retrieved multiple times and there +// is no cache. Creating and using a secret manager interface will make this +// easier to address. func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) { pullSecrets := []api.Secret{} @@ -2635,94 +2441,6 @@ func (kl *Kubelet) updateRuntimeUp() { kl.runtimeState.setRuntimeSync(kl.clock.Now()) } -// updateNodeStatus updates node status to master with retries. -func (kl *Kubelet) updateNodeStatus() error { - for i := 0; i < nodeStatusUpdateRetry; i++ { - if err := kl.tryUpdateNodeStatus(); err != nil { - glog.Errorf("Error updating node status, will retry: %v", err) - } else { - return nil - } - } - return fmt.Errorf("update node status exceeds retry count") -} - -// recordNodeStatusEvent records an event of the given type with the given -// message for the node. -func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) { - glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event) -} - -// Set addresses for the node. -func (kl *Kubelet) setNodeAddress(node *api.Node) error { - // Set addresses for the node. - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface - nodeAddresses, err := instances.NodeAddresses(kl.nodeName) - if err != nil { - return fmt.Errorf("failed to get node address from cloud provider: %v", err) - } - node.Status.Addresses = nodeAddresses - } else { - if kl.nodeIP != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: kl.nodeIP.String()}, - {Type: api.NodeInternalIP, Address: kl.nodeIP.String()}, - } - } else if addr := net.ParseIP(kl.hostname); addr != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: addr.String()}, - {Type: api.NodeInternalIP, Address: addr.String()}, - } - } else { - addrs, err := net.LookupIP(node.Name) - if err != nil { - return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - return fmt.Errorf("no ip address for node %v", node.Name) - } else { - // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. - // If no match is found, it uses the IP of the interface with gateway on it. - for _, ip := range addrs { - if ip.IsLoopback() { - continue - } - - if ip.To4() != nil { - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - break - } - } - - if len(node.Status.Addresses) == 0 { - ip, err := utilnet.ChooseHostInterface() - if err != nil { - return err - } - - node.Status.Addresses = []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: ip.String()}, - {Type: api.NodeInternalIP, Address: ip.String()}, - } - } - } - } - } - return nil -} - func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadvisorapi.MachineInfo) { if info.CloudProvider != cadvisorapi.UnknownProvider && info.CloudProvider != cadvisorapi.Baremetal { @@ -2735,118 +2453,6 @@ func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadv } } -func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) { - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start - // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. - info, err := kl.GetCachedMachineInfo() - if err != nil { - // TODO(roberthbailey): This is required for test-cmd.sh to pass. - // See if the test should be updated instead. - node.Status.Capacity = api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - api.ResourceMemory: resource.MustParse("0Gi"), - api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI), - } - glog.Errorf("Error getting machine info: %v", err) - } else { - node.Status.NodeInfo.MachineID = info.MachineID - node.Status.NodeInfo.SystemUUID = info.SystemUUID - node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info) - if kl.podsPerCore > 0 { - node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( - int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI) - } else { - node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( - int64(kl.maxPods), resource.DecimalSI) - } - node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity( - int64(kl.nvidiaGPUs), resource.DecimalSI) - if node.Status.NodeInfo.BootID != "" && - node.Status.NodeInfo.BootID != info.BootID { - // TODO: This requires a transaction, either both node status is updated - // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.NodeRebooted, - "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) - } - node.Status.NodeInfo.BootID = info.BootID - } - - // Set Allocatable. - node.Status.Allocatable = make(api.ResourceList) - for k, v := range node.Status.Capacity { - value := *(v.Copy()) - if kl.reservation.System != nil { - value.Sub(kl.reservation.System[k]) - } - if kl.reservation.Kubernetes != nil { - value.Sub(kl.reservation.Kubernetes[k]) - } - if value.Sign() < 0 { - // Negative Allocatable resources don't make sense. - value.Set(0) - } - node.Status.Allocatable[k] = value - } -} - -// Set versioninfo for the node. -func (kl *Kubelet) setNodeStatusVersionInfo(node *api.Node) { - verinfo, err := kl.cadvisor.VersionInfo() - if err != nil { - glog.Errorf("Error getting version info: %v", err) - } else { - node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion - node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion - - runtimeVersion := "Unknown" - if runtimeVer, err := kl.containerRuntime.Version(); err == nil { - runtimeVersion = runtimeVer.String() - } - node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion) - - node.Status.NodeInfo.KubeletVersion = version.Get().String() - // TODO: kube-proxy might be different version from kubelet in the future - node.Status.NodeInfo.KubeProxyVersion = version.Get().String() - } - -} - -// Set daemonEndpoints for the node. -func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *api.Node) { - node.Status.DaemonEndpoints = *kl.daemonEndpoints -} - -// Set images list for the node -func (kl *Kubelet) setNodeStatusImages(node *api.Node) { - // Update image list of this node - var imagesOnNode []api.ContainerImage - containerImages, err := kl.imageManager.GetImageList() - if err != nil { - glog.Errorf("Error getting image list: %v", err) - } else { - // sort the images from max to min, and only set top N images into the node status. - sort.Sort(byImageSize(containerImages)) - if maxImagesInNodeStatus < len(containerImages) { - containerImages = containerImages[0:maxImagesInNodeStatus] - } - - for _, image := range containerImages { - imagesOnNode = append(imagesOnNode, api.ContainerImage{ - Names: append(image.RepoTags, image.RepoDigests...), - SizeBytes: image.Size, - }) - } - } - node.Status.Images = imagesOnNode -} - -// Set the GOOS and GOARCH for this node -func (kl *Kubelet) setNodeStatusGoRuntime(node *api.Node) { - node.Status.NodeInfo.OperatingSystem = goRuntime.GOOS - node.Status.NodeInfo.Architecture = goRuntime.GOARCH -} - type byImageSize []kubecontainer.Image // Sort from max to min @@ -2856,298 +2462,6 @@ func (a byImageSize) Less(i, j int) bool { func (a byImageSize) Len() int { return len(a) } func (a byImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// Set status for the node. -func (kl *Kubelet) setNodeStatusInfo(node *api.Node) { - kl.setNodeStatusMachineInfo(node) - kl.setNodeStatusVersionInfo(node) - kl.setNodeStatusDaemonEndpoints(node) - kl.setNodeStatusImages(node) - kl.setNodeStatusGoRuntime(node) -} - -// Set Readycondition for the node. -func (kl *Kubelet) setNodeReadyCondition(node *api.Node) { - // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. - // This is due to an issue with version skewed kubelet and master components. - // ref: https://github.com/kubernetes/kubernetes/issues/16961 - currentTime := unversioned.NewTime(kl.clock.Now()) - var newNodeReadyCondition api.NodeCondition - if rs := kl.runtimeState.errors(); len(rs) == 0 { - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: "kubelet is posting ready status", - LastHeartbeatTime: currentTime, - } - } else { - newNodeReadyCondition = api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionFalse, - Reason: "KubeletNotReady", - Message: strings.Join(rs, ","), - LastHeartbeatTime: currentTime, - } - } - - // Record any soft requirements that were not met in the container manager. - status := kl.containerManager.Status() - if status.SoftRequirements != nil { - newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) - } - - readyConditionUpdated := false - needToRecordEvent := false - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == api.NodeReady { - if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime - needToRecordEvent = true - } - node.Status.Conditions[i] = newNodeReadyCondition - readyConditionUpdated = true - break - } - } - if !readyConditionUpdated { - newNodeReadyCondition.LastTransitionTime = currentTime - node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) - } - if needToRecordEvent { - if newNodeReadyCondition.Status == api.ConditionTrue { - kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeReady) - } else { - kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotReady) - } - } -} - -// setNodeMemoryPressureCondition for the node. -// TODO: this needs to move somewhere centralized... -func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) { - currentTime := unversioned.NewTime(kl.clock.Now()) - var condition *api.NodeCondition - - // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == api.NodeMemoryPressure { - condition = &node.Status.Conditions[i] - } - } - - newCondition := false - // If the NodeMemoryPressure condition doesn't exist, create one - if condition == nil { - condition = &api.NodeCondition{ - Type: api.NodeMemoryPressure, - Status: api.ConditionUnknown, - } - // cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append to the slice here none of the - // updates we make below are reflected in the slice. - newCondition = true - } - - // Update the heartbeat time - condition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to api.ConditionUnknown which matches either - // condition.Status != api.ConditionTrue or - // condition.Status != api.ConditionFalse in the conditions below depending on whether - // the kubelet is under memory pressure or not. - if kl.evictionManager.IsUnderMemoryPressure() { - if condition.Status != api.ConditionTrue { - condition.Status = api.ConditionTrue - condition.Reason = "KubeletHasInsufficientMemory" - condition.Message = "kubelet has insufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory") - } - } else { - if condition.Status != api.ConditionFalse { - condition.Status = api.ConditionFalse - condition.Reason = "KubeletHasSufficientMemory" - condition.Message = "kubelet has sufficient memory available" - condition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory") - } - } - - if newCondition { - node.Status.Conditions = append(node.Status.Conditions, *condition) - } -} - -// Set OODcondition for the node. -func (kl *Kubelet) setNodeOODCondition(node *api.Node) { - currentTime := unversioned.NewTime(kl.clock.Now()) - var nodeOODCondition *api.NodeCondition - - // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. - for i := range node.Status.Conditions { - if node.Status.Conditions[i].Type == api.NodeOutOfDisk { - nodeOODCondition = &node.Status.Conditions[i] - } - } - - newOODCondition := false - // If the NodeOutOfDisk condition doesn't exist, create one. - if nodeOODCondition == nil { - nodeOODCondition = &api.NodeCondition{ - Type: api.NodeOutOfDisk, - Status: api.ConditionUnknown, - } - // nodeOODCondition cannot be appended to node.Status.Conditions here because it gets - // copied to the slice. So if we append nodeOODCondition to the slice here none of the - // updates we make to nodeOODCondition below are reflected in the slice. - newOODCondition = true - } - - // Update the heartbeat time irrespective of all the conditions. - nodeOODCondition.LastHeartbeatTime = currentTime - - // Note: The conditions below take care of the case when a new NodeOutOfDisk condition is - // created and as well as the case when the condition already exists. When a new condition - // is created its status is set to api.ConditionUnknown which matches either - // nodeOODCondition.Status != api.ConditionTrue or - // nodeOODCondition.Status != api.ConditionFalse in the conditions below depending on whether - // the kubelet is out of disk or not. - if kl.isOutOfDisk() { - if nodeOODCondition.Status != api.ConditionTrue { - nodeOODCondition.Status = api.ConditionTrue - nodeOODCondition.Reason = "KubeletOutOfDisk" - nodeOODCondition.Message = "out of disk space" - nodeOODCondition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeOutOfDisk") - } - } else { - if nodeOODCondition.Status != api.ConditionFalse { - // Update the out of disk condition when the condition status is unknown even if we - // are within the outOfDiskTransitionFrequency duration. We do this to set the - // condition status correctly at kubelet startup. - if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency { - nodeOODCondition.Status = api.ConditionFalse - nodeOODCondition.Reason = "KubeletHasSufficientDisk" - nodeOODCondition.Message = "kubelet has sufficient disk space available" - nodeOODCondition.LastTransitionTime = currentTime - kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk") - } else { - glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency) - } - } - } - - if newOODCondition { - node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition) - } -} - -// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() -var oldNodeUnschedulable bool - -// record if node schedulable change. -func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) { - if oldNodeUnschedulable != node.Spec.Unschedulable { - if node.Spec.Unschedulable { - kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotSchedulable) - } else { - kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeSchedulable) - } - oldNodeUnschedulable = node.Spec.Unschedulable - } -} - -// Update VolumesInUse field in Node Status -func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) { - node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse() -} - -// setNodeStatus fills in the Status fields of the given Node, overwriting -// any fields that are currently set. -// TODO(madhusudancs): Simplify the logic for setting node conditions and -// refactor the node status condtion code out to a different file. -func (kl *Kubelet) setNodeStatus(node *api.Node) error { - for _, f := range kl.setNodeStatusFuncs { - if err := f(node); err != nil { - return err - } - } - return nil -} - -// defaultNodeStatusFuncs is a factory that generates the default set of setNodeStatus funcs -func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { - // initial set of node status update handlers, can be modified by Option's - withoutError := func(f func(*api.Node)) func(*api.Node) error { - return func(n *api.Node) error { - f(n) - return nil - } - } - return []func(*api.Node) error{ - kl.setNodeAddress, - withoutError(kl.setNodeStatusInfo), - withoutError(kl.setNodeOODCondition), - withoutError(kl.setNodeMemoryPressureCondition), - withoutError(kl.setNodeReadyCondition), - withoutError(kl.setNodeVolumesInUseStatus), - withoutError(kl.recordNodeSchedulableEvent), - } -} - -// SetNodeStatus returns a functional Option that adds the given node status update handler to the Kubelet -func SetNodeStatus(f func(*api.Node) error) Option { - return func(k *Kubelet) { - k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f) - } -} - -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. -func (kl *Kubelet) tryUpdateNodeStatus() error { - node, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) - if err != nil { - return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) - } - if node == nil { - return fmt.Errorf("no node instance returned for %q", kl.nodeName) - } - - // Flannel is the authoritative source of pod CIDR, if it's running. - // This is a short term compromise till we get flannel working in - // reservation mode. - if kl.flannelExperimentalOverlay { - flannelPodCIDR := kl.runtimeState.podCIDR() - if node.Spec.PodCIDR != flannelPodCIDR { - node.Spec.PodCIDR = flannelPodCIDR - glog.Infof("Updating podcidr to %v", node.Spec.PodCIDR) - if updatedNode, err := kl.kubeClient.Core().Nodes().Update(node); err != nil { - glog.Warningf("Failed to update podCIDR: %v", err) - } else { - // Update the node resourceVersion so the status update doesn't fail. - node = updatedNode - } - } - } else if kl.reconcileCIDR { - kl.updatePodCIDR(node.Spec.PodCIDR) - } - - if err := kl.setNodeStatus(node); err != nil { - return err - } - // Update the current status on the API server - updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node) - if err == nil { - kl.volumeManager.MarkVolumesAsReportedInUse( - updatedNode.Status.VolumesInUse) - } - return err -} - // GetPhase returns the phase of a pod given its container info. // This func is exported to simplify integration with 3rd party kubelet // integrations like kubernetes-mesos. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go new file mode 100644 index 00000000000..d128b759ff2 --- /dev/null +++ b/pkg/kubelet/kubelet_node_status.go @@ -0,0 +1,715 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "math" + "net" + goRuntime "runtime" + "sort" + "strings" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/events" + utilnet "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" +) + +// registerWithApiserver registers the node with the cluster master. It is safe +// to call multiple times, but not concurrently (kl.registrationCompleted is +// not locked). +func (kl *Kubelet) registerWithApiserver() { + if kl.registrationCompleted { + return + } + step := 100 * time.Millisecond + for { + time.Sleep(step) + step = step * 2 + if step >= 7*time.Second { + step = 7 * time.Second + } + + node, err := kl.initialNodeStatus() + if err != nil { + glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) + continue + } + + glog.V(2).Infof("Attempting to register node %s", node.Name) + if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil { + if !apierrors.IsAlreadyExists(err) { + glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) + continue + } + currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) + if err != nil { + glog.Errorf("error getting node %q: %v", kl.nodeName, err) + continue + } + if currentNode == nil { + glog.Errorf("no node instance returned for %q", kl.nodeName) + continue + } + if currentNode.Spec.ExternalID == node.Spec.ExternalID { + glog.Infof("Node %s was previously registered", node.Name) + kl.registrationCompleted = true + return + } + glog.Errorf( + "Previously %q had externalID %q; now it is %q; will delete and recreate.", + kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID, + ) + if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { + glog.Errorf("Unable to delete old node: %v", err) + } else { + glog.Errorf("Deleted old node object %q", kl.nodeName) + } + continue + } + glog.Infof("Successfully registered node %s", node.Name) + kl.registrationCompleted = true + return + } +} + +// initialNodeStatus determines the initial node status, incorporating node +// labels and information from the cloud provider. +func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: kl.nodeName, + Labels: map[string]string{ + unversioned.LabelHostname: kl.hostname, + unversioned.LabelOS: goRuntime.GOOS, + unversioned.LabelArch: goRuntime.GOARCH, + }, + }, + Spec: api.NodeSpec{ + Unschedulable: !kl.registerSchedulable, + }, + } + // Initially, set NodeNetworkUnavailable to true. + if kl.providerRequiresNetworkingConfiguration() { + node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ + Type: api.NodeNetworkUnavailable, + Status: api.ConditionTrue, + Reason: "NoRouteCreated", + Message: "Node created without a route", + LastTransitionTime: unversioned.NewTime(kl.clock.Now()), + }) + } + + if kl.enableControllerAttachDetach { + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + + node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true" + } + + // @question: should this be place after the call to the cloud provider? which also applies labels + for k, v := range kl.nodeLabels { + if cv, found := node.ObjectMeta.Labels[k]; found { + glog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv) + } + node.ObjectMeta.Labels[k] = v + } + + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return nil, fmt.Errorf("failed to get instances from cloud provider") + } + + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO: ExternalID is deprecated, we'll have to drop this code + externalID, err := instances.ExternalID(kl.nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) + } + node.Spec.ExternalID = externalID + + // TODO: We can't assume that the node has credentials to talk to the + // cloudprovider from arbitrary nodes. At most, we should talk to a + // local metadata server here. + node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) + if err != nil { + return nil, err + } + + instanceType, err := instances.InstanceType(kl.nodeName) + if err != nil { + return nil, err + } + if instanceType != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelInstanceType, instanceType) + node.ObjectMeta.Labels[unversioned.LabelInstanceType] = instanceType + } + // If the cloud has zone information, label the node with the zone information + zones, ok := kl.cloud.Zones() + if ok { + zone, err := zones.GetZone() + if err != nil { + return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) + } + if zone.FailureDomain != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneFailureDomain, zone.FailureDomain) + node.ObjectMeta.Labels[unversioned.LabelZoneFailureDomain] = zone.FailureDomain + } + if zone.Region != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneRegion, zone.Region) + node.ObjectMeta.Labels[unversioned.LabelZoneRegion] = zone.Region + } + } + } else { + node.Spec.ExternalID = kl.hostname + if kl.autoDetectCloudProvider { + // If no cloud provider is defined - use the one detected by cadvisor + info, err := kl.GetCachedMachineInfo() + if err == nil { + kl.updateCloudProviderFromMachineInfo(node, info) + } + } + } + if err := kl.setNodeStatus(node); err != nil { + return nil, err + } + + return node, nil +} + +// syncNodeStatus should be called periodically from a goroutine. +// It synchronizes node status to master, registering the kubelet first if +// necessary. +func (kl *Kubelet) syncNodeStatus() { + if kl.kubeClient == nil { + return + } + if kl.registerNode { + // This will exit immediately if it doesn't need to do anything. + kl.registerWithApiserver() + } + if err := kl.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } +} + +// updateNodeStatus updates node status to master with retries. +func (kl *Kubelet) updateNodeStatus() error { + for i := 0; i < nodeStatusUpdateRetry; i++ { + if err := kl.tryUpdateNodeStatus(); err != nil { + glog.Errorf("Error updating node status, will retry: %v", err) + } else { + return nil + } + } + return fmt.Errorf("update node status exceeds retry count") +} + +// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 +// is set, this function will also confirm that cbr0 is configured correctly. +func (kl *Kubelet) tryUpdateNodeStatus() error { + node, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName) + if err != nil { + return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %q", kl.nodeName) + } + + // Flannel is the authoritative source of pod CIDR, if it's running. + // This is a short term compromise till we get flannel working in + // reservation mode. + if kl.flannelExperimentalOverlay { + flannelPodCIDR := kl.runtimeState.podCIDR() + if node.Spec.PodCIDR != flannelPodCIDR { + node.Spec.PodCIDR = flannelPodCIDR + glog.Infof("Updating podcidr to %v", node.Spec.PodCIDR) + if updatedNode, err := kl.kubeClient.Core().Nodes().Update(node); err != nil { + glog.Warningf("Failed to update podCIDR: %v", err) + } else { + // Update the node resourceVersion so the status update doesn't fail. + node = updatedNode + } + } + } else if kl.reconcileCIDR { + kl.updatePodCIDR(node.Spec.PodCIDR) + } + + if err := kl.setNodeStatus(node); err != nil { + return err + } + // Update the current status on the API server + updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node) + if err == nil { + kl.volumeManager.MarkVolumesAsReportedInUse( + updatedNode.Status.VolumesInUse) + } + return err +} + +// recordNodeStatusEvent records an event of the given type with the given +// message for the node. +func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) { + glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event) +} + +// Set addresses for the node. +func (kl *Kubelet) setNodeAddress(node *api.Node) error { + // Set addresses for the node. + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return fmt.Errorf("failed to get instances from cloud provider") + } + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface + nodeAddresses, err := instances.NodeAddresses(kl.nodeName) + if err != nil { + return fmt.Errorf("failed to get node address from cloud provider: %v", err) + } + node.Status.Addresses = nodeAddresses + } else { + if kl.nodeIP != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: kl.nodeIP.String()}, + {Type: api.NodeInternalIP, Address: kl.nodeIP.String()}, + } + } else if addr := net.ParseIP(kl.hostname); addr != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: addr.String()}, + {Type: api.NodeInternalIP, Address: addr.String()}, + } + } else { + addrs, err := net.LookupIP(node.Name) + if err != nil { + return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + return fmt.Errorf("no ip address for node %v", node.Name) + } else { + // check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address. + // If no match is found, it uses the IP of the interface with gateway on it. + for _, ip := range addrs { + if ip.IsLoopback() { + continue + } + + if ip.To4() != nil { + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + break + } + } + + if len(node.Status.Addresses) == 0 { + ip, err := utilnet.ChooseHostInterface() + if err != nil { + return err + } + + node.Status.Addresses = []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: ip.String()}, + {Type: api.NodeInternalIP, Address: ip.String()}, + } + } + } + } + } + return nil +} + +func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) { + // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start + // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. + info, err := kl.GetCachedMachineInfo() + if err != nil { + // TODO(roberthbailey): This is required for test-cmd.sh to pass. + // See if the test should be updated instead. + node.Status.Capacity = api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + api.ResourceMemory: resource.MustParse("0Gi"), + api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI), + } + glog.Errorf("Error getting machine info: %v", err) + } else { + node.Status.NodeInfo.MachineID = info.MachineID + node.Status.NodeInfo.SystemUUID = info.SystemUUID + node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info) + if kl.podsPerCore > 0 { + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( + int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI) + } else { + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity( + int64(kl.maxPods), resource.DecimalSI) + } + node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity( + int64(kl.nvidiaGPUs), resource.DecimalSI) + if node.Status.NodeInfo.BootID != "" && + node.Status.NodeInfo.BootID != info.BootID { + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.NodeRebooted, + "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) + } + node.Status.NodeInfo.BootID = info.BootID + } + + // Set Allocatable. + node.Status.Allocatable = make(api.ResourceList) + for k, v := range node.Status.Capacity { + value := *(v.Copy()) + if kl.reservation.System != nil { + value.Sub(kl.reservation.System[k]) + } + if kl.reservation.Kubernetes != nil { + value.Sub(kl.reservation.Kubernetes[k]) + } + if value.Sign() < 0 { + // Negative Allocatable resources don't make sense. + value.Set(0) + } + node.Status.Allocatable[k] = value + } +} + +// Set versioninfo for the node. +func (kl *Kubelet) setNodeStatusVersionInfo(node *api.Node) { + verinfo, err := kl.cadvisor.VersionInfo() + if err != nil { + glog.Errorf("Error getting version info: %v", err) + } else { + node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion + node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion + + runtimeVersion := "Unknown" + if runtimeVer, err := kl.containerRuntime.Version(); err == nil { + runtimeVersion = runtimeVer.String() + } + node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion) + + node.Status.NodeInfo.KubeletVersion = version.Get().String() + // TODO: kube-proxy might be different version from kubelet in the future + node.Status.NodeInfo.KubeProxyVersion = version.Get().String() + } + +} + +// Set daemonEndpoints for the node. +func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *api.Node) { + node.Status.DaemonEndpoints = *kl.daemonEndpoints +} + +// Set images list for the node +func (kl *Kubelet) setNodeStatusImages(node *api.Node) { + // Update image list of this node + var imagesOnNode []api.ContainerImage + containerImages, err := kl.imageManager.GetImageList() + if err != nil { + glog.Errorf("Error getting image list: %v", err) + } else { + // sort the images from max to min, and only set top N images into the node status. + sort.Sort(byImageSize(containerImages)) + if maxImagesInNodeStatus < len(containerImages) { + containerImages = containerImages[0:maxImagesInNodeStatus] + } + + for _, image := range containerImages { + imagesOnNode = append(imagesOnNode, api.ContainerImage{ + Names: append(image.RepoTags, image.RepoDigests...), + SizeBytes: image.Size, + }) + } + } + node.Status.Images = imagesOnNode +} + +// Set the GOOS and GOARCH for this node +func (kl *Kubelet) setNodeStatusGoRuntime(node *api.Node) { + node.Status.NodeInfo.OperatingSystem = goRuntime.GOOS + node.Status.NodeInfo.Architecture = goRuntime.GOARCH +} + +// Set status for the node. +func (kl *Kubelet) setNodeStatusInfo(node *api.Node) { + kl.setNodeStatusMachineInfo(node) + kl.setNodeStatusVersionInfo(node) + kl.setNodeStatusDaemonEndpoints(node) + kl.setNodeStatusImages(node) + kl.setNodeStatusGoRuntime(node) +} + +// Set Ready condition for the node. +func (kl *Kubelet) setNodeReadyCondition(node *api.Node) { + // NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions. + // This is due to an issue with version skewed kubelet and master components. + // ref: https://github.com/kubernetes/kubernetes/issues/16961 + currentTime := unversioned.NewTime(kl.clock.Now()) + var newNodeReadyCondition api.NodeCondition + if rs := kl.runtimeState.errors(); len(rs) == 0 { + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: "kubelet is posting ready status", + LastHeartbeatTime: currentTime, + } + } else { + newNodeReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFalse, + Reason: "KubeletNotReady", + Message: strings.Join(rs, ","), + LastHeartbeatTime: currentTime, + } + } + + // Record any soft requirements that were not met in the container manager. + status := kl.containerManager.Status() + if status.SoftRequirements != nil { + newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error()) + } + + readyConditionUpdated := false + needToRecordEvent := false + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeReady { + if node.Status.Conditions[i].Status == newNodeReadyCondition.Status { + newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime + } else { + newNodeReadyCondition.LastTransitionTime = currentTime + needToRecordEvent = true + } + node.Status.Conditions[i] = newNodeReadyCondition + readyConditionUpdated = true + break + } + } + if !readyConditionUpdated { + newNodeReadyCondition.LastTransitionTime = currentTime + node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition) + } + if needToRecordEvent { + if newNodeReadyCondition.Status == api.ConditionTrue { + kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeReady) + } else { + kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotReady) + } + } +} + +// setNodeMemoryPressureCondition for the node. +// TODO: this needs to move somewhere centralized... +func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) { + currentTime := unversioned.NewTime(kl.clock.Now()) + var condition *api.NodeCondition + + // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeMemoryPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeMemoryPressure condition doesn't exist, create one + if condition == nil { + condition = &api.NodeCondition{ + Type: api.NodeMemoryPressure, + Status: api.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to api.ConditionUnknown which matches either + // condition.Status != api.ConditionTrue or + // condition.Status != api.ConditionFalse in the conditions below depending on whether + // the kubelet is under memory pressure or not. + if kl.evictionManager.IsUnderMemoryPressure() { + if condition.Status != api.ConditionTrue { + condition.Status = api.ConditionTrue + condition.Reason = "KubeletHasInsufficientMemory" + condition.Message = "kubelet has insufficient memory available" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory") + } + } else { + if condition.Status != api.ConditionFalse { + condition.Status = api.ConditionFalse + condition.Reason = "KubeletHasSufficientMemory" + condition.Message = "kubelet has sufficient memory available" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory") + } + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } +} + +// Set OODcondition for the node. +func (kl *Kubelet) setNodeOODCondition(node *api.Node) { + currentTime := unversioned.NewTime(kl.clock.Now()) + var nodeOODCondition *api.NodeCondition + + // Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeOutOfDisk { + nodeOODCondition = &node.Status.Conditions[i] + } + } + + newOODCondition := false + // If the NodeOutOfDisk condition doesn't exist, create one. + if nodeOODCondition == nil { + nodeOODCondition = &api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionUnknown, + } + // nodeOODCondition cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append nodeOODCondition to the slice here none of the + // updates we make to nodeOODCondition below are reflected in the slice. + newOODCondition = true + } + + // Update the heartbeat time irrespective of all the conditions. + nodeOODCondition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeOutOfDisk condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to api.ConditionUnknown which matches either + // nodeOODCondition.Status != api.ConditionTrue or + // nodeOODCondition.Status != api.ConditionFalse in the conditions below depending on whether + // the kubelet is out of disk or not. + if kl.isOutOfDisk() { + if nodeOODCondition.Status != api.ConditionTrue { + nodeOODCondition.Status = api.ConditionTrue + nodeOODCondition.Reason = "KubeletOutOfDisk" + nodeOODCondition.Message = "out of disk space" + nodeOODCondition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeOutOfDisk") + } + } else { + if nodeOODCondition.Status != api.ConditionFalse { + // Update the out of disk condition when the condition status is unknown even if we + // are within the outOfDiskTransitionFrequency duration. We do this to set the + // condition status correctly at kubelet startup. + if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency { + nodeOODCondition.Status = api.ConditionFalse + nodeOODCondition.Reason = "KubeletHasSufficientDisk" + nodeOODCondition.Message = "kubelet has sufficient disk space available" + nodeOODCondition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk") + } else { + glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency) + } + } + } + + if newOODCondition { + node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition) + } +} + +// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() +// TODO: why is this a package var? +var oldNodeUnschedulable bool + +// record if node schedulable change. +func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) { + if oldNodeUnschedulable != node.Spec.Unschedulable { + if node.Spec.Unschedulable { + kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotSchedulable) + } else { + kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeSchedulable) + } + oldNodeUnschedulable = node.Spec.Unschedulable + } +} + +// Update VolumesInUse field in Node Status +func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) { + node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse() +} + +// setNodeStatus fills in the Status fields of the given Node, overwriting +// any fields that are currently set. +// TODO(madhusudancs): Simplify the logic for setting node conditions and +// refactor the node status condtion code out to a different file. +func (kl *Kubelet) setNodeStatus(node *api.Node) error { + for _, f := range kl.setNodeStatusFuncs { + if err := f(node); err != nil { + return err + } + } + return nil +} + +// defaultNodeStatusFuncs is a factory that generates the default set of +// setNodeStatus funcs +func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { + // initial set of node status update handlers, can be modified by Option's + withoutError := func(f func(*api.Node)) func(*api.Node) error { + return func(n *api.Node) error { + f(n) + return nil + } + } + return []func(*api.Node) error{ + kl.setNodeAddress, + withoutError(kl.setNodeStatusInfo), + withoutError(kl.setNodeOODCondition), + withoutError(kl.setNodeMemoryPressureCondition), + withoutError(kl.setNodeReadyCondition), + withoutError(kl.setNodeVolumesInUseStatus), + withoutError(kl.recordNodeSchedulableEvent), + } +} + +// SetNodeStatus returns a functional Option that adds the given node status +// update handler to the Kubelet +func SetNodeStatus(f func(*api.Node) error) Option { + return func(k *Kubelet) { + k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f) + } +} diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go new file mode 100644 index 00000000000..c06551d711c --- /dev/null +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -0,0 +1,860 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "reflect" + goruntime "runtime" + "sort" + "strconv" + "testing" + "time" + + cadvisorapi "github.com/google/cadvisor/info/v1" + cadvisorapiv2 "github.com/google/cadvisor/info/v2" + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/diff" + "k8s.io/kubernetes/pkg/util/rand" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/version" +) + +// generateTestingImageList generate randomly generated image list and corresponding expectedImageList. +func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) { + // imageList is randomly generated image list + var imageList []kubecontainer.Image + for ; count > 0; count-- { + imageItem := kubecontainer.Image{ + ID: string(util.NewUUID()), + RepoTags: generateImageTags(), + Size: rand.Int63nRange(minImgSize, maxImgSize+1), + } + imageList = append(imageList, imageItem) + } + + // expectedImageList is generated by imageList according to size and maxImagesInNodeStatus + // 1. sort the imageList by size + sort.Sort(byImageSize(imageList)) + // 2. convert sorted imageList to api.ContainerImage list + var expectedImageList []api.ContainerImage + for _, kubeImage := range imageList { + apiImage := api.ContainerImage{ + Names: kubeImage.RepoTags, + SizeBytes: kubeImage.Size, + } + + expectedImageList = append(expectedImageList, apiImage) + } + // 3. only returns the top maxImagesInNodeStatus images in expectedImageList + return imageList, expectedImageList[0:maxImagesInNodeStatus] +} + +func generateImageTags() []string { + var tagList []string + count := rand.IntnRange(1, maxImageTagsForTest+1) + for ; count > 0; count-- { + tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count)) + } + return tagList +} + +func TestUpdateNewNodeStatus(t *testing.T) { + // generate one more than maxImagesInNodeStatus in inputImageList + inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1) + testKubelet := newTestKubeletWithImageList( + t, inputImageList, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 10E9, // 10G + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + // Make kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OSImage: "Debian GNU/Linux 7 (wheezy)", + OperatingSystem: goruntime.GOOS, + Architecture: goruntime.GOARCH, + ContainerRuntimeVersion: "test://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + Images: expectedImageList, + }, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + for i, cond := range updatedNode.Status.Conditions { + if cond.LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type) + } + if cond.LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type) + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + } + + // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 + if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { + t.Errorf("unexpected node condition order. NodeReady should be last.") + } + + if maxImagesInNodeStatus != len(updatedNode.Status.Images) { + t.Errorf("unexpected image list length in node status, expected: %v, got: %v", maxImagesInNodeStatus, len(updatedNode.Status.Images)) + } else { + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode)) + } + } + +} + +func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + // Make Kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + kubelet.outOfDiskTransitionFrequency = 10 * time.Second + + expectedNodeOutOfDiskCondition := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + + var oodCondition api.NodeCondition + for i, cond := range updatedNode.Status.Conditions { + if cond.LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type) + } + if cond.LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type) + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + if cond.Type == api.NodeOutOfDisk { + oodCondition = updatedNode.Status.Conditions[i] + } + } + + if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) { + t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition)) + } +} + +func TestUpdateExistingNodeStatus(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + }, + }, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 20E9, + } + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + // Make kubelet report that it is out of disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.Time{}, // placeholder + LastTransitionTime: unversioned.Time{}, // placeholder + }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.Time{}, // placeholder + LastTransitionTime: unversioned.Time{}, // placeholder + }, + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OSImage: "Debian GNU/Linux 7 (wheezy)", + OperatingSystem: goruntime.GOOS, + Architecture: goruntime.GOARCH, + ContainerRuntimeVersion: "test://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + // images will be sorted from max to min in node status. + Images: []api.ContainerImage{ + { + Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, + SizeBytes: 456, + }, + { + Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, + SizeBytes: 123, + }, + }, + }, + } + + kubelet.updateRuntimeUp() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v", actions) + } + updateAction, ok := actions[1].(core.UpdateAction) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + updatedNode, ok := updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected object type") + } + for i, cond := range updatedNode.Status.Conditions { + // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. + if old := unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; reflect.DeepEqual(cond.LastHeartbeatTime.Rfc3339Copy().UTC(), old) { + t.Errorf("Condition %v LastProbeTime: expected \n%v\n, got \n%v", cond.Type, unversioned.Now(), old) + } + if got, want := cond.LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; !reflect.DeepEqual(got, want) { + t.Errorf("Condition %v LastTransitionTime: expected \n%#v\n, got \n%#v", cond.Type, want, got) + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + } + + // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 + if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { + t.Errorf("unexpected node condition order. NodeReady should be last.") + } + + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { + t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) + } +} + +func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + clock := testKubelet.fakeClock + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: unversioned.NewTime(clock.Now()), + LastTransitionTime: unversioned.NewTime(clock.Now()), + }, + { + + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.NewTime(clock.Now()), + LastTransitionTime: unversioned.NewTime(clock.Now()), + }, + }, + }, + }, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor.On("Start").Return(nil) + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + kubelet.outOfDiskTransitionFrequency = 5 * time.Second + + ood := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder + LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder + } + noOod := api.NodeCondition{ + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder + LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder + } + + testCases := []struct { + rootFsAvail uint64 + dockerFsAvail uint64 + expected api.NodeCondition + }{ + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==true + rootFsAvail: 50, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: ood, + }, + { + // NodeOutOfDisk==true + rootFsAvail: 200, + dockerFsAvail: 50, + expected: ood, + }, + { + // NodeOutOfDisk==false + rootFsAvail: 200, + dockerFsAvail: 200, + expected: noOod, + }, + } + + kubelet.updateRuntimeUp() + for tcIdx, tc := range testCases { + // Step by a second + clock.Step(1 * time.Second) + + // Setup expected times. + tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now()) + // In the last case, there should be a status transition for NodeOutOfDisk + if tcIdx == len(testCases)-1 { + tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now()) + } + + // Make kubelet report that it has sufficient disk space + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Errorf("%d. unexpected actions: %v", tcIdx, actions) + } + updateAction, ok := actions[1].(core.UpdateAction) + if !ok { + t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1]) + } + updatedNode, ok := updateAction.GetObject().(*api.Node) + if !ok { + t.Errorf("%d. unexpected object type", tcIdx) + } + kubeClient.ClearActions() + + var oodCondition api.NodeCondition + for i, cond := range updatedNode.Status.Conditions { + if cond.Type == api.NodeOutOfDisk { + oodCondition = updatedNode.Status.Conditions[i] + } + } + + if !reflect.DeepEqual(tc.expected, oodCondition) { + t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition) + } + } +} + +func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + clock := testKubelet.fakeClock + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, + }}).ReactionChain + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("Start").Return(nil) + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 10E9, + } + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + // Make kubelet report that it has sufficient disk space. + if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { + t.Fatalf("can't update disk space manager: %v", err) + } + + expectedNode := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeOutOfDisk, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: "kubelet has sufficient disk space available", + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, + {}, //placeholder + }, + NodeInfo: api.NodeSystemInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + KernelVersion: "3.16.0-0.bpo.4-amd64", + OSImage: "Debian GNU/Linux 7 (wheezy)", + OperatingSystem: goruntime.GOOS, + Architecture: goruntime.GOARCH, + ContainerRuntimeVersion: "test://1.5.0", + KubeletVersion: version.Get().String(), + KubeProxyVersion: version.Get().String(), + }, + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, + {Type: api.NodeInternalIP, Address: "127.0.0.1"}, + }, + Images: []api.ContainerImage{ + { + Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, + SizeBytes: 456, + }, + { + Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, + SizeBytes: 123, + }, + }, + }, + } + + checkNodeStatus := func(status api.ConditionStatus, reason, message string) { + kubeClient.ClearActions() + if err := kubelet.updateNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + actions := kubeClient.Actions() + if len(actions) != 2 { + t.Fatalf("unexpected actions: %v", actions) + } + if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + t.Fatalf("unexpected actions: %v", actions) + } + updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) + if !ok { + t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + } + + for i, cond := range updatedNode.Status.Conditions { + if cond.LastHeartbeatTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + if cond.LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} + updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} + } + + // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 + lastIndex := len(updatedNode.Status.Conditions) - 1 + if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady { + t.Errorf("unexpected node condition order. NodeReady should be last.") + } + expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{ + Type: api.NodeReady, + Status: status, + Reason: reason, + Message: message, + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + } + if !api.Semantic.DeepEqual(expectedNode, updatedNode) { + t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode)) + } + } + + readyMessage := "kubelet is posting ready status" + downMessage := "container runtime is down" + + // Should report kubelet not ready if the runtime check is out of date + clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) + + // Should report kubelet ready if the runtime check is updated + clock.SetTime(time.Now()) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage) + + // Should report kubelet not ready if the runtime check is out of date + clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) + + // Should report kubelet not ready if the runtime check failed + fakeRuntime := testKubelet.fakeRuntime + // Inject error into fake runtime status check, node should be NotReady + fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") + clock.SetTime(time.Now()) + kubelet.updateRuntimeUp() + checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) +} + +func TestUpdateNodeStatusError(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + // No matching node for the kubelet + testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{}}).ReactionChain + + if err := kubelet.updateNodeStatus(); err == nil { + t.Errorf("unexpected non error: %v", err) + } + if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry { + t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions()) + } +} + +func TestRegisterExistingNodeWithApiserver(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + kubeClient := testKubelet.fakeKubeClient + kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { + // Return an error on create. + return true, &api.Node{}, &apierrors.StatusError{ + ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists}, + } + }) + kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + // Return an existing (matching) node on get. + return true, &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Spec: api.NodeSpec{ExternalID: testKubeletHostname}, + }, nil + }) + kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + machineInfo := &cadvisorapi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + mockCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{ + Usage: 400 * mb, + Capacity: 1000 * mb, + Available: 600 * mb, + }, nil) + mockCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{ + Usage: 9 * mb, + Capacity: 10 * mb, + }, nil) + + done := make(chan struct{}) + go func() { + kubelet.registerWithApiserver() + done <- struct{}{} + }() + select { + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for registration") + case <-done: + return + } +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 543ca45b60f..1e9f15d7a67 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -24,9 +24,7 @@ import ( "net" "os" "reflect" - goruntime "runtime" "sort" - "strconv" "testing" "time" @@ -34,7 +32,6 @@ import ( cadvisorapiv2 "github.com/google/cadvisor/info/v2" "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" @@ -68,12 +65,10 @@ import ( "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/util/rand" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" volumetest "k8s.io/kubernetes/pkg/volume/testing" @@ -126,45 +121,6 @@ func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubel return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled) } -// generateTestingImageList generate randomly generated image list and corresponding expectedImageList. -func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) { - // imageList is randomly generated image list - var imageList []kubecontainer.Image - for ; count > 0; count-- { - imageItem := kubecontainer.Image{ - ID: string(util.NewUUID()), - RepoTags: generateImageTags(), - Size: rand.Int63nRange(minImgSize, maxImgSize+1), - } - imageList = append(imageList, imageItem) - } - - // expectedImageList is generated by imageList according to size and maxImagesInNodeStatus - // 1. sort the imageList by size - sort.Sort(byImageSize(imageList)) - // 2. convert sorted imageList to api.ContainerImage list - var expectedImageList []api.ContainerImage - for _, kubeImage := range imageList { - apiImage := api.ContainerImage{ - Names: kubeImage.RepoTags, - SizeBytes: kubeImage.Size, - } - - expectedImageList = append(expectedImageList, apiImage) - } - // 3. only returns the top maxImagesInNodeStatus images in expectedImageList - return imageList, expectedImageList[0:maxImagesInNodeStatus] -} - -func generateImageTags() []string { - var tagList []string - count := rand.IntnRange(1, maxImageTagsForTest+1) - for ; count > 0; count-- { - tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count)) - } - return tagList -} - func newTestKubeletWithImageList( t *testing.T, imageList []kubecontainer.Image, @@ -2597,726 +2553,6 @@ func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisortest.Mock, ro return nil } -func TestUpdateNewNodeStatus(t *testing.T) { - // generate one more than maxImagesInNodeStatus in inputImageList - inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1) - testKubelet := newTestKubeletWithImageList( - t, inputImageList, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 10E9, // 10G - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - // Make kubelet report that it has sufficient disk space. - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - { - Type: api.NodeMemoryPressure, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientMemory", - Message: fmt.Sprintf("kubelet has sufficient memory available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OSImage: "Debian GNU/Linux 7 (wheezy)", - OperatingSystem: goruntime.GOOS, - Architecture: goruntime.GOARCH, - ContainerRuntimeVersion: "test://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - Images: expectedImageList, - }, - } - - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - for i, cond := range updatedNode.Status.Conditions { - if cond.LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type) - } - if cond.LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type) - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - } - - // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { - t.Errorf("unexpected node condition order. NodeReady should be last.") - } - - if maxImagesInNodeStatus != len(updatedNode.Status.Images) { - t.Errorf("unexpected image list length in node status, expected: %v, got: %v", maxImagesInNodeStatus, len(updatedNode.Status.Images)) - } else { - if !api.Semantic.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode)) - } - } - -} - -func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - // Make Kubelet report that it has sufficient disk space. - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - kubelet.outOfDiskTransitionFrequency = 10 * time.Second - - expectedNodeOutOfDiskCondition := api.NodeCondition{ - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - } - - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - - var oodCondition api.NodeCondition - for i, cond := range updatedNode.Status.Conditions { - if cond.LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type) - } - if cond.LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type) - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - if cond.Type == api.NodeOutOfDisk { - oodCondition = updatedNode.Status.Conditions[i] - } - } - - if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) { - t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition)) - } -} - -func TestUpdateExistingNodeStatus(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: api.NodeMemoryPressure, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientMemory", - Message: fmt.Sprintf("kubelet has sufficient memory available"), - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - }, - }, - }, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 20E9, - } - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - // Make kubelet report that it is out of disk space. - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: unversioned.Time{}, // placeholder - LastTransitionTime: unversioned.Time{}, // placeholder - }, - { - Type: api.NodeMemoryPressure, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientMemory", - Message: fmt.Sprintf("kubelet has sufficient memory available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.Time{}, // placeholder - LastTransitionTime: unversioned.Time{}, // placeholder - }, - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OSImage: "Debian GNU/Linux 7 (wheezy)", - OperatingSystem: goruntime.GOOS, - Architecture: goruntime.GOARCH, - ContainerRuntimeVersion: "test://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - // images will be sorted from max to min in node status. - Images: []api.ContainerImage{ - { - Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, - SizeBytes: 456, - }, - { - Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, - SizeBytes: 123, - }, - }, - }, - } - - kubelet.updateRuntimeUp() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Errorf("unexpected actions: %v", actions) - } - updateAction, ok := actions[1].(core.UpdateAction) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - updatedNode, ok := updateAction.GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected object type") - } - for i, cond := range updatedNode.Status.Conditions { - // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. - if old := unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; reflect.DeepEqual(cond.LastHeartbeatTime.Rfc3339Copy().UTC(), old) { - t.Errorf("Condition %v LastProbeTime: expected \n%v\n, got \n%v", cond.Type, unversioned.Now(), old) - } - if got, want := cond.LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; !reflect.DeepEqual(got, want) { - t.Errorf("Condition %v LastTransitionTime: expected \n%#v\n, got \n%#v", cond.Type, want, got) - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - } - - // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { - t.Errorf("unexpected node condition order. NodeReady should be last.") - } - - if !api.Semantic.DeepEqual(expectedNode, updatedNode) { - t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) - } -} - -func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - clock := testKubelet.fakeClock - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: unversioned.NewTime(clock.Now()), - LastTransitionTime: unversioned.NewTime(clock.Now()), - }, - { - - Type: api.NodeOutOfDisk, - Status: api.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: unversioned.NewTime(clock.Now()), - LastTransitionTime: unversioned.NewTime(clock.Now()), - }, - }, - }, - }, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor.On("Start").Return(nil) - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - kubelet.outOfDiskTransitionFrequency = 5 * time.Second - - ood := api.NodeCondition{ - Type: api.NodeOutOfDisk, - Status: api.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder - LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder - } - noOod := api.NodeCondition{ - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder - LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder - } - - testCases := []struct { - rootFsAvail uint64 - dockerFsAvail uint64 - expected api.NodeCondition - }{ - { - // NodeOutOfDisk==false - rootFsAvail: 200, - dockerFsAvail: 200, - expected: ood, - }, - { - // NodeOutOfDisk==true - rootFsAvail: 50, - dockerFsAvail: 200, - expected: ood, - }, - { - // NodeOutOfDisk==false - rootFsAvail: 200, - dockerFsAvail: 200, - expected: ood, - }, - { - // NodeOutOfDisk==true - rootFsAvail: 200, - dockerFsAvail: 50, - expected: ood, - }, - { - // NodeOutOfDisk==false - rootFsAvail: 200, - dockerFsAvail: 200, - expected: noOod, - }, - } - - kubelet.updateRuntimeUp() - for tcIdx, tc := range testCases { - // Step by a second - clock.Step(1 * time.Second) - - // Setup expected times. - tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now()) - // In the last case, there should be a status transition for NodeOutOfDisk - if tcIdx == len(testCases)-1 { - tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now()) - } - - // Make kubelet report that it has sufficient disk space - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Errorf("%d. unexpected actions: %v", tcIdx, actions) - } - updateAction, ok := actions[1].(core.UpdateAction) - if !ok { - t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1]) - } - updatedNode, ok := updateAction.GetObject().(*api.Node) - if !ok { - t.Errorf("%d. unexpected object type", tcIdx) - } - kubeClient.ClearActions() - - var oodCondition api.NodeCondition - for i, cond := range updatedNode.Status.Conditions { - if cond.Type == api.NodeOutOfDisk { - oodCondition = updatedNode.Status.Conditions[i] - } - } - - if !reflect.DeepEqual(tc.expected, oodCondition) { - t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition) - } - } -} - -func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - clock := testKubelet.fakeClock - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("Start").Return(nil) - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 10E9, - } - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - // Make kubelet report that it has sufficient disk space. - if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { - t.Fatalf("can't update disk space manager: %v", err) - } - - expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeOutOfDisk, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: "kubelet has sufficient disk space available", - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - { - Type: api.NodeMemoryPressure, - Status: api.ConditionFalse, - Reason: "KubeletHasSufficientMemory", - Message: fmt.Sprintf("kubelet has sufficient memory available"), - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - }, - {}, //placeholder - }, - NodeInfo: api.NodeSystemInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - KernelVersion: "3.16.0-0.bpo.4-amd64", - OSImage: "Debian GNU/Linux 7 (wheezy)", - OperatingSystem: goruntime.GOOS, - Architecture: goruntime.GOARCH, - ContainerRuntimeVersion: "test://1.5.0", - KubeletVersion: version.Get().String(), - KubeProxyVersion: version.Get().String(), - }, - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Allocatable: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI), - api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), - api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI), - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, - {Type: api.NodeInternalIP, Address: "127.0.0.1"}, - }, - Images: []api.ContainerImage{ - { - Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, - SizeBytes: 456, - }, - { - Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, - SizeBytes: 123, - }, - }, - }, - } - - checkNodeStatus := func(status api.ConditionStatus, reason, message string) { - kubeClient.ClearActions() - if err := kubelet.updateNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - actions := kubeClient.Actions() - if len(actions) != 2 { - t.Fatalf("unexpected actions: %v", actions) - } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { - t.Fatalf("unexpected actions: %v", actions) - } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) - } - - for i, cond := range updatedNode.Status.Conditions { - if cond.LastHeartbeatTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - if cond.LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} - updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} - } - - // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - lastIndex := len(updatedNode.Status.Conditions) - 1 - if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady { - t.Errorf("unexpected node condition order. NodeReady should be last.") - } - expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{ - Type: api.NodeReady, - Status: status, - Reason: reason, - Message: message, - LastHeartbeatTime: unversioned.Time{}, - LastTransitionTime: unversioned.Time{}, - } - if !api.Semantic.DeepEqual(expectedNode, updatedNode) { - t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode)) - } - } - - readyMessage := "kubelet is posting ready status" - downMessage := "container runtime is down" - - // Should report kubelet not ready if the runtime check is out of date - clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) - kubelet.updateRuntimeUp() - checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) - - // Should report kubelet ready if the runtime check is updated - clock.SetTime(time.Now()) - kubelet.updateRuntimeUp() - checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage) - - // Should report kubelet not ready if the runtime check is out of date - clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) - kubelet.updateRuntimeUp() - checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) - - // Should report kubelet not ready if the runtime check failed - fakeRuntime := testKubelet.fakeRuntime - // Inject error into fake runtime status check, node should be NotReady - fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") - clock.SetTime(time.Now()) - kubelet.updateRuntimeUp() - checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) -} - -func TestUpdateNodeStatusError(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - // No matching node for the kubelet - testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{}}).ReactionChain - - if err := kubelet.updateNodeStatus(); err == nil { - t.Errorf("unexpected non error: %v", err) - } - if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry { - t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions()) - } -} - func TestCreateMirrorPod(t *testing.T) { for _, updateType := range []kubetypes.SyncPodType{kubetypes.SyncPodCreate, kubetypes.SyncPodUpdate} { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) @@ -3623,7 +2859,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) { } } -func TestPrivilegeContainerDisallowed(t *testing.T) { +func TestPrivilegedContainerDisallowed(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.fakeCadvisor.On("VersionInfo").Return(&cadvisorapi.VersionInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) @@ -3668,64 +2904,6 @@ func TestFilterOutTerminatedPods(t *testing.T) { } } -func TestRegisterExistingNodeWithApiserver(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - kubeClient := testKubelet.fakeKubeClient - kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) { - // Return an error on create. - return true, &api.Node{}, &apierrors.StatusError{ - ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists}, - } - }) - kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - // Return an existing (matching) node on get. - return true, &api.Node{ - ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, - Spec: api.NodeSpec{ExternalID: testKubeletHostname}, - }, nil - }) - kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - return true, nil, fmt.Errorf("no reaction implemented for %s", action) - }) - machineInfo := &cadvisorapi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorapi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - mockCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{ - Usage: 400 * mb, - Capacity: 1000 * mb, - Available: 600 * mb, - }, nil) - mockCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{ - Usage: 9 * mb, - Capacity: 10 * mb, - }, nil) - - done := make(chan struct{}) - go func() { - kubelet.registerWithApiserver() - done <- struct{}{} - }() - select { - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("timed out waiting for registration") - case <-done: - return - } -} - func TestMakePortMappings(t *testing.T) { port := func(name string, protocol api.Protocol, containerPort, hostPort int32, ip string) api.ContainerPort { return api.ContainerPort{ diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 41a469366ed..0ddb76a86c5 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -18,6 +18,7 @@ package kubelet import ( "fmt" + "os" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/capabilities" @@ -118,3 +119,12 @@ func allowHostIPC(pod *api.Pod) (bool, error) { } return false, nil } + +// dirExists returns true if the path exists and represents a directory. +func dirExists(path string) bool { + s, err := os.Stat(path) + if err != nil { + return false + } + return s.IsDir() +}