test: improve the reset integration tests

Provide a trace for each step of the reset sequence taken, so if one of
those fails, integration test produces a meaningful message instead of
proceeding and failing somewhere else.

More cleanup/refactor, should be functionally equivalent.

Fixes #8635

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov
2024-04-22 18:39:24 +04:00
parent 8cdf0f7cb0
commit 05fd042bb3
3 changed files with 173 additions and 226 deletions

View File

@@ -11,6 +11,7 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"testing"
"time"
@@ -55,8 +56,6 @@ func (suite *EtcdRecoverSuite) TearDownTest() {
}
// TestSnapshotRecover snapshot etcd, wipes control plane nodes and recovers etcd from a snapshot.
//
//nolint:gocyclo
func (suite *EtcdRecoverSuite) TestSnapshotRecover() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot")
@@ -83,34 +82,22 @@ func (suite *EtcdRecoverSuite) TestSnapshotRecover() {
suite.Require().NoError(suite.snapshotEtcd(snapshotNode, &snapshot))
// wipe ephemeral partition on all control plane nodes
preReset := map[string]string{}
// leave etcd on all nodes but one
for _, node := range controlPlaneNodes[1:] {
suite.T().Logf("leaving etcd on node %q", node)
for _, node := range controlPlaneNodes {
var err error
nodeCtx := client.WithNode(suite.ctx, node)
preReset[node], err = suite.HashKubeletCert(suite.ctx, node)
_, err := suite.Client.EtcdForfeitLeadership(nodeCtx, &machineapi.EtcdForfeitLeadershipRequest{})
suite.Require().NoError(err)
err = suite.Client.EtcdLeaveCluster(nodeCtx, &machineapi.EtcdLeaveClusterRequest{})
suite.Require().NoError(err)
}
suite.T().Logf("wiping control plane nodes %q", controlPlaneNodes)
errCh := make(chan error)
// wipe ephemeral partition on all control plane nodes, starting with the one that still has etcd running
for _, node := range controlPlaneNodes {
go func() {
errCh <- func() error {
nodeCtx := client.WithNodes(suite.ctx, node)
bootIDBefore, err := suite.ReadBootID(nodeCtx)
if err != nil {
return fmt.Errorf("error reading pre-reset boot ID: %w", err)
}
if err = base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: false,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
@@ -119,60 +106,25 @@ func (suite *EtcdRecoverSuite) TestSnapshotRecover() {
Wipe: true,
},
},
},
),
); err != nil {
return fmt.Errorf("error resetting the node %q: %w", node, err)
}, false)
}
var bootIDAfter string
// verify that etcd data directory doesn't exist on the nodes
for _, node := range controlPlaneNodes {
stream, err := suite.Client.MachineClient.List(client.WithNode(suite.ctx, node), &machineapi.ListRequest{Root: filepath.Join(constants.EtcdDataPath, "member")})
suite.Require().NoError(err)
return retry.Constant(5 * time.Minute).Retry(
func() error {
requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, 5*time.Second)
defer requestCtxCancel()
bootIDAfter, err = suite.ReadBootID(requestCtx)
if err != nil {
// API might be unresponsive during reboot
return retry.ExpectedError(err)
_, err = stream.Recv()
suite.Require().Error(err)
suite.Require().Equal(client.StatusCode(err), codes.Unknown)
suite.Require().Contains(client.Status(err).Message(), "no such file or directory")
}
if bootIDAfter == bootIDBefore {
// bootID should be different after reboot
return retry.ExpectedErrorf(
"bootID didn't change for node %q: before %s, after %s",
node,
bootIDBefore,
bootIDAfter,
)
}
return nil
},
)
}()
}()
}
for range controlPlaneNodes {
suite.Require().NoError(<-errCh)
}
suite.ClearConnectionRefused(suite.ctx, controlPlaneNodes...)
suite.T().Logf("recovering etcd snapshot at node %q", recoverNode)
suite.Require().NoError(suite.recoverEtcd(recoverNode, bytes.NewReader(snapshot.Bytes())))
suite.AssertClusterHealthy(suite.ctx)
for _, node := range controlPlaneNodes {
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(postReset, preReset[node], "kubelet cert hasn't changed for node %q", node)
}
}
func (suite *EtcdRecoverSuite) snapshotEtcd(snapshotNode string, dest io.Writer) error {

View File

@@ -13,7 +13,6 @@ import (
"time"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-retry/retry"
"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
@@ -68,48 +67,16 @@ func (suite *ResetSuite) TestResetNodeByNode() {
suite.T().Skip("skipping as talos is explicitly trusted booted")
}
initNodeAddress := ""
for _, node := range suite.Cluster.Info().Nodes {
if node.Type == machine.TypeInit {
initNodeAddress = node.IPs[0].String()
break
}
}
nodes := suite.DiscoverNodeInternalIPs(suite.ctx)
suite.Require().NotEmpty(nodes)
sort.Strings(nodes)
for _, node := range nodes {
if node == initNodeAddress {
// due to the bug with etcd cluster build for the init node after Reset(), skip resetting first node
// there's no problem if bootstrap API was used, so this check only protects legacy init nodes
suite.T().Log("Skipping init node", node, "due to known issue with etcd")
continue
}
suite.T().Log("Resetting node", node)
preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, true, true))
}, 10*time.Minute,
)
suite.ClearConnectionRefused(suite.ctx, node)
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
}, true)
}
}
@@ -121,24 +88,10 @@ func (suite *ResetSuite) testResetNoGraceful(nodeType machine.Type) {
node := suite.RandomDiscoveredNodeInternalIP(nodeType)
suite.T().Logf("Resetting %s node !graceful %s", nodeType, node)
preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(suite.Client.Reset(nodeCtx, false, true))
}, 5*time.Minute,
)
suite.ClearConnectionRefused(suite.ctx, node)
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: false,
}, true)
}
// TestResetNoGracefulWorker resets a worker in !graceful mode.
@@ -157,17 +110,7 @@ func (suite *ResetSuite) TestResetNoGracefulControlplane() {
func (suite *ResetSuite) TestResetWithSpecEphemeral() {
node := suite.RandomDiscoveredNodeInternalIP()
suite.T().Log("Resetting node with spec=[EPHEMERAL]", node)
preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
@@ -176,18 +119,7 @@ func (suite *ResetSuite) TestResetWithSpecEphemeral() {
Wipe: true,
},
},
},
),
)
}, 5*time.Minute,
)
suite.ClearConnectionRefused(suite.ctx, node)
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
}, true)
}
// TestResetWithSpecState resets only state partition on the node.
@@ -201,12 +133,7 @@ func (suite *ResetSuite) TestResetWithSpecState() {
node := suite.RandomDiscoveredNodeInternalIP()
suite.T().Log("Resetting node with spec=[STATE]", node)
preReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
disks, err := suite.Client.Disks(client.WithNodes(suite.ctx, node))
disks, err := suite.Client.Disks(client.WithNode(suite.ctx, node))
suite.Require().NoError(err)
suite.Require().NotEmpty(disks.Messages)
@@ -219,12 +146,7 @@ func (suite *ResetSuite) TestResetWithSpecState() {
},
)
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
return base.IgnoreGRPCUnavailable(
suite.Client.ResetGeneric(
nodeCtx, &machineapi.ResetRequest{
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
@@ -234,37 +156,26 @@ func (suite *ResetSuite) TestResetWithSpecState() {
},
},
UserDisksToWipe: userDisksToWipe,
},
),
)
}, 5*time.Minute,
)
suite.ClearConnectionRefused(suite.ctx, node)
postReset, err := suite.HashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
}, true)
}
// TestResetDuringBoot resets the node multiple times, second reset is done
// before boot sequence is complete.
// TestResetDuringBoot resets the node while it is in boot sequence.
func (suite *ResetSuite) TestResetDuringBoot() {
node := suite.RandomDiscoveredNodeInternalIP()
nodeCtx := client.WithNodes(suite.ctx, node)
suite.T().Log("Resetting node", node)
suite.T().Log("rebooting node", node)
for range 2 {
bootID := suite.ReadBootIDWithRetry(nodeCtx, time.Minute*5)
bootIDBefore, err := suite.ReadBootID(nodeCtx)
suite.Require().NoError(err)
err := retry.Constant(5*time.Minute, retry.WithUnits(time.Millisecond*1000)).Retry(
func() error {
// force reboot after reset, as this is the only mode we can test
return retry.ExpectedError(
suite.Client.ResetGeneric(
client.WithNodes(suite.ctx, node), &machineapi.ResetRequest{
suite.Require().NoError(suite.Client.Reboot(nodeCtx))
suite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute)
suite.ClearConnectionRefused(suite.ctx, node)
suite.ResetNode(suite.ctx, node, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
@@ -273,19 +184,7 @@ func (suite *ResetSuite) TestResetDuringBoot() {
Wipe: true,
},
},
},
),
)
},
)
suite.Require().NoError(err)
suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5)
}
suite.WaitForBootDone(suite.ctx)
suite.AssertClusterHealthy(suite.ctx)
}, true)
}
func init() {

View File

@@ -582,6 +582,102 @@ func (apiSuite *APISuite) PatchV1Alpha1Config(provider config.Provider, patch fu
return bytes
}
// ResetNode wraps the reset node sequence with checks, waiting for the reset to finish and verifying the result.
//
//nolint:gocyclo
func (apiSuite *APISuite) ResetNode(ctx context.Context, node string, resetSpec *machineapi.ResetRequest, runHealthChecks bool) {
apiSuite.T().Logf("resetting node %q with graceful %v mode %s, system %v, user %v", node, resetSpec.Graceful, resetSpec.Mode, resetSpec.SystemPartitionsToWipe, resetSpec.UserDisksToWipe)
nodeCtx := client.WithNode(ctx, node)
// any reset should lead to a reboot, so read boot_id before reboot
bootIDBefore, err := apiSuite.ReadBootID(nodeCtx)
apiSuite.Require().NoError(err)
// figure out if EPHEMERAL is going to be reset
ephemeralIsGoingToBeReset := false
if len(resetSpec.SystemPartitionsToWipe) == 0 && len(resetSpec.UserDisksToWipe) == 0 {
ephemeralIsGoingToBeReset = true
} else {
for _, part := range resetSpec.SystemPartitionsToWipe {
if part.Label == constants.EphemeralPartitionLabel {
ephemeralIsGoingToBeReset = true
break
}
}
}
preReset, err := apiSuite.HashKubeletCert(ctx, node)
apiSuite.Require().NoError(err)
resp, err := apiSuite.Client.ResetGenericWithResponse(nodeCtx, resetSpec)
apiSuite.Require().NoError(err)
actorID := resp.Messages[0].ActorId
eventCh := make(chan client.EventResult)
// watch for events
apiSuite.Require().NoError(apiSuite.Client.EventsWatchV2(nodeCtx, eventCh, client.WithActorID(actorID), client.WithTailEvents(-1)))
waitTimer := time.NewTimer(5 * time.Minute)
defer waitTimer.Stop()
waitLoop:
for {
select {
case ev := <-eventCh:
apiSuite.Require().NoError(ev.Error)
switch msg := ev.Event.Payload.(type) {
case *machineapi.SequenceEvent:
if msg.Error != nil {
apiSuite.FailNow("reset failed", "%s: %s", msg.Error.Message, msg.Error.Code)
}
case *machineapi.PhaseEvent:
if msg.Action == machineapi.PhaseEvent_START && msg.Phase == "unmountSystem" {
// about to be reset, break waitLoop
break waitLoop
}
if msg.Action == machineapi.PhaseEvent_STOP {
apiSuite.T().Logf("reset phase %q finished", msg.Phase)
}
}
case <-waitTimer.C:
apiSuite.FailNow("timeout waiting for reset to finish")
case <-ctx.Done():
apiSuite.FailNow("context canceled")
}
}
// wait for the apid to be shut down
time.Sleep(10 * time.Second)
apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, 3*time.Minute)
apiSuite.ClearConnectionRefused(ctx, node)
if runHealthChecks {
if apiSuite.Cluster != nil {
// without cluster state we can't do deep checks, but basic reboot test still works
// NB: using `ctx` here to have client talking to init node by default
apiSuite.AssertClusterHealthy(ctx)
}
postReset, err := apiSuite.HashKubeletCert(ctx, node)
apiSuite.Require().NoError(err)
if ephemeralIsGoingToBeReset {
apiSuite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
} else {
apiSuite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
}
}
}
// TearDownSuite closes Talos API client.
func (apiSuite *APISuite) TearDownSuite() {
if apiSuite.Client != nil {