Merge pull request #133242 from ahmedtd/fix-podcerts

Pod Certificates: Fix kubelet volume host arg order; improve logging
This commit is contained in:
Kubernetes Prow Robot
2025-07-30 09:34:27 -07:00
committed by GitHub
3 changed files with 107 additions and 32 deletions

View File

@@ -109,11 +109,11 @@ type IssuingManager struct {
}
type projectionKey struct {
namespace string
podName string
podUID string
volumeName string
sourceIndex int
Namespace string
PodName string
PodUID string
VolumeName string
SourceIndex int
}
type projectionRecord struct {
@@ -259,13 +259,14 @@ func (m *IssuingManager) queueAllProjectionsForPod(uid types.UID) {
continue
}
m.projectionQueue.Add(projectionKey{
namespace: pod.ObjectMeta.Namespace,
podName: pod.ObjectMeta.Name,
podUID: string(pod.ObjectMeta.UID),
volumeName: v.Name,
sourceIndex: sourceIndex,
})
key := projectionKey{
Namespace: pod.ObjectMeta.Namespace,
PodName: pod.ObjectMeta.Name,
PodUID: string(pod.ObjectMeta.UID),
VolumeName: v.Name,
SourceIndex: sourceIndex,
}
m.projectionQueue.Add(key)
}
}
}
@@ -298,7 +299,7 @@ func (m *IssuingManager) processNextProjection(ctx context.Context) bool {
err := m.handleProjection(ctx, key)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "while handling podCertificate projected volume source", "namespace", key.namespace, "pod", key.podName, "volume", key.volumeName, "sourceIndex", key.sourceIndex)
utilruntime.HandleErrorWithContext(ctx, err, "while handling podCertificate projected volume source", "namespace", key.Namespace, "pod", key.PodName, "volume", key.VolumeName, "sourceIndex", key.SourceIndex)
m.projectionQueue.AddRateLimited(key)
return true
}
@@ -311,7 +312,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
// Remember, returning nil from this function indicates that the work item
// was successfully processed, and should be dropped from the queue.
pod, ok := m.podManager.GetPodByUID(types.UID(key.podUID))
pod, ok := m.podManager.GetPodByUID(types.UID(key.PodUID))
if !ok {
// If we can't find the pod anymore, it's been deleted. Clear all our
// internal state associated with the pod and return a nil error so it
@@ -320,7 +321,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
m.lock.Lock()
defer m.lock.Unlock()
for k := range m.credStore {
if k.namespace == key.namespace && k.podName == key.podName && k.podUID == key.podUID {
if k.Namespace == key.Namespace && k.PodName == key.PodName && k.PodUID == key.PodUID {
delete(m.credStore, k)
}
}
@@ -330,9 +331,9 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
var source *corev1.PodCertificateProjection
for _, vol := range pod.Spec.Volumes {
if vol.Name == key.volumeName && vol.Projected != nil {
if vol.Name == key.VolumeName && vol.Projected != nil {
for i, volumeSource := range vol.Projected.Sources {
if i == key.sourceIndex && volumeSource.PodCertificate != nil {
if i == key.SourceIndex && volumeSource.PodCertificate != nil {
source = volumeSource.PodCertificate
}
}
@@ -409,7 +410,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
// We are working through the initial issuance. We created a PCR, now
// we need to wait for it to reach a terminal state.
pcr, err := m.pcrLister.PodCertificateRequests(key.namespace).Get(state.pcrName)
pcr, err := m.pcrLister.PodCertificateRequests(key.Namespace).Get(state.pcrName)
if k8serrors.IsNotFound(err) && m.clock.Now().After(state.pcrAbandonAt) {
// "Not Found" could be due to informer lag, or because someone
// deleted the PodCertificateRequest. In the first case, the
@@ -423,7 +424,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
rec.curState = &credStateInitial{}
return fmt.Errorf("PodCertificateRequest %q appears to have been deleted", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
} else if err != nil {
return fmt.Errorf("while getting PodCertificateRequest %q: %w", key.namespace+"/"+state.pcrName, err)
return fmt.Errorf("while getting PodCertificateRequest %q: %w", key.Namespace+"/"+state.pcrName, err)
}
// If the PodCertificateRequest has moved to a terminal state, update
@@ -525,7 +526,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
case *credStateWaitRefresh:
// Check the refresh PodCertificateRequest
pcr, err := m.pcrLister.PodCertificateRequests(key.namespace).Get(state.refreshPCRName)
pcr, err := m.pcrLister.PodCertificateRequests(key.Namespace).Get(state.refreshPCRName)
if k8serrors.IsNotFound(err) && m.clock.Now().After(state.refreshPCRAbandonAt) {
// "Not Found" could be due to informer lag, or because someone
// deleted the PodCertificateRequest. In the first case, the
@@ -543,7 +544,7 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
}
return fmt.Errorf("PodCertificateRequest appears to have been deleted")
} else if err != nil {
return fmt.Errorf("while getting PodCertificateRequest %q: %w", key.namespace+"/"+state.refreshPCRName, err)
return fmt.Errorf("while getting PodCertificateRequest %q: %w", key.Namespace+"/"+state.refreshPCRName, err)
}
// If the PodCertificateRequest has moved to a terminal state, update
@@ -675,24 +676,23 @@ func (m *IssuingManager) createPodCertificateRequest(
}
func (m *IssuingManager) GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) ([]byte, []byte, error) {
credKey := projectionKey{
Namespace: namespace,
PodName: podName,
PodUID: podUID,
VolumeName: volumeName,
SourceIndex: sourceIndex,
}
var rec *projectionRecord
func() {
m.lock.Lock()
defer m.lock.Unlock()
credKey := projectionKey{
namespace: namespace,
podName: podName,
podUID: podUID,
volumeName: volumeName,
sourceIndex: sourceIndex,
}
rec = m.credStore[credKey]
}()
if rec == nil {
return nil, nil, fmt.Errorf("no credentials yet")
return nil, nil, fmt.Errorf("no credentials yet for key=%v", credKey)
}
rec.lock.Lock()

View File

@@ -266,7 +266,7 @@ func (kvh *kubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSe
}
func (kvh *kubeletVolumeHost) GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) ([]byte, []byte, error) {
return kvh.podCertificateManager.GetPodCertificateCredentialBundle(ctx, namespace, podName, volumeName, podUID, sourceIndex)
return kvh.podCertificateManager.GetPodCertificateCredentialBundle(ctx, namespace, podName, podUID, volumeName, sourceIndex)
}
func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {

View File

@@ -0,0 +1,75 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
)
type recordingPodCertificateManager struct {
Namespace string
PodName string
PodUID string
VolumeName string
SourceIndex int
}
func (f *recordingPodCertificateManager) GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) ([]byte, []byte, error) {
f.Namespace = namespace
f.PodName = podName
f.PodUID = podUID
f.VolumeName = volumeName
f.SourceIndex = sourceIndex
return nil, nil, nil
}
func (f *recordingPodCertificateManager) TrackPod(ctx context.Context, pod *corev1.Pod) {}
func (f *recordingPodCertificateManager) ForgetPod(ctx context.Context, pod *corev1.Pod) {}
// Check that GetPodCertificateCredentialBundle forwards its arguments in the
// correct order. Seems excessive, but we got here because I put the arguments
// in the wrong order...
func TestGetPodCertificateCredentialBundle(t *testing.T) {
recorder := &recordingPodCertificateManager{}
kvh := &kubeletVolumeHost{
podCertificateManager: recorder,
}
_, _, err := kvh.GetPodCertificateCredentialBundle(context.Background(), "namespace", "pod-name", "pod-uid", "volume-name", 10)
if err != nil {
t.Fatalf("Unexpected error calling GetPodCertificateCredentialBundle: %v", err)
}
want := &recordingPodCertificateManager{
Namespace: "namespace",
PodName: "pod-name",
PodUID: "pod-uid",
VolumeName: "volume-name",
SourceIndex: 10,
}
if diff := cmp.Diff(recorder, want); diff != "" {
t.Errorf("Wrong input to GetPodCertificateCredentialBundle; diff (-got +want)\n%s", diff)
}
}