mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-01 02:38:12 +00:00 
			
		
		
		
	Add Status in runtime interface and use it for runtime health check.
This commit is contained in:
		| @@ -57,7 +57,10 @@ type Runtime interface { | ||||
| 	Version() (Version, error) | ||||
| 	// APIVersion returns the API version information of the container | ||||
| 	// runtime. This may be different from the runtime engine's version. | ||||
| 	// TODO(random-liu): We should fold this into Version() | ||||
| 	APIVersion() (Version, error) | ||||
| 	// Status returns error if the runtime is unhealthy; nil otherwise. | ||||
| 	Status() error | ||||
| 	// GetPods returns a list containers group by pods. The boolean parameter | ||||
| 	// specifies whether the runtime returns all containers including those already | ||||
| 	// exited and dead containers (used for garbage collection). | ||||
|   | ||||
| @@ -48,6 +48,7 @@ type FakeRuntime struct { | ||||
| 	RuntimeType       string | ||||
| 	Err               error | ||||
| 	InspectErr        error | ||||
| 	StatusErr         error | ||||
| } | ||||
|  | ||||
| // FakeRuntime should implement Runtime. | ||||
| @@ -108,6 +109,7 @@ func (f *FakeRuntime) ClearCalls() { | ||||
| 	f.RuntimeType = "" | ||||
| 	f.Err = nil | ||||
| 	f.InspectErr = nil | ||||
| 	f.StatusErr = nil | ||||
| } | ||||
|  | ||||
| func (f *FakeRuntime) assertList(expect []string, test []string) error { | ||||
| @@ -168,6 +170,14 @@ func (f *FakeRuntime) APIVersion() (Version, error) { | ||||
| 	return &FakeVersion{Version: f.APIVersionInfo}, f.Err | ||||
| } | ||||
|  | ||||
| func (f *FakeRuntime) Status() error { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
|  | ||||
| 	f.CalledFunctions = append(f.CalledFunctions, "Status") | ||||
| 	return f.StatusErr | ||||
| } | ||||
|  | ||||
| func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
|   | ||||
| @@ -53,6 +53,11 @@ func (r *Mock) APIVersion() (Version, error) { | ||||
| 	return args.Get(0).(Version), args.Error(1) | ||||
| } | ||||
|  | ||||
| func (r *Mock) Status() error { | ||||
| 	args := r.Called() | ||||
| 	return args.Error(0) | ||||
| } | ||||
|  | ||||
| func (r *Mock) GetPods(all bool) ([]*Pod, error) { | ||||
| 	args := r.Called(all) | ||||
| 	return args.Get(0).([]*Pod), args.Error(1) | ||||
|   | ||||
| @@ -68,6 +68,26 @@ func NewFakeDockerClientWithVersion(version, apiVersion string) *FakeDockerClien | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) InjectError(fn string, err error) { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| 	f.Errors[fn] = err | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) InjectErrors(errs map[string]error) { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| 	for fn, err := range errs { | ||||
| 		f.Errors[fn] = err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) ClearErrors() { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| 	f.Errors = map[string]error{} | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) ClearCalls() { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| @@ -382,7 +402,7 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) Version() (*docker.Env, error) { | ||||
| 	return &f.VersionInfo, nil | ||||
| 	return &f.VersionInfo, f.popError("version") | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerClient) Info() (*docker.Env, error) { | ||||
|   | ||||
| @@ -60,7 +60,7 @@ import ( | ||||
| const ( | ||||
| 	DockerType = "docker" | ||||
|  | ||||
| 	MinimumDockerAPIVersion = "1.18" | ||||
| 	minimumDockerAPIVersion = "1.18" | ||||
|  | ||||
| 	// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) | ||||
| 	// we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. | ||||
| @@ -941,6 +941,30 @@ func (dm *DockerManager) APIVersion() (kubecontainer.Version, error) { | ||||
| 	return dockerAPIVersion(version), nil | ||||
| } | ||||
|  | ||||
| // Status returns error if docker daemon is unhealthy, nil otherwise. | ||||
| // Now we do this by checking whether: | ||||
| // 1) `docker version` works | ||||
| // 2) docker version is compatible with minimum requirement | ||||
| func (dm *DockerManager) Status() error { | ||||
| 	return dm.checkVersionCompatibility() | ||||
| } | ||||
|  | ||||
| func (dm *DockerManager) checkVersionCompatibility() error { | ||||
| 	version, err := dm.APIVersion() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	// Verify the docker version. | ||||
| 	result, err := version.Compare(minimumDockerAPIVersion) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, minimumDockerAPIVersion, err) | ||||
| 	} | ||||
| 	if result < 0 { | ||||
| 		return fmt.Errorf("container runtime version is older than %s", minimumDockerAPIVersion) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // The first version of docker that supports exec natively is 1.3.0 == API 1.15 | ||||
| var dockerAPIVersionWithExec = "1.15" | ||||
|  | ||||
|   | ||||
| @@ -546,7 +546,7 @@ func TestKillContainerInPodWithError(t *testing.T) { | ||||
| 		}, | ||||
| 	} | ||||
| 	fakeDocker.SetFakeRunningContainers(containers) | ||||
| 	fakeDocker.Errors["stop"] = fmt.Errorf("sample error") | ||||
| 	fakeDocker.InjectError("stop", fmt.Errorf("sample error")) | ||||
|  | ||||
| 	if err := manager.KillContainerInPod(kubecontainer.ContainerID{}, &pod.Spec.Containers[0], pod, "test kill container with error."); err == nil { | ||||
| 		t.Errorf("expected error, found nil") | ||||
| @@ -1744,7 +1744,7 @@ func TestSyncPodWithFailure(t *testing.T) { | ||||
| 			ID:   "9876", | ||||
| 			Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_0", | ||||
| 		}}) | ||||
| 		fakeDocker.Errors = test.dockerError | ||||
| 		fakeDocker.InjectErrors(test.dockerError) | ||||
| 		puller.ErrorsToInject = test.pullerError | ||||
| 		pod.Spec.Containers = []api.Container{test.container} | ||||
| 		result := runSyncPod(t, dm, fakeDocker, pod, nil, true) | ||||
| @@ -1865,3 +1865,44 @@ func TestSecurityOptsAreNilWithDockerV19(t *testing.T) { | ||||
| 	} | ||||
| 	assert.NotContains(t, newContainer.HostConfig.SecurityOpt, "seccomp:unconfined", "Pods with Docker versions < 1.10 must not have seccomp disabled by default") | ||||
| } | ||||
|  | ||||
| func TestCheckVersionCompatibility(t *testing.T) { | ||||
| 	apiVersion, err := docker.NewAPIVersion(minimumDockerAPIVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error %v", err) | ||||
| 	} | ||||
| 	type test struct { | ||||
| 		version    string | ||||
| 		compatible bool | ||||
| 	} | ||||
| 	tests := []test{ | ||||
| 		// Minimum apiversion | ||||
| 		{minimumDockerAPIVersion, true}, | ||||
| 		// Invalid apiversion | ||||
| 		{"invalid_api_version", false}, | ||||
| 	} | ||||
| 	for i := range apiVersion { | ||||
| 		apiVersion[i]++ | ||||
| 		// Newer apiversion | ||||
| 		tests = append(tests, test{apiVersion.String(), true}) | ||||
| 		apiVersion[i] -= 2 | ||||
| 		// Older apiversion | ||||
| 		if apiVersion[i] >= 0 { | ||||
| 			tests = append(tests, test{apiVersion.String(), false}) | ||||
| 		} | ||||
| 		apiVersion[i]++ | ||||
| 	} | ||||
|  | ||||
| 	for i, tt := range tests { | ||||
| 		testCase := fmt.Sprintf("test case #%d test version %q", i, tt.version) | ||||
| 		dm, fakeDocker := newTestDockerManagerWithHTTPClientWithVersion(&fakeHTTP{}, "", tt.version) | ||||
| 		err := dm.checkVersionCompatibility() | ||||
| 		assert.Equal(t, tt.compatible, err == nil, testCase) | ||||
| 		if tt.compatible == true { | ||||
| 			// Get docker version error | ||||
| 			fakeDocker.InjectError("version", fmt.Errorf("injected version error")) | ||||
| 			err := dm.checkVersionCompatibility() | ||||
| 			assert.NotNil(t, err, testCase+" version error check") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -457,7 +457,7 @@ func NewMainKubelet( | ||||
| 	} | ||||
|  | ||||
| 	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{}) | ||||
| 	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible) | ||||
| 	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0) | ||||
| 	klet.updatePodCIDR(podCIDR) | ||||
|  | ||||
| 	// setup containerGC | ||||
| @@ -2659,7 +2659,7 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { | ||||
| } | ||||
|  | ||||
| func (kl *Kubelet) updateRuntimeUp() { | ||||
| 	if _, err := kl.containerRuntime.Version(); err != nil { | ||||
| 	if err := kl.containerRuntime.Status(); err != nil { | ||||
| 		glog.Errorf("Container runtime sanity check failed: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| @@ -3076,26 +3076,6 @@ func SetNodeStatus(f func(*api.Node) error) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // FIXME: Why not combine this with container runtime health check? | ||||
| func (kl *Kubelet) isContainerRuntimeVersionCompatible() error { | ||||
| 	switch kl.GetRuntime().Type() { | ||||
| 	case "docker": | ||||
| 		version, err := kl.GetRuntime().APIVersion() | ||||
| 		if err != nil { | ||||
| 			return nil | ||||
| 		} | ||||
| 		// Verify the docker version. | ||||
| 		result, err := version.Compare(dockertools.MinimumDockerAPIVersion) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("failed to compare current docker version %v with minimum support Docker version %q - %v", version, dockertools.MinimumDockerAPIVersion, err) | ||||
| 		} | ||||
| 		if result < 0 { | ||||
| 			return fmt.Errorf("container runtime version is older than %s", dockertools.MinimumDockerAPIVersion) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 | ||||
| // is set, this function will also confirm that cbr0 is configured correctly. | ||||
| func (kl *Kubelet) tryUpdateNodeStatus() error { | ||||
|   | ||||
| @@ -102,7 +102,8 @@ type TestKubelet struct { | ||||
|  | ||||
| func newTestKubelet(t *testing.T) *TestKubelet { | ||||
| 	fakeRuntime := &containertest.FakeRuntime{} | ||||
| 	fakeRuntime.VersionInfo = "1.15" | ||||
| 	fakeRuntime.RuntimeType = "test" | ||||
| 	fakeRuntime.VersionInfo = "1.5.0" | ||||
| 	fakeRuntime.ImageList = []kubecontainer.Image{ | ||||
| 		{ | ||||
| 			ID:       "abc", | ||||
| @@ -123,7 +124,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { | ||||
|  | ||||
| 	kubelet.hostname = testKubeletHostname | ||||
| 	kubelet.nodeName = testKubeletHostname | ||||
| 	kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, func() error { return nil }) | ||||
| 	kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false) | ||||
| 	kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil)) | ||||
| 	if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { | ||||
| 		t.Fatalf("can't make a temp rootdir: %v", err) | ||||
| @@ -2654,9 +2655,6 @@ func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisortest.Mock, ro | ||||
| func TestUpdateNewNodeStatus(t *testing.T) { | ||||
| 	testKubelet := newTestKubelet(t) | ||||
| 	kubelet := testKubelet.kubelet | ||||
| 	fakeRuntime := testKubelet.fakeRuntime | ||||
| 	fakeRuntime.RuntimeType = "docker" | ||||
| 	fakeRuntime.VersionInfo = "1.5.0" | ||||
| 	kubeClient := testKubelet.fakeKubeClient | ||||
| 	kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ | ||||
| 		{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, | ||||
| @@ -2710,7 +2708,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { | ||||
| 				BootID:                  "1b3", | ||||
| 				KernelVersion:           "3.16.0-0.bpo.4-amd64", | ||||
| 				OSImage:                 "Debian GNU/Linux 7 (wheezy)", | ||||
| 				ContainerRuntimeVersion: "docker://1.5.0", | ||||
| 				ContainerRuntimeVersion: "test://1.5.0", | ||||
| 				KubeletVersion:          version.Get().String(), | ||||
| 				KubeProxyVersion:        version.Get().String(), | ||||
| 			}, | ||||
| @@ -2852,197 +2850,9 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDockerRuntimeVersion(t *testing.T) { | ||||
| 	testKubelet := newTestKubelet(t) | ||||
| 	kubelet := testKubelet.kubelet | ||||
| 	fakeRuntime := testKubelet.fakeRuntime | ||||
| 	fakeRuntime.RuntimeType = "docker" | ||||
| 	fakeRuntime.VersionInfo = "1.10.0-rc1-fc24" | ||||
| 	fakeRuntime.APIVersionInfo = "1.22" | ||||
| 	kubeClient := testKubelet.fakeKubeClient | ||||
| 	kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ | ||||
| 		{ | ||||
| 			ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, | ||||
| 			Spec:       api.NodeSpec{}, | ||||
| 			Status: api.NodeStatus{ | ||||
| 				Conditions: []api.NodeCondition{ | ||||
| 					{ | ||||
| 						Type:               api.NodeOutOfDisk, | ||||
| 						Status:             api.ConditionFalse, | ||||
| 						Reason:             "KubeletHasSufficientDisk", | ||||
| 						Message:            fmt.Sprintf("kubelet has sufficient disk space available"), | ||||
| 						LastHeartbeatTime:  unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), | ||||
| 						LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), | ||||
| 					}, | ||||
| 					{ | ||||
| 						Type:               api.NodeReady, | ||||
| 						Status:             api.ConditionTrue, | ||||
| 						Reason:             "KubeletReady", | ||||
| 						Message:            fmt.Sprintf("kubelet is posting ready status"), | ||||
| 						LastHeartbeatTime:  unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), | ||||
| 						LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), | ||||
| 					}, | ||||
| 				}, | ||||
| 				Capacity: api.ResourceList{ | ||||
| 					api.ResourceCPU:    *resource.NewMilliQuantity(3000, resource.DecimalSI), | ||||
| 					api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), | ||||
| 					api.ResourcePods:   *resource.NewQuantity(0, resource.DecimalSI), | ||||
| 				}, | ||||
| 				Allocatable: api.ResourceList{ | ||||
| 					api.ResourceCPU:    *resource.NewMilliQuantity(2800, resource.DecimalSI), | ||||
| 					api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), | ||||
| 					api.ResourcePods:   *resource.NewQuantity(0, resource.DecimalSI), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}}).ReactionChain | ||||
| 	mockCadvisor := testKubelet.fakeCadvisor | ||||
| 	mockCadvisor.On("Start").Return(nil) | ||||
| 	machineInfo := &cadvisorapi.MachineInfo{ | ||||
| 		MachineID:      "123", | ||||
| 		SystemUUID:     "abc", | ||||
| 		BootID:         "1b3", | ||||
| 		NumCores:       2, | ||||
| 		MemoryCapacity: 20E9, | ||||
| 	} | ||||
| 	mockCadvisor.On("MachineInfo").Return(machineInfo, nil) | ||||
| 	versionInfo := &cadvisorapi.VersionInfo{ | ||||
| 		KernelVersion:      "3.16.0-0.bpo.4-amd64", | ||||
| 		ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", | ||||
| 	} | ||||
| 	mockCadvisor.On("VersionInfo").Return(versionInfo, nil) | ||||
|  | ||||
| 	// Make kubelet report that it has sufficient disk space. | ||||
| 	if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil { | ||||
| 		t.Fatalf("can't update disk space manager: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	expectedNode := &api.Node{ | ||||
| 		ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, | ||||
| 		Spec:       api.NodeSpec{}, | ||||
| 		Status: api.NodeStatus{ | ||||
| 			Conditions: []api.NodeCondition{ | ||||
| 				{ | ||||
| 					Type:               api.NodeOutOfDisk, | ||||
| 					Status:             api.ConditionFalse, | ||||
| 					Reason:             "KubeletHasSufficientDisk", | ||||
| 					Message:            fmt.Sprintf("kubelet has sufficient disk space available"), | ||||
| 					LastHeartbeatTime:  unversioned.Time{}, | ||||
| 					LastTransitionTime: unversioned.Time{}, | ||||
| 				}, | ||||
| 				{ | ||||
| 					Type:               api.NodeReady, | ||||
| 					Status:             api.ConditionTrue, | ||||
| 					Reason:             "KubeletReady", | ||||
| 					Message:            fmt.Sprintf("kubelet is posting ready status"), | ||||
| 					LastHeartbeatTime:  unversioned.Time{}, | ||||
| 					LastTransitionTime: unversioned.Time{}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			NodeInfo: api.NodeSystemInfo{ | ||||
| 				MachineID:               "123", | ||||
| 				SystemUUID:              "abc", | ||||
| 				BootID:                  "1b3", | ||||
| 				KernelVersion:           "3.16.0-0.bpo.4-amd64", | ||||
| 				OSImage:                 "Debian GNU/Linux 7 (wheezy)", | ||||
| 				ContainerRuntimeVersion: "docker://1.10.0-rc1-fc24", | ||||
| 				KubeletVersion:          version.Get().String(), | ||||
| 				KubeProxyVersion:        version.Get().String(), | ||||
| 			}, | ||||
| 			Capacity: api.ResourceList{ | ||||
| 				api.ResourceCPU:    *resource.NewMilliQuantity(2000, resource.DecimalSI), | ||||
| 				api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), | ||||
| 				api.ResourcePods:   *resource.NewQuantity(0, resource.DecimalSI), | ||||
| 			}, | ||||
| 			Allocatable: api.ResourceList{ | ||||
| 				api.ResourceCPU:    *resource.NewMilliQuantity(1800, resource.DecimalSI), | ||||
| 				api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), | ||||
| 				api.ResourcePods:   *resource.NewQuantity(0, resource.DecimalSI), | ||||
| 			}, | ||||
| 			Addresses: []api.NodeAddress{ | ||||
| 				{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}, | ||||
| 				{Type: api.NodeInternalIP, Address: "127.0.0.1"}, | ||||
| 			}, | ||||
| 			Images: []api.ContainerImage{ | ||||
| 				{ | ||||
| 					Names:     []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"}, | ||||
| 					SizeBytes: 123, | ||||
| 				}, | ||||
| 				{ | ||||
| 					Names:     []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"}, | ||||
| 					SizeBytes: 456, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, kubelet.isContainerRuntimeVersionCompatible) | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	if err := kubelet.updateNodeStatus(); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	actions := kubeClient.Actions() | ||||
| 	if len(actions) != 2 { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) | ||||
| 	if !ok { | ||||
| 		t.Errorf("unexpected object type") | ||||
| 	} | ||||
| 	for i, cond := range updatedNode.Status.Conditions { | ||||
| 		if cond.LastHeartbeatTime.IsZero() { | ||||
| 			t.Errorf("unexpected zero last probe timestamp") | ||||
| 		} | ||||
| 		if cond.LastTransitionTime.IsZero() { | ||||
| 			t.Errorf("unexpected zero last transition timestamp") | ||||
| 		} | ||||
| 		updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} | ||||
| 		updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} | ||||
| 	} | ||||
|  | ||||
| 	// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 | ||||
| 	if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { | ||||
| 		t.Errorf("unexpected node condition order. NodeReady should be last.") | ||||
| 	} | ||||
|  | ||||
| 	if !api.Semantic.DeepEqual(expectedNode, updatedNode) { | ||||
| 		t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) | ||||
| 	} | ||||
|  | ||||
| 	// Downgrade docker version, node should be NotReady | ||||
| 	fakeRuntime.RuntimeType = "docker" | ||||
| 	fakeRuntime.VersionInfo = "1.5.0" | ||||
| 	fakeRuntime.APIVersionInfo = "1.17" | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	if err := kubelet.updateNodeStatus(); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	actions = kubeClient.Actions() | ||||
| 	if len(actions) != 4 { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	updatedNode, ok = actions[3].(testclient.UpdateAction).GetObject().(*api.Node) | ||||
| 	if !ok { | ||||
| 		t.Errorf("unexpected object type") | ||||
| 	} | ||||
| 	if updatedNode.Status.Conditions[1].Reason != "KubeletNotReady" && | ||||
| 		!strings.Contains(updatedNode.Status.Conditions[1].Message, "container runtime version is older than") { | ||||
| 		t.Errorf("unexpect NodeStatus due to container runtime version") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestUpdateExistingNodeStatus(t *testing.T) { | ||||
| 	testKubelet := newTestKubelet(t) | ||||
| 	kubelet := testKubelet.kubelet | ||||
| 	fakeRuntime := testKubelet.fakeRuntime | ||||
| 	fakeRuntime.RuntimeType = "docker" | ||||
| 	fakeRuntime.VersionInfo = "1.5.0" | ||||
| 	kubeClient := testKubelet.fakeKubeClient | ||||
| 	kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ | ||||
| 		{ | ||||
| @@ -3129,7 +2939,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { | ||||
| 				BootID:                  "1b3", | ||||
| 				KernelVersion:           "3.16.0-0.bpo.4-amd64", | ||||
| 				OSImage:                 "Debian GNU/Linux 7 (wheezy)", | ||||
| 				ContainerRuntimeVersion: "docker://1.5.0", | ||||
| 				ContainerRuntimeVersion: "test://1.5.0", | ||||
| 				KubeletVersion:          version.Get().String(), | ||||
| 				KubeProxyVersion:        version.Get().String(), | ||||
| 			}, | ||||
| @@ -3350,13 +3160,11 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { | ||||
| func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { | ||||
| 	testKubelet := newTestKubelet(t) | ||||
| 	kubelet := testKubelet.kubelet | ||||
| 	clock := testKubelet.fakeClock | ||||
| 	kubeClient := testKubelet.fakeKubeClient | ||||
| 	fakeRuntime := testKubelet.fakeRuntime | ||||
| 	fakeRuntime.RuntimeType = "docker" | ||||
| 	fakeRuntime.VersionInfo = "1.5.0" | ||||
| 	kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{ | ||||
| 		{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}}, | ||||
| 	}}).ReactionChain | ||||
| @@ -3394,14 +3202,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { | ||||
| 					LastHeartbeatTime:  unversioned.Time{}, | ||||
| 					LastTransitionTime: unversioned.Time{}, | ||||
| 				}, | ||||
| 				{ | ||||
| 					Type:               api.NodeReady, | ||||
| 					Status:             api.ConditionFalse, | ||||
| 					Reason:             "KubeletNotReady", | ||||
| 					Message:            fmt.Sprintf("container runtime is down"), | ||||
| 					LastHeartbeatTime:  unversioned.Time{}, | ||||
| 					LastTransitionTime: unversioned.Time{}, | ||||
| 				}, | ||||
| 				{}, //placeholder | ||||
| 			}, | ||||
| 			NodeInfo: api.NodeSystemInfo{ | ||||
| 				MachineID:               "123", | ||||
| @@ -3409,7 +3210,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { | ||||
| 				BootID:                  "1b3", | ||||
| 				KernelVersion:           "3.16.0-0.bpo.4-amd64", | ||||
| 				OSImage:                 "Debian GNU/Linux 7 (wheezy)", | ||||
| 				ContainerRuntimeVersion: "docker://1.5.0", | ||||
| 				ContainerRuntimeVersion: "test://1.5.0", | ||||
| 				KubeletVersion:          version.Get().String(), | ||||
| 				KubeProxyVersion:        version.Get().String(), | ||||
| 			}, | ||||
| @@ -3439,42 +3240,77 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	kubelet.runtimeState = newRuntimeState(time.Duration(0), false, func() error { return nil }) | ||||
|  | ||||
| 	checkNodeStatus := func(status api.ConditionStatus, reason, message string) { | ||||
| 		kubeClient.ClearActions() | ||||
| 		if err := kubelet.updateNodeStatus(); err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 		} | ||||
| 		actions := kubeClient.Actions() | ||||
| 		if len(actions) != 2 { | ||||
| 			t.Fatalf("unexpected actions: %v", actions) | ||||
| 		} | ||||
| 		if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { | ||||
| 			t.Fatalf("unexpected actions: %v", actions) | ||||
| 		} | ||||
| 		updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) | ||||
| 		if !ok { | ||||
| 			t.Errorf("unexpected action type.  expected UpdateAction, got %#v", actions[1]) | ||||
| 		} | ||||
|  | ||||
| 		for i, cond := range updatedNode.Status.Conditions { | ||||
| 			if cond.LastHeartbeatTime.IsZero() { | ||||
| 				t.Errorf("unexpected zero last probe timestamp") | ||||
| 			} | ||||
| 			if cond.LastTransitionTime.IsZero() { | ||||
| 				t.Errorf("unexpected zero last transition timestamp") | ||||
| 			} | ||||
| 			updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} | ||||
| 			updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} | ||||
| 		} | ||||
|  | ||||
| 		// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 | ||||
| 		if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { | ||||
| 			t.Errorf("unexpected node condition order. NodeReady should be last.") | ||||
| 		} | ||||
| 		expectedNode.Status.Conditions[1] = api.NodeCondition{ | ||||
| 			Type:               api.NodeReady, | ||||
| 			Status:             status, | ||||
| 			Reason:             reason, | ||||
| 			Message:            message, | ||||
| 			LastHeartbeatTime:  unversioned.Time{}, | ||||
| 			LastTransitionTime: unversioned.Time{}, | ||||
| 		} | ||||
| 		if !api.Semantic.DeepEqual(expectedNode, updatedNode) { | ||||
| 			t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	readyMessage := "kubelet is posting ready status" | ||||
| 	downMessage := "container runtime is down" | ||||
|  | ||||
| 	// Should report kubelet not ready if the runtime check is out of date | ||||
| 	clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	if err := kubelet.updateNodeStatus(); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	actions := kubeClient.Actions() | ||||
| 	if len(actions) != 2 { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { | ||||
| 		t.Fatalf("unexpected actions: %v", actions) | ||||
| 	} | ||||
| 	updatedNode, ok := actions[1].(testclient.UpdateAction).GetObject().(*api.Node) | ||||
| 	if !ok { | ||||
| 		t.Errorf("unexpected action type.  expected UpdateAction, got %#v", actions[1]) | ||||
| 	} | ||||
| 	checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) | ||||
|  | ||||
| 	for i, cond := range updatedNode.Status.Conditions { | ||||
| 		if cond.LastHeartbeatTime.IsZero() { | ||||
| 			t.Errorf("unexpected zero last probe timestamp") | ||||
| 		} | ||||
| 		if cond.LastTransitionTime.IsZero() { | ||||
| 			t.Errorf("unexpected zero last transition timestamp") | ||||
| 		} | ||||
| 		updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{} | ||||
| 		updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{} | ||||
| 	} | ||||
| 	// Should report kubelet ready if the runtime check is updated | ||||
| 	clock.SetTime(time.Now()) | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage) | ||||
|  | ||||
| 	// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 | ||||
| 	if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { | ||||
| 		t.Errorf("unexpected node condition order. NodeReady should be last.") | ||||
| 	} | ||||
| 	// Should report kubelet not ready if the runtime check is out of date | ||||
| 	clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime)) | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) | ||||
|  | ||||
| 	if !api.Semantic.DeepEqual(expectedNode, updatedNode) { | ||||
| 		t.Errorf("unexpected objects: %s", util.ObjectDiff(expectedNode, updatedNode)) | ||||
| 	} | ||||
| 	// Should report kubelet not ready if the runtime check failed | ||||
| 	fakeRuntime := testKubelet.fakeRuntime | ||||
| 	// Inject error into fake runtime status check, node should be NotReady | ||||
| 	fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error") | ||||
| 	clock.SetTime(time.Now()) | ||||
| 	kubelet.updateRuntimeUp() | ||||
| 	checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage) | ||||
| } | ||||
|  | ||||
| func TestUpdateNodeStatusError(t *testing.T) { | ||||
|   | ||||
| @@ -184,13 +184,6 @@ func New(config *Config, | ||||
| 		rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) | ||||
| 	} | ||||
|  | ||||
| 	if err := rkt.checkVersion(minimumRktBinVersion, recommendedRktBinVersion, minimumAppcVersion, minimumRktApiVersion, minimumSystemdVersion); err != nil { | ||||
| 		// TODO(yifan): Latest go-systemd version have the ability to close the | ||||
| 		// dbus connection. However the 'docker/libcontainer' package is using | ||||
| 		// the older go-systemd version, so we can't update the go-systemd version. | ||||
| 		rkt.apisvcConn.Close() | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return rkt, nil | ||||
| } | ||||
|  | ||||
| @@ -1062,7 +1055,12 @@ func (r *Runtime) Version() (kubecontainer.Version, error) { | ||||
| } | ||||
|  | ||||
| func (r *Runtime) APIVersion() (kubecontainer.Version, error) { | ||||
| 	return r.binVersion, nil | ||||
| 	return r.apiVersion, nil | ||||
| } | ||||
|  | ||||
| // Status returns error if rkt is unhealthy, nil otherwise. | ||||
| func (r *Runtime) Status() error { | ||||
| 	return r.checkVersion(minimumRktBinVersion, recommendedRktBinVersion, minimumAppcVersion, minimumRktApiVersion, minimumSystemdVersion) | ||||
| } | ||||
|  | ||||
| // SyncPod syncs the running pod to match the specified desired pod. | ||||
|   | ||||
| @@ -30,7 +30,6 @@ type runtimeState struct { | ||||
| 	internalError            error | ||||
| 	cidr                     string | ||||
| 	initError                error | ||||
| 	runtimeCompatibility     func() error | ||||
| } | ||||
|  | ||||
| func (s *runtimeState) setRuntimeSync(t time.Time) { | ||||
| @@ -85,16 +84,12 @@ func (s *runtimeState) errors() []string { | ||||
| 	if s.internalError != nil { | ||||
| 		ret = append(ret, s.internalError.Error()) | ||||
| 	} | ||||
| 	if err := s.runtimeCompatibility(); err != nil { | ||||
| 		ret = append(ret, err.Error()) | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
|  | ||||
| func newRuntimeState( | ||||
| 	runtimeSyncThreshold time.Duration, | ||||
| 	configureNetwork bool, | ||||
| 	runtimeCompatibility func() error, | ||||
| ) *runtimeState { | ||||
| 	var networkError error = nil | ||||
| 	if configureNetwork { | ||||
| @@ -105,6 +100,5 @@ func newRuntimeState( | ||||
| 		baseRuntimeSyncThreshold: runtimeSyncThreshold, | ||||
| 		networkError:             networkError, | ||||
| 		internalError:            nil, | ||||
| 		runtimeCompatibility:     runtimeCompatibility, | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Random-Liu
					Random-Liu