feat: support disk usage command in talosctl

Usage example:

```bash
talosctl du --nodes 10.5.0.2 /var -H -d 2
NODE       NAME
10.5.0.2   8.4 kB   etc
10.5.0.2   1.3 GB   lib
10.5.0.2   16 MB    log
10.5.0.2   25 kB    run
10.5.0.2   4.1 kB   tmp
10.5.0.2   1.3 GB   .
```

Supported flags:
- `-a` writes counts for all files, not just directories.
- `-d` recursion depth
- '-H' humanize size outputs.
- '-t' size threshold (skip files if < size or > size).

Fixes: https://github.com/talos-systems/talos/issues/2504

Signed-off-by: Artem Chernyshev <artem.0xD2@gmail.com>
This commit is contained in:
Artem Chernyshev
2020-10-13 14:31:31 +03:00
committed by talos-bot
parent 17b6ce0a83
commit e7e99cf1b3
10 changed files with 2062 additions and 1165 deletions

View File

@@ -31,6 +31,7 @@ service MachineService {
rpc Hostname(google.protobuf.Empty) returns (HostnameResponse);
rpc Kubeconfig(google.protobuf.Empty) returns (stream common.Data);
rpc List(ListRequest) returns (stream FileInfo);
rpc DiskUsage(DiskUsageRequest) returns (stream DiskUsageInfo);
rpc LoadAvg(google.protobuf.Empty) returns (LoadAvgResponse);
rpc Logs(LogsRequest) returns (stream common.Data);
rpc Memory(google.protobuf.Empty) returns (MemoryResponse);
@@ -270,6 +271,20 @@ message ListRequest {
int32 recursion_depth = 3;
}
// DiskUsageRequest describes a request to list disk usage of directories and regular files
message DiskUsageRequest {
// RecursionDepth indicates how many levels of subdirectories should be
// recursed. The default (0) indicates that no limit should be enforced.
int32 recursion_depth = 1;
// All write sizes for all files, not just directories.
bool all = 2;
// Threshold exclude entries smaller than SIZE if positive,
// or entries greater than SIZE if negative.
int64 threshold = 3;
// DiskUsagePaths is the list of directories to calculate disk usage for.
repeated string paths = 4;
}
// FileInfo describes a file or directory's information
message FileInfo {
common.Metadata metadata = 1;
@@ -292,6 +307,20 @@ message FileInfo {
string relative_name = 9;
}
// DiskUsageInfo describes a file or directory's information for du command
message DiskUsageInfo {
common.Metadata metadata = 1;
// Name is the name (including prefixed path) of the file or directory
string name = 2;
// Size indicates the number of bytes contained within the file
int64 size = 3;
// Error describes any error encountered while trying to read the file
// information.
string error = 4;
// RelativeName is the name of the file or directory relative to the RootPath
string relative_name = 5;
}
// The messages message containing the requested df stats.
message Mounts {
common.Metadata metadata = 1;

View File

@@ -0,0 +1,132 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package talos
import (
"context"
"fmt"
"io"
"os"
"text/tabwriter"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
humanize "github.com/dustin/go-humanize"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
)
var (
all bool
threshold int64
)
// duCmd represents the du command.
var duCmd = &cobra.Command{
Use: "usage [path1] [path2] ... [pathN]",
Aliases: []string{"du"},
Short: "Retrieve a disk usage",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(func(ctx context.Context, c *client.Client) error {
var paths []string
if len(args) == 0 {
paths = []string{"/"}
} else {
paths = args
}
stream, err := c.DiskUsage(ctx, &machineapi.DiskUsageRequest{
RecursionDepth: recursionDepth,
All: all,
Threshold: threshold,
Paths: paths,
})
if err != nil {
return fmt.Errorf("error fetching logs: %s", err)
}
addedHeader := false
defaultNode := client.RemotePeer(stream.Context())
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
multipleNodes := false
node := defaultNode
stringifySize := func(s int64) string {
if humanizeFlag {
return humanize.Bytes(uint64(s))
}
return fmt.Sprintf("%d", s)
}
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
return w.Flush()
}
return fmt.Errorf("error streaming results: %s", err)
}
if info.Error != "" {
fmt.Fprintf(os.Stderr, "%s: error reading file %s: %s\n", node, info.Name, info.Error)
continue
}
pattern := "%s\t%s\n"
size := stringifySize(info.Size)
args := []interface{}{
size, info.RelativeName,
}
if info.Metadata != nil && info.Metadata.Hostname != "" {
multipleNodes = true
node = info.Metadata.Hostname
}
if !addedHeader {
if multipleNodes {
fmt.Fprintln(w, "NODE\tSIZE\tNAME")
} else {
fmt.Fprintln(w, "SIZE\tNAME")
}
addedHeader = true
}
if info.Metadata != nil && info.Metadata.Error != "" {
fmt.Fprintf(os.Stderr, "%s: %s\n", node, info.Metadata.Error)
continue
}
if multipleNodes {
pattern = "%s\t%s\t%s\n"
args = append([]interface{}{node}, args...)
}
fmt.Fprintf(w, pattern, args...)
}
})
},
}
func init() {
duCmd.Flags().BoolVarP(&humanizeFlag, "humanize", "H", false, "humanize size and time in the output")
duCmd.Flags().BoolVarP(&all, "all", "a", false, "write counts for all files, not just directories")
duCmd.Flags().Int64VarP(&threshold, "threshold", "t", 0, "threshold exclude entries smaller than SIZE if positive, or entries greater than SIZE if negative")
duCmd.Flags().Int32VarP(&recursionDepth, "depth", "d", 0, "maximum recursion depth")
addCommand(duCmd)
}

