mirror of
				https://github.com/lingble/talos.git
				synced 2025-11-04 06:28:09 +00:00 
			
		
		
		
	test: add cluster discovery integration tests
This verifies that members match cluster state and that both cluster registries work in sync producing same discovery data. Fixes #4191 Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
		@@ -8,12 +8,8 @@
 | 
			
		||||
package api
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -23,7 +19,6 @@ import (
 | 
			
		||||
	machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/client"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/configloader"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/constants"
 | 
			
		||||
@@ -95,7 +90,7 @@ func (suite *ApplyConfigSuite) TestApply() {
 | 
			
		||||
 | 
			
		||||
	nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
	provider, err := suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
	provider, err := suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
	suite.Assert().Nilf(err, "failed to read existing config from node %q: %w", node, err)
 | 
			
		||||
 | 
			
		||||
	cfg, ok := provider.(*v1alpha1.Config)
 | 
			
		||||
@@ -126,7 +121,7 @@ func (suite *ApplyConfigSuite) TestApply() {
 | 
			
		||||
	var newProvider config.Provider
 | 
			
		||||
 | 
			
		||||
	suite.Require().Nilf(retry.Constant(time.Minute, retry.WithUnits(time.Second)).Retry(func() error {
 | 
			
		||||
		newProvider, err = suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
		newProvider, err = suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return retry.ExpectedError(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -149,7 +144,7 @@ func (suite *ApplyConfigSuite) TestApplyOnReboot() {
 | 
			
		||||
 | 
			
		||||
	nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
	provider, err := suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
	provider, err := suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
	suite.Require().NoError(err, "failed to read existing config from node %q", node)
 | 
			
		||||
 | 
			
		||||
	cfg, ok := provider.(*v1alpha1.Config)
 | 
			
		||||
@@ -173,7 +168,7 @@ func (suite *ApplyConfigSuite) TestApplyOnReboot() {
 | 
			
		||||
	// Verify configuration change
 | 
			
		||||
	var newProvider config.Provider
 | 
			
		||||
 | 
			
		||||
	newProvider, err = suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
	newProvider, err = suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
 | 
			
		||||
	suite.Require().NoError(err, "failed to read updated configuration from node %q: %w", node)
 | 
			
		||||
 | 
			
		||||
@@ -208,7 +203,7 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() {
 | 
			
		||||
	suite.ClearConnectionRefused(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
	nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
	provider, err := suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
	provider, err := suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
 | 
			
		||||
	suite.Assert().NoError(err)
 | 
			
		||||
 | 
			
		||||
@@ -294,7 +289,7 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() {
 | 
			
		||||
		var newProvider config.Provider
 | 
			
		||||
 | 
			
		||||
		suite.Require().Nilf(retry.Constant(time.Minute, retry.WithUnits(time.Second)).Retry(func() error {
 | 
			
		||||
			newProvider, err = suite.readConfigFromNode(nodeCtx)
 | 
			
		||||
			newProvider, err = suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return retry.ExpectedError(err)
 | 
			
		||||
			}
 | 
			
		||||
@@ -325,56 +320,6 @@ func (suite *ApplyConfigSuite) TestApplyConfigRotateEncryptionSecrets() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (suite *ApplyConfigSuite) readConfigFromNode(nodeCtx context.Context) (config.Provider, error) {
 | 
			
		||||
	// Load the current node machine config
 | 
			
		||||
	cfgData := new(bytes.Buffer)
 | 
			
		||||
 | 
			
		||||
	reader, errCh, err := suite.Client.Read(nodeCtx, constants.ConfigPath)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("error creating reader: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer reader.Close() //nolint:errcheck
 | 
			
		||||
 | 
			
		||||
	if err = copyFromReaderWithErrChan(cfgData, reader, errCh); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("error reading: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	provider, err := configloader.NewFromBytes(cfgData.Bytes())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to parse: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return provider, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copyFromReaderWithErrChan(out io.Writer, in io.Reader, errCh <-chan error) (err error) {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
	var chanErr error
 | 
			
		||||
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
 | 
			
		||||
		// StreamReader is only singly-buffered, so we need to process any errors as we get them.
 | 
			
		||||
		for chanErr = range errCh {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			err = chanErr
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	_, err = io.Copy(out, in)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	allSuites = append(allSuites, new(ApplyConfigSuite))
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										379
									
								
								internal/integration/api/discovery.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										379
									
								
								internal/integration/api/discovery.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,379 @@
 | 
			
		||||
// This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
 | 
			
		||||
 | 
			
		||||
//go:build integration_api
 | 
			
		||||
// +build integration_api
 | 
			
		||||
 | 
			
		||||
package api
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/cosi-project/runtime/pkg/resource"
 | 
			
		||||
	"gopkg.in/yaml.v3"
 | 
			
		||||
	"inet.af/netaddr"
 | 
			
		||||
 | 
			
		||||
	"github.com/talos-systems/talos/internal/integration/base"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/client"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/resources/cluster"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/resources/kubespan"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// DiscoverySuite verifies Discovery API.
 | 
			
		||||
type DiscoverySuite struct {
 | 
			
		||||
	base.APISuite
 | 
			
		||||
 | 
			
		||||
	ctx       context.Context
 | 
			
		||||
	ctxCancel context.CancelFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SuiteName ...
 | 
			
		||||
func (suite *DiscoverySuite) SuiteName() string {
 | 
			
		||||
	return "api.DiscoverySuite"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetupTest ...
 | 
			
		||||
func (suite *DiscoverySuite) SetupTest() {
 | 
			
		||||
	// make sure API calls have timeout
 | 
			
		||||
	suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 15*time.Second)
 | 
			
		||||
 | 
			
		||||
	// check that cluster has discovery enabled
 | 
			
		||||
	node := suite.RandomDiscoveredNode()
 | 
			
		||||
	suite.ClearConnectionRefused(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
	nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
	provider, err := suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	if !provider.Cluster().Discovery().Enabled() {
 | 
			
		||||
		suite.T().Skip("cluster discovery is disabled")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TearDownTest ...
 | 
			
		||||
func (suite *DiscoverySuite) TearDownTest() {
 | 
			
		||||
	if suite.ctxCancel != nil {
 | 
			
		||||
		suite.ctxCancel()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestMembers checks that `talosctl get members` matches expected cluster discovery.
 | 
			
		||||
//
 | 
			
		||||
//nolint:gocyclo
 | 
			
		||||
func (suite *DiscoverySuite) TestMembers() {
 | 
			
		||||
	nodes := suite.DiscoverNodes()
 | 
			
		||||
	expectedTalosVersion := fmt.Sprintf("Talos (%s)", suite.Version)
 | 
			
		||||
 | 
			
		||||
	for _, node := range nodes.Nodes() {
 | 
			
		||||
		nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
		members := suite.getMembers(nodeCtx)
 | 
			
		||||
 | 
			
		||||
		suite.Assert().Len(members, len(nodes.Nodes()))
 | 
			
		||||
 | 
			
		||||
		// do basic check against discovered nodes
 | 
			
		||||
		for _, expectedNode := range nodes.Nodes() {
 | 
			
		||||
			addr, err := netaddr.ParseIP(expectedNode)
 | 
			
		||||
			suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
			found := false
 | 
			
		||||
 | 
			
		||||
			for _, member := range members {
 | 
			
		||||
				for _, memberAddr := range member.TypedSpec().Addresses {
 | 
			
		||||
					if memberAddr.Compare(addr) == 0 {
 | 
			
		||||
						found = true
 | 
			
		||||
 | 
			
		||||
						break
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if found {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			suite.Assert().True(found, "addr %s", addr)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// if cluster informantion is available, perform additional checks
 | 
			
		||||
		if suite.Cluster == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		memberByID := make(map[string]*cluster.Member)
 | 
			
		||||
 | 
			
		||||
		for _, member := range members {
 | 
			
		||||
			memberByID[member.Metadata().ID()] = member
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		nodesInfo := suite.Cluster.Info().Nodes
 | 
			
		||||
 | 
			
		||||
		for _, nodeInfo := range nodesInfo {
 | 
			
		||||
			matchingMember := memberByID[nodeInfo.Name]
 | 
			
		||||
			suite.Require().NotNil(matchingMember)
 | 
			
		||||
 | 
			
		||||
			suite.Assert().Equal(nodeInfo.Type, matchingMember.TypedSpec().MachineType)
 | 
			
		||||
			suite.Assert().Equal(expectedTalosVersion, matchingMember.TypedSpec().OperatingSystem)
 | 
			
		||||
			suite.Assert().Equal(nodeInfo.Name, matchingMember.TypedSpec().Hostname)
 | 
			
		||||
 | 
			
		||||
			for _, nodeIPStd := range nodeInfo.IPs {
 | 
			
		||||
				nodeIP, ok := netaddr.FromStdIP(nodeIPStd)
 | 
			
		||||
				suite.Assert().True(ok)
 | 
			
		||||
 | 
			
		||||
				found := false
 | 
			
		||||
 | 
			
		||||
				for _, memberAddr := range matchingMember.TypedSpec().Addresses {
 | 
			
		||||
					if memberAddr.Compare(nodeIP) == 0 {
 | 
			
		||||
						found = true
 | 
			
		||||
 | 
			
		||||
						break
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				suite.Assert().True(found, "addr %s", nodeIP)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestRegistries checks that all registries produce same raw Affiliate data.
 | 
			
		||||
func (suite *DiscoverySuite) TestRegistries() {
 | 
			
		||||
	registries := []string{"k8s/", "service/"}
 | 
			
		||||
 | 
			
		||||
	nodes := suite.DiscoverNodes()
 | 
			
		||||
 | 
			
		||||
	for _, node := range nodes.Nodes() {
 | 
			
		||||
		nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
		members := suite.getMembers(nodeCtx)
 | 
			
		||||
		localIdentity := suite.getNodeIdentity(nodeCtx)
 | 
			
		||||
		rawAffiliates := suite.getAffiliates(nodeCtx, cluster.RawNamespaceName)
 | 
			
		||||
 | 
			
		||||
		// raw affiliates don't contain the local node
 | 
			
		||||
		suite.Assert().Len(rawAffiliates, len(registries)*(len(members)-1))
 | 
			
		||||
 | 
			
		||||
		rawAffiliatesByID := make(map[string]*cluster.Affiliate)
 | 
			
		||||
 | 
			
		||||
		for _, rawAffiliate := range rawAffiliates {
 | 
			
		||||
			rawAffiliatesByID[rawAffiliate.Metadata().ID()] = rawAffiliate
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// every member except for local identity member should be discovered via each registry
 | 
			
		||||
		for _, member := range members {
 | 
			
		||||
			if member.TypedSpec().NodeID == localIdentity.TypedSpec().NodeID {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for _, registry := range registries {
 | 
			
		||||
				rawAffiliate := rawAffiliatesByID[registry+member.TypedSpec().NodeID]
 | 
			
		||||
				suite.Require().NotNil(rawAffiliate)
 | 
			
		||||
 | 
			
		||||
				suite.Assert().Equal(member.TypedSpec().Hostname, rawAffiliate.TypedSpec().Hostname)
 | 
			
		||||
				suite.Assert().Equal(member.TypedSpec().Addresses, rawAffiliate.TypedSpec().Addresses)
 | 
			
		||||
				suite.Assert().Equal(member.TypedSpec().OperatingSystem, rawAffiliate.TypedSpec().OperatingSystem)
 | 
			
		||||
				suite.Assert().Equal(member.TypedSpec().MachineType, rawAffiliate.TypedSpec().MachineType)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestKubeSpanPeers verifies that KubeSpan peer specs are populated, and that peer statuses are available.
 | 
			
		||||
func (suite *DiscoverySuite) TestKubeSpanPeers() {
 | 
			
		||||
	if !suite.Capabilities().RunsTalosKernel {
 | 
			
		||||
		suite.T().Skip("not running Talos kernel")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// check that cluster has KubeSpan enabled
 | 
			
		||||
	node := suite.RandomDiscoveredNode()
 | 
			
		||||
	suite.ClearConnectionRefused(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
	nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
	provider, err := suite.ReadConfigFromNode(nodeCtx)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	if !provider.Machine().Network().KubeSpan().Enabled() {
 | 
			
		||||
		suite.T().Skip("KubeSpan is disabled")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nodes := suite.DiscoverNodes().Nodes()
 | 
			
		||||
 | 
			
		||||
	for _, node := range nodes {
 | 
			
		||||
		nodeCtx := client.WithNodes(suite.ctx, node)
 | 
			
		||||
 | 
			
		||||
		peerSpecs := suite.getKubeSpanPeerSpecs(nodeCtx)
 | 
			
		||||
		suite.Assert().Len(peerSpecs, len(nodes)-1)
 | 
			
		||||
 | 
			
		||||
		peerStatuses := suite.getKubeSpanPeerStatuses(nodeCtx)
 | 
			
		||||
		suite.Assert().Len(peerStatuses, len(nodes)-1)
 | 
			
		||||
 | 
			
		||||
		for _, status := range peerStatuses {
 | 
			
		||||
			suite.Assert().Equal(kubespan.PeerStateUp, status.TypedSpec().State)
 | 
			
		||||
			suite.Assert().False(status.TypedSpec().Endpoint.IsZero())
 | 
			
		||||
			suite.Assert().Greater(status.TypedSpec().ReceiveBytes, int64(0))
 | 
			
		||||
			suite.Assert().Greater(status.TypedSpec().TransmitBytes, int64(0))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:dupl
 | 
			
		||||
func (suite *DiscoverySuite) getMembers(nodeCtx context.Context) []*cluster.Member {
 | 
			
		||||
	var result []*cluster.Member
 | 
			
		||||
 | 
			
		||||
	memberList, err := suite.Client.Resources.List(nodeCtx, cluster.NamespaceName, cluster.MemberType)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		resp, err := memberList.Recv()
 | 
			
		||||
		if err == io.EOF {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		if resp.Resource == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO: this is hackery to decode back into Member resource
 | 
			
		||||
		//       this should be fixed once we introduce protobuf properly
 | 
			
		||||
		b, err := yaml.Marshal(resp.Resource.Spec())
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		member := cluster.NewMember(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID())
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(yaml.Unmarshal(b, member.TypedSpec()))
 | 
			
		||||
 | 
			
		||||
		result = append(result, member)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (suite *DiscoverySuite) getNodeIdentity(nodeCtx context.Context) *cluster.Identity {
 | 
			
		||||
	list, err := suite.Client.Resources.Get(nodeCtx, cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
	suite.Require().Len(list, 1)
 | 
			
		||||
 | 
			
		||||
	resp := list[0]
 | 
			
		||||
 | 
			
		||||
	// TODO: this is hackery to decode back into Member resource
 | 
			
		||||
	//       this should be fixed once we introduce protobuf properly
 | 
			
		||||
	b, err := yaml.Marshal(resp.Resource.Spec())
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	identity := cluster.NewIdentity(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID())
 | 
			
		||||
 | 
			
		||||
	suite.Require().NoError(yaml.Unmarshal(b, identity.TypedSpec()))
 | 
			
		||||
 | 
			
		||||
	return identity
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:dupl
 | 
			
		||||
func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace resource.Namespace) []*cluster.Affiliate {
 | 
			
		||||
	var result []*cluster.Affiliate
 | 
			
		||||
 | 
			
		||||
	affiliateList, err := suite.Client.Resources.List(nodeCtx, namespace, cluster.AffiliateType)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		resp, err := affiliateList.Recv()
 | 
			
		||||
		if err == io.EOF {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		if resp.Resource == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO: this is hackery to decode back into Affiliate resource
 | 
			
		||||
		//       this should be fixed once we introduce protobuf properly
 | 
			
		||||
		b, err := yaml.Marshal(resp.Resource.Spec())
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		affiliate := cluster.NewAffiliate(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID())
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(yaml.Unmarshal(b, affiliate.TypedSpec()))
 | 
			
		||||
 | 
			
		||||
		result = append(result, affiliate)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:dupl
 | 
			
		||||
func (suite *DiscoverySuite) getKubeSpanPeerSpecs(nodeCtx context.Context) []*kubespan.PeerSpec {
 | 
			
		||||
	var result []*kubespan.PeerSpec
 | 
			
		||||
 | 
			
		||||
	peerList, err := suite.Client.Resources.List(nodeCtx, kubespan.NamespaceName, kubespan.PeerSpecType)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		resp, err := peerList.Recv()
 | 
			
		||||
		if err == io.EOF {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		if resp.Resource == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO: this is hackery to decode back into KubeSpanPeerSpec resource
 | 
			
		||||
		//       this should be fixed once we introduce protobuf properly
 | 
			
		||||
		b, err := yaml.Marshal(resp.Resource.Spec())
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		peerSpec := kubespan.NewPeerSpec(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID())
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(yaml.Unmarshal(b, peerSpec.TypedSpec()))
 | 
			
		||||
 | 
			
		||||
		result = append(result, peerSpec)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:dupl
 | 
			
		||||
func (suite *DiscoverySuite) getKubeSpanPeerStatuses(nodeCtx context.Context) []*kubespan.PeerStatus {
 | 
			
		||||
	var result []*kubespan.PeerStatus
 | 
			
		||||
 | 
			
		||||
	peerList, err := suite.Client.Resources.List(nodeCtx, kubespan.NamespaceName, kubespan.PeerStatusType)
 | 
			
		||||
	suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		resp, err := peerList.Recv()
 | 
			
		||||
		if err == io.EOF {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		if resp.Resource == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO: this is hackery to decode back into KubeSpanPeerStatus resource
 | 
			
		||||
		//       this should be fixed once we introduce protobuf properly
 | 
			
		||||
		b, err := yaml.Marshal(resp.Resource.Spec())
 | 
			
		||||
		suite.Require().NoError(err)
 | 
			
		||||
 | 
			
		||||
		peerStatus := kubespan.NewPeerStatus(resp.Resource.Metadata().Namespace(), resp.Resource.Metadata().ID())
 | 
			
		||||
 | 
			
		||||
		suite.Require().NoError(yaml.Unmarshal(b, peerStatus.TypedSpec()))
 | 
			
		||||
 | 
			
		||||
		result = append(result, peerStatus)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	allSuites = append(allSuites, new(DiscoverySuite))
 | 
			
		||||
}
 | 
			
		||||
@@ -8,6 +8,7 @@
 | 
			
		||||
package base
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"crypto/sha256"
 | 
			
		||||
	"encoding/hex"
 | 
			
		||||
@@ -16,6 +17,7 @@ import (
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/suite"
 | 
			
		||||
@@ -28,7 +30,10 @@ import (
 | 
			
		||||
	machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/client"
 | 
			
		||||
	clientconfig "github.com/talos-systems/talos/pkg/machinery/client/config"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/configloader"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/constants"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/provision"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/provision/access"
 | 
			
		||||
)
 | 
			
		||||
@@ -323,6 +328,57 @@ func (apiSuite *APISuite) HashKubeletCert(ctx context.Context, node string) (str
 | 
			
		||||
	return hex.EncodeToString(hash.Sum(nil)), reader.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadConfigFromNode reads machine configuration from the node.
 | 
			
		||||
func (apiSuite *APISuite) ReadConfigFromNode(nodeCtx context.Context) (config.Provider, error) {
 | 
			
		||||
	// Load the current node machine config
 | 
			
		||||
	cfgData := new(bytes.Buffer)
 | 
			
		||||
 | 
			
		||||
	reader, errCh, err := apiSuite.Client.Read(nodeCtx, constants.ConfigPath)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("error creating reader: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer reader.Close() //nolint:errcheck
 | 
			
		||||
 | 
			
		||||
	if err = copyFromReaderWithErrChan(cfgData, reader, errCh); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("error reading: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	provider, err := configloader.NewFromBytes(cfgData.Bytes())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to parse: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return provider, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copyFromReaderWithErrChan(out io.Writer, in io.Reader, errCh <-chan error) (err error) {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
	var chanErr error
 | 
			
		||||
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
 | 
			
		||||
		// StreamReader is only singly-buffered, so we need to process any errors as we get them.
 | 
			
		||||
		for chanErr = range errCh {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			err = chanErr
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	_, err = io.Copy(out, in)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TearDownSuite closes Talos API client.
 | 
			
		||||
func (apiSuite *APISuite) TearDownSuite() {
 | 
			
		||||
	if apiSuite.Client != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -8,6 +8,7 @@ import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"net"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
 | 
			
		||||
	"github.com/talos-systems/talos/pkg/provision"
 | 
			
		||||
@@ -63,7 +64,7 @@ func (p *provisioner) Reflect(ctx context.Context, clusterName, stateDirectory s
 | 
			
		||||
		res.clusterInfo.Nodes = append(res.clusterInfo.Nodes,
 | 
			
		||||
			provision.NodeInfo{
 | 
			
		||||
				ID:   node.ID,
 | 
			
		||||
				Name: node.Names[0],
 | 
			
		||||
				Name: strings.TrimLeft(node.Names[0], "/"),
 | 
			
		||||
				Type: t,
 | 
			
		||||
 | 
			
		||||
				IPs: []net.IP{net.ParseIP(node.NetworkSettings.Networks[res.clusterInfo.Network.Name].IPAddress)},
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,8 @@
 | 
			
		||||
 | 
			
		||||
package kubespan
 | 
			
		||||
 | 
			
		||||
import "fmt"
 | 
			
		||||
 | 
			
		||||
//go:generate stringer -type=PeerState -linecomment
 | 
			
		||||
 | 
			
		||||
// PeerState is KubeSpan peer current state.
 | 
			
		||||
@@ -14,6 +16,22 @@ func (v PeerState) MarshalText() ([]byte, error) {
 | 
			
		||||
	return []byte(v.String()), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnmarshalText implements encoding.TextUnmarshaler.
 | 
			
		||||
func (v *PeerState) UnmarshalText(b []byte) error {
 | 
			
		||||
	switch string(b) {
 | 
			
		||||
	case "unknown":
 | 
			
		||||
		*v = PeerStateUnknown
 | 
			
		||||
	case "up":
 | 
			
		||||
		*v = PeerStateUp
 | 
			
		||||
	case "down":
 | 
			
		||||
		*v = PeerStateDown
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("unsupported value for PeerState: %q", string(b))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PeerState constants.
 | 
			
		||||
const (
 | 
			
		||||
	PeerStateUnknown PeerState = iota // unknown
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user