mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #72709 from changyaowei/pleg_relist
When pleg channel is full, discard events and record its count
This commit is contained in:
		@@ -39,6 +39,7 @@ const (
 | 
			
		||||
	CgroupManagerOperationsKey   = "cgroup_manager_latency_microseconds"
 | 
			
		||||
	PodWorkerStartLatencyKey     = "pod_worker_start_latency_microseconds"
 | 
			
		||||
	PLEGRelistLatencyKey         = "pleg_relist_latency_microseconds"
 | 
			
		||||
	PLEGDiscardEventsKey         = "pleg_discard_events"
 | 
			
		||||
	PLEGRelistIntervalKey        = "pleg_relist_interval_microseconds"
 | 
			
		||||
	EvictionStatsAgeKey          = "eviction_stats_age_microseconds"
 | 
			
		||||
	VolumeStatsCapacityBytesKey  = "volume_stats_capacity_bytes"
 | 
			
		||||
@@ -124,6 +125,14 @@ var (
 | 
			
		||||
			Help:      "Latency in microseconds for relisting pods in PLEG.",
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	PLEGDiscardEvents = prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Subsystem: KubeletSubsystem,
 | 
			
		||||
			Name:      PLEGDiscardEventsKey,
 | 
			
		||||
			Help:      "The number of discard events in PLEG.",
 | 
			
		||||
		},
 | 
			
		||||
		[]string{},
 | 
			
		||||
	)
 | 
			
		||||
	PLEGRelistInterval = prometheus.NewSummary(
 | 
			
		||||
		prometheus.SummaryOpts{
 | 
			
		||||
			Subsystem: KubeletSubsystem,
 | 
			
		||||
@@ -246,6 +255,7 @@ func Register(containerCache kubecontainer.RuntimeCache, collectors ...prometheu
 | 
			
		||||
		prometheus.MustRegister(ContainersPerPodCount)
 | 
			
		||||
		prometheus.MustRegister(newPodAndContainerCollector(containerCache))
 | 
			
		||||
		prometheus.MustRegister(PLEGRelistLatency)
 | 
			
		||||
		prometheus.MustRegister(PLEGDiscardEvents)
 | 
			
		||||
		prometheus.MustRegister(PLEGRelistInterval)
 | 
			
		||||
		prometheus.MustRegister(RuntimeOperations)
 | 
			
		||||
		prometheus.MustRegister(RuntimeOperationsLatency)
 | 
			
		||||
 
 | 
			
		||||
@@ -265,7 +265,12 @@ func (g *GenericPLEG) relist() {
 | 
			
		||||
			if events[i].Type == ContainerChanged {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			g.eventChannel <- events[i]
 | 
			
		||||
			select {
 | 
			
		||||
			case g.eventChannel <- events[i]:
 | 
			
		||||
			default:
 | 
			
		||||
				metrics.PLEGDiscardEvents.WithLabelValues().Inc()
 | 
			
		||||
				klog.Error("event channel is full, discard this relist() cycle event")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -34,6 +34,8 @@ import (
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	testContainerRuntimeType = "fooRuntime"
 | 
			
		||||
	// largeChannelCap is a large enough capacity to hold all events in a single test.
 | 
			
		||||
	largeChannelCap = 100
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type TestGenericPLEG struct {
 | 
			
		||||
@@ -43,6 +45,10 @@ type TestGenericPLEG struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestGenericPLEG() *TestGenericPLEG {
 | 
			
		||||
	return newTestGenericPLEGWithChannelSize(largeChannelCap)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG {
 | 
			
		||||
	fakeRuntime := &containertest.FakeRuntime{}
 | 
			
		||||
	clock := clock.NewFakeClock(time.Time{})
 | 
			
		||||
	// The channel capacity should be large enough to hold all events in a
 | 
			
		||||
@@ -50,7 +56,7 @@ func newTestGenericPLEG() *TestGenericPLEG {
 | 
			
		||||
	pleg := &GenericPLEG{
 | 
			
		||||
		relistPeriod: time.Hour,
 | 
			
		||||
		runtime:      fakeRuntime,
 | 
			
		||||
		eventChannel: make(chan *PodLifecycleEvent, 100),
 | 
			
		||||
		eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
 | 
			
		||||
		podRecords:   make(podRecords),
 | 
			
		||||
		clock:        clock,
 | 
			
		||||
	}
 | 
			
		||||
@@ -157,6 +163,65 @@ func TestRelisting(t *testing.T) {
 | 
			
		||||
	verifyEvents(t, expected, actual)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestEventChannelFull test when channel is full, the events will be discard.
 | 
			
		||||
func TestEventChannelFull(t *testing.T) {
 | 
			
		||||
	testPleg := newTestGenericPLEGWithChannelSize(4)
 | 
			
		||||
	pleg, runtime := testPleg.pleg, testPleg.runtime
 | 
			
		||||
	ch := pleg.Watch()
 | 
			
		||||
	// The first relist should send a PodSync event to each pod.
 | 
			
		||||
	runtime.AllPodList = []*containertest.FakePod{
 | 
			
		||||
		{Pod: &kubecontainer.Pod{
 | 
			
		||||
			ID: "1234",
 | 
			
		||||
			Containers: []*kubecontainer.Container{
 | 
			
		||||
				createTestContainer("c1", kubecontainer.ContainerStateExited),
 | 
			
		||||
				createTestContainer("c2", kubecontainer.ContainerStateRunning),
 | 
			
		||||
				createTestContainer("c3", kubecontainer.ContainerStateUnknown),
 | 
			
		||||
			},
 | 
			
		||||
		}},
 | 
			
		||||
		{Pod: &kubecontainer.Pod{
 | 
			
		||||
			ID: "4567",
 | 
			
		||||
			Containers: []*kubecontainer.Container{
 | 
			
		||||
				createTestContainer("c1", kubecontainer.ContainerStateExited),
 | 
			
		||||
			},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	pleg.relist()
 | 
			
		||||
	// Report every running/exited container if we see them for the first time.
 | 
			
		||||
	expected := []*PodLifecycleEvent{
 | 
			
		||||
		{ID: "1234", Type: ContainerStarted, Data: "c2"},
 | 
			
		||||
		{ID: "4567", Type: ContainerDied, Data: "c1"},
 | 
			
		||||
		{ID: "1234", Type: ContainerDied, Data: "c1"},
 | 
			
		||||
	}
 | 
			
		||||
	actual := getEventsFromChannel(ch)
 | 
			
		||||
	verifyEvents(t, expected, actual)
 | 
			
		||||
 | 
			
		||||
	runtime.AllPodList = []*containertest.FakePod{
 | 
			
		||||
		{Pod: &kubecontainer.Pod{
 | 
			
		||||
			ID: "1234",
 | 
			
		||||
			Containers: []*kubecontainer.Container{
 | 
			
		||||
				createTestContainer("c2", kubecontainer.ContainerStateExited),
 | 
			
		||||
				createTestContainer("c3", kubecontainer.ContainerStateRunning),
 | 
			
		||||
			},
 | 
			
		||||
		}},
 | 
			
		||||
		{Pod: &kubecontainer.Pod{
 | 
			
		||||
			ID: "4567",
 | 
			
		||||
			Containers: []*kubecontainer.Container{
 | 
			
		||||
				createTestContainer("c4", kubecontainer.ContainerStateRunning),
 | 
			
		||||
			},
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	pleg.relist()
 | 
			
		||||
	// event channel is full, discard events
 | 
			
		||||
	expected = []*PodLifecycleEvent{
 | 
			
		||||
		{ID: "1234", Type: ContainerRemoved, Data: "c1"},
 | 
			
		||||
		{ID: "1234", Type: ContainerDied, Data: "c2"},
 | 
			
		||||
		{ID: "1234", Type: ContainerStarted, Data: "c3"},
 | 
			
		||||
		{ID: "4567", Type: ContainerRemoved, Data: "c1"},
 | 
			
		||||
	}
 | 
			
		||||
	actual = getEventsFromChannel(ch)
 | 
			
		||||
	verifyEvents(t, expected, actual)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDetectingContainerDeaths(t *testing.T) {
 | 
			
		||||
	// Vary the number of relists after the container started and before the
 | 
			
		||||
	// container died to account for the changes in pleg's internal states.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user