From 05fd042bb3600541a8e2587b66b8b4c4e9f99c27 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Mon, 22 Apr 2024 18:39:24 +0400 Subject: [PATCH] test: improve the reset integration tests Provide a trace for each step of the reset sequence taken, so if one of those fails, integration test produces a meaningful message instead of proceeding and failing somewhere else. More cleanup/refactor, should be functionally equivalent. Fixes #8635 Signed-off-by: Andrey Smirnov --- internal/integration/api/etcd-recover.go | 104 ++++-------- internal/integration/api/reset.go | 199 ++++++----------------- internal/integration/base/api.go | 96 +++++++++++ 3 files changed, 173 insertions(+), 226 deletions(-) diff --git a/internal/integration/api/etcd-recover.go b/internal/integration/api/etcd-recover.go index 14c5203cf..3647317e9 100644 --- a/internal/integration/api/etcd-recover.go +++ b/internal/integration/api/etcd-recover.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "io" + "path/filepath" "testing" "time" @@ -55,8 +56,6 @@ func (suite *EtcdRecoverSuite) TearDownTest() { } // TestSnapshotRecover snapshot etcd, wipes control plane nodes and recovers etcd from a snapshot. -// -//nolint:gocyclo func (suite *EtcdRecoverSuite) TestSnapshotRecover() { if !suite.Capabilities().SupportsReboot { suite.T().Skip("cluster doesn't support reboot") @@ -83,96 +82,49 @@ func (suite *EtcdRecoverSuite) TestSnapshotRecover() { suite.Require().NoError(suite.snapshotEtcd(snapshotNode, &snapshot)) - // wipe ephemeral partition on all control plane nodes - preReset := map[string]string{} + // leave etcd on all nodes but one + for _, node := range controlPlaneNodes[1:] { + suite.T().Logf("leaving etcd on node %q", node) - for _, node := range controlPlaneNodes { - var err error + nodeCtx := client.WithNode(suite.ctx, node) - preReset[node], err = suite.HashKubeletCert(suite.ctx, node) + _, err := suite.Client.EtcdForfeitLeadership(nodeCtx, &machineapi.EtcdForfeitLeadershipRequest{}) + suite.Require().NoError(err) + err = suite.Client.EtcdLeaveCluster(nodeCtx, &machineapi.EtcdLeaveClusterRequest{}) suite.Require().NoError(err) } - suite.T().Logf("wiping control plane nodes %q", controlPlaneNodes) - - errCh := make(chan error) - + // wipe ephemeral partition on all control plane nodes, starting with the one that still has etcd running for _, node := range controlPlaneNodes { - go func() { - errCh <- func() error { - nodeCtx := client.WithNodes(suite.ctx, node) - - bootIDBefore, err := suite.ReadBootID(nodeCtx) - if err != nil { - return fmt.Errorf("error reading pre-reset boot ID: %w", err) - } - - if err = base.IgnoreGRPCUnavailable( - suite.Client.ResetGeneric( - nodeCtx, &machineapi.ResetRequest{ - Reboot: true, - Graceful: false, - SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ - { - Label: constants.EphemeralPartitionLabel, - Wipe: true, - }, - }, - }, - ), - ); err != nil { - return fmt.Errorf("error resetting the node %q: %w", node, err) - } - - var bootIDAfter string - - return retry.Constant(5 * time.Minute).Retry( - func() error { - requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, 5*time.Second) - defer requestCtxCancel() - - bootIDAfter, err = suite.ReadBootID(requestCtx) - if err != nil { - // API might be unresponsive during reboot - return retry.ExpectedError(err) - } - - if bootIDAfter == bootIDBefore { - // bootID should be different after reboot - return retry.ExpectedErrorf( - "bootID didn't change for node %q: before %s, after %s", - node, - bootIDBefore, - bootIDAfter, - ) - } - - return nil - }, - ) - }() - }() + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: false, + SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ + { + Label: constants.EphemeralPartitionLabel, + Wipe: true, + }, + }, + }, false) } - for range controlPlaneNodes { - suite.Require().NoError(<-errCh) - } + // verify that etcd data directory doesn't exist on the nodes + for _, node := range controlPlaneNodes { + stream, err := suite.Client.MachineClient.List(client.WithNode(suite.ctx, node), &machineapi.ListRequest{Root: filepath.Join(constants.EtcdDataPath, "member")}) + suite.Require().NoError(err) - suite.ClearConnectionRefused(suite.ctx, controlPlaneNodes...) + _, err = stream.Recv() + suite.Require().Error(err) + suite.Require().Equal(client.StatusCode(err), codes.Unknown) + suite.Require().Contains(client.Status(err).Message(), "no such file or directory") + } suite.T().Logf("recovering etcd snapshot at node %q", recoverNode) suite.Require().NoError(suite.recoverEtcd(recoverNode, bytes.NewReader(snapshot.Bytes()))) suite.AssertClusterHealthy(suite.ctx) - - for _, node := range controlPlaneNodes { - postReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.Assert().NotEqual(postReset, preReset[node], "kubelet cert hasn't changed for node %q", node) - } } func (suite *EtcdRecoverSuite) snapshotEtcd(snapshotNode string, dest io.Writer) error { diff --git a/internal/integration/api/reset.go b/internal/integration/api/reset.go index 67ba67f48..c6481e8f8 100644 --- a/internal/integration/api/reset.go +++ b/internal/integration/api/reset.go @@ -13,7 +13,6 @@ import ( "time" "github.com/siderolabs/gen/xslices" - "github.com/siderolabs/go-retry/retry" "github.com/siderolabs/talos/internal/integration/base" machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine" @@ -68,48 +67,16 @@ func (suite *ResetSuite) TestResetNodeByNode() { suite.T().Skip("skipping as talos is explicitly trusted booted") } - initNodeAddress := "" - - for _, node := range suite.Cluster.Info().Nodes { - if node.Type == machine.TypeInit { - initNodeAddress = node.IPs[0].String() - - break - } - } - nodes := suite.DiscoverNodeInternalIPs(suite.ctx) suite.Require().NotEmpty(nodes) sort.Strings(nodes) for _, node := range nodes { - if node == initNodeAddress { - // due to the bug with etcd cluster build for the init node after Reset(), skip resetting first node - // there's no problem if bootstrap API was used, so this check only protects legacy init nodes - suite.T().Log("Skipping init node", node, "due to known issue with etcd") - - continue - } - - suite.T().Log("Resetting node", node) - - preReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.AssertRebooted( - suite.ctx, node, func(nodeCtx context.Context) error { - // force reboot after reset, as this is the only mode we can test - return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, true, true)) - }, 10*time.Minute, - ) - - suite.ClearConnectionRefused(suite.ctx, node) - - postReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated") + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: true, + }, true) } } @@ -121,24 +88,10 @@ func (suite *ResetSuite) testResetNoGraceful(nodeType machine.Type) { node := suite.RandomDiscoveredNodeInternalIP(nodeType) - suite.T().Logf("Resetting %s node !graceful %s", nodeType, node) - - preReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.AssertRebooted( - suite.ctx, node, func(nodeCtx context.Context) error { - // force reboot after reset, as this is the only mode we can test - return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, false, true)) - }, 5*time.Minute, - ) - - suite.ClearConnectionRefused(suite.ctx, node) - - postReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated") + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: false, + }, true) } // TestResetNoGracefulWorker resets a worker in !graceful mode. @@ -157,37 +110,16 @@ func (suite *ResetSuite) TestResetNoGracefulControlplane() { func (suite *ResetSuite) TestResetWithSpecEphemeral() { node := suite.RandomDiscoveredNodeInternalIP() - suite.T().Log("Resetting node with spec=[EPHEMERAL]", node) - - preReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.AssertRebooted( - suite.ctx, node, func(nodeCtx context.Context) error { - // force reboot after reset, as this is the only mode we can test - return base.IgnoreGRPCUnavailable( - suite.Client.ResetGeneric( - nodeCtx, &machineapi.ResetRequest{ - Reboot: true, - Graceful: true, - SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ - { - Label: constants.EphemeralPartitionLabel, - Wipe: true, - }, - }, - }, - ), - ) - }, 5*time.Minute, - ) - - suite.ClearConnectionRefused(suite.ctx, node) - - postReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated") + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: true, + SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ + { + Label: constants.EphemeralPartitionLabel, + Wipe: true, + }, + }, + }, true) } // TestResetWithSpecState resets only state partition on the node. @@ -201,12 +133,7 @@ func (suite *ResetSuite) TestResetWithSpecState() { node := suite.RandomDiscoveredNodeInternalIP() - suite.T().Log("Resetting node with spec=[STATE]", node) - - preReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - disks, err := suite.Client.Disks(client.WithNodes(suite.ctx, node)) + disks, err := suite.Client.Disks(client.WithNode(suite.ctx, node)) suite.Require().NoError(err) suite.Require().NotEmpty(disks.Messages) @@ -219,73 +146,45 @@ func (suite *ResetSuite) TestResetWithSpecState() { }, ) - suite.AssertRebooted( - suite.ctx, node, func(nodeCtx context.Context) error { - // force reboot after reset, as this is the only mode we can test - return base.IgnoreGRPCUnavailable( - suite.Client.ResetGeneric( - nodeCtx, &machineapi.ResetRequest{ - Reboot: true, - Graceful: true, - SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ - { - Label: constants.StatePartitionLabel, - Wipe: true, - }, - }, - UserDisksToWipe: userDisksToWipe, - }, - ), - ) - }, 5*time.Minute, - ) - - suite.ClearConnectionRefused(suite.ctx, node) - - postReset, err := suite.HashKubeletCert(suite.ctx, node) - suite.Require().NoError(err) - - suite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset") + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: true, + SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ + { + Label: constants.StatePartitionLabel, + Wipe: true, + }, + }, + UserDisksToWipe: userDisksToWipe, + }, true) } -// TestResetDuringBoot resets the node multiple times, second reset is done -// before boot sequence is complete. +// TestResetDuringBoot resets the node while it is in boot sequence. func (suite *ResetSuite) TestResetDuringBoot() { node := suite.RandomDiscoveredNodeInternalIP() nodeCtx := client.WithNodes(suite.ctx, node) - suite.T().Log("Resetting node", node) + suite.T().Log("rebooting node", node) - for range 2 { - bootID := suite.ReadBootIDWithRetry(nodeCtx, time.Minute*5) + bootIDBefore, err := suite.ReadBootID(nodeCtx) + suite.Require().NoError(err) - err := retry.Constant(5*time.Minute, retry.WithUnits(time.Millisecond*1000)).Retry( - func() error { - // force reboot after reset, as this is the only mode we can test - return retry.ExpectedError( - suite.Client.ResetGeneric( - client.WithNodes(suite.ctx, node), &machineapi.ResetRequest{ - Reboot: true, - Graceful: true, - SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ - { - Label: constants.EphemeralPartitionLabel, - Wipe: true, - }, - }, - }, - ), - ) + suite.Require().NoError(suite.Client.Reboot(nodeCtx)) + + suite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute) + + suite.ClearConnectionRefused(suite.ctx, node) + + suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{ + Reboot: true, + Graceful: true, + SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{ + { + Label: constants.EphemeralPartitionLabel, + Wipe: true, }, - ) - - suite.Require().NoError(err) - - suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5) - } - - suite.WaitForBootDone(suite.ctx) - suite.AssertClusterHealthy(suite.ctx) + }, + }, true) } func init() { diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index e2738f5bc..e9608e34b 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -582,6 +582,102 @@ func (apiSuite *APISuite) PatchV1Alpha1Config(provider config.Provider, patch fu return bytes } +// ResetNode wraps the reset node sequence with checks, waiting for the reset to finish and verifying the result. +// +//nolint:gocyclo +func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec *machineapi.ResetRequest, runHealthChecks bool) { + apiSuite.T().Logf("resetting node %q with graceful %v mode %s, system %v, user %v", node, resetSpec.Graceful, resetSpec.Mode, resetSpec.SystemPartitionsToWipe, resetSpec.UserDisksToWipe) + + nodeCtx := client.WithNode(ctx, node) + + // any reset should lead to a reboot, so read boot_id before reboot + bootIDBefore, err := apiSuite.ReadBootID(nodeCtx) + apiSuite.Require().NoError(err) + + // figure out if EPHEMERAL is going to be reset + ephemeralIsGoingToBeReset := false + + if len(resetSpec.SystemPartitionsToWipe) == 0 && len(resetSpec.UserDisksToWipe) == 0 { + ephemeralIsGoingToBeReset = true + } else { + for _, part := range resetSpec.SystemPartitionsToWipe { + if part.Label == constants.EphemeralPartitionLabel { + ephemeralIsGoingToBeReset = true + + break + } + } + } + + preReset, err := apiSuite.HashKubeletCert(ctx, node) + apiSuite.Require().NoError(err) + + resp, err := apiSuite.Client.ResetGenericWithResponse(nodeCtx, resetSpec) + apiSuite.Require().NoError(err) + + actorID := resp.Messages[0].ActorId + + eventCh := make(chan client.EventResult) + + // watch for events + apiSuite.Require().NoError(apiSuite.Client.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1))) + + waitTimer := time.NewTimer(5 * time.Minute) + defer waitTimer.Stop() + +waitLoop: + for { + select { + case ev := <-eventCh: + apiSuite.Require().NoError(ev.Error) + + switch msg := ev.Event.Payload.(type) { + case *machineapi.SequenceEvent: + if msg.Error != nil { + apiSuite.FailNow("reset failed", "%s: %s", msg.Error.Message, msg.Error.Code) + } + case *machineapi.PhaseEvent: + if msg.Action == machineapi.PhaseEvent_START && msg.Phase == "unmountSystem" { + // about to be reset, break waitLoop + break waitLoop + } + + if msg.Action == machineapi.PhaseEvent_STOP { + apiSuite.T().Logf("reset phase %q finished", msg.Phase) + } + } + case <-waitTimer.C: + apiSuite.FailNow("timeout waiting for reset to finish") + case <-ctx.Done(): + apiSuite.FailNow("context canceled") + } + } + + // wait for the apid to be shut down + time.Sleep(10 * time.Second) + + apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute) + + apiSuite.ClearConnectionRefused(ctx, node) + + if runHealthChecks { + if apiSuite.Cluster != nil { + // without cluster state we can't do deep checks, but basic reboot test still works + // NB: using `ctx` here to have client talking to init node by default + apiSuite.AssertClusterHealthy(ctx) + } + + postReset, err := apiSuite.HashKubeletCert(ctx, node) + apiSuite.Require().NoError(err) + + if ephemeralIsGoingToBeReset { + apiSuite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated") + } else { + apiSuite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset") + } + } +} + // TearDownSuite closes Talos API client. func (apiSuite *APISuite) TearDownSuite() { if apiSuite.Client != nil {