Revert "plumb context from CRI calls through kubelet"

This reverts commit f43b4f1b95.
This commit is contained in:
Antonio Ojea
2022-11-02 13:37:23 +00:00
parent 548d135ea6
commit 9c2b333925
115 changed files with 1183 additions and 1440 deletions

View File

@@ -177,7 +177,6 @@ func TestUpdateNewNodeStatus(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
// generate one more in inputImageList than we configure the Kubelet to report,
// or 5 images if unlimited
numTestImages := int(tc.nodeStatusMaxImages) + 1
@@ -291,7 +290,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
@@ -316,7 +315,6 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
func TestUpdateExistingNodeStatus(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@@ -480,7 +478,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
@@ -508,7 +506,6 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
ctx := context.Background()
if testing.Short() {
t.Skip("skipping test in short mode.")
}
@@ -562,7 +559,7 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
}
// should return an error, but not hang
assert.Error(t, kubelet.updateNodeStatus(ctx))
assert.Error(t, kubelet.updateNodeStatus())
// should have attempted multiple times
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
@@ -575,7 +572,6 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@@ -685,13 +681,13 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
checkNodeStatus := func(status v1.ConditionStatus, reason string) {
kubeClient.ClearActions()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
updatedNode, err := kubeClient.CoreV1().Nodes().Get(ctx, testKubeletHostname, metav1.GetOptions{})
updatedNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), testKubeletHostname, metav1.GetOptions{})
require.NoError(t, err, "can't apply node status patch")
for i, cond := range updatedNode.Status.Conditions {
@@ -785,19 +781,17 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
}
func TestUpdateNodeStatusError(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain
assert.Error(t, kubelet.updateNodeStatus(ctx))
assert.Error(t, kubelet.updateNodeStatus())
assert.Len(t, testKubelet.fakeKubeClient.Actions(), nodeStatusUpdateRetry)
}
func TestUpdateNodeStatusWithLease(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
clock := testKubelet.fakeClock
@@ -917,7 +911,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status when node status is created.
// Report node status.
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
assert.Len(t, actions, 2)
@@ -940,7 +934,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status again when nothing is changed (except heartbeat time).
// Report node status if it has exceeded the duration of nodeStatusReportFrequency.
clock.Step(time.Minute)
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 2 actions before).
actions = kubeClient.Actions()
@@ -965,7 +959,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
// Update node status again when nothing is changed (except heartbeat time).
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
clock.Step(10 * time.Second)
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 4 actions before).
actions = kubeClient.Actions()
@@ -983,7 +977,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
newMachineInfo := oldMachineInfo.Clone()
newMachineInfo.MemoryCapacity = uint64(newMemoryCapacity)
kubelet.setCachedMachineInfo(newMachineInfo)
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
// 2 more action (There were 5 actions before).
actions = kubeClient.Actions()
@@ -1015,7 +1009,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
updatedNode.Spec.PodCIDR = podCIDRs[0]
updatedNode.Spec.PodCIDRs = podCIDRs
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
// 2 more action (There were 7 actions before).
actions = kubeClient.Actions()
@@ -1028,7 +1022,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
clock.Step(10 * time.Second)
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
// Only 1 more action (There were 9 actions before).
actions = kubeClient.Actions()
assert.Len(t, actions, 10)
@@ -1084,7 +1078,6 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
// Setup
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@@ -1101,7 +1094,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubelet.volumeManager = fakeVolumeManager
// Only test VolumesInUse setter
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
nodestatus.VolumesInUse(kubelet.volumeManager.ReconcilerStatesHasBeenSynced,
kubelet.volumeManager.GetVolumesInUse),
}
@@ -1110,7 +1103,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
// Execute
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
// Validate
actions := kubeClient.Actions()
@@ -1352,7 +1345,6 @@ func TestTryRegisterWithApiServer(t *testing.T) {
}
func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
ctx := context.Background()
const nodeStatusMaxImages = 5
// generate one more in inputImageList than we configure the Kubelet to report
@@ -1411,7 +1403,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
}
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
@@ -2825,7 +2817,6 @@ func TestUpdateNodeAddresses(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
ctx := context.Background()
oldNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
@@ -2841,15 +2832,15 @@ func TestUpdateNodeAddresses(t *testing.T) {
},
}
_, err := kubeClient.CoreV1().Nodes().Update(ctx, oldNode, metav1.UpdateOptions{})
_, err := kubeClient.CoreV1().Nodes().Update(context.TODO(), oldNode, metav1.UpdateOptions{})
assert.NoError(t, err)
kubelet.setNodeStatusFuncs = []func(context.Context, *v1.Node) error{
func(_ context.Context, node *v1.Node) error {
kubelet.setNodeStatusFuncs = []func(*v1.Node) error{
func(node *v1.Node) error {
node.Status.Addresses = expectedNode.Status.Addresses
return nil
},
}
assert.NoError(t, kubelet.updateNodeStatus(ctx))
assert.NoError(t, kubelet.updateNodeStatus())
actions := kubeClient.Actions()
lastAction := actions[len(actions)-1]