mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #121310 from nilekhc/polling
[KMSv2] feat: updates encryption config file watch logic to polling
This commit is contained in:
		@@ -504,6 +504,24 @@ func (h *kmsv2PluginProbe) isKMSv2ProviderHealthyAndMaybeRotateDEK(ctx context.C
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
 | 
					// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
 | 
				
			||||||
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
 | 
					func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
 | 
				
			||||||
 | 
						data, contentHash, err := loadDataAndHash(filepath)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, "", fmt.Errorf("error while loading file: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return config, contentHash, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func loadDataAndHash(filepath string) ([]byte, string, error) {
 | 
				
			||||||
	f, err := os.Open(filepath)
 | 
						f, err := os.Open(filepath)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
 | 
							return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
 | 
				
			||||||
@@ -518,16 +536,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
 | 
				
			|||||||
		return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
 | 
							return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
 | 
						return data, computeEncryptionConfigHash(data), nil
 | 
				
			||||||
	if err != nil {
 | 
					}
 | 
				
			||||||
		return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
 | 
					 | 
				
			||||||
	if !ok {
 | 
					 | 
				
			||||||
		return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
 | 
					// GetEncryptionConfigHash reads the encryption configuration file at filepath and returns the hash of the file.
 | 
				
			||||||
 | 
					// It does not attempt to decode or load the config, and serves as a cheap check to determine if the file has changed.
 | 
				
			||||||
 | 
					func GetEncryptionConfigHash(filepath string) (string, error) {
 | 
				
			||||||
 | 
						_, contentHash, err := loadDataAndHash(filepath)
 | 
				
			||||||
 | 
						return contentHash, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
 | 
					// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2177,3 +2177,48 @@ func logLines(logs string) []string {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return lines
 | 
						return lines
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestGetEncryptionConfigHash(t *testing.T) {
 | 
				
			||||||
 | 
						t.Parallel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						tests := []struct {
 | 
				
			||||||
 | 
							name     string
 | 
				
			||||||
 | 
							filepath string
 | 
				
			||||||
 | 
							wantHash string
 | 
				
			||||||
 | 
							wantErr  string
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:     "empty config file content",
 | 
				
			||||||
 | 
								filepath: "testdata/invalid-configs/kms/invalid-content.yaml",
 | 
				
			||||||
 | 
								wantHash: "",
 | 
				
			||||||
 | 
								wantErr:  `encryption provider configuration file "testdata/invalid-configs/kms/invalid-content.yaml" is empty`,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:     "missing file",
 | 
				
			||||||
 | 
								filepath: "testdata/invalid-configs/kms/file-that-does-not-exist.yaml",
 | 
				
			||||||
 | 
								wantHash: "",
 | 
				
			||||||
 | 
								wantErr:  `error opening encryption provider configuration file "testdata/invalid-configs/kms/file-that-does-not-exist.yaml": open testdata/invalid-configs/kms/file-that-does-not-exist.yaml: no such file or directory`,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:     "valid file",
 | 
				
			||||||
 | 
								filepath: "testdata/valid-configs/secret-box-first.yaml",
 | 
				
			||||||
 | 
								wantHash: "c638c0327dbc3276dd1fcf3e67895d19ebca16b91ae0d19af24ef0759b8e0f66",
 | 
				
			||||||
 | 
								wantErr:  ``,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, tt := range tests {
 | 
				
			||||||
 | 
							tt := tt
 | 
				
			||||||
 | 
							t.Run(tt.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								t.Parallel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								got, err := GetEncryptionConfigHash(tt.filepath)
 | 
				
			||||||
 | 
								if errString(err) != tt.wantErr {
 | 
				
			||||||
 | 
									t.Errorf("GetEncryptionConfigHash() error = %v, wantErr %v", err, tt.wantErr)
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if got != tt.wantHash {
 | 
				
			||||||
 | 
									t.Errorf("GetEncryptionConfigHash() got = %v, want %v", got, tt.wantHash)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,9 +20,9 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/fsnotify/fsnotify"
 | 
					 | 
				
			||||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/server/healthz"
 | 
						"k8s.io/apiserver/pkg/server/healthz"
 | 
				
			||||||
@@ -35,6 +35,9 @@ import (
 | 
				
			|||||||
// workqueueKey is the dummy key used to process change in encryption config file.
 | 
					// workqueueKey is the dummy key used to process change in encryption config file.
 | 
				
			||||||
const workqueueKey = "key"
 | 
					const workqueueKey = "key"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// EncryptionConfigFileChangePollDuration is exposed so that integration tests can crank up the reload speed.
 | 
				
			||||||
 | 
					var EncryptionConfigFileChangePollDuration = time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
 | 
					// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
 | 
				
			||||||
type DynamicKMSEncryptionConfigContent struct {
 | 
					type DynamicKMSEncryptionConfigContent struct {
 | 
				
			||||||
	name string
 | 
						name string
 | 
				
			||||||
@@ -53,6 +56,10 @@ type DynamicKMSEncryptionConfigContent struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// identity of the api server
 | 
						// identity of the api server
 | 
				
			||||||
	apiServerID string
 | 
						apiServerID string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// can be swapped during testing
 | 
				
			||||||
 | 
						getEncryptionConfigHash func(ctx context.Context, filepath string) (string, error)
 | 
				
			||||||
 | 
						loadEncryptionConfig    func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
@@ -73,77 +80,57 @@ func NewDynamicEncryptionConfiguration(
 | 
				
			|||||||
		dynamicTransformers:            dynamicTransformers,
 | 
							dynamicTransformers:            dynamicTransformers,
 | 
				
			||||||
		queue:                          workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
 | 
							queue:                          workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
 | 
				
			||||||
		apiServerID:                    apiServerID,
 | 
							apiServerID:                    apiServerID,
 | 
				
			||||||
 | 
							getEncryptionConfigHash: func(_ context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
								return encryptionconfig.GetEncryptionConfigHash(filepath)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							loadEncryptionConfig: encryptionconfig.LoadEncryptionConfig,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run
 | 
						encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return encryptionConfig
 | 
						return encryptionConfig
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Run starts the controller and blocks until stopCh is closed.
 | 
					// Run starts the controller and blocks until ctx is canceled.
 | 
				
			||||||
func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
 | 
					func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
 | 
				
			||||||
	defer utilruntime.HandleCrash()
 | 
						defer utilruntime.HandleCrash()
 | 
				
			||||||
	defer d.queue.ShutDown()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	klog.InfoS("Starting controller", "name", d.name)
 | 
						klog.InfoS("Starting controller", "name", d.name)
 | 
				
			||||||
	defer klog.InfoS("Shutting down controller", "name", d.name)
 | 
						defer klog.InfoS("Shutting down controller", "name", d.name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// start worker for processing content
 | 
						var wg sync.WaitGroup
 | 
				
			||||||
	go wait.UntilWithContext(ctx, d.runWorker, time.Second)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// start the loop that watches the encryption config file until stopCh is closed.
 | 
					 | 
				
			||||||
	go wait.UntilWithContext(ctx, func(ctx context.Context) {
 | 
					 | 
				
			||||||
		if err := d.watchEncryptionConfigFile(ctx); err != nil {
 | 
					 | 
				
			||||||
			// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
 | 
					 | 
				
			||||||
			defer d.queue.Add(workqueueKey)
 | 
					 | 
				
			||||||
			klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}, time.Second)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						wg.Add(1)
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							defer utilruntime.HandleCrash()
 | 
				
			||||||
 | 
							defer wg.Done()
 | 
				
			||||||
 | 
							defer d.queue.ShutDown()
 | 
				
			||||||
		<-ctx.Done()
 | 
							<-ctx.Done()
 | 
				
			||||||
}
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(ctx context.Context) error {
 | 
						wg.Add(1)
 | 
				
			||||||
	watcher, err := fsnotify.NewWatcher()
 | 
						go func() {
 | 
				
			||||||
	if err != nil {
 | 
							defer utilruntime.HandleCrash()
 | 
				
			||||||
		return fmt.Errorf("error creating fsnotify watcher: %w", err)
 | 
							defer wg.Done()
 | 
				
			||||||
	}
 | 
							d.runWorker(ctx)
 | 
				
			||||||
	defer watcher.Close()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err = watcher.Add(d.filePath); err != nil {
 | 
						// this function polls changes in the encryption config file by placing a dummy key in the queue.
 | 
				
			||||||
		return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
 | 
						// the 'runWorker' function then picks up this dummy key and processes the changes.
 | 
				
			||||||
	}
 | 
						// the goroutine terminates when 'ctx' is canceled.
 | 
				
			||||||
 | 
						_ = wait.PollUntilContextCancel(
 | 
				
			||||||
 | 
							ctx,
 | 
				
			||||||
 | 
							EncryptionConfigFileChangePollDuration,
 | 
				
			||||||
 | 
							true,
 | 
				
			||||||
 | 
							func(ctx context.Context) (bool, error) {
 | 
				
			||||||
 | 
								// add dummy item to the queue to trigger file content processing.
 | 
				
			||||||
 | 
								d.queue.Add(workqueueKey)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for {
 | 
								// return false to continue polling.
 | 
				
			||||||
		select {
 | 
								return false, nil
 | 
				
			||||||
		case event := <-watcher.Events:
 | 
							},
 | 
				
			||||||
			if err := d.handleWatchEvent(event, watcher); err != nil {
 | 
						)
 | 
				
			||||||
				return err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		case err := <-watcher.Errors:
 | 
					 | 
				
			||||||
			return fmt.Errorf("received fsnotify error: %w", err)
 | 
					 | 
				
			||||||
		case <-ctx.Done():
 | 
					 | 
				
			||||||
			return nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error {
 | 
						wg.Wait()
 | 
				
			||||||
	// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
 | 
					 | 
				
			||||||
	defer d.queue.Add(workqueueKey)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// return if file has not been removed or renamed.
 | 
					 | 
				
			||||||
	if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := watcher.Remove(d.filePath); err != nil {
 | 
					 | 
				
			||||||
		klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err := watcher.Add(d.filePath); err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// runWorker to process file content
 | 
					// runWorker to process file content
 | 
				
			||||||
@@ -161,6 +148,12 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer d.queue.Done(key)
 | 
						defer d.queue.Done(key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						d.processWorkItem(serverCtx, key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return true
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DynamicKMSEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey interface{}) {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		updatedEffectiveConfig  bool
 | 
							updatedEffectiveConfig  bool
 | 
				
			||||||
		err                     error
 | 
							err                     error
 | 
				
			||||||
@@ -188,25 +181,25 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
 | 
				
			|||||||
			metrics.RecordEncryptionConfigAutomaticReloadFailure(d.apiServerID)
 | 
								metrics.RecordEncryptionConfigAutomaticReloadFailure(d.apiServerID)
 | 
				
			||||||
			utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
 | 
								utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
 | 
				
			||||||
			// add dummy item back to the queue to trigger file content processing.
 | 
								// add dummy item back to the queue to trigger file content processing.
 | 
				
			||||||
			d.queue.AddRateLimited(key)
 | 
								d.queue.AddRateLimited(workqueueKey)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
 | 
						encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return true
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if !configChanged {
 | 
						if !configChanged {
 | 
				
			||||||
		return true
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(encryptionConfiguration.HealthChecks) != 1 {
 | 
						if len(encryptionConfiguration.HealthChecks) != 1 {
 | 
				
			||||||
		err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
 | 
							err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
 | 
				
			||||||
		return true
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// get healthz checks for all new KMS plugins.
 | 
						// get healthz checks for all new KMS plugins.
 | 
				
			||||||
	if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
 | 
						if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
 | 
				
			||||||
		return true
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update transformers.
 | 
						// update transformers.
 | 
				
			||||||
@@ -223,26 +216,37 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
 | 
				
			|||||||
	klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)
 | 
						klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	updatedEffectiveConfig = true
 | 
						updatedEffectiveConfig = true
 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// loadEncryptionConfig processes the next set of content from the file.
 | 
					// loadEncryptionConfig processes the next set of content from the file.
 | 
				
			||||||
func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
 | 
					func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
 | 
				
			||||||
	encryptionConfiguration *encryptionconfig.EncryptionConfiguration,
 | 
						_ *encryptionconfig.EncryptionConfiguration,
 | 
				
			||||||
	configChanged bool,
 | 
						configChanged bool,
 | 
				
			||||||
	err error,
 | 
						_ error,
 | 
				
			||||||
) {
 | 
					) {
 | 
				
			||||||
	// this code path will only execute if reload=true. So passing true explicitly.
 | 
						contentHash, err := d.getEncryptionConfigHash(ctx, d.filePath)
 | 
				
			||||||
	encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, false, err
 | 
							return nil, false, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// check if encryptionConfig is different from the current. Do nothing if they are the same.
 | 
						// check if encryptionConfig is different from the current. Do nothing if they are the same.
 | 
				
			||||||
	if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
 | 
						if contentHash == d.lastLoadedEncryptionConfigHash {
 | 
				
			||||||
		klog.V(4).InfoS("Encryption config has not changed", "name", d.name)
 | 
							klog.V(4).InfoS("Encryption config has not changed (before load)", "name", d.name)
 | 
				
			||||||
		return nil, false, nil
 | 
							return nil, false, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// this code path will only execute if reload=true. So passing true explicitly.
 | 
				
			||||||
 | 
						encryptionConfiguration, err := d.loadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// check if encryptionConfig is different from the current (again to avoid TOCTOU). Do nothing if they are the same.
 | 
				
			||||||
 | 
						if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
 | 
				
			||||||
 | 
							klog.V(4).InfoS("Encryption config has not changed (after load)", "name", d.name)
 | 
				
			||||||
 | 
							return nil, false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return encryptionConfiguration, true, nil
 | 
						return encryptionConfiguration, true, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,156 +18,374 @@ package controller
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"io"
 | 
						"fmt"
 | 
				
			||||||
	"os"
 | 
						"net/http"
 | 
				
			||||||
	"path/filepath"
 | 
						"strings"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/features"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/server/healthz"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/server/options/encryptionconfig"
 | 
				
			||||||
 | 
						utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
				
			||||||
 | 
						"k8s.io/client-go/util/workqueue"
 | 
				
			||||||
 | 
						featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
				
			||||||
 | 
						"k8s.io/component-base/metrics/legacyregistry"
 | 
				
			||||||
 | 
						"k8s.io/component-base/metrics/testutil"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestProcessEncryptionConfig(t *testing.T) {
 | 
					func TestController(t *testing.T) {
 | 
				
			||||||
	testCases := []struct {
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						const expectedSuccessMetricValue = `
 | 
				
			||||||
 | 
					# HELP apiserver_encryption_config_controller_automatic_reload_success_total [ALPHA] Total number of successful automatic reloads of encryption configuration split by apiserver identity.
 | 
				
			||||||
 | 
					# TYPE apiserver_encryption_config_controller_automatic_reload_success_total counter
 | 
				
			||||||
 | 
					apiserver_encryption_config_controller_automatic_reload_success_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1
 | 
				
			||||||
 | 
					`
 | 
				
			||||||
 | 
						const expectedFailureMetricValue = `
 | 
				
			||||||
 | 
					# HELP apiserver_encryption_config_controller_automatic_reload_failures_total [ALPHA] Total number of failed automatic reloads of encryption configuration split by apiserver identity.
 | 
				
			||||||
 | 
					# TYPE apiserver_encryption_config_controller_automatic_reload_failures_total counter
 | 
				
			||||||
 | 
					apiserver_encryption_config_controller_automatic_reload_failures_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1
 | 
				
			||||||
 | 
					`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						tests := []struct {
 | 
				
			||||||
		name                        string
 | 
							name                        string
 | 
				
			||||||
		filePath    string
 | 
							wantECFileHash              string
 | 
				
			||||||
		expectError bool
 | 
							wantTransformerClosed       bool
 | 
				
			||||||
 | 
							wantLoadCalls               int
 | 
				
			||||||
 | 
							wantHashCalls               int
 | 
				
			||||||
 | 
							wantAddRateLimitedCount     uint64
 | 
				
			||||||
 | 
							wantMetrics                 string
 | 
				
			||||||
 | 
							mockLoadEncryptionConfig    func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error)
 | 
				
			||||||
 | 
							mockGetEncryptionConfigHash func(ctx context.Context, filepath string) (string, error)
 | 
				
			||||||
	}{
 | 
						}{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:        "empty config file",
 | 
								name:                    "when invalid config is provided previous config shouldn't be changed",
 | 
				
			||||||
			filePath:    "testdata/empty_config.yaml",
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
			expectError: true,
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             expectedFailureMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 1,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("empty config file")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when new valid config is provided it should be updated",
 | 
				
			||||||
 | 
								wantECFileHash:          "some new config hash",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantMetrics:             expectedSuccessMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 0,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "valid-plugin",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "some new config hash",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when same valid config is provided previous config shouldn't be changed",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             "",
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 0,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "valid-plugin",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										// hash of initial "testdata/ec_config.yaml" config file before reloading
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when transformer's health check fails previous config shouldn't be changed",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             expectedFailureMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 1,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "invalid-plugin",
 | 
				
			||||||
 | 
												err:        fmt.Errorf("mockingly failing"),
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										KMSCloseGracePeriod:       time.Second,
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "anything different",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when multiple health checks are present previous config shouldn't be changed",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             expectedFailureMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 1,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "valid-plugin",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "another-valid-plugin",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "anything different",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when invalid health check URL is provided previous config shouldn't be changed",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             expectedFailureMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 1,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "invalid\nname",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "anything different",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when config is not updated transformers are closed correctly",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           1,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             "",
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 0,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "always changes and never errors", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return &encryptionconfig.EncryptionConfiguration{
 | 
				
			||||||
 | 
										HealthChecks: []healthz.HealthChecker{
 | 
				
			||||||
 | 
											&mockHealthChecker{
 | 
				
			||||||
 | 
												pluginName: "valid-plugin",
 | 
				
			||||||
 | 
												err:        nil,
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										// hash of initial "testdata/ec_config.yaml" config file before reloading
 | 
				
			||||||
 | 
										EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
									}, nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when config hash is not updated transformers are closed correctly",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           0,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             "",
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 0,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									// hash of initial "testdata/ec_config.yaml" config file before reloading
 | 
				
			||||||
 | 
									return "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", nil
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("should not be called")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:                    "when config hash errors transformers are closed correctly",
 | 
				
			||||||
 | 
								wantECFileHash:          "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
 | 
				
			||||||
 | 
								wantLoadCalls:           0,
 | 
				
			||||||
 | 
								wantHashCalls:           1,
 | 
				
			||||||
 | 
								wantTransformerClosed:   true,
 | 
				
			||||||
 | 
								wantMetrics:             expectedFailureMetricValue,
 | 
				
			||||||
 | 
								wantAddRateLimitedCount: 1,
 | 
				
			||||||
 | 
								mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									return "", fmt.Errorf("some io error")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("should not be called")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, testCase := range testCases {
 | 
						for _, test := range tests {
 | 
				
			||||||
		t.Run(testCase.name, func(t *testing.T) {
 | 
							t.Run(test.name, func(t *testing.T) {
 | 
				
			||||||
			ctx := context.Background()
 | 
								serverCtx, closeServer := context.WithCancel(context.Background())
 | 
				
			||||||
			d := NewDynamicEncryptionConfiguration(
 | 
								t.Cleanup(closeServer)
 | 
				
			||||||
				testCase.name,
 | 
					 | 
				
			||||||
				testCase.filePath,
 | 
					 | 
				
			||||||
				nil,
 | 
					 | 
				
			||||||
				"",
 | 
					 | 
				
			||||||
				"",
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
			_, _, err := d.processEncryptionConfig(ctx)
 | 
								legacyregistry.Reset()
 | 
				
			||||||
			if testCase.expectError && err == nil {
 | 
					
 | 
				
			||||||
				t.Fatalf("expected error but got none")
 | 
								// load initial encryption config
 | 
				
			||||||
 | 
								encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(
 | 
				
			||||||
 | 
									serverCtx,
 | 
				
			||||||
 | 
									"testdata/ec_config.yaml",
 | 
				
			||||||
 | 
									true,
 | 
				
			||||||
 | 
									"test-apiserver",
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("failed to load encryption config: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if !testCase.expectError && err != nil {
 | 
					
 | 
				
			||||||
				t.Fatalf("expected no error but got %v", err)
 | 
								d := NewDynamicEncryptionConfiguration(
 | 
				
			||||||
 | 
									"test-controller",
 | 
				
			||||||
 | 
									"does not matter",
 | 
				
			||||||
 | 
									encryptionconfig.NewDynamicTransformers(
 | 
				
			||||||
 | 
										encryptionConfiguration.Transformers,
 | 
				
			||||||
 | 
										encryptionConfiguration.HealthChecks[0],
 | 
				
			||||||
 | 
										closeServer,
 | 
				
			||||||
 | 
										encryptionConfiguration.KMSCloseGracePeriod,
 | 
				
			||||||
 | 
									),
 | 
				
			||||||
 | 
									encryptionConfiguration.EncryptionFileContentHash,
 | 
				
			||||||
 | 
									"test-apiserver",
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
								d.queue.ShutDown() // we do not use the real queue during tests
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								queue := &mockWorkQueue{
 | 
				
			||||||
 | 
									addCalled: make(chan struct{}),
 | 
				
			||||||
 | 
									cancel:    closeServer,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								d.queue = queue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								var hashCalls, loadCalls int
 | 
				
			||||||
 | 
								d.loadEncryptionConfig = func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
 | 
				
			||||||
 | 
									loadCalls++
 | 
				
			||||||
 | 
									queue.ctx = ctx
 | 
				
			||||||
 | 
									return test.mockLoadEncryptionConfig(ctx, filepath, reload, apiServerID)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								d.getEncryptionConfigHash = func(ctx context.Context, filepath string) (string, error) {
 | 
				
			||||||
 | 
									hashCalls++
 | 
				
			||||||
 | 
									queue.ctx = ctx
 | 
				
			||||||
 | 
									return test.mockGetEncryptionConfigHash(ctx, filepath)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								d.Run(serverCtx) // this should block and run exactly one iteration of the worker loop
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if test.wantECFileHash != d.lastLoadedEncryptionConfigHash {
 | 
				
			||||||
 | 
									t.Errorf("expected encryption config hash %q but got %q", test.wantECFileHash, d.lastLoadedEncryptionConfigHash)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if test.wantLoadCalls != loadCalls {
 | 
				
			||||||
 | 
									t.Errorf("load calls does not match: want=%v, got=%v", test.wantLoadCalls, loadCalls)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if test.wantHashCalls != hashCalls {
 | 
				
			||||||
 | 
									t.Errorf("hash calls does not match: want=%v, got=%v", test.wantHashCalls, hashCalls)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if test.wantTransformerClosed != queue.wasCanceled {
 | 
				
			||||||
 | 
									t.Errorf("transformer closed does not match: want=%v, got=%v", test.wantTransformerClosed, queue.wasCanceled)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if test.wantAddRateLimitedCount != queue.addRateLimitedCount.Load() {
 | 
				
			||||||
 | 
									t.Errorf("queue addRateLimitedCount does not match: want=%v, got=%v", test.wantAddRateLimitedCount, queue.addRateLimitedCount.Load())
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics),
 | 
				
			||||||
 | 
									"apiserver_encryption_config_controller_automatic_reload_success_total",
 | 
				
			||||||
 | 
									"apiserver_encryption_config_controller_automatic_reload_failures_total",
 | 
				
			||||||
 | 
								); err != nil {
 | 
				
			||||||
 | 
									t.Errorf("failed to validate metrics: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatchEncryptionConfigFile(t *testing.T) {
 | 
					type mockWorkQueue struct {
 | 
				
			||||||
	testCases := []struct {
 | 
						workqueue.RateLimitingInterface // will panic if any unexpected method is called
 | 
				
			||||||
		name          string
 | 
					 | 
				
			||||||
		generateEvent func(filePath string, cancel context.CancelFunc)
 | 
					 | 
				
			||||||
		expectError   bool
 | 
					 | 
				
			||||||
	}{
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			name:        "file not renamed or removed",
 | 
					 | 
				
			||||||
			expectError: false,
 | 
					 | 
				
			||||||
			generateEvent: func(filePath string, cancel context.CancelFunc) {
 | 
					 | 
				
			||||||
				os.Chtimes(filePath, time.Now(), time.Now())
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// wait for the event to be handled
 | 
						closeOnce sync.Once
 | 
				
			||||||
				time.Sleep(1 * time.Second)
 | 
						addCalled chan struct{}
 | 
				
			||||||
				cancel()
 | 
					 | 
				
			||||||
				os.Remove(filePath)
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			name:        "file renamed",
 | 
					 | 
				
			||||||
			expectError: true,
 | 
					 | 
				
			||||||
			generateEvent: func(filePath string, cancel context.CancelFunc) {
 | 
					 | 
				
			||||||
				os.Rename(filePath, filePath+"1")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// wait for the event to be handled
 | 
						count       atomic.Uint64
 | 
				
			||||||
				time.Sleep(1 * time.Second)
 | 
						ctx         context.Context
 | 
				
			||||||
				os.Remove(filePath + "1")
 | 
						wasCanceled bool
 | 
				
			||||||
			},
 | 
						cancel      func()
 | 
				
			||||||
		},
 | 
					
 | 
				
			||||||
		{
 | 
						addRateLimitedCount atomic.Uint64
 | 
				
			||||||
			name:        "file removed",
 | 
					}
 | 
				
			||||||
			expectError: true,
 | 
					
 | 
				
			||||||
			generateEvent: func(filePath string, cancel context.CancelFunc) {
 | 
					func (m *mockWorkQueue) Done(item interface{}) {
 | 
				
			||||||
				// allow watcher handle to start
 | 
						m.count.Add(1)
 | 
				
			||||||
				time.Sleep(1 * time.Second)
 | 
						m.wasCanceled = m.ctx.Err() != nil
 | 
				
			||||||
				os.Remove(filePath)
 | 
						m.cancel()
 | 
				
			||||||
			},
 | 
					}
 | 
				
			||||||
		},
 | 
					
 | 
				
			||||||
 | 
					func (m *mockWorkQueue) Get() (item interface{}, shutdown bool) {
 | 
				
			||||||
 | 
						<-m.addCalled
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						switch m.count.Load() {
 | 
				
			||||||
 | 
						case 0:
 | 
				
			||||||
 | 
							return nil, false
 | 
				
			||||||
 | 
						case 1:
 | 
				
			||||||
 | 
							return nil, true
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							panic("too many calls to Get")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, testCase := range testCases {
 | 
					func (m *mockWorkQueue) Add(item interface{}) {
 | 
				
			||||||
		t.Run(testCase.name, func(t *testing.T) {
 | 
						m.closeOnce.Do(func() {
 | 
				
			||||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
							close(m.addCalled)
 | 
				
			||||||
			t.Cleanup(cancel)
 | 
					 | 
				
			||||||
			testFilePath := copyFileForTest(t, "testdata/ec_config.yaml")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			d := NewDynamicEncryptionConfiguration(
 | 
					 | 
				
			||||||
				testCase.name,
 | 
					 | 
				
			||||||
				testFilePath,
 | 
					 | 
				
			||||||
				nil,
 | 
					 | 
				
			||||||
				"",
 | 
					 | 
				
			||||||
				"",
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			errs := make(chan error, 1)
 | 
					 | 
				
			||||||
			go func() {
 | 
					 | 
				
			||||||
				err := d.watchEncryptionConfigFile(ctx)
 | 
					 | 
				
			||||||
				errs <- err
 | 
					 | 
				
			||||||
			}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			testCase.generateEvent(d.filePath, cancel)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			err := <-errs
 | 
					 | 
				
			||||||
			if testCase.expectError && err == nil {
 | 
					 | 
				
			||||||
				t.Fatalf("expected error but got none")
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if !testCase.expectError && err != nil {
 | 
					 | 
				
			||||||
				t.Fatalf("expected no error but got %v", err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func copyFileForTest(t *testing.T, srcFilePath string) string {
 | 
					func (m *mockWorkQueue) ShutDown()                       {}
 | 
				
			||||||
	t.Helper()
 | 
					func (m *mockWorkQueue) AddRateLimited(item interface{}) { m.addRateLimitedCount.Add(1) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// get directory from source file path
 | 
					type mockHealthChecker struct {
 | 
				
			||||||
	srcDir := filepath.Dir(srcFilePath)
 | 
						pluginName string
 | 
				
			||||||
 | 
						err        error
 | 
				
			||||||
	// get file name from source file path
 | 
					}
 | 
				
			||||||
	srcFileName := filepath.Base(srcFilePath)
 | 
					
 | 
				
			||||||
 | 
					func (m *mockHealthChecker) Check(req *http.Request) error {
 | 
				
			||||||
	// set new file path
 | 
						return m.err
 | 
				
			||||||
	dstFilePath := filepath.Join(srcDir, "test_"+srcFileName)
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// copy src file to dst file
 | 
					func (m *mockHealthChecker) Name() string {
 | 
				
			||||||
	r, err := os.Open(srcFilePath)
 | 
						return m.pluginName
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("failed to open source file: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer r.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	w, err := os.Create(dstFilePath)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("failed to create destination file: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer w.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// copy the file
 | 
					 | 
				
			||||||
	_, err = io.Copy(w, r)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("failed to copy file: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	err = w.Close()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("failed to close destination file: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return dstFilePath
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -47,6 +47,7 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/features"
 | 
						"k8s.io/apiserver/pkg/features"
 | 
				
			||||||
	genericapiserver "k8s.io/apiserver/pkg/server"
 | 
						genericapiserver "k8s.io/apiserver/pkg/server"
 | 
				
			||||||
 | 
						encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/storage/value"
 | 
						"k8s.io/apiserver/pkg/storage/value"
 | 
				
			||||||
	aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
 | 
						aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
 | 
				
			||||||
	mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1"
 | 
						mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1"
 | 
				
			||||||
@@ -308,6 +309,9 @@ resources:
 | 
				
			|||||||
func TestEncryptionConfigHotReload(t *testing.T) {
 | 
					func TestEncryptionConfigHotReload(t *testing.T) {
 | 
				
			||||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// this makes the test super responsive. It's set to a default of 1 minute.
 | 
				
			||||||
 | 
						encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	storageConfig := framework.SharedEtcd()
 | 
						storageConfig := framework.SharedEtcd()
 | 
				
			||||||
	encryptionConfig := `
 | 
						encryptionConfig := `
 | 
				
			||||||
kind: EncryptionConfiguration
 | 
					kind: EncryptionConfiguration
 | 
				
			||||||
@@ -407,7 +411,7 @@ resources:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
 | 
						// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
 | 
				
			||||||
	// wait for config to be observed
 | 
						// wait for config to be observed
 | 
				
			||||||
	verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test)
 | 
						verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, "", test)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// run storage migration
 | 
						// run storage migration
 | 
				
			||||||
	// get secrets
 | 
						// get secrets
 | 
				
			||||||
@@ -477,6 +481,10 @@ resources:
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// remove old KMS provider
 | 
						// remove old KMS provider
 | 
				
			||||||
 | 
						// verifyIfKMSTransformersSwapped sometimes passes even before the changes in the encryption config file are observed.
 | 
				
			||||||
 | 
						// this causes the metrics tests to fail, which validate two config changes.
 | 
				
			||||||
 | 
						// this may happen when an existing KMS provider is already running (e.g., new-kms-provider-for-secrets in this case).
 | 
				
			||||||
 | 
						// to ensure that the changes are observed, we added one more provider (kms-provider-to-encrypt-all) and are validating it in verifyIfKMSTransformersSwapped.
 | 
				
			||||||
	encryptionConfigWithoutOldProvider := `
 | 
						encryptionConfigWithoutOldProvider := `
 | 
				
			||||||
kind: EncryptionConfiguration
 | 
					kind: EncryptionConfiguration
 | 
				
			||||||
apiVersion: apiserver.config.k8s.io/v1
 | 
					apiVersion: apiserver.config.k8s.io/v1
 | 
				
			||||||
@@ -495,13 +503,25 @@ resources:
 | 
				
			|||||||
       name: new-kms-provider-for-configmaps
 | 
					       name: new-kms-provider-for-configmaps
 | 
				
			||||||
       cachesize: 1000
 | 
					       cachesize: 1000
 | 
				
			||||||
       endpoint: unix:///@new-kms-provider.sock
 | 
					       endpoint: unix:///@new-kms-provider.sock
 | 
				
			||||||
 | 
					  - resources:
 | 
				
			||||||
 | 
					    - '*.*'
 | 
				
			||||||
 | 
					    providers:
 | 
				
			||||||
 | 
					    - kms:
 | 
				
			||||||
 | 
					        name: kms-provider-to-encrypt-all
 | 
				
			||||||
 | 
					        cachesize: 1000
 | 
				
			||||||
 | 
					        endpoint: unix:///@new-encrypt-all-kms-provider.sock
 | 
				
			||||||
 | 
					    - identity: {}
 | 
				
			||||||
`
 | 
					`
 | 
				
			||||||
 | 
						// start new KMS Plugin
 | 
				
			||||||
 | 
						_ = mock.NewBase64Plugin(t, "@new-encrypt-all-kms-provider.sock")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update encryption config and wait for hot reload
 | 
						// update encryption config and wait for hot reload
 | 
				
			||||||
	updateFile(t, test.configDir, encryptionConfigFileName, []byte(encryptionConfigWithoutOldProvider))
 | 
						updateFile(t, test.configDir, encryptionConfigFileName, []byte(encryptionConfigWithoutOldProvider))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						wantPrefixForEncryptAll := "k8s:enc:kms:v1:kms-provider-to-encrypt-all:"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// wait for config to be observed
 | 
						// wait for config to be observed
 | 
				
			||||||
	verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test)
 | 
						verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, wantPrefixForEncryptAll, test)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// confirm that reading secrets still works
 | 
						// confirm that reading secrets still works
 | 
				
			||||||
	_, err = test.restClient.CoreV1().Secrets(testNamespace).Get(
 | 
						_, err = test.restClient.CoreV1().Secrets(testNamespace).Get(
 | 
				
			||||||
@@ -801,9 +821,12 @@ resources:
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEncryptionConfigHotReloadFileWatch(t *testing.T) {
 | 
					func TestEncryptionConfigHotReloadFilePolling(t *testing.T) {
 | 
				
			||||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// this makes the test super responsive. It's set to a default of 1 minute.
 | 
				
			||||||
 | 
						encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	testCases := []struct {
 | 
						testCases := []struct {
 | 
				
			||||||
		sleep      time.Duration
 | 
							sleep      time.Duration
 | 
				
			||||||
		name       string
 | 
							name       string
 | 
				
			||||||
@@ -948,7 +971,7 @@ resources:
 | 
				
			|||||||
func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transformTest) {
 | 
					func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transformTest) {
 | 
				
			||||||
	// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
 | 
						// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
 | 
				
			||||||
	// wait for config to be observed
 | 
						// wait for config to be observed
 | 
				
			||||||
	verifyIfKMSTransformersSwapped(t, wantPrefix, test)
 | 
						verifyIfKMSTransformersSwapped(t, wantPrefix, "", test)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// run storage migration
 | 
						// run storage migration
 | 
				
			||||||
	secretsList, err := test.restClient.CoreV1().Secrets("").List(
 | 
						secretsList, err := test.restClient.CoreV1().Secrets("").List(
 | 
				
			||||||
@@ -982,7 +1005,7 @@ func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transfo
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *transformTest) {
 | 
					func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix, wantPrefixForEncryptAll string, test *transformTest) {
 | 
				
			||||||
	t.Helper()
 | 
						t.Helper()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var swapErr error
 | 
						var swapErr error
 | 
				
			||||||
@@ -1013,6 +1036,29 @@ func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *trans
 | 
				
			|||||||
			return false, nil
 | 
								return false, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if wantPrefixForEncryptAll != "" {
 | 
				
			||||||
 | 
								deploymentName := fmt.Sprintf("deployment-%d", idx)
 | 
				
			||||||
 | 
								_, err := test.createDeployment(deploymentName, "default")
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("Failed to create test secret, error: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								rawEnvelope, err := test.readRawRecordFromETCD(test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default"))
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("failed to read %s from etcd: %v", test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default"), err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// check prefix
 | 
				
			||||||
 | 
								if !bytes.HasPrefix(rawEnvelope.Kvs[0].Value, []byte(wantPrefixForEncryptAll)) {
 | 
				
			||||||
 | 
									idx++
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									swapErr = fmt.Errorf("expected deployment to be prefixed with %s, but got %s", wantPrefixForEncryptAll, rawEnvelope.Kvs[0].Value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// return nil error to continue polling till timeout
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return true, nil
 | 
							return true, nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if pollErr == wait.ErrWaitTimeout {
 | 
						if pollErr == wait.ErrWaitTimeout {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user