Files
talos/internal/integration/api/reset.go
Andrey Smirnov 54ed80e244 feat: reset with system disk wipe spec
Idea is to add an option to perform "selective" reset: default reset
operation is to wipe all partitions (triggering reinstall), while spec
allows only to wipe some of the operations.

Other operations are performed exactly in the same way for any reset
flow.

Possible use case: reset only `EPHEMERAL` partition.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
2020-12-10 11:31:07 -08:00

287 lines
7.8 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration_api
package api
import (
"context"
"crypto/sha256"
"encoding/hex"
"io"
"sort"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
// ResetSuite ...
type ResetSuite struct {
base.APISuite
ctx context.Context
ctxCancel context.CancelFunc
}
// SuiteName ...
func (suite *ResetSuite) SuiteName() string {
return "api.ResetSuite"
}
// SetupTest ...
func (suite *ResetSuite) SetupTest() {
if testing.Short() {
suite.T().Skip("skipping in short mode")
}
// make sure we abort at some point in time, but give enough room for Resets
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Minute)
}
// TearDownTest ...
func (suite *ResetSuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
func (suite *ResetSuite) hashKubeletCert(ctx context.Context, node string) (string, error) {
reqCtx, reqCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer reqCtxCancel()
reqCtx = client.WithNodes(reqCtx, node)
reader, errCh, err := suite.Client.Read(reqCtx, "/var/lib/kubelet/pki/kubelet-client-current.pem")
if err != nil {
return "", err
}
defer reader.Close() //nolint: errcheck
hash := sha256.New()
_, err = io.Copy(hash, reader)
if err != nil {
return "", err
}
for err = range errCh {
if err != nil {
return "", err
}
}
return hex.EncodeToString(hash.Sum(nil)), reader.Close()
}
// TestResetNodeByNode Resets cluster node by node, waiting for health between Resets.
func (suite *ResetSuite) TestResetNodeByNode() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot (and reset)")
}
if suite.Cluster == nil {
suite.T().Skip("without full cluster state reset test is not reliable (can't wait for cluster readiness in between resets)")
}
initNodeAddress := ""
for _, node := range suite.Cluster.Info().Nodes {
if node.Type == machine.TypeInit {
initNodeAddress = node.PrivateIP.String()
break
}
}
nodes := suite.DiscoverNodes().Nodes()
suite.Require().NotEmpty(nodes)
sort.Strings(nodes)
for _, node := range nodes {
if node == initNodeAddress {
// due to the bug with etcd cluster build for the init node after Reset(), skip resetting first node
// there's no problem if bootstrap API was used, so this check only protects legacy init nodes
suite.T().Log("Skipping init node", node, "due to known issue with etcd")
continue
}
suite.T().Log("Resetting node", node)
preReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
err = suite.Client.Reset(nodeCtx, true, true)
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// ignore errors if reboot happens before response is fully received
err = nil
}
}
return err
}, 10*time.Minute)
postReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
}
}
// TestResetNoGraceful resets a worker in !graceful to test the flow.
//
// We can't reset control plane node in !graceful mode as it won't be able to join back the cluster.
func (suite *ResetSuite) TestResetNoGraceful() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot (and reset)")
}
if suite.Cluster == nil {
suite.T().Skip("without full cluster state reset test is not reliable (can't wait for cluster readiness in between resets)")
}
node := suite.RandomDiscoveredNode(machine.TypeJoin)
suite.T().Log("Resetting node !graceful", node)
preReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
err = suite.Client.Reset(nodeCtx, false, true)
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// ignore errors if reboot happens before response is fully received
err = nil
}
}
return err
}, 5*time.Minute)
postReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
}
// TestResetWithSpecEphemeral resets only ephemeral partition on the node.
//
//nolint: dupl
func (suite *ResetSuite) TestResetWithSpecEphemeral() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot (and reset)")
}
if suite.Cluster == nil {
suite.T().Skip("without full cluster state reset test is not reliable (can't wait for cluster readiness in between resets)")
}
node := suite.RandomDiscoveredNode()
suite.T().Log("Resetting node with spec=[EPHEMERAL]", node)
preReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
err = suite.Client.ResetGeneric(nodeCtx, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.EphemeralPartitionLabel,
Wipe: true,
},
},
})
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// ignore errors if reboot happens before response is fully received
err = nil
}
}
return err
}, 5*time.Minute)
postReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().NotEqual(preReset, postReset, "reset should lead to new kubelet cert being generated")
}
// TestResetWithSpecState resets only state partition on the node.
//
// As ephemeral partition is not reset, so kubelet cert shouldn't change.
//
//nolint: dupl
func (suite *ResetSuite) TestResetWithSpecState() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot (and reset)")
}
if suite.Cluster == nil {
suite.T().Skip("without full cluster state reset test is not reliable (can't wait for cluster readiness in between resets)")
}
node := suite.RandomDiscoveredNode()
suite.T().Log("Resetting node with spec=[STATE]", node)
preReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
// force reboot after reset, as this is the only mode we can test
err = suite.Client.ResetGeneric(nodeCtx, &machineapi.ResetRequest{
Reboot: true,
Graceful: true,
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
{
Label: constants.StatePartitionLabel,
Wipe: true,
},
},
})
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// ignore errors if reboot happens before response is fully received
err = nil
}
}
return err
}, 5*time.Minute)
postReset, err := suite.hashKubeletCert(suite.ctx, node)
suite.Require().NoError(err)
suite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
}
func init() {
allSuites = append(allSuites, new(ResetSuite))
}