mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			171 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			171 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2019 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package pluginmanager
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"reflect"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/stretchr/testify/require"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
 | |
| 
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/config"
 | |
| 	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	socketDir         string
 | |
| 	supportedVersions = []string{"v1beta1", "v1beta2"}
 | |
| )
 | |
| 
 | |
| type fakePluginHandler struct {
 | |
| 	events []string
 | |
| 	sync.RWMutex
 | |
| }
 | |
| 
 | |
| func newFakePluginHandler() *fakePluginHandler {
 | |
| 	return &fakePluginHandler{}
 | |
| }
 | |
| 
 | |
| // ValidatePlugin is a fake method
 | |
| func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
 | |
| 	f.Lock()
 | |
| 	defer f.Unlock()
 | |
| 	f.events = append(f.events, "validate "+pluginName)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RegisterPlugin is a fake method
 | |
| func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
 | |
| 	f.Lock()
 | |
| 	defer f.Unlock()
 | |
| 	f.events = append(f.events, "register "+pluginName)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DeRegisterPlugin is a fake method
 | |
| func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
 | |
| 	f.Lock()
 | |
| 	defer f.Unlock()
 | |
| 	f.events = append(f.events, "deregister "+pluginName)
 | |
| }
 | |
| 
 | |
| func (f *fakePluginHandler) Reset() {
 | |
| 	f.Lock()
 | |
| 	defer f.Unlock()
 | |
| 	f.events = nil
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	d, err := os.MkdirTemp("", "plugin_manager_test")
 | |
| 	if err != nil {
 | |
| 		panic(fmt.Sprintf("Could not create a temp directory: %s", d))
 | |
| 	}
 | |
| 
 | |
| 	socketDir = d
 | |
| }
 | |
| 
 | |
| func cleanup(t *testing.T) {
 | |
| 	require.NoError(t, os.RemoveAll(socketDir))
 | |
| 	os.MkdirAll(socketDir, 0755)
 | |
| }
 | |
| 
 | |
| func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
 | |
| 	expected := []string{"validate " + pluginName, "register " + pluginName}
 | |
| 	err := retryWithExponentialBackOff(
 | |
| 		100*time.Millisecond,
 | |
| 		func() (bool, error) {
 | |
| 			fakePluginHandler.Lock()
 | |
| 			defer fakePluginHandler.Unlock()
 | |
| 			if reflect.DeepEqual(fakePluginHandler.events, expected) {
 | |
| 				return true, nil
 | |
| 			}
 | |
| 			t.Logf("expected %#v, got %#v, will retry", expected, fakePluginHandler.events)
 | |
| 			return false, nil
 | |
| 		},
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
 | |
| 	backoff := wait.Backoff{
 | |
| 		Duration: initialDuration,
 | |
| 		Factor:   3,
 | |
| 		Jitter:   0,
 | |
| 		Steps:    6,
 | |
| 	}
 | |
| 	return wait.ExponentialBackoff(backoff, fn)
 | |
| }
 | |
| 
 | |
| func TestPluginRegistration(t *testing.T) {
 | |
| 	defer cleanup(t)
 | |
| 
 | |
| 	pluginManager := newTestPluginManager(socketDir)
 | |
| 
 | |
| 	// Start the plugin manager
 | |
| 	stopChan := make(chan struct{})
 | |
| 	defer close(stopChan)
 | |
| 	go func() {
 | |
| 		sourcesReady := config.NewSourcesReady(func(_ sets.Set[string]) bool { return true })
 | |
| 		pluginManager.Run(sourcesReady, stopChan)
 | |
| 	}()
 | |
| 
 | |
| 	// Add handler for device plugin
 | |
| 	fakeHandler := newFakePluginHandler()
 | |
| 	pluginManager.AddHandler(registerapi.DevicePlugin, fakeHandler)
 | |
| 
 | |
| 	const maxDepth = 3
 | |
| 	// Make sure the plugin manager is aware of the socket in subdirectories
 | |
| 	for i := 0; i < maxDepth; i++ {
 | |
| 		fakeHandler.Reset()
 | |
| 		pluginDir := socketDir
 | |
| 
 | |
| 		for j := 0; j < i; j++ {
 | |
| 			pluginDir = filepath.Join(pluginDir, strconv.Itoa(j))
 | |
| 		}
 | |
| 		require.NoError(t, os.MkdirAll(pluginDir, os.ModePerm))
 | |
| 		socketPath := filepath.Join(pluginDir, fmt.Sprintf("plugin-%d.sock", i))
 | |
| 
 | |
| 		// Add a new plugin
 | |
| 		pluginName := fmt.Sprintf("example-plugin-%d", i)
 | |
| 		p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
 | |
| 		require.NoError(t, p.Serve("v1beta1", "v1beta2"))
 | |
| 
 | |
| 		// Verify that the plugin is registered
 | |
| 		waitForRegistration(t, fakeHandler, pluginName)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newTestPluginManager(sockDir string) PluginManager {
 | |
| 	pm := NewPluginManager(
 | |
| 		sockDir,
 | |
| 		&record.FakeRecorder{},
 | |
| 	)
 | |
| 	return pm
 | |
| }
 | 
