diff --git a/test/e2e/dra/utils/builder.go b/test/e2e/dra/utils/builder.go index fbb6addbef5..068416d1aa4 100644 --- a/test/e2e/dra/utils/builder.go +++ b/test/e2e/dra/utils/builder.go @@ -40,6 +40,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + admissionapi "k8s.io/pod-security-admission/api" "k8s.io/utils/ptr" ) @@ -170,7 +171,7 @@ func (b *Builder) Pod() *v1.Pod { // // It is tempting to use `terminationGraceperiodSeconds: 0`, but that is a very bad // idea because it removes the pod before the kubelet had a chance to react (https://github.com/kubernetes/kubernetes/issues/120671). - pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, b.f.NamespacePodSecurityLevel, "" /* no command = pause */) + pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */) pod.Labels = make(map[string]string) pod.Spec.RestartPolicy = v1.RestartPolicyNever pod.GenerateName = "" diff --git a/test/integration/dra/cluster/.import-restrictions b/test/integration/dra/cluster/.import-restrictions new file mode 100644 index 00000000000..4bd8b5005ec --- /dev/null +++ b/test/integration/dra/cluster/.import-restrictions @@ -0,0 +1,5 @@ +rules: + # This test is a Ginkgo suite and, in contrast to other integration tests, can use the E2E framework. + - selectorRegexp: k8s[.]io/kubernetes/test/e2e + allowedPrefixes: + - "" diff --git a/test/integration/dra/cluster/README.md b/test/integration/dra/cluster/README.md new file mode 100644 index 00000000000..460c996936a --- /dev/null +++ b/test/integration/dra/cluster/README.md @@ -0,0 +1,43 @@ +This directory contains a testsuite with automatic upgrade/downgrade tests for +DRA. Conceptually this is like an integration test, in the sense that it +starts/stops cluster components and runs tests against them. + +The difference is that it starts Kubernetes components by running the actual +binaries, relying on local-up-cluster.sh for the logic and configuration +steps. Because local-up-cluster.sh needs additional permissions and +preparations on the host, the test cannot run in "make test-integration" and +just skips itself there. + +To run it: +- Make sure that hack/local-up-cluster.sh works: + - sudo must work + - Set env variables as necessary for your environment. +- Ensure that /var/lib/kubelet/plugins, /var/lib/kubelet/plugins_registry, + and /var/run/cdi are writable. +- Build binaries with `make`. +- Export `KUBERNETES_SERVER_BIN_DIR=$(pwd)/_output/local/bin/linux/amd64` (or + whatever is your GOOS/GOARCH and output directory). +- Optional: export `KUBERNETES_SERVER_CACHE_DIR=$(pwd)/_output/local/bin/linx/amd64/cache-dir` + to reuse downloaded release binaries across test invocations. +- Optional: set ARTIFACTS to store component log files persistently. + Otherwise a test tmp directory is used. +- Invoke as a Go test (no need for the ginkgo CLI), for example: + + go test -v -count=1 ./test/integration/dra/cluster -args -ginkgo.v + dlv test ./test/integration/dra/cluster -- -ginkgo.v + make test WHAT=test/integration/dra/cluster FULL_LOG=true KUBE_TEST_ARGS="-count=1 -args -ginkgo.v" + +`make test` instead of `make test-integration` is intentional: `local-up-cluster.sh` +itself wants to start etcd. `-count=1` ensures that test runs each time it is invoked. +`-v` and `-ginkgo.v` make the test output visible while the test runs. + +To simplify starting from scratch, ./test/integration/dra/cluster/run.sh cleans +up, sets permissions, and then invokes whatever command is specified on the +command line: + + ./test/integration/dra/cluster/run.sh go test ./test/integration/dra/cluster + +The test is implemented as a Ginkgo suite because that allows reusing the same +helper code as in E2E tests. Long-term the goal is to port that helper code to +ktesting, support ktesting in test/e2e, and turn this test into a normal Go +test. diff --git a/test/e2e/dra/gtesting.go b/test/integration/dra/cluster/gtesting_test.go similarity index 99% rename from test/e2e/dra/gtesting.go rename to test/integration/dra/cluster/gtesting_test.go index 207bedeec96..c62216efaaa 100644 --- a/test/e2e/dra/gtesting.go +++ b/test/integration/dra/cluster/gtesting_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dra +package cluster import ( "context" diff --git a/test/integration/dra/cluster/run.sh b/test/integration/dra/cluster/run.sh new file mode 100755 index 00000000000..761d5df1540 --- /dev/null +++ b/test/integration/dra/cluster/run.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +killall etcd || true +sudo rm -rf /tmp/ginkgo* /tmp/*.log /var/run/kubernetes /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins /var/lib/kubelet/*_state /var/lib/kubelet/checkpoints /tmp/artifacts +sudo mkdir /var/lib/kubelet/plugins_registry +sudo mkdir /var/lib/kubelet/plugins +sudo mkdir /var/run/cdi +sudo chown "$(id -u)" /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins /var/run/cdi +ARTIFACTS=/tmp/artifacts +KUBERNETES_SERVER_BIN_DIR="$(pwd)/_output/local/bin/$(go env GOOS)/$(go env GOARCH)" +KUBERNETES_SERVER_CACHE_DIR="${KUBERNETES_SERVER_BIN_DIR}/cache-dir" + +export ARTIFACTS KUBERNETES_SERVER_BIN_DIR KUBERNETES_SERVER_CACHE_DIR + +exec "$@" diff --git a/test/integration/dra/cluster/upgradedowngrade_test.go b/test/integration/dra/cluster/upgradedowngrade_test.go new file mode 100644 index 00000000000..a4df376445c --- /dev/null +++ b/test/integration/dra/cluster/upgradedowngrade_test.go @@ -0,0 +1,368 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "archive/tar" + "compress/gzip" + "context" + _ "embed" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path" + "runtime" + "slices" + "strings" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/version" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kubeadm/app/util/errors" + drautils "k8s.io/kubernetes/test/e2e/dra/utils" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/kubernetes/test/utils/localupcluster" + admissionapi "k8s.io/pod-security-admission/api" +) + +func init() { + // -v=5 may be useful to debug driver operations, but usually isn't needed. + ktesting.SetDefaultVerbosity(2) +} + +var repoRoot = repoRootDefault() + +func currentBinDir() (envName, content string) { + envName = "KUBERNETES_SERVER_BIN_DIR" + content, _ = os.LookupEnv(envName) + return +} + +// repoRootDefault figures out whether an E2E suite is invoked in its directory (as in `go test ./test/e2e`), +// directly in the root (as in `make test-e2e` or `ginkgo ./test/e2e`), or somewhere deep inside +// the _output directory (`ginkgo _output/bin/e2e.test` where `_output/bin` is actually a symlink). +func repoRootDefault() string { + for i := 0; i < 10; i++ { + path := "." + strings.Repeat("/..", i) + if _, err := os.Stat(path + "/test/e2e/framework"); err == nil { + return path + } + } + // Traditional default. + return "../../" +} + +func TestUpgradeDowngrade(t *testing.T) { + if envName, dir := currentBinDir(); dir == "" { + t.Skipf("%s must be set to test DRA upgrade/downgrade scenarios.", envName) + } + suiteConfig, reporterConfig := framework.CreateGinkgoConfig() + ginkgo.RunSpecs(t, "DRA", suiteConfig, reporterConfig) +} + +var _ = ginkgo.Describe("DRA upgrade/downgrade", func() { + // Initialize the default values by registering flags. We don't actually expose those flags. + var fs flag.FlagSet + framework.RegisterCommonFlags(&fs) + framework.RegisterClusterFlags(&fs) + + // Some other things normally done by test/e2e. + e2etestfiles.AddFileSource(e2etestfiles.RootFileSource{Root: repoRoot}) + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.It("works", func(ctx context.Context) { + // TODO: replace with helper code from https://github.com/kubernetes/kubernetes/pull/122481 should that get merged. + tCtx := ktesting.Init(GinkgoContextTB()) + tCtx = ktesting.WithContext(tCtx, ctx) + + // Determine what we need to downgrade to. + tCtx = ktesting.Begin(tCtx, "get source code version") + gitVersion, _, err := sourceVersion(tCtx, repoRoot) + tCtx.ExpectNoError(err, "determine source code version for repo root %q", repoRoot) + version, err := version.ParseGeneric(gitVersion) + tCtx.ExpectNoError(err, "parse version %s of repo root %q", gitVersion, repoRoot) + major, previousMinor := version.Major(), version.Minor()-1 + tCtx = ktesting.End(tCtx) + + // KUBERNETES_SERVER_CACHE_DIR can be set to keep downloaded files across test restarts. + binDir, cacheBinaries := os.LookupEnv("KUBERNETES_SERVER_CACHE_DIR") + if !cacheBinaries { + binDir = tCtx.TempDir() + } + haveBinaries := false + + // Get the previous release, if necessary. + previousURL, previousVersion := serverDownloadURL(tCtx, major, previousMinor) + if cacheBinaries { + binDir = path.Join(binDir, previousVersion) + _, err := os.Stat(path.Join(binDir, string(localupcluster.KubeClusterComponents[0]))) + if err == nil { + haveBinaries = true + } + } + if !haveBinaries { + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("download and unpack %s", previousURL)) + req, err := http.NewRequestWithContext(tCtx, http.MethodGet, previousURL, nil) + tCtx.ExpectNoError(err, "construct request") + response, err := http.DefaultClient.Do(req) + tCtx.ExpectNoError(err, "download") + defer func() { + _ = response.Body.Close() + }() + decompress, err := gzip.NewReader(response.Body) + tCtx.ExpectNoError(err, "construct gzip reader") + unpack := tar.NewReader(decompress) + for { + header, err := unpack.Next() + if err == io.EOF { + break + } + base := path.Base(header.Name) + if slices.Contains(localupcluster.KubeClusterComponents, localupcluster.KubeComponentName(base)) { + data, err := io.ReadAll(unpack) + tCtx.ExpectNoError(err, fmt.Sprintf("read content of %s", header.Name)) + tCtx.ExpectNoError(os.MkdirAll(binDir, 0755), "create directory for binaries") + tCtx.ExpectNoError(os.WriteFile(path.Join(binDir, base), data, 0555), fmt.Sprintf("write content of %s", header.Name)) + } + } + tCtx = ktesting.End(tCtx) + } + + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("bring up v%d.%d", major, previousMinor)) + cluster := localupcluster.New(tCtx) + localUpClusterEnv := map[string]string{ + "RUNTIME_CONFIG": "resource.k8s.io/v1beta1,resource.k8s.io/v1beta2", + "FEATURE_GATES": "DynamicResourceAllocation=true", + // *not* needed because driver will run in "local filesystem" mode (= driver.IsLocal): "ALLOW_PRIVILEGED": "1", + } + cluster.Start(tCtx, binDir, localUpClusterEnv) + tCtx = ktesting.End(tCtx) + + restConfig := cluster.LoadConfig(tCtx) + restConfig.UserAgent = fmt.Sprintf("%s -- dra", restclient.DefaultKubernetesUserAgent()) + tCtx = ktesting.WithRESTConfig(tCtx, restConfig) + // TODO: rewrite all DRA test code to use ktesting.TContext once https://github.com/kubernetes/kubernetes/pull/122481 is + // merged, then we don't need to fake a Framework instance. + f := &framework.Framework{ + BaseName: "dra", + Timeouts: framework.NewTimeoutContext(), + ClientSet: tCtx.Client(), + DynamicClient: tCtx.Dynamic(), + + // The driver containers have to run with sufficient privileges to + // modify /var/lib/kubelet/plugins. + NamespacePodSecurityLevel: admissionapi.LevelPrivileged, + } + f.SetClientConfig(restConfig) + + namespace, err := f.CreateNamespace(tCtx, f.BaseName, map[string]string{ + "e2e-framework": f.BaseName, + }) + tCtx.ExpectNoError(err, "create namespace") + f.Namespace = namespace + f.UniqueName = namespace.Name + + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("v%d.%d", major, previousMinor)) + + tCtx.ExpectNoError(e2enode.WaitForAllNodesSchedulable(tCtx, tCtx.Client(), f.Timeouts.NodeSchedulable), "wait for all nodes to be schedulable") + nodes := drautils.NewNodesNow(tCtx, f, 1, 1) + + // Opening sockets locally avoids intermittent errors and delays caused by proxying through the restarted apiserver. + // We could speed up testing by shortening the sync delay in the ResourceSlice controller, but let's better + // test the defaults. + driver := drautils.NewDriverInstance(f) + driver.IsLocal = true + driver.Run(nodes, drautils.DriverResourcesNow(nodes, 1)) + b := drautils.NewBuilderNow(ctx, f, driver) + + claim := b.ExternalClaim() + pod := b.PodExternal() + b.Create(ctx, claim, pod) + b.TestPod(ctx, f, pod) + + tCtx = ktesting.End(tCtx) + + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("update to %s", gitVersion)) + // We could split this up into first updating the apiserver, then control plane components, then restarting kubelet. + // For the purpose of this test here we we primarily care about full before/after comparisons, so not done yet. + // TODO + _, dir := currentBinDir() + restoreOptions := cluster.Modify(tCtx, localupcluster.ModifyOptions{Upgrade: true, BinDir: dir}) + tCtx = ktesting.End(tCtx) + + // The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running. + // Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices. + tCtx = ktesting.Begin(tCtx, "wait for ResourceSlices") + gomega.Eventually(ctx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + tCtx = ktesting.End(tCtx) + + // Remove pod prepared by previous Kubernetes. + framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, namespace.Name, f.Timeouts.PodDelete)) + + // Create another claim and pod, this time using the latest Kubernetes. + claim = b.ExternalClaim() + pod = b.PodExternal() + pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name + b.Create(ctx, claim, pod) + b.TestPod(ctx, f, pod) + + // Roll back. + tCtx = ktesting.Begin(tCtx, "downgrade") + cluster.Modify(tCtx, restoreOptions) + tCtx = ktesting.End(tCtx) + + // TODO: ensure that kube-controller-manager is up-and-running. + // This works around https://github.com/kubernetes/kubernetes/issues/132334 and can be removed + // once a fix for that is backported. + tCtx = ktesting.Begin(tCtx, "wait for kube-controller-manager") + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) string { + output, _ := cluster.GetSystemLogs(tCtx, localupcluster.KubeControllerManager) + return output + }).Should(gomega.ContainSubstring(`"Caches are synced" controller="resource_claim"`)) + tCtx = ktesting.End(tCtx) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // + // The retry loops are necessary because of a stale connection + // to the restarted apiserver. Sometimes, attempts fail with "EOF" as error + // or (even weirder) with + // getting *v1.Pod: pods "tester-2" is forbidden: User "kubernetes-admin" cannot get resource "pods" in API group "" in the namespace "dra-9021" + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + return f.ClientSet.ResourceV1beta1().ResourceClaims(namespace.Name).Delete(tCtx, claim.Name, metav1.DeleteOptions{}) + }).Should(gomega.Succeed(), "delete claim after downgrade") + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + return f.ClientSet.CoreV1().Pods(namespace.Name).Delete(tCtx, pod.Name, metav1.DeleteOptions{}) + }).Should(gomega.Succeed(), "delete pod after downgrade") + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) *v1.Pod { + pod, err := f.ClientSet.CoreV1().Pods(namespace.Name).Get(tCtx, pod.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + tCtx.ExpectNoError(err, "get pod") + return pod + }).Should(gomega.BeNil(), "no pod after deletion after downgrade") + }) +}) + +// sourceVersion identifies the Kubernetes git version based on hack/print-workspace-status.sh. +// +// Adapted from https://github.com/kubernetes-sigs/kind/blob/3df64e784cc0ea74125b2a2e9877817418afa3af/pkg/build/nodeimage/internal/kube/source.go#L71-L104 +func sourceVersion(tCtx ktesting.TContext, kubeRoot string) (gitVersion string, dockerTag string, err error) { + // Get the version output. + cmd := exec.CommandContext(tCtx, "hack/print-workspace-status.sh") + cmd.Dir = kubeRoot + output, err := cmd.Output() + if err != nil { + return "", "", err + } + + // Parse it. + for _, line := range strings.Split(string(output), "\n") { + parts := strings.SplitN(line, " ", 2) + if len(parts) != 2 { + continue + } + switch parts[0] { + case "gitVersion": + gitVersion = parts[1] + case "STABLE_DOCKER_TAG": + dockerTag = parts[1] + } + } + if gitVersion == "" { + return "", "", errors.Errorf("could not obtain Kubernetes git version: %q", string(output)) + + } + if dockerTag == "" { + return "", "", errors.Errorf("count not obtain docker tag: %q", string(output)) + } + return +} + +// func runCmdIn(tCtx ktesting.TContext, dir string, name string, args ...string) string { +// tCtx.Helper() +// tCtx.Logf("Running command: %s %s", name, strings.Join(args, " ")) +// cmd := exec.CommandContext(tCtx, name, args...) +// cmd.Dir = dir +// var output strings.Builder +// reader, writer := io.Pipe() +// cmd.Stdout = writer +// cmd.Stderr = writer +// tCtx.ExpectNoError(cmd.Start(), "start %s command", name) +// scanner := bufio.NewScanner(reader) +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// defer wg.Done() +// for scanner.Scan() { +// line := scanner.Text() +// line = strings.TrimSuffix(line, "\n") +// tCtx.Logf("%s: %s", name, line) +// output.WriteString(line) +// output.WriteByte('\n') +// } +// }() +// result := cmd.Wait() +// tCtx.ExpectNoError(writer.Close(), "close in-memory pipe") +// wg.Wait() +// tCtx.ExpectNoError(result, fmt.Sprintf("%s command failed, output:\n%s", name, output.String())) +// tCtx.ExpectNoError(scanner.Err(), "read %s command output", name) + +// return output.String() +// } + +// serverDownloadURL returns the full URL for a kubernetes-server archive matching +// the current GOOS/GOARCH for the given major/minor version of Kubernetes. +// +// This considers only proper releases. +func serverDownloadURL(tCtx ktesting.TContext, major, minor uint) (string, string) { + tCtx.Helper() + url := fmt.Sprintf("https://dl.k8s.io/release/stable-%d.%d.txt", major, minor) + get, err := http.NewRequestWithContext(tCtx, http.MethodGet, url, nil) + tCtx.ExpectNoError(err, "construct GET for %s", url) + resp, err := http.DefaultClient.Do(get) + tCtx.ExpectNoError(err, "get %s", url) + if resp.StatusCode != http.StatusOK { + tCtx.Fatalf("get %s: %d - %s", url, resp.StatusCode, resp.Status) + } + if resp.Body == nil { + tCtx.Fatalf("empty response for %s", url) + } + defer func() { + tCtx.ExpectNoError(resp.Body.Close(), "close response body") + }() + version, err := io.ReadAll(resp.Body) + tCtx.ExpectNoError(err, "read response body for %s", url) + return fmt.Sprintf("https://dl.k8s.io/release/%s/kubernetes-server-%s-%s.tar.gz", string(version), runtime.GOOS, runtime.GOARCH), string(version) +} diff --git a/test/utils/localupcluster/cmd.go b/test/utils/localupcluster/cmd.go new file mode 100644 index 00000000000..162ca2c51a6 --- /dev/null +++ b/test/utils/localupcluster/cmd.go @@ -0,0 +1,226 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package localupcluster contains wrapper code around invoking hack/local-up-cluster.sh +// and managing the resulting cluster. +package localupcluster + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "time" + + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" +) + +type Cmd struct { + // Name is a short, descriptive name. + Name string + + // CommandLine is the complete command line, including the command itself. + CommandLine []string + + // AdditionalEnv gets added to the current environment. + AdditionalEnv map[string]string + + // Log output as it gets printed. + LogOutput bool + + // ProcessOutput, it non-nil, gets called for each line printed by the + // command on stderr or stdout. The line does not include the trailing + // newline. + // + // Called with EOF true when output processing stops. This implies + // that the command has stopped or output processing failed. A non-empty + // line in this case is the output processing error. + ProcessOutput func(output Output) + + // Gather output in a string buffer. That collected output is returned by Wait and Stop. + // If disabled, those methods return the empty string. + GatherOutput bool + + // LogFile specifies a file to write the output to. + // Can be combined with other options for output handling. + // If it's the only one, then the command writes directly + // into the file. + LogFile string + + // KeepRunning ensures that the command is kept running beyond the end of its context, + // i.e. context cancellation is ignored. Such commands have to be stopped explicitly. + KeepRunning bool + + cancel func(string) + cmd *exec.Cmd + wg sync.WaitGroup + running atomic.Pointer[bool] + result error + gathering bool + + mutex sync.RWMutex + output strings.Builder +} + +type Output struct { + Line string + EOF bool +} + +func (c *Cmd) Start(tCtx ktesting.TContext) { + tCtx.Helper() + tCtx.Logf("running command %s: %s", c.Name, strings.Join(c.CommandLine, " ")) + if c.KeepRunning { + tCtx = ktesting.WithoutCancel(tCtx) + } + tCtx = ktesting.WithCancel(tCtx) + c.cancel = tCtx.Cancel + c.cmd = exec.CommandContext(tCtx, c.CommandLine[0], c.CommandLine[1:]...) + c.gathering = false + + c.cmd.Env = os.Environ() + for k, v := range c.AdditionalEnv { + c.cmd.Env = append(c.cmd.Env, k+"="+v) + } + + var reader io.Reader + var writer io.WriteCloser + + c.gathering = false + switch { + case c.LogOutput || c.ProcessOutput != nil || c.GatherOutput: + // Process each line through an in-memory pipe. + reader, writer = io.Pipe() + c.gathering = true + case c.LogFile != "": + // Let command write directly. + f, err := os.Create(c.LogFile) + tCtx.ExpectNoError(err, "create log file") + writer = f + } + c.cmd.Stdout = writer + c.cmd.Stderr = writer + + tCtx.ExpectNoError(c.cmd.Start(), "start %s command", c.Name) + c.running.Store(ptr.To(true)) + + if reader != nil { + scanner := bufio.NewScanner(reader) + c.wg.Add(1) + go func() { + defer c.wg.Done() + for scanner.Scan() { + line := scanner.Text() + line = strings.TrimSuffix(line, "\n") + if c.LogOutput { + tCtx.Logf("%s: %s", c.Name, line) + } + if c.ProcessOutput != nil { + c.ProcessOutput(Output{Line: line}) + } + if c.GatherOutput { + c.mutex.Lock() + c.output.WriteString(line) + c.output.WriteByte('\n') + c.mutex.Unlock() + } + } + if c.ProcessOutput != nil { + if err := scanner.Err(); err != nil { + c.ProcessOutput(Output{Line: err.Error(), EOF: true}) + } else { + c.ProcessOutput(Output{EOF: true}) + } + } + }() + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.result = c.cmd.Wait() + now := time.Now() + if reader != nil { + // Has to be closed to stop output processing, otherwise the scanner + // keeps reading because someone might still write something. + _ = writer.Close() + } + if c.LogFile != "" { + f, err := os.OpenFile(c.LogFile, os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + defer func() { + _ = f.Close() + }() + _, _ = fmt.Fprintf(f, "%s: terminated, result: %v\n", now, c.result) + if err := context.Cause(tCtx); err != nil { + _, _ = fmt.Fprintf(f, "%s: killed because command context was canceled: %v\n", now, err) + } + } + } + c.running.Store(ptr.To(false)) + }() +} + +func (c *Cmd) Wait(tCtx ktesting.TContext) string { + return c.wait(tCtx, false) +} + +func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string { + tCtx.Helper() + if c.cancel == nil { + // Not started... + return "" + } + c.cancel(reason) + return c.wait(tCtx, true) +} + +func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string { + tCtx.Helper() + c.wg.Wait() + if !killed { + tCtx.ExpectNoError(c.result, fmt.Sprintf("%s command failed, output:\n%s", c.Name, c.output.String())) + } + return c.output.String() +} + +func (c *Cmd) Running() bool { + return ptr.Deref(c.running.Load(), false) +} + +func (c *Cmd) Output(tCtx ktesting.TContext) string { + if c.gathering { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.output.String() + } + + if c.LogFile != "" { + f, err := os.Open(c.LogFile) + tCtx.ExpectNoError(err, "open log file") + content, err := io.ReadAll(f) + tCtx.ExpectNoError(err, "read log file") + return string(content) + } + + return "" +} diff --git a/test/utils/localupcluster/localupcluster.go b/test/utils/localupcluster/localupcluster.go new file mode 100644 index 00000000000..4cc0e142f4b --- /dev/null +++ b/test/utils/localupcluster/localupcluster.go @@ -0,0 +1,488 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package localupcluster contains wrapper code around invoking hack/local-up-cluster.sh +// and managing the resulting cluster. +// +// The basic mode of operation is: +// - local-up-cluster.sh contains all the logic of how to configure and invoke components. +// - local-up-cluster.sh is run in dry-run mode, which prints all commands and their parameters +// without actually running them, except for etcd: etcd's lifecycle is managed as before +// by the caller or local-up-cluster.sh. +// - local-up-cluster.sh is kept running as long as the cluster runs to keep etcd running and +// generated configuration files around. +// - This package takes care of running the commands, including output redirection. +// - It can stop commands and run them again using different binaries to similar upgrades +// or downgrades. +package localupcluster + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "os" + "path" + "path/filepath" + "regexp" + "slices" + "strconv" + "strings" + "time" + + "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubernetes/test/utils/ktesting" +) + +type KubeComponentName string + +// Component names. +// +// They match the names in the local-up-cluster.sh output, if the script runs those components. +const ( + KubeAPIServer = KubeComponentName("kube-apiserver") + KubeControllerManager = KubeComponentName("kube-controller-manager") + KubeScheduler = KubeComponentName("kube-scheduler") + Kubelet = KubeComponentName("kubelet") + KubeProxy = KubeComponentName("kube-proxy") + Kubectl = KubeComponentName("kubectl") + LocalUpCluster = KubeComponentName("local-up-cluster") +) + +// Kubernetes components running in the cluster, in the order in which they need to be started and upgraded. +var KubeClusterComponents = []KubeComponentName{KubeAPIServer, KubeControllerManager, KubeScheduler, Kubelet, KubeProxy} + +// RUN in the local-up-cluster.sh output marks commands that we need to run. +const localUpClusterRunPrefix = "RUN " + +func repoRoot(tCtx ktesting.TContext) string { + for i := 0; ; i++ { + dir := path.Join(".", strings.Repeat("../", i)) + _, err := os.Stat(dir) + tCtx.ExpectNoError(err, "examine parent directory while looking for hack/local-up-cluster.sh (not invoked inside the Kubernetes repository?)") + _, err = os.Stat(path.Join(dir, "hack/local-up-cluster.sh")) + if err == nil { + dir, err = filepath.Abs(dir) + tCtx.ExpectNoError(err, "turn into absolute path") + return dir + } + } +} + +func New(tCtx ktesting.TContext) *Cluster { + tCtx.Helper() + c := &Cluster{} + tCtx.CleanupCtx(c.Stop) + return c +} + +// Cluster represents one cluster. +// +// hack/local-up-cluster.sh must be functional in the current environment. If necessary, +// env variables like CONTAINER_RUNTIME_ENDPOINT have to be set to adapt the script +// to the host system. The current user must have permission to use the container runtime. +// All Kubernetes components run as that user with files stored in a test temp directory. +// +// local-up-cluster.sh does not support more than one cluster per host, so +// tests using this package have to run sequentially. +type Cluster struct { + running map[KubeComponentName]*Cmd + dir string + kubeConfig string + settings map[string]string +} + +// Start brings up the cluster anew. If it was already running, it will be stopped first. +// +// The cluster will be stopped automatically at the end of the test. +// If the ARTIFACTS env variable is set and the test failed, +// log files of the kind cluster get dumped into +// $ARTIFACTS//kind/ before stopping it. +// +// The binary directory must contain kube-apiserver, kube-controller-manager, +// kube-scheduler, kube-proxy, and kubelet. Those binaries can be from a previous +// Kubernetes release. They will be invoked with parameters as defined in the +// *current* local-up-cluster.sh. This works as long as local-up-cluster.sh in its +// default configuration doesn't depend on something which was added only recently. +func (c *Cluster) Start(tCtx ktesting.TContext, bindir string, localUpClusterEnv map[string]string) { + tCtx.Helper() + c.Stop(tCtx) + tCtx.CleanupCtx(func(tCtx ktesting.TContext) { + // Intentional additional lambda function for source code location in log output. + c.Stop(tCtx) + }) + + if artifacts, ok := os.LookupEnv("ARTIFACTS"); ok { + // Sanitize the name: + // - remove E2E [] tags + // - replace whitespaces and some special characters with hyphens + testName := tCtx.Name() + testName = regexp.MustCompile(`\s*\[[^\]]*\]`).ReplaceAllString(testName, "") + testName = regexp.MustCompile(`[[:space:]/:()\\]+`).ReplaceAllString(testName, "-") + testName = strings.Trim(testName, "-") + c.dir = path.Join(artifacts, testName) + tCtx.ExpectNoError(os.MkdirAll(c.dir, 0766), "create artifacts directory for test") + } else { + c.dir = tCtx.TempDir() + } + c.running = make(map[KubeComponentName]*Cmd) + c.settings = make(map[string]string) + + // Spawn local-up-cluster.sh in background, keep it running (for etcd!), + // parse output to pick up commands and run them in order. + lines := make(chan Output, 100) + cmd := &Cmd{ + Name: string(LocalUpCluster), + CommandLine: []string{ + path.Join(repoRoot(tCtx), "hack/local-up-cluster.sh"), + "-o", bindir, + "-d", // dry run + }, + ProcessOutput: func(output Output) { + // Redirect processing into the main goroutine. + lines <- output + }, + AdditionalEnv: localUpClusterEnv, + } + + kubeVerboseStr := cmd.AdditionalEnv["KUBE_VERBOSE"] + kubeVerboseVal, err := strconv.Atoi(kubeVerboseStr) + if kubeVerboseStr == "" { + kubeVerboseVal = 0 + } else { + tCtx.ExpectNoError(err, "KUBE_VERBOSE") + } + if kubeVerboseVal < 2 { + cmd.AdditionalEnv["KUBE_VERBOSE"] = "2" // Enables -x for configuration variable assignments. + } + cmd.Start(tCtx) + c.running[LocalUpCluster] = cmd + +processLocalUpClusterOutput: + for { + select { + case <-tCtx.Done(): + c.Stop(tCtx) + tCtx.Fatalf("interrupted cluster startup: %w", context.Cause(tCtx)) + case output := <-lines: + if c.processLocalUpClusterOutput(tCtx, output) { + break processLocalUpClusterOutput + } + } + } + tCtx.Logf("cluster is running, use KUBECONFIG=%s to access it", c.kubeConfig) +} + +// Matches e.g. "+ API_SECURE_PORT=6443". +var varAssignment = regexp.MustCompile(`^\+ ([A-Z0-9_]+)=(.*)$`) + +func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Output) bool { + if output.EOF { + if output.Line != "" { + tCtx.Fatalf("%s output processing failed: %s", LocalUpCluster, output.Line) + } + tCtx.Fatalf("%s terminated unexpectedly", LocalUpCluster) + } + + tCtx.Logf("local-up-cluster: %s", output.Line) + + if strings.HasPrefix(output.Line, localUpClusterRunPrefix) { + line := output.Line[len(localUpClusterRunPrefix):] + parts := strings.SplitN(line, ": ", 2) + if len(parts) != 2 { + tCtx.Fatalf("unexpected RUN line: %s", output.Line) + } + name := parts[0] + cmdLine := parts[1] + + // Cluster components are kept running. + if slices.Contains(KubeClusterComponents, KubeComponentName(name)) { + c.runKubeComponent(tCtx, KubeComponentName(name), cmdLine) + return false + } + + // Other commands get invoked and need to terminate before we proceed. + c.runCmd(tCtx, name, cmdLine) + return false + } + if m := varAssignment.FindStringSubmatch(output.Line); m != nil { + c.settings[m[1]] = m[2] + if m[1] == "CERT_DIR" { + c.kubeConfig = path.Join(m[2], "admin.kubeconfig") + } + return false + } + if strings.Contains(output.Line, "Local etcd is running. Run commands.") { + // We have seen and processed all commands. + // Time to start testing... + return true + } + + return false +} + +func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, component KubeComponentName, command string) { + commandLine := fromLocalUpClusterOutput(command) + + cmd := &Cmd{ + Name: string(component), + CommandLine: commandLine, + // Number gets bumped when restarting. + LogFile: path.Join(c.dir, fmt.Sprintf("%s-0.log", component)), + // Stopped via Cluster.Stop. + KeepRunning: true, + } + + c.runComponentWithRetry(tCtx, cmd) +} + +func (c *Cluster) runCmd(tCtx ktesting.TContext, name, command string) { + commandLine := fromLocalUpClusterOutput(command) + cmd := &Cmd{ + Name: name, + CommandLine: commandLine, + GatherOutput: true, + } + cmd.Start(tCtx) + cmd.Wait(tCtx) +} + +func fromLocalUpClusterOutput(command string) []string { + // The assumption here is that arguments don't contain spaces. + // We cannot pass the entire string to a shell and let the shell do + // the parsing because some arguments contain special characters like $ + // without quoting them. + return strings.Split(command, " ") +} + +// Stop ensures that the cluster is not running anymore. +func (c *Cluster) Stop(tCtx ktesting.TContext) { + tCtx.Helper() + for _, component := range slices.Backward(KubeClusterComponents) { + cmd := c.running[component] + if cmd == nil { + continue + } + tCtx.Logf("stopping %s", component) + cmd.Stop(tCtx, "stopping cluster") + } +} + +// LoadConfig returns the REST config for the running cluster. +func (c *Cluster) LoadConfig(tCtx ktesting.TContext) *restclient.Config { + tCtx.Helper() + cfg, err := clientcmd.LoadFromFile(c.kubeConfig) + tCtx.ExpectNoError(err, "load KubeConfig") + config, err := clientcmd.NewDefaultClientConfig(*cfg, nil).ClientConfig() + tCtx.ExpectNoError(err, "construct REST config") + return config + +} + +// GetSystemLogs returns the output of the given component, the empty string and false if not started. +func (c *Cluster) GetSystemLogs(tCtx ktesting.TContext, component KubeComponentName) (string, bool) { + cmd, ok := c.running[component] + if !ok { + return "", false + } + return cmd.Output(tCtx), true +} + +type ModifyOptions struct { + // BinDir specifies where to find the replacement Kubernetes components. + // If empty, then only components explicitly listed in FileByComponent + // get modified. + BinDir string + + // FileByComponent overrides BinDir for those components which are specified here. + FileByComponent map[KubeComponentName]string + + // Upgrade determines whether the apiserver gets updated first (upgrade) + // or last (downgrade). + Upgrade bool +} + +func (m ModifyOptions) GetComponentFile(component KubeComponentName) string { + if file, ok := m.FileByComponent[component]; ok { + return file + } + if m.BinDir != "" { + return path.Join(m.BinDir, string(component)) + } + return "" +} + +// Modify changes the cluster as described in the options. +// It returns options that can be passed to Modify unchanged +// to restore the original state. +func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOptions { + tCtx.Helper() + + restore := ModifyOptions{ + FileByComponent: make(map[KubeComponentName]string), + } + + restore.Upgrade = !options.Upgrade + components := slices.Clone(KubeClusterComponents) + if !options.Upgrade { + slices.Reverse(components) + } + for _, component := range components { + c.modifyComponent(tCtx, options, component, &restore) + } + return restore +} + +func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) { + tCtx.Helper() + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("modify %s", component)) + defer ktesting.End(tCtx) + + // We could also do things like turning feature gates on or off. + // For now we only support replacing the file. + if fileName := options.GetComponentFile(component); fileName != "" { + cmd, ok := c.running[component] + if !ok { + tCtx.Fatal("not running") + } + cmd.Stop(tCtx, "modifying the component") + delete(c.running, component) + + // Find the command (might be wrapped by sudo!). + cmdLine := slices.Clone(cmd.CommandLine) + found := false + for i := range cmdLine { + if path.Base(cmdLine[i]) == string(component) { + found = true + restore.FileByComponent[component] = cmdLine[i] + cmdLine[i] = fileName + break + } + } + if !found { + tCtx.Fatal("binary filename not found") + } + cmd.CommandLine = cmdLine + + // New log file. + m := regexp.MustCompile(`^(.*)-([[:digit:]]+)\.log$`).FindStringSubmatch(cmd.LogFile) + if m == nil { + tCtx.Fatalf("unexpected log file, should have contained number: %s", cmd.LogFile) + } + logNum, _ := strconv.Atoi(m[2]) + cmd.LogFile = fmt.Sprintf("%s-%d.log", m[1], logNum+1) + + c.runComponentWithRetry(tCtx, cmd) + } +} + +func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) { + // Sometimes components fail to come up. We have to retry. + // + // For example, the apiserver's port might not be free again yet (no SO_LINGER!). + // Or kube-controller-manager: + // I0630 13:20:45.046709 61710 serving.go:380] Generated self-signed cert (/var/run/kubernetes/kube-controller-manager.crt, /var/run/kubernetes/kube-controller-manager.key) + // W0630 13:20:45.410578 61710 requestheader_controller.go:204] Unable to get configmap/extension-apiserver-authentication in kube-system. Usually fixed by 'kubectl create rolebinding -n kube-system ROLEBINDING_NAME --role=extension-apiserver-authentication-reader --serviceaccount=YOUR_NS:YOUR_SA' + // E0630 13:20:45.410618 61710 run.go:72] "command failed" err="unable to load configmap based request-header-client-ca-file: configmaps \"extension-apiserver-authentication\" is forbidden: User \"system:kube-controller-manager\" cannot get resource \"configmaps\" in API group \"\" in the namespace \"kube-system\"" + // The kube-controller-manager then is stuck. Perhaps it should retry instead? + for i := 0; ; i++ { + tCtx.Logf("running %s with output redirected to %s", cmd.Name, cmd.LogFile) + cmd.Start(tCtx) + c.running[KubeComponentName(cmd.Name)] = cmd + err := func() (finalErr error) { + tCtx, finalize := ktesting.WithError(tCtx, &finalErr) + defer finalize() + c.checkReadiness(tCtx, cmd) + return nil + }() + if err == nil { + break + } + if !cmd.Running() && i < 10 { + // Retry. + time.Sleep(time.Second) + continue + } + // Re-raise the failure. + tCtx.ExpectNoError(err) + } +} + +func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) { + restConfig := c.LoadConfig(tCtx) + tCtx = ktesting.WithRESTConfig(tCtx, restConfig) + tCtx = ktesting.Begin(tCtx, fmt.Sprintf("wait for %s readiness", cmd.Name)) + defer ktesting.End(tCtx) + + switch KubeComponentName(cmd.Name) { + case KubeAPIServer: + c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["API_SECURE_PORT"]) + case KubeScheduler: + c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["SCHEDULER_SECURE_PORT"]) + case KubeControllerManager: + c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["KCM_SECURE_PORT"]) + case KubeProxy: + c.checkHealthz(tCtx, cmd, "http" /* not an error! */, c.settings["API_HOST_IP"], c.settings["PROXY_HEALTHZ_PORT"]) + case Kubelet: + c.checkHealthz(tCtx, cmd, "https", c.settings["KUBELET_HOST"], c.settings["KUBELET_PORT"]) + + // Also wait for the node to be ready. + tCtx = ktesting.Begin(tCtx, "wait for node ready") + defer ktesting.End(tCtx) + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []corev1.Node { + nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{}) + tCtx.ExpectNoError(err, "list nodes") + return nodes.Items + }).Should(gomega.ConsistOf(gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Type": gomega.Equal(corev1.NodeReady), + "Status": gomega.Equal(corev1.ConditionTrue), + }))))) + } +} + +func (c *Cluster) checkHealthz(tCtx ktesting.TContext, cmd *Cmd, method, hostIP, port string) { + url := fmt.Sprintf("%s://%s:%s/healthz", method, hostIP, port) + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + if !cmd.Running() { + return gomega.StopTrying(fmt.Sprintf("%s stopped unexpectedly", cmd.Name)) + } + // Like kube::util::wait_for_url in local-up-cluster.sh we use https, + // but don't check the certificate. + req, err := http.NewRequestWithContext(tCtx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("get %s: %w", url, err) + } + if err := resp.Body.Close(); err != nil { + return fmt.Errorf("close GET response: %w", err) + } + // Any response is fine, we just need to get here. In practice, we get a 403 Forbidden. + return nil + }).Should(gomega.Succeed(), fmt.Sprintf("HTTP GET %s", url)) +}