mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	kube-proxy: improve logging around network programming latency SLI.
This commit is contained in:
		@@ -25,7 +25,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
@@ -197,14 +197,15 @@ type UpdateEndpointMapResult struct {
 | 
			
		||||
	StaleServiceNames []ServicePortName
 | 
			
		||||
	// List of the trigger times for all endpoints objects that changed. It's used to export the
 | 
			
		||||
	// network programming latency.
 | 
			
		||||
	LastChangeTriggerTimes []time.Time
 | 
			
		||||
	// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
 | 
			
		||||
	LastChangeTriggerTimes map[types.NamespacedName][]time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateEndpointsMap updates endpointsMap base on the given changes.
 | 
			
		||||
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
 | 
			
		||||
	result.StaleEndpoints = make([]ServiceEndpoint, 0)
 | 
			
		||||
	result.StaleServiceNames = make([]ServicePortName, 0)
 | 
			
		||||
	result.LastChangeTriggerTimes = make([]time.Time, 0)
 | 
			
		||||
	result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
 | 
			
		||||
 | 
			
		||||
	em.apply(
 | 
			
		||||
		changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
 | 
			
		||||
@@ -287,7 +288,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
 | 
			
		||||
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
 | 
			
		||||
// that were changed and will result in syncing the proxy rules.
 | 
			
		||||
func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
 | 
			
		||||
	staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
 | 
			
		||||
	staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
 | 
			
		||||
	if changes == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
@@ -300,8 +301,13 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
 | 
			
		||||
	}
 | 
			
		||||
	changes.items = make(map[types.NamespacedName]*endpointsChange)
 | 
			
		||||
	metrics.EndpointChangesPending.Set(0)
 | 
			
		||||
	for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
 | 
			
		||||
		*lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
 | 
			
		||||
	for k, v := range changes.lastChangeTriggerTimes {
 | 
			
		||||
		prev, ok := (*lastChangeTriggerTimes)[k]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			(*lastChangeTriggerTimes)[k] = v
 | 
			
		||||
		} else {
 | 
			
		||||
			(*lastChangeTriggerTimes)[k] = append(prev, v...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,13 +18,12 @@ package proxy
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/davecgh/go-spew/spew"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
@@ -1288,6 +1287,10 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
		return e
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	createName := func(namespace, name string) types.NamespacedName {
 | 
			
		||||
		return types.NamespacedName{Namespace: namespace, Name: name}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints {
 | 
			
		||||
		e := endpoints.DeepCopy()
 | 
			
		||||
		e.Subsets[0].Ports[0].Port++
 | 
			
		||||
@@ -1295,14 +1298,10 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
		return e
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sortTimeSlice := func(data []time.Time) {
 | 
			
		||||
		sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) })
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testCases := []struct {
 | 
			
		||||
		name     string
 | 
			
		||||
		scenario func(fp *FakeProxier)
 | 
			
		||||
		expected []time.Time
 | 
			
		||||
		expected map[types.NamespacedName][]time.Time
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "Single addEndpoints",
 | 
			
		||||
@@ -1310,7 +1309,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				e := createEndpoints("ns", "ep1", t0)
 | 
			
		||||
				fp.addEndpoints(e)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{t0},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "addEndpoints then updatedEndpoints",
 | 
			
		||||
@@ -1321,7 +1320,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				e1 := modifyEndpoints(e, t1)
 | 
			
		||||
				fp.updateEndpoints(e, e1)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{t0, t1},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Add two endpoints then modify one",
 | 
			
		||||
@@ -1335,7 +1334,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				e11 := modifyEndpoints(e1, t3)
 | 
			
		||||
				fp.updateEndpoints(e1, e11)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{t1, t2, t3},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Endpoints without annotation set",
 | 
			
		||||
@@ -1344,7 +1343,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				delete(e.Annotations, v1.EndpointsLastChangeTriggerTime)
 | 
			
		||||
				fp.addEndpoints(e)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "addEndpoints then deleteEndpoints",
 | 
			
		||||
@@ -1353,7 +1352,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				fp.addEndpoints(e)
 | 
			
		||||
				fp.deleteEndpoints(e)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "add then delete then add again",
 | 
			
		||||
@@ -1364,7 +1363,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
				e = modifyEndpoints(e, t2)
 | 
			
		||||
				fp.addEndpoints(e)
 | 
			
		||||
			},
 | 
			
		||||
			expected: []time.Time{t2},
 | 
			
		||||
			expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -1375,8 +1374,6 @@ func TestLastChangeTriggerTime(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
		result := fp.endpointsMap.Update(fp.endpointsChanges)
 | 
			
		||||
		got := result.LastChangeTriggerTimes
 | 
			
		||||
		sortTimeSlice(got)
 | 
			
		||||
		sortTimeSlice(tc.expected)
 | 
			
		||||
 | 
			
		||||
		if !reflect.DeepEqual(got, tc.expected) {
 | 
			
		||||
			t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v",
 | 
			
		||||
 
 | 
			
		||||
@@ -1406,10 +1406,12 @@ func (proxier *Proxier) syncProxyRules() {
 | 
			
		||||
		utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
 | 
			
		||||
	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
 | 
			
		||||
		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
 | 
			
		||||
			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
 | 
			
		||||
			metrics.NetworkProgrammingLatency.Observe(latency)
 | 
			
		||||
		klog.V(4).Infof("Network programming took %f seconds", latency)
 | 
			
		||||
			klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Close old local ports and save new ones.
 | 
			
		||||
 
 | 
			
		||||
@@ -1314,10 +1314,12 @@ func (proxier *Proxier) syncProxyRules() {
 | 
			
		||||
		utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
 | 
			
		||||
	for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
 | 
			
		||||
		for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
 | 
			
		||||
			latency := metrics.SinceInSeconds(lastChangeTriggerTime)
 | 
			
		||||
			metrics.NetworkProgrammingLatency.Observe(latency)
 | 
			
		||||
		klog.V(4).Infof("Network programming took %f seconds", latency)
 | 
			
		||||
			klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Close old local ports and save new ones.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user