From 2f426fdba6fa0458c4a47dd22b37cc920d9f2a98 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 15 Jun 2021 16:29:08 +0200 Subject: [PATCH 1/2] devicemanager: checkpoint: support pre-1.20 data The commit a8b8995ef241e93e9486d475126450f33f24ef4e changed the content of the data kubelet writes in the checkpoint. Unfortunately, the checkpoint restore code was not updated, so if we upgrade kubelet from pre-1.20 to 1.20+, the device manager cannot anymore restore its state correctly. The only trace of this misbehaviour is this line in the kubelet logs: ``` W0615 07:31:49.744770 4852 manager.go:244] Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: json: cannot unmarshal array into Go struct field PodDevicesEntry.Data.PodDeviceEntries.DeviceIDs of type checkpoint.DevicesPerNUMA ``` If we hit this bug, the device allocation info is indeed NOT up-to-date up until the device plugins register themselves again. This can take up to few minutes, depending on the specific device plugin. While the device manager state is inconsistent: 1. the kubelet will NOT update the device availability to zero, so the scheduler will send pods towards the inconsistent kubelet. 2. at pod admission time, the device manager allocation will not trigger, so pods will be admitted without devices actually being allocated to them. To fix these issues, we add support to the device manager to read pre-1.20 checkpoint data. We retroactively call this format "v1". Signed-off-by: Francesco Romani --- .../cm/devicemanager/checkpoint/checkpoint.go | 16 ++- .../devicemanager/checkpoint/checkpointv1.go | 124 ++++++++++++++++++ pkg/kubelet/cm/devicemanager/manager.go | 43 +++++- pkg/kubelet/cm/devicemanager/manager_test.go | 23 ++++ 4 files changed, 193 insertions(+), 13 deletions(-) create mode 100644 pkg/kubelet/cm/devicemanager/checkpoint/checkpointv1.go diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go index 0430fcd1d2b..4620004f170 100644 --- a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go +++ b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go @@ -27,7 +27,7 @@ import ( // DeviceManagerCheckpoint defines the operations to retrieve pod devices type DeviceManagerCheckpoint interface { checkpointmanager.Checkpoint - GetData() ([]PodDevicesEntry, map[string][]string) + GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) } // DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id @@ -72,9 +72,12 @@ func (dev DevicesPerNUMA) Devices() sets.String { return result } -// New returns an instance of Checkpoint -func New(devEntries []PodDevicesEntry, - devices map[string][]string) DeviceManagerCheckpoint { +// New returns an instance of Checkpoint - must be an alias for the most recent version +func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint { + return NewV2(devEntries, devices) +} + +func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint { return &Data{ Data: checkpointData{ PodDeviceEntries: devEntries, @@ -99,7 +102,8 @@ func (cp *Data) VerifyChecksum() error { return cp.Checksum.Verify(cp.Data) } -// GetData returns device entries and registered devices -func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) { +// GetDataInLatestFormat returns device entries and registered devices in the *most recent* +// checkpoint format, *not* in the original format stored on disk. +func (cp *Data) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) { return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices } diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/checkpointv1.go b/pkg/kubelet/cm/devicemanager/checkpoint/checkpointv1.go new file mode 100644 index 00000000000..65238caa196 --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/checkpoint/checkpointv1.go @@ -0,0 +1,124 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "encoding/json" + "hash/fnv" + "strings" + + "github.com/davecgh/go-spew/spew" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" +) + +// PodDevicesEntry connects pod information to devices, without topology information (k8s <= 1.19) +type PodDevicesEntryV1 struct { + PodUID string + ContainerName string + ResourceName string + DeviceIDs []string + AllocResp []byte +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file, without topology information (k8s <= 1.19) +type checkpointDataV1 struct { + PodDeviceEntries []PodDevicesEntryV1 + RegisteredDevices map[string][]string +} + +// checksum compute the checksum using the same algorithms (and data type names) k8s 1.19 used. +// We need this special code path to be able to correctly validate the checksum k8s 1.19 wrote. +// credits to https://github.com/kubernetes/kubernetes/pull/102717/commits/353f93895118d2ffa2d59a29a1fbc225160ea1d6 +func (cp checkpointDataV1) checksum() checksum.Checksum { + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + + object := printer.Sprintf("%#v", cp) + object = strings.Replace(object, "checkpointDataV1", "checkpointData", 1) + object = strings.Replace(object, "PodDevicesEntryV1", "PodDevicesEntry", -1) + hash := fnv.New32a() + printer.Fprintf(hash, "%v", object) + return checksum.Checksum(hash.Sum32()) +} + +// Data holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format +type DataV1 struct { + Data checkpointDataV1 + Checksum checksum.Checksum +} + +// New returns an instance of Checkpoint, in V1 (k8s <= 1.19) format. +// Users should avoid creating checkpoints in formats different than the most recent one, +// use the old formats only to validate existing checkpoint and convert them to most recent +// format. The only exception should be test code. +func NewV1(devEntries []PodDevicesEntryV1, + devices map[string][]string) DeviceManagerCheckpoint { + return &DataV1{ + Data: checkpointDataV1{ + PodDeviceEntries: devEntries, + RegisteredDevices: devices, + }, + } +} + +// MarshalCheckpoint is needed to implement the Checkpoint interface, but should not be called anymore +func (cp *DataV1) MarshalCheckpoint() ([]byte, error) { + klog.InfoS("Marshalling a device manager V1 checkpoint") + cp.Checksum = cp.Data.checksum() + return json.Marshal(*cp) +} + +// MarshalCheckpoint returns marshalled data +func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *DataV1) VerifyChecksum() error { + if cp.Checksum != cp.Data.checksum() { + return errors.ErrCorruptCheckpoint + } + return nil +} + +// GetDataInLatestFormat returns device entries and registered devices in the *most recent* +// checkpoint format, *not* in the original format stored on disk. +func (cp *DataV1) GetDataInLatestFormat() ([]PodDevicesEntry, map[string][]string) { + var podDevs []PodDevicesEntry + for _, entryV1 := range cp.Data.PodDeviceEntries { + devsPerNuma := NewDevicesPerNUMA() + // no NUMA cell affinity was recorded. The only possible choice + // is to set all the devices affine to node 0. + devsPerNuma[0] = entryV1.DeviceIDs + podDevs = append(podDevs, PodDevicesEntry{ + PodUID: entryV1.PodUID, + ContainerName: entryV1.ContainerName, + ResourceName: entryV1.ResourceName, + DeviceIDs: devsPerNuma, + AllocResp: entryV1.AllocResp, + }) + } + return podDevs, cp.Data.RegisteredDevices +} diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index a048167a73e..db6a015031a 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -599,20 +599,33 @@ func (m *ManagerImpl) writeCheckpoint() error { // Reads device to container allocation information from disk, and populates // m.allocatedDevices accordingly. func (m *ManagerImpl) readCheckpoint() error { - registeredDevs := make(map[string][]string) - devEntries := make([]checkpoint.PodDevicesEntry, 0) - cp := checkpoint.New(devEntries, registeredDevs) - err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp) + // the vast majority of time we restore a compatible checkpoint, so we try + // the current version first. Trying to restore older format checkpoints is + // relevant only in the kubelet upgrade flow, which happens once in a + // (long) while. + cp, err := m.getCheckpointV2() if err != nil { if err == errors.ErrCheckpointNotFound { - klog.InfoS("Failed to retrieve checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err) + // no point in trying anything else + klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err) return nil } - return err + + var errv1 error + // one last try: maybe it's a old format checkpoint? + cp, errv1 = m.getCheckpointV1() + if errv1 != nil { + klog.InfoS("Failed to read checkpoint V1 file", "err", errv1) + // intentionally return the parent error. We expect to restore V1 checkpoints + // a tiny fraction of time, so what matters most is the current checkpoint read error. + return err + } + klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint) } + m.mutex.Lock() defer m.mutex.Unlock() - podDevices, registeredDevs := cp.GetData() + podDevices, registeredDevs := cp.GetDataInLatestFormat() m.podDevices.fromCheckpointData(podDevices) m.allocatedDevices = m.podDevices.devices() for resource := range registeredDevs { @@ -625,6 +638,22 @@ func (m *ManagerImpl) readCheckpoint() error { return nil } +func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) { + registeredDevs := make(map[string][]string) + devEntries := make([]checkpoint.PodDevicesEntry, 0) + cp := checkpoint.New(devEntries, registeredDevs) + err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp) + return cp, err +} + +func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) { + registeredDevs := make(map[string][]string) + devEntries := make([]checkpoint.PodDevicesEntryV1, 0) + cp := checkpoint.NewV1(devEntries, registeredDevs) + err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp) + return cp, err +} + // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. func (m *ManagerImpl) UpdateAllocatedDevices() { if !m.sourcesReady.AllReady() { diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0f7ce32164c..b96b88baf6c 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -1288,3 +1289,25 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p } return res } + +const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint" + +var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}` + +func TestReadPreNUMACheckpoint(t *testing.T) { + socketDir, socketName, _, err := tmpSocketDir() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + + err = ioutil.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644) + require.NoError(t, err) + + topologyStore := topologymanager.NewFakeManager() + nodes := []cadvisorapi.Node{{Id: 0}} + m, err := newManagerImpl(socketName, nodes, topologyStore) + require.NoError(t, err) + + // TODO: we should not calling private methods, but among the existing tests we do anyway + err = m.readCheckpoint() + require.NoError(t, err) +} From b382b6cd0a6fd6d26a3f62d70132d95fa7d29f7c Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 26 Oct 2021 09:31:04 +0200 Subject: [PATCH 2/2] node: e2e: add test for the checkpoint recovery Add a e2e test to exercise the checkpoint recovery flow. This means we need to actually create a old (V1, pre-1.20) checkpoint, but if we do it only in the e2e test, it's still fine. Signed-off-by: Francesco Romani --- test/e2e_node/device_manager_test.go | 370 +++++++++++++++++++++++++ test/e2e_node/topology_manager_test.go | 37 ++- test/e2e_node/util.go | 12 + 3 files changed, 412 insertions(+), 7 deletions(-) create mode 100644 test/e2e_node/device_manager_test.go diff --git a/test/e2e_node/device_manager_test.go b/test/e2e_node/device_manager_test.go new file mode 100644 index 00000000000..a990e56849a --- /dev/null +++ b/test/e2e_node/device_manager_test.go @@ -0,0 +1,370 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/util" + + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" +) + +const ( + devicePluginDir = "/var/lib/kubelet/device-plugins" + checkpointName = "kubelet_internal_checkpoint" +) + +// Serial because the test updates kubelet configuration. +var _ = SIGDescribe("Device Manager [Serial] [Feature:DeviceManager][NodeFeature:DeviceManager]", func() { + checkpointFullPath := filepath.Join(devicePluginDir, checkpointName) + f := framework.NewDefaultFramework("devicemanager-test") + + ginkgo.Context("With SRIOV devices in the system", func() { + // this test wants to reproduce what happened in https://github.com/kubernetes/kubernetes/issues/102880 + ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and reject pods before device re-registration", func() { + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 { + e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") + } + + oldCfg := enablePodResourcesFeatureGateInKubelet(f) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) + sd := setupSRIOVConfigOrFail(f, configMap) + + waitForSRIOVResources(f, sd) + + cntName := "gu-container" + // we create and delete a pod to make sure the internal device manager state contains a pod allocation + ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName)) + var initCtnAttrs []tmCtnAttribute + ctnAttrs := []tmCtnAttribute{ + { + ctnName: cntName, + cpuRequest: "1000m", + cpuLimit: "1000m", + deviceName: sd.resourceName, + deviceRequest: "1", + deviceLimit: "1", + }, + } + + podName := "gu-pod-rec-pre-1" + framework.Logf("creating pod %s attrs %v", podName, ctnAttrs) + pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs) + pod = f.PodClient().CreateSync(pod) + + // now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin. + + ginkgo.By("deleting the pod") + // note we delete right now because we know the current implementation of devicemanager will NOT + // clean up on pod deletion. When this changes, the deletion needs to be done after the test is done. + deletePodSyncByName(f, pod.Name) + waitForAllContainerRemoval(pod.Name, pod.Namespace) + + ginkgo.By("teardown the sriov device plugin") + // since we will NOT be recreating the plugin, we clean up everything now + teardownSRIOVConfigOrFail(f, sd) + + ginkgo.By("stopping the kubelet") + killKubelet("SIGSTOP") + + ginkgo.By("rewriting the kubelet checkpoint file as v1") + err := rewriteCheckpointAsV1(devicePluginDir, checkpointName) + // make sure we remove any leftovers + defer os.Remove(checkpointFullPath) + framework.ExpectNoError(err) + + // this mimics a kubelet restart after the upgrade + // TODO: is SIGTERM (less brutal) good enough? + ginkgo.By("killing the kubelet") + killKubelet("SIGKILL") + + ginkgo.By("waiting for the kubelet to be ready again") + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + // note we DO NOT start the sriov device plugin. This is intentional. + // issue#102880 reproduces because of a race on startup caused by corrupted device manager + // state which leads to v1.Node object not updated on apiserver. + // So to hit the issue we need to receive the pod *before* the device plugin registers itself. + // The simplest and safest way to reproduce is just avoid to run the device plugin again + + podName = "gu-pod-rec-post-2" + framework.Logf("creating pod %s attrs %v", podName, ctnAttrs) + pod = makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs) + + pod = f.PodClient().Create(pod) + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, pod.Name, "Failed", 30*time.Second, func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase != v1.PodPending { + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err) + pod, err = f.PodClient().Get(context.TODO(), pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + if pod.Status.Phase != v1.PodFailed { + framework.Failf("pod %s not failed: %v", pod.Name, pod.Status) + } + + framework.Logf("checking pod %s status reason (%s)", pod.Name, pod.Status.Reason) + if !isUnexpectedAdmissionError(pod) { + framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason) + } + + deletePodSyncByName(f, pod.Name) + }) + + ginkgo.It("should be able to recover V1 (aka pre-1.20) checkpoint data and update topology info on device re-registration", func() { + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 { + e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") + } + + oldCfg := enablePodResourcesFeatureGateInKubelet(f) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) + + sd := setupSRIOVConfigOrFail(f, configMap) + waitForSRIOVResources(f, sd) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + + resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + conn.Close() + framework.ExpectNoError(err) + + suitableDevs := 0 + for _, dev := range resp.GetDevices() { + for _, node := range dev.GetTopology().GetNodes() { + if node.GetID() != 0 { + suitableDevs++ + } + } + } + if suitableDevs == 0 { + teardownSRIOVConfigOrFail(f, sd) + e2eskipper.Skipf("no devices found on NUMA Cell other than 0") + } + + cntName := "gu-container" + // we create and delete a pod to make sure the internal device manager state contains a pod allocation + ginkgo.By(fmt.Sprintf("Successfully admit one guaranteed pod with 1 core, 1 %s device", sd.resourceName)) + var initCtnAttrs []tmCtnAttribute + ctnAttrs := []tmCtnAttribute{ + { + ctnName: cntName, + cpuRequest: "1000m", + cpuLimit: "1000m", + deviceName: sd.resourceName, + deviceRequest: "1", + deviceLimit: "1", + }, + } + + podName := "gu-pod-rec-pre-1" + framework.Logf("creating pod %s attrs %v", podName, ctnAttrs) + pod := makeTopologyManagerTestPod(podName, ctnAttrs, initCtnAttrs) + pod = f.PodClient().CreateSync(pod) + + // now we need to simulate a node drain, so we remove all the pods, including the sriov device plugin. + + ginkgo.By("deleting the pod") + // note we delete right now because we know the current implementation of devicemanager will NOT + // clean up on pod deletion. When this changes, the deletion needs to be done after the test is done. + deletePodSyncByName(f, pod.Name) + waitForAllContainerRemoval(pod.Name, pod.Namespace) + + ginkgo.By("teardown the sriov device plugin") + // no need to delete the config now (speed up later) + deleteSRIOVPodOrFail(f, sd) + + ginkgo.By("stopping the kubelet") + killKubelet("SIGSTOP") + + ginkgo.By("rewriting the kubelet checkpoint file as v1") + err = rewriteCheckpointAsV1(devicePluginDir, checkpointName) + // make sure we remove any leftovers + defer os.Remove(checkpointFullPath) + framework.ExpectNoError(err) + + // this mimics a kubelet restart after the upgrade + // TODO: is SIGTERM (less brutal) good enough? + ginkgo.By("killing the kubelet") + killKubelet("SIGKILL") + + ginkgo.By("waiting for the kubelet to be ready again") + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + sd2 := &sriovData{ + configMap: sd.configMap, + serviceAccount: sd.serviceAccount, + } + sd2.pod = createSRIOVPodOrFail(f) + defer teardownSRIOVConfigOrFail(f, sd2) + waitForSRIOVResources(f, sd2) + + compareSRIOVResources(sd, sd2) + + cli, conn, err = podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + resp2, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoError(err) + + cntDevs := stringifyContainerDevices(resp.GetDevices()) + cntDevs2 := stringifyContainerDevices(resp2.GetDevices()) + if cntDevs != cntDevs2 { + framework.Failf("different allocatable resources expected %v got %v", cntDevs, cntDevs2) + } + }) + + }) +}) + +func compareSRIOVResources(expected, got *sriovData) { + if expected.resourceName != got.resourceName { + framework.Failf("different SRIOV resource name: expected %q got %q", expected.resourceName, got.resourceName) + } + if expected.resourceAmount != got.resourceAmount { + framework.Failf("different SRIOV resource amount: expected %d got %d", expected.resourceAmount, got.resourceAmount) + } +} + +func isUnexpectedAdmissionError(pod *v1.Pod) bool { + re := regexp.MustCompile(`Unexpected.*Admission.*Error`) + return re.MatchString(pod.Status.Reason) +} + +func rewriteCheckpointAsV1(dir, name string) error { + ginkgo.By(fmt.Sprintf("Creating temporary checkpoint manager (dir=%q)", dir)) + checkpointManager, err := checkpointmanager.NewCheckpointManager(dir) + if err != nil { + return err + } + cp := checkpoint.New(make([]checkpoint.PodDevicesEntry, 0), make(map[string][]string)) + err = checkpointManager.GetCheckpoint(name, cp) + if err != nil { + return err + } + + ginkgo.By(fmt.Sprintf("Read checkpoint %q %#v", name, cp)) + + podDevices, registeredDevs := cp.GetDataInLatestFormat() + podDevicesV1 := convertPodDeviceEntriesToV1(podDevices) + cpV1 := checkpoint.NewV1(podDevicesV1, registeredDevs) + + blob, err := cpV1.MarshalCheckpoint() + if err != nil { + return err + } + + // TODO: why `checkpointManager.CreateCheckpoint(name, cpV1)` doesn't seem to work? + ckPath := filepath.Join(dir, name) + ioutil.WriteFile(filepath.Join("/tmp", name), blob, 0600) + return ioutil.WriteFile(ckPath, blob, 0600) +} + +func convertPodDeviceEntriesToV1(entries []checkpoint.PodDevicesEntry) []checkpoint.PodDevicesEntryV1 { + entriesv1 := []checkpoint.PodDevicesEntryV1{} + for _, entry := range entries { + deviceIDs := []string{} + for _, perNUMANodeDevIDs := range entry.DeviceIDs { + deviceIDs = append(deviceIDs, perNUMANodeDevIDs...) + } + entriesv1 = append(entriesv1, checkpoint.PodDevicesEntryV1{ + PodUID: entry.PodUID, + ContainerName: entry.ContainerName, + ResourceName: entry.ResourceName, + DeviceIDs: deviceIDs, + AllocResp: entry.AllocResp, + }) + } + return entriesv1 +} + +func stringifyContainerDevices(devs []*kubeletpodresourcesv1.ContainerDevices) string { + entries := []string{} + for _, dev := range devs { + devIDs := dev.GetDeviceIds() + if devIDs != nil { + for _, devID := range dev.DeviceIds { + nodes := dev.GetTopology().GetNodes() + if nodes != nil { + for _, node := range nodes { + entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=%d", dev.ResourceName, devID, node.GetID())) + } + } else { + entries = append(entries, fmt.Sprintf("%s[%s]@NUMA=none", dev.ResourceName, devID)) + } + } + } else { + entries = append(entries, dev.ResourceName) + } + } + sort.Strings(entries) + return strings.Join(entries, ", ") +} diff --git a/test/e2e_node/topology_manager_test.go b/test/e2e_node/topology_manager_test.go index 6b43ffd6256..ab207325e3d 100644 --- a/test/e2e_node/topology_manager_test.go +++ b/test/e2e_node/topology_manager_test.go @@ -505,6 +505,15 @@ type sriovData struct { } func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sriovData { + sd := createSRIOVConfigOrFail(f, configMap) + + e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) + + sd.pod = createSRIOVPodOrFail(f) + return sd +} + +func createSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sriovData { var err error ginkgo.By(fmt.Sprintf("Creating configMap %v/%v", metav1.NamespaceSystem, configMap.Name)) @@ -522,8 +531,13 @@ func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sr framework.Failf("unable to create test serviceAccount %s: %v", serviceAccount.Name, err) } - e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) + return &sriovData{ + configMap: configMap, + serviceAccount: serviceAccount, + } +} +func createSRIOVPodOrFail(f *framework.Framework) *v1.Pod { dp := getSRIOVDevicePluginPod() dp.Spec.NodeName = framework.TestContext.NodeName @@ -536,11 +550,7 @@ func setupSRIOVConfigOrFail(f *framework.Framework, configMap *v1.ConfigMap) *sr } framework.ExpectNoError(err) - return &sriovData{ - configMap: configMap, - serviceAccount: serviceAccount, - pod: dpPod, - } + return dpPod } // waitForSRIOVResources waits until enough SRIOV resources are avaailable, expecting to complete within the timeout. @@ -560,7 +570,7 @@ func waitForSRIOVResources(f *framework.Framework, sd *sriovData) { framework.Logf("Detected SRIOV allocatable devices name=%q amount=%d", sd.resourceName, sd.resourceAmount) } -func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { +func deleteSRIOVPodOrFail(f *framework.Framework, sd *sriovData) { var err error gp := int64(0) deleteOptions := metav1.DeleteOptions{ @@ -571,6 +581,14 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { err = f.ClientSet.CoreV1().Pods(sd.pod.Namespace).Delete(context.TODO(), sd.pod.Name, deleteOptions) framework.ExpectNoError(err) waitForAllContainerRemoval(sd.pod.Name, sd.pod.Namespace) +} + +func removeSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { + var err error + gp := int64(0) + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } ginkgo.By(fmt.Sprintf("Deleting configMap %v/%v", metav1.NamespaceSystem, sd.configMap.Name)) err = f.ClientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(context.TODO(), sd.configMap.Name, deleteOptions) @@ -581,6 +599,11 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { framework.ExpectNoError(err) } +func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { + deleteSRIOVPodOrFail(f, sd) + removeSRIOVConfigOrFail(f, sd) +} + func runTMScopeResourceAlignmentTestSuite(f *framework.Framework, configMap *v1.ConfigMap, reservedSystemCPUs, policy string, numaNodes, coreCount int) { threadsPerCore := getSMTLevel() sd := setupSRIOVConfigOrFail(f, configMap) diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index a609011af52..a25fc772a9a 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -452,6 +452,18 @@ func stopKubelet() func() { } } +// killKubelet sends a signal (SIGINT, SIGSTOP, SIGTERM...) to the running kubelet +func killKubelet(sig string) { + kubeletServiceName := findKubeletServiceName(true) + + // reset the kubelet service start-limit-hit + stdout, err := exec.Command("sudo", "systemctl", "reset-failed", kubeletServiceName).CombinedOutput() + framework.ExpectNoError(err, "Failed to reset kubelet start-limit-hit with systemctl: %v, %v", err, stdout) + + stdout, err = exec.Command("sudo", "systemctl", "kill", "-s", sig, kubeletServiceName).CombinedOutput() + framework.ExpectNoError(err, "Failed to stop kubelet with systemctl: %v, %v", err, stdout) +} + func kubeletHealthCheck(url string) bool { insecureTransport := http.DefaultTransport.(*http.Transport).Clone() insecureTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}