View File

@@ -12,7 +12,7 @@ import (
"text/tabwriter"
"time"
"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

View File

@@ -53,6 +53,7 @@ A CLI for out-of-band management of Kubernetes nodes created by Talos
* [talosctl time](talosctl_time.md) - Gets current server time
* [talosctl upgrade](talosctl_upgrade.md) - Upgrade Talos on the target node
* [talosctl upgrade-k8s](talosctl_upgrade-k8s.md) - Upgrade Kubernetes control plane in the Talos cluster.
* [talosctl usage](talosctl_usage.md) - Retrieve a disk usage
* [talosctl validate](talosctl_validate.md) - Validate config
* [talosctl version](talosctl_version.md) - Prints the version

View File

@@ -68,6 +68,7 @@ func main() {
// all existing streaming methods
for _, methodName := range []string{
"/machine.MachineService/Copy",
"/machine.MachineService/DiskUsage",
"/machine.MachineService/Dmesg",
"/machine.MachineService/Events",
"/machine.MachineService/Kubeconfig",

View File

@@ -26,7 +26,7 @@ import (
"github.com/containerd/containerd/oci"
criconstants "github.com/containerd/cri/pkg/constants"
"github.com/golang/protobuf/ptypes/empty"
"github.com/hashicorp/go-multierror"
multierror "github.com/hashicorp/go-multierror"
"github.com/prometheus/procfs"
"github.com/rs/xid"
"golang.org/x/sys/unix"
@@ -505,6 +505,187 @@ func (s *Server) List(req *machine.ListRequest, obj machine.MachineService_ListS
return nil
}
// DiskUsage implements the machine.MachineServer interface.
func (s *Server) DiskUsage(req *machine.DiskUsageRequest, obj machine.MachineService_DiskUsageServer) error { //nolint: gocyclo
if req == nil {
req = new(machine.DiskUsageRequest)
}
for _, path := range req.Paths {
if !strings.HasPrefix(path, OSPathSeparator) {
// Make sure we use complete paths
path = OSPathSeparator + path
}
path = strings.TrimSuffix(path, OSPathSeparator)
if path == "" {
path = "/"
}
_, err := os.Stat(path)
if err == os.ErrNotExist {
err = obj.Send(
&machine.DiskUsageInfo{
Name: path,
RelativeName: path,
Error: err.Error(),
},
)
if err != nil {
return err
}
continue
}
files, err := archiver.Walker(obj.Context(), path, archiver.WithMaxRecurseDepth(-1))
if err != nil {
err = obj.Send(
&machine.DiskUsageInfo{
Name: path,
RelativeName: path,
Error: err.Error(),
},
)
if err != nil {
return err
}
continue
}
folders := map[string]*machine.DiskUsageInfo{}
// send a record back to client if the message shouldn't be skipped
// at the same time use record information for folder size estimation
sendSize := func(info *machine.DiskUsageInfo, depth int32, isDir bool) error {
prefix := strings.TrimRight(filepath.Dir(info.Name), "/")
if folder, ok := folders[prefix]; ok {
folder.Size += info.Size
}
// recursion depth check
skip := depth >= req.RecursionDepth && req.RecursionDepth > 0
// skip files check
skip = skip || !isDir && !req.All
// threshold check
skip = skip || req.Threshold > 0 && info.Size < req.Threshold
skip = skip || req.Threshold < 0 && info.Size > -req.Threshold
if skip {
return nil
}
return obj.Send(info)
}
var (
depth int32
prefix = path
rootDepth = int32(strings.Count(path, archiver.OSPathSeparator))
)
// flush all folder sizes until we get to the common prefix
flushFolders := func(prefix, nextPrefix string) error {
for !strings.HasPrefix(nextPrefix, prefix) {
currentDepth := int32(strings.Count(prefix, archiver.OSPathSeparator)) - rootDepth
if folder, ok := folders[prefix]; ok {
err = sendSize(folder, currentDepth, true)
if err != nil {
return err
}
delete(folders, prefix)
}
prefix = strings.TrimRight(filepath.Dir(prefix), "/")
}
return nil
}
for fi := range files {
if fi.Error != nil {
err = obj.Send(
&machine.DiskUsageInfo{
Name: fi.FullPath,
RelativeName: fi.RelPath,
Error: fi.Error.Error(),
},
)
} else {
currentDepth := int32(strings.Count(fi.FullPath, archiver.OSPathSeparator)) - rootDepth
size := fi.FileInfo.Size()
if size < 0 {
size = 0
}
// kcore file size gives wrong value, this code should be smarter when it reads it
// TODO: figure out better way to skip such file
if fi.FullPath == "/proc/kcore" {
size = 0
}
if fi.FileInfo.IsDir() {
folders[strings.TrimRight(fi.FullPath, "/")] = &machine.DiskUsageInfo{
Name: fi.FullPath,
RelativeName: fi.RelPath,
Size: size,
}
} else {
err = sendSize(&machine.DiskUsageInfo{
Name: fi.FullPath,
RelativeName: fi.RelPath,
Size: size,
}, currentDepth, false)
if err != nil {
return err
}
}
// depth goes down when walker gets to the next sibling folder
if currentDepth < depth {
nextPrefix := fi.FullPath
err = flushFolders(prefix, nextPrefix)
if err != nil {
return err
}
prefix = nextPrefix
}
if fi.FileInfo.IsDir() {
prefix = fi.FullPath
}
depth = currentDepth
}
}
if path != "" {
p := strings.TrimRight(path, "/")
if folder, ok := folders[p]; ok {
err = flushFolders(prefix, p)
if err != nil {
return err
}
err = sendSize(folder, 0, true)
if err != nil {
return err
}
}
}
return nil
}
return nil
}
// Mounts implements the machine.MachineServer interface.
func (s *Server) Mounts(ctx context.Context, in *empty.Empty) (reply *machine.MountsResponse, err error) {
file, err := os.Open("/proc/mounts")

View File

@@ -0,0 +1,135 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration_api
package api
import (
"context"
"io"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
)
// DiskUsageSuite verifies Logs API.
type DiskUsageSuite struct {
base.APISuite
ctx context.Context
ctxCancel context.CancelFunc
nodeCtx context.Context
}
// SuiteName ...
func (suite *DiskUsageSuite) SuiteName() string {
return "api.DiskUsageSuite"
}
// SetupTest ...
func (suite *DiskUsageSuite) SetupTest() {
// make sure API calls have timeout
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute)
suite.nodeCtx = client.WithNodes(suite.ctx, suite.RandomDiscoveredNode())
}
// TearDownTest ...
func (suite *DiskUsageSuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
// TestDiskUsageRequests compares results of disk usage requests with different parameters.
func (suite *DiskUsageSuite) TestDiskUsageRequests() {
type testParams struct {
recursionDepth int32
all bool
paths []string
}
defaultPaths := []string{
"/etc",
"/bin",
}
cases := []*testParams{
&testParams{
recursionDepth: 0,
all: false,
paths: defaultPaths,
},
&testParams{
recursionDepth: 1,
all: false,
paths: defaultPaths,
},
&testParams{
recursionDepth: 0,
all: true,
paths: defaultPaths,
},
&testParams{
recursionDepth: 1,
all: true,
paths: defaultPaths,
},
&testParams{
recursionDepth: 0,
all: true,
paths: append([]string{"/this/is/going/to/fail"}, defaultPaths...),
},
}
sizes := map[string]int64{}
for _, params := range cases {
lookupPaths := map[string]bool{}
for _, path := range params.paths {
lookupPaths[path] = true
}
stream, err := suite.Client.DiskUsage(
suite.nodeCtx,
&machineapi.DiskUsageRequest{
Paths: params.paths,
RecursionDepth: params.recursionDepth,
All: params.all,
},
)
suite.Require().NoError(err)
responseCount := 0
for {
info, err := stream.Recv()
responseCount++
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
break
}
suite.Require().NoError(err)
}
if size, ok := sizes[info.Name]; ok {
suite.Require().EqualValues(size, info.Size)
}
sizes[info.Name] = info.Size
}
suite.Require().Greater(responseCount, 1)
}
}
func init() {
allSuites = append(allSuites, new(DiskUsageSuite))
}

View File

@@ -0,0 +1,134 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration_cli
package cli
import (
"fmt"
"strconv"
"strings"
"github.com/talos-systems/talos/internal/integration/base"
)
// DiskUsageSuite verifies dmesg command.
type DiskUsageSuite struct {
base.CLISuite
}
// SuiteName ...
func (suite *DiskUsageSuite) SuiteName() string {
return "cli.DiskUsageSuite"
}
type duInfo struct {
size int64
name string
node string
}
func splitLine(line string) []string {
columns := []string{}
parts := strings.Split(line, " ")
for _, part := range parts {
if part != "" {
columns = append(columns, strings.TrimSpace(part))
}
}
return columns
}
func parseLine(line string) (*duInfo, error) {
columns := splitLine(line)
if len(columns) < 2 || len(columns) > 3 {
return nil, fmt.Errorf("failed to parse line %s", line)
}
res := &duInfo{}
offset := 0
if len(columns) == 3 {
res.node = columns[0]
offset += 1
}
size, err := strconv.ParseInt(columns[offset], 10, 64)
if err != nil {
return nil, err
}
res.size = size
res.name = columns[offset+1]
return res, nil
}
// TestSuccess runs comand with success.
func (suite *DiskUsageSuite) TestSuccess() {
folder := "/var"
node := suite.RandomDiscoveredNode()
var folderSize int64 = 4096
suite.RunCLI([]string{"list", "--nodes", node, folder, "-l"},
base.StdoutMatchFunc(func(stdout string) error {
lines := strings.Split(strings.TrimSpace(stdout), "\n")
if len(lines) == 1 {
return fmt.Errorf("expected lines > 0")
}
parts := splitLine(lines[1])
var err error
folderSize, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return err
}
return nil
}))
// check total calculation
suite.RunCLI([]string{"usage", "--nodes", node, folder, "-d2", "--all"},
base.StdoutMatchFunc(func(stdout string) error {
lines := strings.Split(strings.TrimSpace(stdout), "\n")
if len(lines) == 1 {
return fmt.Errorf("expected lines > 0")
}
var totalExpected int64
for _, line := range lines[1 : len(lines)-1] {
info, err := parseLine(line)
if err != nil {
return err
}
totalExpected += info.size
}
// add folder size
totalExpected += folderSize
info, err := parseLine(lines[len(lines)-1])
if err != nil {
return err
}
if info.size != totalExpected {
return fmt.Errorf("folder size was calculated incorrectly. Expected %d, got %d", totalExpected, info.size)
}
return nil
}))
}
// TestError runs comand with error.
func (suite *DiskUsageSuite) TestError() {
suite.RunCLI([]string{"usage", "--nodes", suite.RandomDiscoveredNode(), "/no/such/folder/here/just/for/sure"},
base.StderrNotEmpty(), base.StdoutEmpty())
}
func init() {
allSuites = append(allSuites, new(DiskUsageSuite))
}

File diff suppressed because it is too large Load Diff

View File

@@ -563,6 +563,11 @@ func (c *Client) LS(ctx context.Context, req *machineapi.ListRequest) (stream ma
return c.MachineClient.List(ctx, req)
}
// DiskUsage implements the proto.OSClient interface.
func (c *Client) DiskUsage(ctx context.Context, req *machineapi.DiskUsageRequest) (stream machineapi.MachineService_DiskUsageClient, err error) {
return c.MachineClient.DiskUsage(ctx, req)
}
// Copy implements the proto.OSClient interface.
func (c *Client) Copy(ctx context.Context, rootPath string) (io.ReadCloser, <-chan error, error) {
stream, err := c.MachineClient.Copy(ctx, &machineapi.CopyRequest{