mirror of
https://github.com/outbackdingo/kubernetes.git
synced 2026-01-27 10:19:35 +00:00
DRA integration: add upgrade/downgrade testing with local-up-cluster.sh
The test brings up the cluster and uses that power to run through an upgrade/downgrade scenario. Version skew testing (running tests while the cluster is partially up- or downgraded) could be added. The new helper code for managing the cluster is written so that it could be used both in an integration test and an E2E test. https://github.com/kubernetes/kubernetes/pull/122481 could make that a bit easier in an E2E test, but is not absolutely required. In contrast to running on a normal cluster, pods need no privileges. Instead, the caller has to make sure that the test itself can write into system directories used by the cluster.
This commit is contained in:
@@ -40,6 +40,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
@@ -170,7 +171,7 @@ func (b *Builder) Pod() *v1.Pod {
|
||||
//
|
||||
// It is tempting to use `terminationGraceperiodSeconds: 0`, but that is a very bad
|
||||
// idea because it removes the pod before the kubelet had a chance to react (https://github.com/kubernetes/kubernetes/issues/120671).
|
||||
pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, b.f.NamespacePodSecurityLevel, "" /* no command = pause */)
|
||||
pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */)
|
||||
pod.Labels = make(map[string]string)
|
||||
pod.Spec.RestartPolicy = v1.RestartPolicyNever
|
||||
pod.GenerateName = ""
|
||||
|
||||
5
test/integration/dra/cluster/.import-restrictions
Normal file
5
test/integration/dra/cluster/.import-restrictions
Normal file
@@ -0,0 +1,5 @@
|
||||
rules:
|
||||
# This test is a Ginkgo suite and, in contrast to other integration tests, can use the E2E framework.
|
||||
- selectorRegexp: k8s[.]io/kubernetes/test/e2e
|
||||
allowedPrefixes:
|
||||
- ""
|
||||
43
test/integration/dra/cluster/README.md
Normal file
43
test/integration/dra/cluster/README.md
Normal file
@@ -0,0 +1,43 @@
|
||||
This directory contains a testsuite with automatic upgrade/downgrade tests for
|
||||
DRA. Conceptually this is like an integration test, in the sense that it
|
||||
starts/stops cluster components and runs tests against them.
|
||||
|
||||
The difference is that it starts Kubernetes components by running the actual
|
||||
binaries, relying on local-up-cluster.sh for the logic and configuration
|
||||
steps. Because local-up-cluster.sh needs additional permissions and
|
||||
preparations on the host, the test cannot run in "make test-integration" and
|
||||
just skips itself there.
|
||||
|
||||
To run it:
|
||||
- Make sure that hack/local-up-cluster.sh works:
|
||||
- sudo must work
|
||||
- Set env variables as necessary for your environment.
|
||||
- Ensure that /var/lib/kubelet/plugins, /var/lib/kubelet/plugins_registry,
|
||||
and /var/run/cdi are writable.
|
||||
- Build binaries with `make`.
|
||||
- Export `KUBERNETES_SERVER_BIN_DIR=$(pwd)/_output/local/bin/linux/amd64` (or
|
||||
whatever is your GOOS/GOARCH and output directory).
|
||||
- Optional: export `KUBERNETES_SERVER_CACHE_DIR=$(pwd)/_output/local/bin/linx/amd64/cache-dir`
|
||||
to reuse downloaded release binaries across test invocations.
|
||||
- Optional: set ARTIFACTS to store component log files persistently.
|
||||
Otherwise a test tmp directory is used.
|
||||
- Invoke as a Go test (no need for the ginkgo CLI), for example:
|
||||
|
||||
go test -v -count=1 ./test/integration/dra/cluster -args -ginkgo.v
|
||||
dlv test ./test/integration/dra/cluster -- -ginkgo.v
|
||||
make test WHAT=test/integration/dra/cluster FULL_LOG=true KUBE_TEST_ARGS="-count=1 -args -ginkgo.v"
|
||||
|
||||
`make test` instead of `make test-integration` is intentional: `local-up-cluster.sh`
|
||||
itself wants to start etcd. `-count=1` ensures that test runs each time it is invoked.
|
||||
`-v` and `-ginkgo.v` make the test output visible while the test runs.
|
||||
|
||||
To simplify starting from scratch, ./test/integration/dra/cluster/run.sh cleans
|
||||
up, sets permissions, and then invokes whatever command is specified on the
|
||||
command line:
|
||||
|
||||
./test/integration/dra/cluster/run.sh go test ./test/integration/dra/cluster
|
||||
|
||||
The test is implemented as a Ginkgo suite because that allows reusing the same
|
||||
helper code as in E2E tests. Long-term the goal is to port that helper code to
|
||||
ktesting, support ktesting in test/e2e, and turn this test into a normal Go
|
||||
test.
|
||||
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package dra
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
31
test/integration/dra/cluster/run.sh
Executable file
31
test/integration/dra/cluster/run.sh
Executable file
@@ -0,0 +1,31 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Copyright 2025 The Kubernetes Authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
set -ex
|
||||
|
||||
killall etcd || true
|
||||
sudo rm -rf /tmp/ginkgo* /tmp/*.log /var/run/kubernetes /var/run/cdi /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins /var/lib/kubelet/*_state /var/lib/kubelet/checkpoints /tmp/artifacts
|
||||
sudo mkdir /var/lib/kubelet/plugins_registry
|
||||
sudo mkdir /var/lib/kubelet/plugins
|
||||
sudo mkdir /var/run/cdi
|
||||
sudo chown "$(id -u)" /var/lib/kubelet/plugins_registry /var/lib/kubelet/plugins /var/run/cdi
|
||||
ARTIFACTS=/tmp/artifacts
|
||||
KUBERNETES_SERVER_BIN_DIR="$(pwd)/_output/local/bin/$(go env GOOS)/$(go env GOARCH)"
|
||||
KUBERNETES_SERVER_CACHE_DIR="${KUBERNETES_SERVER_BIN_DIR}/cache-dir"
|
||||
|
||||
export ARTIFACTS KUBERNETES_SERVER_BIN_DIR KUBERNETES_SERVER_CACHE_DIR
|
||||
|
||||
exec "$@"
|
||||
368
test/integration/dra/cluster/upgradedowngrade_test.go
Normal file
368
test/integration/dra/cluster/upgradedowngrade_test.go
Normal file
@@ -0,0 +1,368 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
_ "embed"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/version"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/cmd/kubeadm/app/util/errors"
|
||||
drautils "k8s.io/kubernetes/test/e2e/dra/utils"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/kubernetes/test/utils/localupcluster"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// -v=5 may be useful to debug driver operations, but usually isn't needed.
|
||||
ktesting.SetDefaultVerbosity(2)
|
||||
}
|
||||
|
||||
var repoRoot = repoRootDefault()
|
||||
|
||||
func currentBinDir() (envName, content string) {
|
||||
envName = "KUBERNETES_SERVER_BIN_DIR"
|
||||
content, _ = os.LookupEnv(envName)
|
||||
return
|
||||
}
|
||||
|
||||
// repoRootDefault figures out whether an E2E suite is invoked in its directory (as in `go test ./test/e2e`),
|
||||
// directly in the root (as in `make test-e2e` or `ginkgo ./test/e2e`), or somewhere deep inside
|
||||
// the _output directory (`ginkgo _output/bin/e2e.test` where `_output/bin` is actually a symlink).
|
||||
func repoRootDefault() string {
|
||||
for i := 0; i < 10; i++ {
|
||||
path := "." + strings.Repeat("/..", i)
|
||||
if _, err := os.Stat(path + "/test/e2e/framework"); err == nil {
|
||||
return path
|
||||
}
|
||||
}
|
||||
// Traditional default.
|
||||
return "../../"
|
||||
}
|
||||
|
||||
func TestUpgradeDowngrade(t *testing.T) {
|
||||
if envName, dir := currentBinDir(); dir == "" {
|
||||
t.Skipf("%s must be set to test DRA upgrade/downgrade scenarios.", envName)
|
||||
}
|
||||
suiteConfig, reporterConfig := framework.CreateGinkgoConfig()
|
||||
ginkgo.RunSpecs(t, "DRA", suiteConfig, reporterConfig)
|
||||
}
|
||||
|
||||
var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
|
||||
// Initialize the default values by registering flags. We don't actually expose those flags.
|
||||
var fs flag.FlagSet
|
||||
framework.RegisterCommonFlags(&fs)
|
||||
framework.RegisterClusterFlags(&fs)
|
||||
|
||||
// Some other things normally done by test/e2e.
|
||||
e2etestfiles.AddFileSource(e2etestfiles.RootFileSource{Root: repoRoot})
|
||||
gomega.RegisterFailHandler(ginkgo.Fail)
|
||||
|
||||
ginkgo.It("works", func(ctx context.Context) {
|
||||
// TODO: replace with helper code from https://github.com/kubernetes/kubernetes/pull/122481 should that get merged.
|
||||
tCtx := ktesting.Init(GinkgoContextTB())
|
||||
tCtx = ktesting.WithContext(tCtx, ctx)
|
||||
|
||||
// Determine what we need to downgrade to.
|
||||
tCtx = ktesting.Begin(tCtx, "get source code version")
|
||||
gitVersion, _, err := sourceVersion(tCtx, repoRoot)
|
||||
tCtx.ExpectNoError(err, "determine source code version for repo root %q", repoRoot)
|
||||
version, err := version.ParseGeneric(gitVersion)
|
||||
tCtx.ExpectNoError(err, "parse version %s of repo root %q", gitVersion, repoRoot)
|
||||
major, previousMinor := version.Major(), version.Minor()-1
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
// KUBERNETES_SERVER_CACHE_DIR can be set to keep downloaded files across test restarts.
|
||||
binDir, cacheBinaries := os.LookupEnv("KUBERNETES_SERVER_CACHE_DIR")
|
||||
if !cacheBinaries {
|
||||
binDir = tCtx.TempDir()
|
||||
}
|
||||
haveBinaries := false
|
||||
|
||||
// Get the previous release, if necessary.
|
||||
previousURL, previousVersion := serverDownloadURL(tCtx, major, previousMinor)
|
||||
if cacheBinaries {
|
||||
binDir = path.Join(binDir, previousVersion)
|
||||
_, err := os.Stat(path.Join(binDir, string(localupcluster.KubeClusterComponents[0])))
|
||||
if err == nil {
|
||||
haveBinaries = true
|
||||
}
|
||||
}
|
||||
if !haveBinaries {
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("download and unpack %s", previousURL))
|
||||
req, err := http.NewRequestWithContext(tCtx, http.MethodGet, previousURL, nil)
|
||||
tCtx.ExpectNoError(err, "construct request")
|
||||
response, err := http.DefaultClient.Do(req)
|
||||
tCtx.ExpectNoError(err, "download")
|
||||
defer func() {
|
||||
_ = response.Body.Close()
|
||||
}()
|
||||
decompress, err := gzip.NewReader(response.Body)
|
||||
tCtx.ExpectNoError(err, "construct gzip reader")
|
||||
unpack := tar.NewReader(decompress)
|
||||
for {
|
||||
header, err := unpack.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
base := path.Base(header.Name)
|
||||
if slices.Contains(localupcluster.KubeClusterComponents, localupcluster.KubeComponentName(base)) {
|
||||
data, err := io.ReadAll(unpack)
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("read content of %s", header.Name))
|
||||
tCtx.ExpectNoError(os.MkdirAll(binDir, 0755), "create directory for binaries")
|
||||
tCtx.ExpectNoError(os.WriteFile(path.Join(binDir, base), data, 0555), fmt.Sprintf("write content of %s", header.Name))
|
||||
}
|
||||
}
|
||||
tCtx = ktesting.End(tCtx)
|
||||
}
|
||||
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("bring up v%d.%d", major, previousMinor))
|
||||
cluster := localupcluster.New(tCtx)
|
||||
localUpClusterEnv := map[string]string{
|
||||
"RUNTIME_CONFIG": "resource.k8s.io/v1beta1,resource.k8s.io/v1beta2",
|
||||
"FEATURE_GATES": "DynamicResourceAllocation=true",
|
||||
// *not* needed because driver will run in "local filesystem" mode (= driver.IsLocal): "ALLOW_PRIVILEGED": "1",
|
||||
}
|
||||
cluster.Start(tCtx, binDir, localUpClusterEnv)
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
restConfig := cluster.LoadConfig(tCtx)
|
||||
restConfig.UserAgent = fmt.Sprintf("%s -- dra", restclient.DefaultKubernetesUserAgent())
|
||||
tCtx = ktesting.WithRESTConfig(tCtx, restConfig)
|
||||
// TODO: rewrite all DRA test code to use ktesting.TContext once https://github.com/kubernetes/kubernetes/pull/122481 is
|
||||
// merged, then we don't need to fake a Framework instance.
|
||||
f := &framework.Framework{
|
||||
BaseName: "dra",
|
||||
Timeouts: framework.NewTimeoutContext(),
|
||||
ClientSet: tCtx.Client(),
|
||||
DynamicClient: tCtx.Dynamic(),
|
||||
|
||||
// The driver containers have to run with sufficient privileges to
|
||||
// modify /var/lib/kubelet/plugins.
|
||||
NamespacePodSecurityLevel: admissionapi.LevelPrivileged,
|
||||
}
|
||||
f.SetClientConfig(restConfig)
|
||||
|
||||
namespace, err := f.CreateNamespace(tCtx, f.BaseName, map[string]string{
|
||||
"e2e-framework": f.BaseName,
|
||||
})
|
||||
tCtx.ExpectNoError(err, "create namespace")
|
||||
f.Namespace = namespace
|
||||
f.UniqueName = namespace.Name
|
||||
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("v%d.%d", major, previousMinor))
|
||||
|
||||
tCtx.ExpectNoError(e2enode.WaitForAllNodesSchedulable(tCtx, tCtx.Client(), f.Timeouts.NodeSchedulable), "wait for all nodes to be schedulable")
|
||||
nodes := drautils.NewNodesNow(tCtx, f, 1, 1)
|
||||
|
||||
// Opening sockets locally avoids intermittent errors and delays caused by proxying through the restarted apiserver.
|
||||
// We could speed up testing by shortening the sync delay in the ResourceSlice controller, but let's better
|
||||
// test the defaults.
|
||||
driver := drautils.NewDriverInstance(f)
|
||||
driver.IsLocal = true
|
||||
driver.Run(nodes, drautils.DriverResourcesNow(nodes, 1))
|
||||
b := drautils.NewBuilderNow(ctx, f, driver)
|
||||
|
||||
claim := b.ExternalClaim()
|
||||
pod := b.PodExternal()
|
||||
b.Create(ctx, claim, pod)
|
||||
b.TestPod(ctx, f, pod)
|
||||
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("update to %s", gitVersion))
|
||||
// We could split this up into first updating the apiserver, then control plane components, then restarting kubelet.
|
||||
// For the purpose of this test here we we primarily care about full before/after comparisons, so not done yet.
|
||||
// TODO
|
||||
_, dir := currentBinDir()
|
||||
restoreOptions := cluster.Modify(tCtx, localupcluster.ModifyOptions{Upgrade: true, BinDir: dir})
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
// The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running.
|
||||
// Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices.
|
||||
tCtx = ktesting.Begin(tCtx, "wait for ResourceSlices")
|
||||
gomega.Eventually(ctx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
// Remove pod prepared by previous Kubernetes.
|
||||
framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
|
||||
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
|
||||
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, namespace.Name, f.Timeouts.PodDelete))
|
||||
|
||||
// Create another claim and pod, this time using the latest Kubernetes.
|
||||
claim = b.ExternalClaim()
|
||||
pod = b.PodExternal()
|
||||
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
|
||||
b.Create(ctx, claim, pod)
|
||||
b.TestPod(ctx, f, pod)
|
||||
|
||||
// Roll back.
|
||||
tCtx = ktesting.Begin(tCtx, "downgrade")
|
||||
cluster.Modify(tCtx, restoreOptions)
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
// TODO: ensure that kube-controller-manager is up-and-running.
|
||||
// This works around https://github.com/kubernetes/kubernetes/issues/132334 and can be removed
|
||||
// once a fix for that is backported.
|
||||
tCtx = ktesting.Begin(tCtx, "wait for kube-controller-manager")
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) string {
|
||||
output, _ := cluster.GetSystemLogs(tCtx, localupcluster.KubeControllerManager)
|
||||
return output
|
||||
}).Should(gomega.ContainSubstring(`"Caches are synced" controller="resource_claim"`))
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
// We need to clean up explicitly because the normal
|
||||
// cleanup doesn't work (driver shuts down first).
|
||||
//
|
||||
// The retry loops are necessary because of a stale connection
|
||||
// to the restarted apiserver. Sometimes, attempts fail with "EOF" as error
|
||||
// or (even weirder) with
|
||||
// getting *v1.Pod: pods "tester-2" is forbidden: User "kubernetes-admin" cannot get resource "pods" in API group "" in the namespace "dra-9021"
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
|
||||
return f.ClientSet.ResourceV1beta1().ResourceClaims(namespace.Name).Delete(tCtx, claim.Name, metav1.DeleteOptions{})
|
||||
}).Should(gomega.Succeed(), "delete claim after downgrade")
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
|
||||
return f.ClientSet.CoreV1().Pods(namespace.Name).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
|
||||
}).Should(gomega.Succeed(), "delete pod after downgrade")
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) *v1.Pod {
|
||||
pod, err := f.ClientSet.CoreV1().Pods(namespace.Name).Get(tCtx, pod.Name, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
tCtx.ExpectNoError(err, "get pod")
|
||||
return pod
|
||||
}).Should(gomega.BeNil(), "no pod after deletion after downgrade")
|
||||
})
|
||||
})
|
||||
|
||||
// sourceVersion identifies the Kubernetes git version based on hack/print-workspace-status.sh.
|
||||
//
|
||||
// Adapted from https://github.com/kubernetes-sigs/kind/blob/3df64e784cc0ea74125b2a2e9877817418afa3af/pkg/build/nodeimage/internal/kube/source.go#L71-L104
|
||||
func sourceVersion(tCtx ktesting.TContext, kubeRoot string) (gitVersion string, dockerTag string, err error) {
|
||||
// Get the version output.
|
||||
cmd := exec.CommandContext(tCtx, "hack/print-workspace-status.sh")
|
||||
cmd.Dir = kubeRoot
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// Parse it.
|
||||
for _, line := range strings.Split(string(output), "\n") {
|
||||
parts := strings.SplitN(line, " ", 2)
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
switch parts[0] {
|
||||
case "gitVersion":
|
||||
gitVersion = parts[1]
|
||||
case "STABLE_DOCKER_TAG":
|
||||
dockerTag = parts[1]
|
||||
}
|
||||
}
|
||||
if gitVersion == "" {
|
||||
return "", "", errors.Errorf("could not obtain Kubernetes git version: %q", string(output))
|
||||
|
||||
}
|
||||
if dockerTag == "" {
|
||||
return "", "", errors.Errorf("count not obtain docker tag: %q", string(output))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// func runCmdIn(tCtx ktesting.TContext, dir string, name string, args ...string) string {
|
||||
// tCtx.Helper()
|
||||
// tCtx.Logf("Running command: %s %s", name, strings.Join(args, " "))
|
||||
// cmd := exec.CommandContext(tCtx, name, args...)
|
||||
// cmd.Dir = dir
|
||||
// var output strings.Builder
|
||||
// reader, writer := io.Pipe()
|
||||
// cmd.Stdout = writer
|
||||
// cmd.Stderr = writer
|
||||
// tCtx.ExpectNoError(cmd.Start(), "start %s command", name)
|
||||
// scanner := bufio.NewScanner(reader)
|
||||
// var wg sync.WaitGroup
|
||||
// wg.Add(1)
|
||||
// go func() {
|
||||
// defer wg.Done()
|
||||
// for scanner.Scan() {
|
||||
// line := scanner.Text()
|
||||
// line = strings.TrimSuffix(line, "\n")
|
||||
// tCtx.Logf("%s: %s", name, line)
|
||||
// output.WriteString(line)
|
||||
// output.WriteByte('\n')
|
||||
// }
|
||||
// }()
|
||||
// result := cmd.Wait()
|
||||
// tCtx.ExpectNoError(writer.Close(), "close in-memory pipe")
|
||||
// wg.Wait()
|
||||
// tCtx.ExpectNoError(result, fmt.Sprintf("%s command failed, output:\n%s", name, output.String()))
|
||||
// tCtx.ExpectNoError(scanner.Err(), "read %s command output", name)
|
||||
|
||||
// return output.String()
|
||||
// }
|
||||
|
||||
// serverDownloadURL returns the full URL for a kubernetes-server archive matching
|
||||
// the current GOOS/GOARCH for the given major/minor version of Kubernetes.
|
||||
//
|
||||
// This considers only proper releases.
|
||||
func serverDownloadURL(tCtx ktesting.TContext, major, minor uint) (string, string) {
|
||||
tCtx.Helper()
|
||||
url := fmt.Sprintf("https://dl.k8s.io/release/stable-%d.%d.txt", major, minor)
|
||||
get, err := http.NewRequestWithContext(tCtx, http.MethodGet, url, nil)
|
||||
tCtx.ExpectNoError(err, "construct GET for %s", url)
|
||||
resp, err := http.DefaultClient.Do(get)
|
||||
tCtx.ExpectNoError(err, "get %s", url)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
tCtx.Fatalf("get %s: %d - %s", url, resp.StatusCode, resp.Status)
|
||||
}
|
||||
if resp.Body == nil {
|
||||
tCtx.Fatalf("empty response for %s", url)
|
||||
}
|
||||
defer func() {
|
||||
tCtx.ExpectNoError(resp.Body.Close(), "close response body")
|
||||
}()
|
||||
version, err := io.ReadAll(resp.Body)
|
||||
tCtx.ExpectNoError(err, "read response body for %s", url)
|
||||
return fmt.Sprintf("https://dl.k8s.io/release/%s/kubernetes-server-%s-%s.tar.gz", string(version), runtime.GOOS, runtime.GOARCH), string(version)
|
||||
}
|
||||
226
test/utils/localupcluster/cmd.go
Normal file
226
test/utils/localupcluster/cmd.go
Normal file
@@ -0,0 +1,226 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package localupcluster contains wrapper code around invoking hack/local-up-cluster.sh
|
||||
// and managing the resulting cluster.
|
||||
package localupcluster
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
type Cmd struct {
|
||||
// Name is a short, descriptive name.
|
||||
Name string
|
||||
|
||||
// CommandLine is the complete command line, including the command itself.
|
||||
CommandLine []string
|
||||
|
||||
// AdditionalEnv gets added to the current environment.
|
||||
AdditionalEnv map[string]string
|
||||
|
||||
// Log output as it gets printed.
|
||||
LogOutput bool
|
||||
|
||||
// ProcessOutput, it non-nil, gets called for each line printed by the
|
||||
// command on stderr or stdout. The line does not include the trailing
|
||||
// newline.
|
||||
//
|
||||
// Called with EOF true when output processing stops. This implies
|
||||
// that the command has stopped or output processing failed. A non-empty
|
||||
// line in this case is the output processing error.
|
||||
ProcessOutput func(output Output)
|
||||
|
||||
// Gather output in a string buffer. That collected output is returned by Wait and Stop.
|
||||
// If disabled, those methods return the empty string.
|
||||
GatherOutput bool
|
||||
|
||||
// LogFile specifies a file to write the output to.
|
||||
// Can be combined with other options for output handling.
|
||||
// If it's the only one, then the command writes directly
|
||||
// into the file.
|
||||
LogFile string
|
||||
|
||||
// KeepRunning ensures that the command is kept running beyond the end of its context,
|
||||
// i.e. context cancellation is ignored. Such commands have to be stopped explicitly.
|
||||
KeepRunning bool
|
||||
|
||||
cancel func(string)
|
||||
cmd *exec.Cmd
|
||||
wg sync.WaitGroup
|
||||
running atomic.Pointer[bool]
|
||||
result error
|
||||
gathering bool
|
||||
|
||||
mutex sync.RWMutex
|
||||
output strings.Builder
|
||||
}
|
||||
|
||||
type Output struct {
|
||||
Line string
|
||||
EOF bool
|
||||
}
|
||||
|
||||
func (c *Cmd) Start(tCtx ktesting.TContext) {
|
||||
tCtx.Helper()
|
||||
tCtx.Logf("running command %s: %s", c.Name, strings.Join(c.CommandLine, " "))
|
||||
if c.KeepRunning {
|
||||
tCtx = ktesting.WithoutCancel(tCtx)
|
||||
}
|
||||
tCtx = ktesting.WithCancel(tCtx)
|
||||
c.cancel = tCtx.Cancel
|
||||
c.cmd = exec.CommandContext(tCtx, c.CommandLine[0], c.CommandLine[1:]...)
|
||||
c.gathering = false
|
||||
|
||||
c.cmd.Env = os.Environ()
|
||||
for k, v := range c.AdditionalEnv {
|
||||
c.cmd.Env = append(c.cmd.Env, k+"="+v)
|
||||
}
|
||||
|
||||
var reader io.Reader
|
||||
var writer io.WriteCloser
|
||||
|
||||
c.gathering = false
|
||||
switch {
|
||||
case c.LogOutput || c.ProcessOutput != nil || c.GatherOutput:
|
||||
// Process each line through an in-memory pipe.
|
||||
reader, writer = io.Pipe()
|
||||
c.gathering = true
|
||||
case c.LogFile != "":
|
||||
// Let command write directly.
|
||||
f, err := os.Create(c.LogFile)
|
||||
tCtx.ExpectNoError(err, "create log file")
|
||||
writer = f
|
||||
}
|
||||
c.cmd.Stdout = writer
|
||||
c.cmd.Stderr = writer
|
||||
|
||||
tCtx.ExpectNoError(c.cmd.Start(), "start %s command", c.Name)
|
||||
c.running.Store(ptr.To(true))
|
||||
|
||||
if reader != nil {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
line = strings.TrimSuffix(line, "\n")
|
||||
if c.LogOutput {
|
||||
tCtx.Logf("%s: %s", c.Name, line)
|
||||
}
|
||||
if c.ProcessOutput != nil {
|
||||
c.ProcessOutput(Output{Line: line})
|
||||
}
|
||||
if c.GatherOutput {
|
||||
c.mutex.Lock()
|
||||
c.output.WriteString(line)
|
||||
c.output.WriteByte('\n')
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
if c.ProcessOutput != nil {
|
||||
if err := scanner.Err(); err != nil {
|
||||
c.ProcessOutput(Output{Line: err.Error(), EOF: true})
|
||||
} else {
|
||||
c.ProcessOutput(Output{EOF: true})
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.result = c.cmd.Wait()
|
||||
now := time.Now()
|
||||
if reader != nil {
|
||||
// Has to be closed to stop output processing, otherwise the scanner
|
||||
// keeps reading because someone might still write something.
|
||||
_ = writer.Close()
|
||||
}
|
||||
if c.LogFile != "" {
|
||||
f, err := os.OpenFile(c.LogFile, os.O_WRONLY|os.O_APPEND, 0666)
|
||||
if err == nil {
|
||||
defer func() {
|
||||
_ = f.Close()
|
||||
}()
|
||||
_, _ = fmt.Fprintf(f, "%s: terminated, result: %v\n", now, c.result)
|
||||
if err := context.Cause(tCtx); err != nil {
|
||||
_, _ = fmt.Fprintf(f, "%s: killed because command context was canceled: %v\n", now, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
c.running.Store(ptr.To(false))
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Cmd) Wait(tCtx ktesting.TContext) string {
|
||||
return c.wait(tCtx, false)
|
||||
}
|
||||
|
||||
func (c *Cmd) Stop(tCtx ktesting.TContext, reason string) string {
|
||||
tCtx.Helper()
|
||||
if c.cancel == nil {
|
||||
// Not started...
|
||||
return ""
|
||||
}
|
||||
c.cancel(reason)
|
||||
return c.wait(tCtx, true)
|
||||
}
|
||||
|
||||
func (c *Cmd) wait(tCtx ktesting.TContext, killed bool) string {
|
||||
tCtx.Helper()
|
||||
c.wg.Wait()
|
||||
if !killed {
|
||||
tCtx.ExpectNoError(c.result, fmt.Sprintf("%s command failed, output:\n%s", c.Name, c.output.String()))
|
||||
}
|
||||
return c.output.String()
|
||||
}
|
||||
|
||||
func (c *Cmd) Running() bool {
|
||||
return ptr.Deref(c.running.Load(), false)
|
||||
}
|
||||
|
||||
func (c *Cmd) Output(tCtx ktesting.TContext) string {
|
||||
if c.gathering {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
return c.output.String()
|
||||
}
|
||||
|
||||
if c.LogFile != "" {
|
||||
f, err := os.Open(c.LogFile)
|
||||
tCtx.ExpectNoError(err, "open log file")
|
||||
content, err := io.ReadAll(f)
|
||||
tCtx.ExpectNoError(err, "read log file")
|
||||
return string(content)
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
488
test/utils/localupcluster/localupcluster.go
Normal file
488
test/utils/localupcluster/localupcluster.go
Normal file
@@ -0,0 +1,488 @@
|
||||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package localupcluster contains wrapper code around invoking hack/local-up-cluster.sh
|
||||
// and managing the resulting cluster.
|
||||
//
|
||||
// The basic mode of operation is:
|
||||
// - local-up-cluster.sh contains all the logic of how to configure and invoke components.
|
||||
// - local-up-cluster.sh is run in dry-run mode, which prints all commands and their parameters
|
||||
// without actually running them, except for etcd: etcd's lifecycle is managed as before
|
||||
// by the caller or local-up-cluster.sh.
|
||||
// - local-up-cluster.sh is kept running as long as the cluster runs to keep etcd running and
|
||||
// generated configuration files around.
|
||||
// - This package takes care of running the commands, including output redirection.
|
||||
// - It can stop commands and run them again using different binaries to similar upgrades
|
||||
// or downgrades.
|
||||
package localupcluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/onsi/gomega/gstruct"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
type KubeComponentName string
|
||||
|
||||
// Component names.
|
||||
//
|
||||
// They match the names in the local-up-cluster.sh output, if the script runs those components.
|
||||
const (
|
||||
KubeAPIServer = KubeComponentName("kube-apiserver")
|
||||
KubeControllerManager = KubeComponentName("kube-controller-manager")
|
||||
KubeScheduler = KubeComponentName("kube-scheduler")
|
||||
Kubelet = KubeComponentName("kubelet")
|
||||
KubeProxy = KubeComponentName("kube-proxy")
|
||||
Kubectl = KubeComponentName("kubectl")
|
||||
LocalUpCluster = KubeComponentName("local-up-cluster")
|
||||
)
|
||||
|
||||
// Kubernetes components running in the cluster, in the order in which they need to be started and upgraded.
|
||||
var KubeClusterComponents = []KubeComponentName{KubeAPIServer, KubeControllerManager, KubeScheduler, Kubelet, KubeProxy}
|
||||
|
||||
// RUN <name> <command line> in the local-up-cluster.sh output marks commands that we need to run.
|
||||
const localUpClusterRunPrefix = "RUN "
|
||||
|
||||
func repoRoot(tCtx ktesting.TContext) string {
|
||||
for i := 0; ; i++ {
|
||||
dir := path.Join(".", strings.Repeat("../", i))
|
||||
_, err := os.Stat(dir)
|
||||
tCtx.ExpectNoError(err, "examine parent directory while looking for hack/local-up-cluster.sh (not invoked inside the Kubernetes repository?)")
|
||||
_, err = os.Stat(path.Join(dir, "hack/local-up-cluster.sh"))
|
||||
if err == nil {
|
||||
dir, err = filepath.Abs(dir)
|
||||
tCtx.ExpectNoError(err, "turn into absolute path")
|
||||
return dir
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func New(tCtx ktesting.TContext) *Cluster {
|
||||
tCtx.Helper()
|
||||
c := &Cluster{}
|
||||
tCtx.CleanupCtx(c.Stop)
|
||||
return c
|
||||
}
|
||||
|
||||
// Cluster represents one cluster.
|
||||
//
|
||||
// hack/local-up-cluster.sh must be functional in the current environment. If necessary,
|
||||
// env variables like CONTAINER_RUNTIME_ENDPOINT have to be set to adapt the script
|
||||
// to the host system. The current user must have permission to use the container runtime.
|
||||
// All Kubernetes components run as that user with files stored in a test temp directory.
|
||||
//
|
||||
// local-up-cluster.sh does not support more than one cluster per host, so
|
||||
// tests using this package have to run sequentially.
|
||||
type Cluster struct {
|
||||
running map[KubeComponentName]*Cmd
|
||||
dir string
|
||||
kubeConfig string
|
||||
settings map[string]string
|
||||
}
|
||||
|
||||
// Start brings up the cluster anew. If it was already running, it will be stopped first.
|
||||
//
|
||||
// The cluster will be stopped automatically at the end of the test.
|
||||
// If the ARTIFACTS env variable is set and the test failed,
|
||||
// log files of the kind cluster get dumped into
|
||||
// $ARTIFACTS/<test name>/kind/<cluster name> before stopping it.
|
||||
//
|
||||
// The binary directory must contain kube-apiserver, kube-controller-manager,
|
||||
// kube-scheduler, kube-proxy, and kubelet. Those binaries can be from a previous
|
||||
// Kubernetes release. They will be invoked with parameters as defined in the
|
||||
// *current* local-up-cluster.sh. This works as long as local-up-cluster.sh in its
|
||||
// default configuration doesn't depend on something which was added only recently.
|
||||
func (c *Cluster) Start(tCtx ktesting.TContext, bindir string, localUpClusterEnv map[string]string) {
|
||||
tCtx.Helper()
|
||||
c.Stop(tCtx)
|
||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
||||
// Intentional additional lambda function for source code location in log output.
|
||||
c.Stop(tCtx)
|
||||
})
|
||||
|
||||
if artifacts, ok := os.LookupEnv("ARTIFACTS"); ok {
|
||||
// Sanitize the name:
|
||||
// - remove E2E [] tags
|
||||
// - replace whitespaces and some special characters with hyphens
|
||||
testName := tCtx.Name()
|
||||
testName = regexp.MustCompile(`\s*\[[^\]]*\]`).ReplaceAllString(testName, "")
|
||||
testName = regexp.MustCompile(`[[:space:]/:()\\]+`).ReplaceAllString(testName, "-")
|
||||
testName = strings.Trim(testName, "-")
|
||||
c.dir = path.Join(artifacts, testName)
|
||||
tCtx.ExpectNoError(os.MkdirAll(c.dir, 0766), "create artifacts directory for test")
|
||||
} else {
|
||||
c.dir = tCtx.TempDir()
|
||||
}
|
||||
c.running = make(map[KubeComponentName]*Cmd)
|
||||
c.settings = make(map[string]string)
|
||||
|
||||
// Spawn local-up-cluster.sh in background, keep it running (for etcd!),
|
||||
// parse output to pick up commands and run them in order.
|
||||
lines := make(chan Output, 100)
|
||||
cmd := &Cmd{
|
||||
Name: string(LocalUpCluster),
|
||||
CommandLine: []string{
|
||||
path.Join(repoRoot(tCtx), "hack/local-up-cluster.sh"),
|
||||
"-o", bindir,
|
||||
"-d", // dry run
|
||||
},
|
||||
ProcessOutput: func(output Output) {
|
||||
// Redirect processing into the main goroutine.
|
||||
lines <- output
|
||||
},
|
||||
AdditionalEnv: localUpClusterEnv,
|
||||
}
|
||||
|
||||
kubeVerboseStr := cmd.AdditionalEnv["KUBE_VERBOSE"]
|
||||
kubeVerboseVal, err := strconv.Atoi(kubeVerboseStr)
|
||||
if kubeVerboseStr == "" {
|
||||
kubeVerboseVal = 0
|
||||
} else {
|
||||
tCtx.ExpectNoError(err, "KUBE_VERBOSE")
|
||||
}
|
||||
if kubeVerboseVal < 2 {
|
||||
cmd.AdditionalEnv["KUBE_VERBOSE"] = "2" // Enables -x for configuration variable assignments.
|
||||
}
|
||||
cmd.Start(tCtx)
|
||||
c.running[LocalUpCluster] = cmd
|
||||
|
||||
processLocalUpClusterOutput:
|
||||
for {
|
||||
select {
|
||||
case <-tCtx.Done():
|
||||
c.Stop(tCtx)
|
||||
tCtx.Fatalf("interrupted cluster startup: %w", context.Cause(tCtx))
|
||||
case output := <-lines:
|
||||
if c.processLocalUpClusterOutput(tCtx, output) {
|
||||
break processLocalUpClusterOutput
|
||||
}
|
||||
}
|
||||
}
|
||||
tCtx.Logf("cluster is running, use KUBECONFIG=%s to access it", c.kubeConfig)
|
||||
}
|
||||
|
||||
// Matches e.g. "+ API_SECURE_PORT=6443".
|
||||
var varAssignment = regexp.MustCompile(`^\+ ([A-Z0-9_]+)=(.*)$`)
|
||||
|
||||
func (c *Cluster) processLocalUpClusterOutput(tCtx ktesting.TContext, output Output) bool {
|
||||
if output.EOF {
|
||||
if output.Line != "" {
|
||||
tCtx.Fatalf("%s output processing failed: %s", LocalUpCluster, output.Line)
|
||||
}
|
||||
tCtx.Fatalf("%s terminated unexpectedly", LocalUpCluster)
|
||||
}
|
||||
|
||||
tCtx.Logf("local-up-cluster: %s", output.Line)
|
||||
|
||||
if strings.HasPrefix(output.Line, localUpClusterRunPrefix) {
|
||||
line := output.Line[len(localUpClusterRunPrefix):]
|
||||
parts := strings.SplitN(line, ": ", 2)
|
||||
if len(parts) != 2 {
|
||||
tCtx.Fatalf("unexpected RUN line: %s", output.Line)
|
||||
}
|
||||
name := parts[0]
|
||||
cmdLine := parts[1]
|
||||
|
||||
// Cluster components are kept running.
|
||||
if slices.Contains(KubeClusterComponents, KubeComponentName(name)) {
|
||||
c.runKubeComponent(tCtx, KubeComponentName(name), cmdLine)
|
||||
return false
|
||||
}
|
||||
|
||||
// Other commands get invoked and need to terminate before we proceed.
|
||||
c.runCmd(tCtx, name, cmdLine)
|
||||
return false
|
||||
}
|
||||
if m := varAssignment.FindStringSubmatch(output.Line); m != nil {
|
||||
c.settings[m[1]] = m[2]
|
||||
if m[1] == "CERT_DIR" {
|
||||
c.kubeConfig = path.Join(m[2], "admin.kubeconfig")
|
||||
}
|
||||
return false
|
||||
}
|
||||
if strings.Contains(output.Line, "Local etcd is running. Run commands.") {
|
||||
// We have seen and processed all commands.
|
||||
// Time to start testing...
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *Cluster) runKubeComponent(tCtx ktesting.TContext, component KubeComponentName, command string) {
|
||||
commandLine := fromLocalUpClusterOutput(command)
|
||||
|
||||
cmd := &Cmd{
|
||||
Name: string(component),
|
||||
CommandLine: commandLine,
|
||||
// Number gets bumped when restarting.
|
||||
LogFile: path.Join(c.dir, fmt.Sprintf("%s-0.log", component)),
|
||||
// Stopped via Cluster.Stop.
|
||||
KeepRunning: true,
|
||||
}
|
||||
|
||||
c.runComponentWithRetry(tCtx, cmd)
|
||||
}
|
||||
|
||||
func (c *Cluster) runCmd(tCtx ktesting.TContext, name, command string) {
|
||||
commandLine := fromLocalUpClusterOutput(command)
|
||||
cmd := &Cmd{
|
||||
Name: name,
|
||||
CommandLine: commandLine,
|
||||
GatherOutput: true,
|
||||
}
|
||||
cmd.Start(tCtx)
|
||||
cmd.Wait(tCtx)
|
||||
}
|
||||
|
||||
func fromLocalUpClusterOutput(command string) []string {
|
||||
// The assumption here is that arguments don't contain spaces.
|
||||
// We cannot pass the entire string to a shell and let the shell do
|
||||
// the parsing because some arguments contain special characters like $
|
||||
// without quoting them.
|
||||
return strings.Split(command, " ")
|
||||
}
|
||||
|
||||
// Stop ensures that the cluster is not running anymore.
|
||||
func (c *Cluster) Stop(tCtx ktesting.TContext) {
|
||||
tCtx.Helper()
|
||||
for _, component := range slices.Backward(KubeClusterComponents) {
|
||||
cmd := c.running[component]
|
||||
if cmd == nil {
|
||||
continue
|
||||
}
|
||||
tCtx.Logf("stopping %s", component)
|
||||
cmd.Stop(tCtx, "stopping cluster")
|
||||
}
|
||||
}
|
||||
|
||||
// LoadConfig returns the REST config for the running cluster.
|
||||
func (c *Cluster) LoadConfig(tCtx ktesting.TContext) *restclient.Config {
|
||||
tCtx.Helper()
|
||||
cfg, err := clientcmd.LoadFromFile(c.kubeConfig)
|
||||
tCtx.ExpectNoError(err, "load KubeConfig")
|
||||
config, err := clientcmd.NewDefaultClientConfig(*cfg, nil).ClientConfig()
|
||||
tCtx.ExpectNoError(err, "construct REST config")
|
||||
return config
|
||||
|
||||
}
|
||||
|
||||
// GetSystemLogs returns the output of the given component, the empty string and false if not started.
|
||||
func (c *Cluster) GetSystemLogs(tCtx ktesting.TContext, component KubeComponentName) (string, bool) {
|
||||
cmd, ok := c.running[component]
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
return cmd.Output(tCtx), true
|
||||
}
|
||||
|
||||
type ModifyOptions struct {
|
||||
// BinDir specifies where to find the replacement Kubernetes components.
|
||||
// If empty, then only components explicitly listed in FileByComponent
|
||||
// get modified.
|
||||
BinDir string
|
||||
|
||||
// FileByComponent overrides BinDir for those components which are specified here.
|
||||
FileByComponent map[KubeComponentName]string
|
||||
|
||||
// Upgrade determines whether the apiserver gets updated first (upgrade)
|
||||
// or last (downgrade).
|
||||
Upgrade bool
|
||||
}
|
||||
|
||||
func (m ModifyOptions) GetComponentFile(component KubeComponentName) string {
|
||||
if file, ok := m.FileByComponent[component]; ok {
|
||||
return file
|
||||
}
|
||||
if m.BinDir != "" {
|
||||
return path.Join(m.BinDir, string(component))
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Modify changes the cluster as described in the options.
|
||||
// It returns options that can be passed to Modify unchanged
|
||||
// to restore the original state.
|
||||
func (c *Cluster) Modify(tCtx ktesting.TContext, options ModifyOptions) ModifyOptions {
|
||||
tCtx.Helper()
|
||||
|
||||
restore := ModifyOptions{
|
||||
FileByComponent: make(map[KubeComponentName]string),
|
||||
}
|
||||
|
||||
restore.Upgrade = !options.Upgrade
|
||||
components := slices.Clone(KubeClusterComponents)
|
||||
if !options.Upgrade {
|
||||
slices.Reverse(components)
|
||||
}
|
||||
for _, component := range components {
|
||||
c.modifyComponent(tCtx, options, component, &restore)
|
||||
}
|
||||
return restore
|
||||
}
|
||||
|
||||
func (c *Cluster) modifyComponent(tCtx ktesting.TContext, options ModifyOptions, component KubeComponentName, restore *ModifyOptions) {
|
||||
tCtx.Helper()
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("modify %s", component))
|
||||
defer ktesting.End(tCtx)
|
||||
|
||||
// We could also do things like turning feature gates on or off.
|
||||
// For now we only support replacing the file.
|
||||
if fileName := options.GetComponentFile(component); fileName != "" {
|
||||
cmd, ok := c.running[component]
|
||||
if !ok {
|
||||
tCtx.Fatal("not running")
|
||||
}
|
||||
cmd.Stop(tCtx, "modifying the component")
|
||||
delete(c.running, component)
|
||||
|
||||
// Find the command (might be wrapped by sudo!).
|
||||
cmdLine := slices.Clone(cmd.CommandLine)
|
||||
found := false
|
||||
for i := range cmdLine {
|
||||
if path.Base(cmdLine[i]) == string(component) {
|
||||
found = true
|
||||
restore.FileByComponent[component] = cmdLine[i]
|
||||
cmdLine[i] = fileName
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
tCtx.Fatal("binary filename not found")
|
||||
}
|
||||
cmd.CommandLine = cmdLine
|
||||
|
||||
// New log file.
|
||||
m := regexp.MustCompile(`^(.*)-([[:digit:]]+)\.log$`).FindStringSubmatch(cmd.LogFile)
|
||||
if m == nil {
|
||||
tCtx.Fatalf("unexpected log file, should have contained number: %s", cmd.LogFile)
|
||||
}
|
||||
logNum, _ := strconv.Atoi(m[2])
|
||||
cmd.LogFile = fmt.Sprintf("%s-%d.log", m[1], logNum+1)
|
||||
|
||||
c.runComponentWithRetry(tCtx, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) runComponentWithRetry(tCtx ktesting.TContext, cmd *Cmd) {
|
||||
// Sometimes components fail to come up. We have to retry.
|
||||
//
|
||||
// For example, the apiserver's port might not be free again yet (no SO_LINGER!).
|
||||
// Or kube-controller-manager:
|
||||
// I0630 13:20:45.046709 61710 serving.go:380] Generated self-signed cert (/var/run/kubernetes/kube-controller-manager.crt, /var/run/kubernetes/kube-controller-manager.key)
|
||||
// W0630 13:20:45.410578 61710 requestheader_controller.go:204] Unable to get configmap/extension-apiserver-authentication in kube-system. Usually fixed by 'kubectl create rolebinding -n kube-system ROLEBINDING_NAME --role=extension-apiserver-authentication-reader --serviceaccount=YOUR_NS:YOUR_SA'
|
||||
// E0630 13:20:45.410618 61710 run.go:72] "command failed" err="unable to load configmap based request-header-client-ca-file: configmaps \"extension-apiserver-authentication\" is forbidden: User \"system:kube-controller-manager\" cannot get resource \"configmaps\" in API group \"\" in the namespace \"kube-system\""
|
||||
// The kube-controller-manager then is stuck. Perhaps it should retry instead?
|
||||
for i := 0; ; i++ {
|
||||
tCtx.Logf("running %s with output redirected to %s", cmd.Name, cmd.LogFile)
|
||||
cmd.Start(tCtx)
|
||||
c.running[KubeComponentName(cmd.Name)] = cmd
|
||||
err := func() (finalErr error) {
|
||||
tCtx, finalize := ktesting.WithError(tCtx, &finalErr)
|
||||
defer finalize()
|
||||
c.checkReadiness(tCtx, cmd)
|
||||
return nil
|
||||
}()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if !cmd.Running() && i < 10 {
|
||||
// Retry.
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
// Re-raise the failure.
|
||||
tCtx.ExpectNoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) checkReadiness(tCtx ktesting.TContext, cmd *Cmd) {
|
||||
restConfig := c.LoadConfig(tCtx)
|
||||
tCtx = ktesting.WithRESTConfig(tCtx, restConfig)
|
||||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("wait for %s readiness", cmd.Name))
|
||||
defer ktesting.End(tCtx)
|
||||
|
||||
switch KubeComponentName(cmd.Name) {
|
||||
case KubeAPIServer:
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["API_SECURE_PORT"])
|
||||
case KubeScheduler:
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["SCHEDULER_SECURE_PORT"])
|
||||
case KubeControllerManager:
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["API_HOST_IP"], c.settings["KCM_SECURE_PORT"])
|
||||
case KubeProxy:
|
||||
c.checkHealthz(tCtx, cmd, "http" /* not an error! */, c.settings["API_HOST_IP"], c.settings["PROXY_HEALTHZ_PORT"])
|
||||
case Kubelet:
|
||||
c.checkHealthz(tCtx, cmd, "https", c.settings["KUBELET_HOST"], c.settings["KUBELET_PORT"])
|
||||
|
||||
// Also wait for the node to be ready.
|
||||
tCtx = ktesting.Begin(tCtx, "wait for node ready")
|
||||
defer ktesting.End(tCtx)
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []corev1.Node {
|
||||
nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{})
|
||||
tCtx.ExpectNoError(err, "list nodes")
|
||||
return nodes.Items
|
||||
}).Should(gomega.ConsistOf(gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{
|
||||
"Type": gomega.Equal(corev1.NodeReady),
|
||||
"Status": gomega.Equal(corev1.ConditionTrue),
|
||||
})))))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) checkHealthz(tCtx ktesting.TContext, cmd *Cmd, method, hostIP, port string) {
|
||||
url := fmt.Sprintf("%s://%s:%s/healthz", method, hostIP, port)
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
|
||||
if !cmd.Running() {
|
||||
return gomega.StopTrying(fmt.Sprintf("%s stopped unexpectedly", cmd.Name))
|
||||
}
|
||||
// Like kube::util::wait_for_url in local-up-cluster.sh we use https,
|
||||
// but don't check the certificate.
|
||||
req, err := http.NewRequestWithContext(tCtx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
client := &http.Client{Transport: tr}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get %s: %w", url, err)
|
||||
}
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
return fmt.Errorf("close GET response: %w", err)
|
||||
}
|
||||
// Any response is fine, we just need to get here. In practice, we get a 403 Forbidden.
|
||||
return nil
|
||||
}).Should(gomega.Succeed(), fmt.Sprintf("HTTP GET %s", url))
|
||||
}
|
||||
Reference in New Issue
Block a user