mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #93333 from loburm/fix-logrotate
Fix an issue when rotated logs of dead containers are not removed.
This commit is contained in:
		@@ -38,6 +38,9 @@ type OSInterface interface {
 | 
			
		||||
	Pipe() (r *os.File, w *os.File, err error)
 | 
			
		||||
	ReadDir(dirname string) ([]os.FileInfo, error)
 | 
			
		||||
	Glob(pattern string) ([]string, error)
 | 
			
		||||
	Open(name string) (*os.File, error)
 | 
			
		||||
	OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
 | 
			
		||||
	Rename(oldpath, newpath string) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RealOS is used to dispatch the real system level operations.
 | 
			
		||||
@@ -105,3 +108,18 @@ func (RealOS) ReadDir(dirname string) ([]os.FileInfo, error) {
 | 
			
		||||
func (RealOS) Glob(pattern string) ([]string, error) {
 | 
			
		||||
	return filepath.Glob(pattern)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Open will call os.Open to return the file.
 | 
			
		||||
func (RealOS) Open(name string) (*os.File, error) {
 | 
			
		||||
	return os.Open(name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OpenFile will call os.OpenFile to return the file.
 | 
			
		||||
func (RealOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
 | 
			
		||||
	return os.OpenFile(name, flag, perm)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Rename will call os.Rename to rename a file.
 | 
			
		||||
func (RealOS) Rename(oldpath, newpath string) error {
 | 
			
		||||
	return os.Rename(oldpath, newpath)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -30,6 +30,7 @@ type FakeOS struct {
 | 
			
		||||
	ReadDirFn  func(string) ([]os.FileInfo, error)
 | 
			
		||||
	MkdirAllFn func(string, os.FileMode) error
 | 
			
		||||
	SymlinkFn  func(string, string) error
 | 
			
		||||
	GlobFn     func(string, string) bool
 | 
			
		||||
	HostName   string
 | 
			
		||||
	Removes    []string
 | 
			
		||||
	Files      map[string][]*os.FileInfo
 | 
			
		||||
@@ -78,8 +79,12 @@ func (f *FakeOS) RemoveAll(path string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create is a fake call that returns nil.
 | 
			
		||||
func (FakeOS) Create(path string) (*os.File, error) {
 | 
			
		||||
// Create is a fake call that creates a virtual file and returns nil.
 | 
			
		||||
func (f *FakeOS) Create(path string) (*os.File, error) {
 | 
			
		||||
	if f.Files == nil {
 | 
			
		||||
		f.Files = make(map[string][]*os.FileInfo)
 | 
			
		||||
	}
 | 
			
		||||
	f.Files[path] = []*os.FileInfo{}
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -111,7 +116,31 @@ func (f *FakeOS) ReadDir(dirname string) ([]os.FileInfo, error) {
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Glob is a fake call that returns nil.
 | 
			
		||||
// Glob is a fake call that returns list of virtual files matching a pattern.
 | 
			
		||||
func (f *FakeOS) Glob(pattern string) ([]string, error) {
 | 
			
		||||
	if f.GlobFn != nil {
 | 
			
		||||
		var res []string
 | 
			
		||||
		for k := range f.Files {
 | 
			
		||||
			if f.GlobFn(pattern, k) {
 | 
			
		||||
				res = append(res, k)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return res, nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Open is a fake call that returns nil.
 | 
			
		||||
func (FakeOS) Open(name string) (*os.File, error) {
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OpenFile is a fake call that return nil.
 | 
			
		||||
func (FakeOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Rename is a fake call that return nil.
 | 
			
		||||
func (FakeOS) Rename(oldpath, newpath string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -585,6 +585,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
			
		||||
		klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
 | 
			
		||||
		// setup containerLogManager for CRI container runtime
 | 
			
		||||
		containerLogManager, err := logs.NewContainerLogManager(
 | 
			
		||||
			klet.runtimeService,
 | 
			
		||||
			kubeDeps.OSInterface,
 | 
			
		||||
			kubeCfg.ContainerLogMaxSize,
 | 
			
		||||
			int(kubeCfg.ContainerLogMaxFiles),
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		klet.containerLogManager = containerLogManager
 | 
			
		||||
	} else {
 | 
			
		||||
		klet.containerLogManager = logs.NewStubContainerLogManager()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
 | 
			
		||||
		kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
 | 
			
		||||
		klet.livenessManager,
 | 
			
		||||
@@ -605,6 +621,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
			
		||||
		kubeDeps.RemoteImageService,
 | 
			
		||||
		kubeDeps.ContainerManager.InternalContainerLifecycle(),
 | 
			
		||||
		kubeDeps.dockerLegacyService,
 | 
			
		||||
		klet.containerLogManager,
 | 
			
		||||
		klet.runtimeClassManager,
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -662,21 +679,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
			
		||||
	}
 | 
			
		||||
	klet.imageManager = imageManager
 | 
			
		||||
 | 
			
		||||
	if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
 | 
			
		||||
		// setup containerLogManager for CRI container runtime
 | 
			
		||||
		containerLogManager, err := logs.NewContainerLogManager(
 | 
			
		||||
			klet.runtimeService,
 | 
			
		||||
			kubeCfg.ContainerLogMaxSize,
 | 
			
		||||
			int(kubeCfg.ContainerLogMaxFiles),
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		klet.containerLogManager = containerLogManager
 | 
			
		||||
	} else {
 | 
			
		||||
		klet.containerLogManager = logs.NewStubContainerLogManager()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
 | 
			
		||||
		klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -41,6 +41,7 @@ go_library(
 | 
			
		||||
        "//pkg/kubelet/images:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/kuberuntime/logs:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/lifecycle:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/logs:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/metrics:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/prober/results:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/runtimeclass:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -32,6 +32,7 @@ import (
 | 
			
		||||
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/images"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/logs"
 | 
			
		||||
	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -73,6 +74,10 @@ func (f *fakePodStateProvider) IsPodTerminated(uid types.UID) bool {
 | 
			
		||||
 | 
			
		||||
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
 | 
			
		||||
	recorder := &record.FakeRecorder{}
 | 
			
		||||
	logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	kubeRuntimeManager := &kubeGenericRuntimeManager{
 | 
			
		||||
		recorder:           recorder,
 | 
			
		||||
		cpuCFSQuota:        false,
 | 
			
		||||
@@ -88,6 +93,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
 | 
			
		||||
		seccompProfileRoot: fakeSeccompProfileRoot,
 | 
			
		||||
		internalLifecycle:  cm.NewFakeInternalContainerLifecycle(),
 | 
			
		||||
		logReduction:       logreduction.NewLogReduction(identicalErrorDelay),
 | 
			
		||||
		logManager:         logManager,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)
 | 
			
		||||
 
 | 
			
		||||
@@ -855,19 +855,19 @@ func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {
 | 
			
		||||
 | 
			
		||||
// removeContainerLog removes the container log.
 | 
			
		||||
func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {
 | 
			
		||||
	// Remove the container log.
 | 
			
		||||
	// Use log manager to remove rotated logs.
 | 
			
		||||
	err := m.logManager.Clean(containerID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	status, err := m.runtimeService.ContainerStatus(containerID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to get container status %q: %v", containerID, err)
 | 
			
		||||
	}
 | 
			
		||||
	labeledInfo := getContainerInfoFromLabels(status.Labels)
 | 
			
		||||
	path := status.GetLogPath()
 | 
			
		||||
	if err := m.osInterface.Remove(path); err != nil && !os.IsNotExist(err) {
 | 
			
		||||
		return fmt.Errorf("failed to remove container %q log %q: %v", containerID, path, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remove the legacy container log symlink.
 | 
			
		||||
	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
 | 
			
		||||
	labeledInfo := getContainerInfoFromLabels(status.Labels)
 | 
			
		||||
	legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,
 | 
			
		||||
		labeledInfo.PodNamespace)
 | 
			
		||||
	if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,7 @@ package kuberuntime
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
@@ -65,12 +66,22 @@ func TestRemoveContainer(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	containerID := fakeContainers[0].Id
 | 
			
		||||
	fakeOS := m.osInterface.(*containertest.FakeOS)
 | 
			
		||||
	fakeOS.GlobFn = func(pattern, path string) bool {
 | 
			
		||||
		pattern = strings.Replace(pattern, "*", ".*", -1)
 | 
			
		||||
		return regexp.MustCompile(pattern).MatchString(path)
 | 
			
		||||
	}
 | 
			
		||||
	expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log")
 | 
			
		||||
	expectedContainerLogPathRotated := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log.20060102-150405")
 | 
			
		||||
	expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new")
 | 
			
		||||
 | 
			
		||||
	fakeOS.Create(expectedContainerLogPath)
 | 
			
		||||
	fakeOS.Create(expectedContainerLogPathRotated)
 | 
			
		||||
 | 
			
		||||
	err = m.removeContainer(containerID)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
	// Verify container log is removed
 | 
			
		||||
	expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log")
 | 
			
		||||
	expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new")
 | 
			
		||||
	assert.Equal(t, fakeOS.Removes, []string{expectedContainerLogPath, expectedContainerLogSymlink})
 | 
			
		||||
 | 
			
		||||
	assert.Equal(t, []string{expectedContainerLogPath, expectedContainerLogPathRotated, expectedContainerLogSymlink}, fakeOS.Removes)
 | 
			
		||||
	// Verify container is removed
 | 
			
		||||
	assert.Contains(t, fakeRuntime.Called, "RemoveContainer")
 | 
			
		||||
	containers, err := fakeRuntime.ListContainers(&runtimeapi.ContainerFilter{Id: containerID})
 | 
			
		||||
 
 | 
			
		||||
@@ -46,6 +46,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/events"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/images"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/logs"
 | 
			
		||||
	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/types"
 | 
			
		||||
@@ -127,6 +128,9 @@ type kubeGenericRuntimeManager struct {
 | 
			
		||||
	// A shim to legacy functions for backward compatibility.
 | 
			
		||||
	legacyLogProvider LegacyLogProvider
 | 
			
		||||
 | 
			
		||||
	// Manage container logs.
 | 
			
		||||
	logManager logs.ContainerLogManager
 | 
			
		||||
 | 
			
		||||
	// Manage RuntimeClass resources.
 | 
			
		||||
	runtimeClassManager *runtimeclass.Manager
 | 
			
		||||
 | 
			
		||||
@@ -168,6 +172,7 @@ func NewKubeGenericRuntimeManager(
 | 
			
		||||
	imageService internalapi.ImageManagerService,
 | 
			
		||||
	internalLifecycle cm.InternalContainerLifecycle,
 | 
			
		||||
	legacyLogProvider LegacyLogProvider,
 | 
			
		||||
	logManager logs.ContainerLogManager,
 | 
			
		||||
	runtimeClassManager *runtimeclass.Manager,
 | 
			
		||||
) (KubeGenericRuntime, error) {
 | 
			
		||||
	kubeRuntimeManager := &kubeGenericRuntimeManager{
 | 
			
		||||
@@ -185,6 +190,7 @@ func NewKubeGenericRuntimeManager(
 | 
			
		||||
		keyring:             credentialprovider.NewDockerKeyring(),
 | 
			
		||||
		internalLifecycle:   internalLifecycle,
 | 
			
		||||
		legacyLogProvider:   legacyLogProvider,
 | 
			
		||||
		logManager:          logManager,
 | 
			
		||||
		runtimeClassManager: runtimeClassManager,
 | 
			
		||||
		logReduction:        logreduction.NewLogReduction(identicalErrorDelay),
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ go_library(
 | 
			
		||||
    importpath = "k8s.io/kubernetes/pkg/kubelet/logs",
 | 
			
		||||
    visibility = ["//visibility:public"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/kubelet/container:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
@@ -23,6 +24,7 @@ go_test(
 | 
			
		||||
    srcs = ["container_log_manager_test.go"],
 | 
			
		||||
    embed = [":go_default_library"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/kubelet/container:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/cri-api/pkg/apis/testing:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@ import (
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
@@ -33,6 +34,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	internalapi "k8s.io/cri-api/pkg/apis"
 | 
			
		||||
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
 | 
			
		||||
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
@@ -55,6 +57,8 @@ type ContainerLogManager interface {
 | 
			
		||||
	// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
 | 
			
		||||
	// Start container log manager.
 | 
			
		||||
	Start()
 | 
			
		||||
	// Clean removes all logs of specified container.
 | 
			
		||||
	Clean(containerID string) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
 | 
			
		||||
@@ -142,12 +146,14 @@ func parseMaxSize(size string) (int64, error) {
 | 
			
		||||
 | 
			
		||||
type containerLogManager struct {
 | 
			
		||||
	runtimeService internalapi.RuntimeService
 | 
			
		||||
	osInterface    kubecontainer.OSInterface
 | 
			
		||||
	policy         LogRotatePolicy
 | 
			
		||||
	clock          clock.Clock
 | 
			
		||||
	mutex          sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewContainerLogManager creates a new container log manager.
 | 
			
		||||
func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
 | 
			
		||||
func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) {
 | 
			
		||||
	if maxFiles <= 1 {
 | 
			
		||||
		return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
 | 
			
		||||
	}
 | 
			
		||||
@@ -157,12 +163,14 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize s
 | 
			
		||||
	}
 | 
			
		||||
	// policy LogRotatePolicy
 | 
			
		||||
	return &containerLogManager{
 | 
			
		||||
		osInterface:    osInterface,
 | 
			
		||||
		runtimeService: runtimeService,
 | 
			
		||||
		policy: LogRotatePolicy{
 | 
			
		||||
			MaxSize:  parsedMaxSize,
 | 
			
		||||
			MaxFiles: maxFiles,
 | 
			
		||||
		},
 | 
			
		||||
		clock: clock.RealClock{},
 | 
			
		||||
		mutex: sync.Mutex{},
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -176,7 +184,32 @@ func (c *containerLogManager) Start() {
 | 
			
		||||
	}, logMonitorPeriod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Clean removes all logs of specified container (including rotated one).
 | 
			
		||||
func (c *containerLogManager) Clean(containerID string) error {
 | 
			
		||||
	c.mutex.Lock()
 | 
			
		||||
	defer c.mutex.Unlock()
 | 
			
		||||
	status, err := c.runtimeService.ContainerStatus(containerID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to get container status %q: %v", containerID, err)
 | 
			
		||||
	}
 | 
			
		||||
	pattern := fmt.Sprintf("%s*", status.GetLogPath())
 | 
			
		||||
	logs, err := c.osInterface.Glob(pattern)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, l := range logs {
 | 
			
		||||
		if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) {
 | 
			
		||||
			return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *containerLogManager) rotateLogs() error {
 | 
			
		||||
	c.mutex.Lock()
 | 
			
		||||
	defer c.mutex.Unlock()
 | 
			
		||||
	// TODO(#59998): Use kubelet pod cache.
 | 
			
		||||
	containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -197,7 +230,7 @@ func (c *containerLogManager) rotateLogs() error {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		path := status.GetLogPath()
 | 
			
		||||
		info, err := os.Stat(path)
 | 
			
		||||
		info, err := c.osInterface.Stat(path)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if !os.IsNotExist(err) {
 | 
			
		||||
				klog.Errorf("Failed to stat container log %q: %v", path, err)
 | 
			
		||||
@@ -211,7 +244,7 @@ func (c *containerLogManager) rotateLogs() error {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// The container log should be recovered.
 | 
			
		||||
			info, err = os.Stat(path)
 | 
			
		||||
			info, err = c.osInterface.Stat(path)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
 | 
			
		||||
				continue
 | 
			
		||||
@@ -269,7 +302,7 @@ func (c *containerLogManager) rotateLog(id, log string) error {
 | 
			
		||||
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
 | 
			
		||||
	inuse, unused := filterUnusedLogs(logs)
 | 
			
		||||
	for _, l := range unused {
 | 
			
		||||
		if err := os.Remove(l); err != nil {
 | 
			
		||||
		if err := c.osInterface.Remove(l); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -322,7 +355,7 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
 | 
			
		||||
	}
 | 
			
		||||
	i := 0
 | 
			
		||||
	for ; i < len(logs)-maxRotatedFiles; i++ {
 | 
			
		||||
		if err := os.Remove(logs[i]); err != nil {
 | 
			
		||||
		if err := c.osInterface.Remove(logs[i]); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -332,19 +365,19 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
 | 
			
		||||
 | 
			
		||||
// compressLog compresses a log to log.gz with gzip.
 | 
			
		||||
func (c *containerLogManager) compressLog(log string) error {
 | 
			
		||||
	r, err := os.Open(log)
 | 
			
		||||
	r, err := c.osInterface.Open(log)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to open log %q: %v", log, err)
 | 
			
		||||
	}
 | 
			
		||||
	defer r.Close()
 | 
			
		||||
	tmpLog := log + tmpSuffix
 | 
			
		||||
	f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
 | 
			
		||||
	f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		// Best effort cleanup of tmpLog.
 | 
			
		||||
		os.Remove(tmpLog)
 | 
			
		||||
		c.osInterface.Remove(tmpLog)
 | 
			
		||||
	}()
 | 
			
		||||
	defer f.Close()
 | 
			
		||||
	w := gzip.NewWriter(f)
 | 
			
		||||
@@ -353,11 +386,11 @@ func (c *containerLogManager) compressLog(log string) error {
 | 
			
		||||
		return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
 | 
			
		||||
	}
 | 
			
		||||
	compressedLog := log + compressSuffix
 | 
			
		||||
	if err := os.Rename(tmpLog, compressedLog); err != nil {
 | 
			
		||||
	if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
 | 
			
		||||
	}
 | 
			
		||||
	// Remove old log file.
 | 
			
		||||
	if err := os.Remove(log); err != nil {
 | 
			
		||||
	if err := c.osInterface.Remove(log); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
@@ -368,14 +401,14 @@ func (c *containerLogManager) compressLog(log string) error {
 | 
			
		||||
func (c *containerLogManager) rotateLatestLog(id, log string) error {
 | 
			
		||||
	timestamp := c.clock.Now().Format(timestampFormat)
 | 
			
		||||
	rotated := fmt.Sprintf("%s.%s", log, timestamp)
 | 
			
		||||
	if err := os.Rename(log, rotated); err != nil {
 | 
			
		||||
	if err := c.osInterface.Rename(log, rotated); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := c.runtimeService.ReopenContainerLog(id); err != nil {
 | 
			
		||||
		// Rename the rotated log back, so that we can try rotating it again
 | 
			
		||||
		// next round.
 | 
			
		||||
		// If kubelet gets restarted at this point, we'll lose original log.
 | 
			
		||||
		if renameErr := os.Rename(rotated, log); renameErr != nil {
 | 
			
		||||
		if renameErr := c.osInterface.Rename(rotated, log); renameErr != nil {
 | 
			
		||||
			// This shouldn't happen.
 | 
			
		||||
			// Report an error if this happens, because we will lose original
 | 
			
		||||
			// log.
 | 
			
		||||
 
 | 
			
		||||
@@ -20,6 +20,10 @@ type containerLogManagerStub struct{}
 | 
			
		||||
 | 
			
		||||
func (*containerLogManagerStub) Start() {}
 | 
			
		||||
 | 
			
		||||
func (*containerLogManagerStub) Clean(containerID string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewStubContainerLogManager returns an empty ContainerLogManager which does nothing.
 | 
			
		||||
func NewStubContainerLogManager() ContainerLogManager {
 | 
			
		||||
	return &containerLogManagerStub{}
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	"github.com/stretchr/testify/require"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/clock"
 | 
			
		||||
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
 | 
			
		||||
@@ -90,7 +91,8 @@ func TestRotateLogs(t *testing.T) {
 | 
			
		||||
			MaxSize:  testMaxSize,
 | 
			
		||||
			MaxFiles: testMaxFiles,
 | 
			
		||||
		},
 | 
			
		||||
		clock: clock.NewFakeClock(now),
 | 
			
		||||
		osInterface: container.RealOS{},
 | 
			
		||||
		clock:       clock.NewFakeClock(now),
 | 
			
		||||
	}
 | 
			
		||||
	testLogs := []string{
 | 
			
		||||
		"test-log-1",
 | 
			
		||||
@@ -159,6 +161,77 @@ func TestRotateLogs(t *testing.T) {
 | 
			
		||||
	assert.Equal(t, testLogs[3], logs[4].Name())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestClean(t *testing.T) {
 | 
			
		||||
	dir, err := ioutil.TempDir("", "test-clean")
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	defer os.RemoveAll(dir)
 | 
			
		||||
 | 
			
		||||
	const (
 | 
			
		||||
		testMaxFiles = 3
 | 
			
		||||
		testMaxSize  = 10
 | 
			
		||||
	)
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	f := critest.NewFakeRuntimeService()
 | 
			
		||||
	c := &containerLogManager{
 | 
			
		||||
		runtimeService: f,
 | 
			
		||||
		policy: LogRotatePolicy{
 | 
			
		||||
			MaxSize:  testMaxSize,
 | 
			
		||||
			MaxFiles: testMaxFiles,
 | 
			
		||||
		},
 | 
			
		||||
		osInterface: container.RealOS{},
 | 
			
		||||
		clock:       clock.NewFakeClock(now),
 | 
			
		||||
	}
 | 
			
		||||
	testLogs := []string{
 | 
			
		||||
		"test-log-1",
 | 
			
		||||
		"test-log-2",
 | 
			
		||||
		"test-log-3",
 | 
			
		||||
		"test-log-2.00000000-000000.gz",
 | 
			
		||||
		"test-log-2.00000000-000001",
 | 
			
		||||
		"test-log-3.00000000-000000.gz",
 | 
			
		||||
		"test-log-3.00000000-000001",
 | 
			
		||||
	}
 | 
			
		||||
	for i := range testLogs {
 | 
			
		||||
		f, err := os.Create(filepath.Join(dir, testLogs[i]))
 | 
			
		||||
		require.NoError(t, err)
 | 
			
		||||
		f.Close()
 | 
			
		||||
	}
 | 
			
		||||
	testContainers := []*critest.FakeContainer{
 | 
			
		||||
		{
 | 
			
		||||
			ContainerStatus: runtimeapi.ContainerStatus{
 | 
			
		||||
				Id:      "container-1",
 | 
			
		||||
				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
 | 
			
		||||
				LogPath: filepath.Join(dir, testLogs[0]),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			ContainerStatus: runtimeapi.ContainerStatus{
 | 
			
		||||
				Id:      "container-2",
 | 
			
		||||
				State:   runtimeapi.ContainerState_CONTAINER_RUNNING,
 | 
			
		||||
				LogPath: filepath.Join(dir, testLogs[1]),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			ContainerStatus: runtimeapi.ContainerStatus{
 | 
			
		||||
				Id:      "container-3",
 | 
			
		||||
				State:   runtimeapi.ContainerState_CONTAINER_EXITED,
 | 
			
		||||
				LogPath: filepath.Join(dir, testLogs[2]),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	f.SetFakeContainers(testContainers)
 | 
			
		||||
 | 
			
		||||
	err = c.Clean("container-3")
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
	logs, err := ioutil.ReadDir(dir)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.Len(t, logs, 4)
 | 
			
		||||
	assert.Equal(t, testLogs[0], logs[0].Name())
 | 
			
		||||
	assert.Equal(t, testLogs[1], logs[1].Name())
 | 
			
		||||
	assert.Equal(t, testLogs[3], logs[2].Name())
 | 
			
		||||
	assert.Equal(t, testLogs[4], logs[3].Name())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCleanupUnusedLog(t *testing.T) {
 | 
			
		||||
	dir, err := ioutil.TempDir("", "test-cleanup-unused-log")
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
@@ -178,7 +251,9 @@ func TestCleanupUnusedLog(t *testing.T) {
 | 
			
		||||
		f.Close()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c := &containerLogManager{}
 | 
			
		||||
	c := &containerLogManager{
 | 
			
		||||
		osInterface: container.RealOS{},
 | 
			
		||||
	}
 | 
			
		||||
	got, err := c.cleanupUnusedLogs(testLogs)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.Len(t, got, 2)
 | 
			
		||||
@@ -223,7 +298,10 @@ func TestRemoveExcessLog(t *testing.T) {
 | 
			
		||||
			f.Close()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		c := &containerLogManager{policy: LogRotatePolicy{MaxFiles: test.max}}
 | 
			
		||||
		c := &containerLogManager{
 | 
			
		||||
			policy:      LogRotatePolicy{MaxFiles: test.max},
 | 
			
		||||
			osInterface: container.RealOS{},
 | 
			
		||||
		}
 | 
			
		||||
		got, err := c.removeExcessLogs(testLogs)
 | 
			
		||||
		require.NoError(t, err)
 | 
			
		||||
		require.Len(t, got, len(test.expect))
 | 
			
		||||
@@ -253,7 +331,7 @@ func TestCompressLog(t *testing.T) {
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
	testLog := testFile.Name()
 | 
			
		||||
	c := &containerLogManager{}
 | 
			
		||||
	c := &containerLogManager{osInterface: container.RealOS{}}
 | 
			
		||||
	require.NoError(t, c.compressLog(testLog))
 | 
			
		||||
	_, err = os.Stat(testLog + compressSuffix)
 | 
			
		||||
	assert.NoError(t, err, "log should be compressed")
 | 
			
		||||
@@ -303,6 +381,7 @@ func TestRotateLatestLog(t *testing.T) {
 | 
			
		||||
		c := &containerLogManager{
 | 
			
		||||
			runtimeService: f,
 | 
			
		||||
			policy:         LogRotatePolicy{MaxFiles: test.maxFiles},
 | 
			
		||||
			osInterface:    container.RealOS{},
 | 
			
		||||
			clock:          clock.NewFakeClock(now),
 | 
			
		||||
		}
 | 
			
		||||
		if test.runtimeError != nil {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user